Skip to content

Download

cxg.download

FileType

Bases: StrEnum

Supported asset file types for cxg dataset download.

Source code in src/cxg/download.py
class FileType(StrEnum):
    """Supported asset file types for `cxg dataset download`."""

    h5ad = "h5ad"
    rds = "rds"

DownloadFailure dataclass

Record of a failed download attempt.

Source code in src/cxg/download.py
@dataclass(slots=True)
class DownloadFailure:
    """Record of a failed download attempt."""

    dataset_id: str
    reason: str

DownloadPlan dataclass

Plan for downloading a single dataset asset file.

Source code in src/cxg/download.py
@dataclass(slots=True)
class DownloadPlan:
    """Plan for downloading a single dataset asset file."""

    dataset_id: str
    title: str
    filename: str
    filesize: int | None
    url: str

select_asset(dataset, filetype)

Find the asset dict matching the requested file type, or None.

Source code in src/cxg/download.py
def select_asset(dataset: dict[str, Any], filetype: FileType | str) -> dict[str, Any] | None:
    """Find the asset dict matching the requested file type, or None."""
    for asset in dataset.get("assets", []):
        if not isinstance(asset, dict):
            continue
        if str(asset.get("filetype") or "").lower() == filetype.lower():
            return asset
    return None

build_plan(dataset, filetype)

Build a DownloadPlan from a dataset and file type.

Returns:

Type Description
DownloadPlan | None

A DownloadPlan if a matching asset exists, or None.

Source code in src/cxg/download.py
def build_plan(dataset: dict[str, Any], filetype: FileType | str) -> DownloadPlan | None:
    """Build a DownloadPlan from a dataset and file type.

    Returns:
        A DownloadPlan if a matching asset exists, or None.
    """
    asset = select_asset(dataset, filetype)
    if not asset:
        return None
    url = str(asset.get("url") or "")
    filename = _safe_filename(
        asset.get("file_name"),
        url,
        fallback=f"{dataset.get('dataset_id')}.{filetype}",
    )
    if not filename:
        return None
    filesize = asset.get("filesize")
    try:
        size_value = int(filesize) if filesize is not None else None
    except (TypeError, ValueError):
        size_value = None
    return DownloadPlan(
        dataset_id=str(dataset.get("dataset_id") or dataset.get("id") or ""),
        title=str(dataset.get("title") or ""),
        filename=filename,
        filesize=size_value,
        url=url,
    )

ensure_disk_space(output_dir, required_bytes)

Raise OSError if the output directory lacks sufficient disk space.

Source code in src/cxg/download.py
def ensure_disk_space(output_dir: Path, required_bytes: int) -> None:
    """Raise OSError if the output directory lacks sufficient disk space."""
    usage = shutil.disk_usage(output_dir)
    if required_bytes > usage.free:
        raise OSError(f"Not enough disk space in {output_dir}. Need {required_bytes} bytes.")

download_file(client, plan, output_dir, *, overwrite=False, retries=3, progress=None, task_id=None, cancel_event=None)

Download a file with a progress bar and retry logic.

Uses a .part temp file with atomic rename on completion. Skips the download if the destination already exists and overwrite is False.

If progress and task_id are provided, advances that external task; otherwise creates a transient single-file progress bar. Both must be provided together or both omitted.

Raises:

Type Description
RuntimeError

If all retry attempts fail.

ValueError

If only one of progress or task_id is provided.

Source code in src/cxg/download.py
def download_file(
    client: httpx.Client,
    plan: DownloadPlan,
    output_dir: Path,
    *,
    overwrite: bool = False,
    retries: int = 3,
    progress: Progress | None = None,
    task_id: TaskID | None = None,
    cancel_event: threading.Event | None = None,
) -> None:
    """Download a file with a progress bar and retry logic.

    Uses a `.part` temp file with atomic rename on completion. Skips the
    download if the destination already exists and overwrite is False.

    If `progress` and `task_id` are provided, advances that external task;
    otherwise creates a transient single-file progress bar. Both must be
    provided together or both omitted.

    Raises:
        RuntimeError: If all retry attempts fail.
        ValueError: If only one of `progress` or `task_id` is provided.
    """
    if (progress is None) != (task_id is None):
        raise ValueError("progress and task_id must be provided together or both omitted")

    output_dir.mkdir(parents=True, exist_ok=True)
    destination = output_dir / plan.filename
    temp_path = destination.with_name(f"{destination.name}.part")

    if destination.exists() and not overwrite:
        if progress is not None and task_id is not None:
            progress.update(task_id, total=plan.filesize or 0, completed=plan.filesize or 0)
            progress.start_task(task_id)
        return

    if plan.filesize:
        ensure_disk_space(output_dir, plan.filesize)

    owns_progress = progress is None
    if owns_progress:
        progress = Progress(*_progress_columns(), console=console, transient=True)
        progress.start()
        task_id = progress.add_task("download", name=plan.filename, total=plan.filesize)

    try:
        _stream_with_retries(
            client,
            plan,
            destination,
            temp_path,
            retries=retries,
            progress=progress,
            task_id=task_id,
            cancel_event=cancel_event,
        )
    finally:
        if owns_progress:
            progress.stop()

download_many(client, plans, output_dir, *, overwrite=False, parallel=3, retries=3)

Download multiple datasets concurrently with stacked progress bars.

Returns a list of DownloadFailure for any plans that failed. On KeyboardInterrupt, signals all in-flight workers to stop, cleans up .part files, and re-raises.

Source code in src/cxg/download.py
def download_many(
    client: httpx.Client,
    plans: list[DownloadPlan],
    output_dir: Path,
    *,
    overwrite: bool = False,
    parallel: int = 3,
    retries: int = 3,
) -> list[DownloadFailure]:
    """Download multiple datasets concurrently with stacked progress bars.

    Returns a list of DownloadFailure for any plans that failed. On
    KeyboardInterrupt, signals all in-flight workers to stop, cleans up
    `.part` files, and re-raises.
    """
    if not plans:
        return []

    failures: list[DownloadFailure] = []
    cancel_event = threading.Event()
    workers = max(1, min(parallel, len(plans)))

    with Progress(*_progress_columns(), console=console, transient=False) as progress:
        task_ids: dict[str, TaskID] = {}
        for plan in plans:
            task_ids[plan.dataset_id] = progress.add_task(
                "download",
                name=plan.filename,
                total=plan.filesize,
                start=False,
            )

        with ThreadPoolExecutor(max_workers=workers) as executor:
            future_to_plan = {
                executor.submit(
                    download_file,
                    client,
                    plan,
                    output_dir,
                    overwrite=overwrite,
                    retries=retries,
                    progress=progress,
                    task_id=task_ids[plan.dataset_id],
                    cancel_event=cancel_event,
                ): plan
                for plan in plans
            }
            try:
                for future in as_completed(future_to_plan):
                    plan = future_to_plan[future]
                    try:
                        future.result()
                    except _Cancelled:
                        failures.append(DownloadFailure(plan.dataset_id, "cancelled"))
                    except Exception as exc:  # noqa: BLE001
                        failures.append(DownloadFailure(plan.dataset_id, str(exc)))
            except KeyboardInterrupt:
                cancel_event.set()
                executor.shutdown(wait=True, cancel_futures=True)
                raise

    return failures