Skip to content

Filters

cxg.filters

DatasetFilters dataclass

Filter criteria for dataset searches.

Repeated values within a single field are combined with OR logic. Different fields are combined with AND logic.

Source code in src/cxg/filters.py
@dataclass(slots=True)
class DatasetFilters:
    """Filter criteria for dataset searches.

    Repeated values within a single field are combined with OR logic.
    Different fields are combined with AND logic.
    """

    organism: list[str] = field(default_factory=list)
    tissue: list[str] = field(default_factory=list)
    assay: list[str] = field(default_factory=list)
    cell_type: list[str] = field(default_factory=list)
    disease: list[str] = field(default_factory=list)
    self_reported_ethnicity: list[str] = field(default_factory=list)
    suspension_type: list[str] = field(default_factory=list)
    tissue_type: list[str] = field(default_factory=list)
    title: str | None = None
    collection: str | None = None
    collection_id: str | None = None
    min_cells: int | None = None
    max_cells: int | None = None
    schema_version: str | None = None
    has_spatial: bool = False

dataset_id(dataset)

Extract the dataset ID from a dataset dict.

Source code in src/cxg/filters.py
def dataset_id(dataset: dict[str, Any]) -> str:
    """Extract the dataset ID from a dataset dict."""
    return str(dataset.get("dataset_id") or dataset.get("id") or "")

collection_id(dataset)

Extract the collection ID from a dataset dict.

Source code in src/cxg/filters.py
def collection_id(dataset: dict[str, Any]) -> str:
    """Extract the collection ID from a dataset dict."""
    collection = dataset.get("collection") or {}
    return str(
        dataset.get("collection_id")
        or collection.get("id")
        or collection.get("collection_id")
        or ""
    )

collection_name(dataset)

Extract the collection name from a dataset dict.

Source code in src/cxg/filters.py
def collection_name(dataset: dict[str, Any]) -> str:
    """Extract the collection name from a dataset dict."""
    collection = dataset.get("collection") or {}
    return str(dataset.get("collection_name") or collection.get("name") or "")

dataset_title(dataset)

Extract the title from a dataset dict.

Source code in src/cxg/filters.py
def dataset_title(dataset: dict[str, Any]) -> str:
    """Extract the title from a dataset dict."""
    return str(dataset.get("title") or dataset.get("name") or "")

get_ontology_entries(dataset, field_name)

Extract ontology entries for a field as a list of dicts.

Normalizes scalar and dict values into a consistent list-of-dicts format.

Source code in src/cxg/filters.py
def get_ontology_entries(dataset: dict[str, Any], field_name: str) -> list[dict[str, Any]]:
    """Extract ontology entries for a field as a list of dicts.

    Normalizes scalar and dict values into a consistent list-of-dicts
    format.
    """
    value = dataset.get(field_name)
    if value is None:
        return []
    if isinstance(value, list):
        return [entry for entry in value if isinstance(entry, dict)]
    if isinstance(value, dict):
        return [value]
    return [{"label": value}]

get_labels(dataset, field_name)

Extract human-readable labels for an ontology field.

Source code in src/cxg/filters.py
def get_labels(dataset: dict[str, Any], field_name: str) -> list[str]:
    """Extract human-readable labels for an ontology field."""
    labels: list[str] = []
    for entry in get_ontology_entries(dataset, field_name):
        label = entry.get("label") or entry.get("name") or entry.get("ontology_term_id")
        if label:
            labels.append(str(label))
    return labels

get_cell_count(dataset)

Extract the cell count from a dataset dict.

Source code in src/cxg/filters.py
def get_cell_count(dataset: dict[str, Any]) -> int:
    """Extract the cell count from a dataset dict."""
    value = dataset.get("cell_count") or dataset.get("cellCount") or 0
    try:
        return int(value)
    except (TypeError, ValueError):
        return 0

get_schema_version(dataset)

Extract the schema version string from a dataset dict.

Source code in src/cxg/filters.py
def get_schema_version(dataset: dict[str, Any]) -> str:
    """Extract the schema version string from a dataset dict."""
    return str(dataset.get("schema_version") or dataset.get("schemaVersion") or "")

get_suspension_types(dataset)

Extract suspension types from a dataset dict as a list of strings.

Source code in src/cxg/filters.py
def get_suspension_types(dataset: dict[str, Any]) -> list[str]:
    """Extract suspension types from a dataset dict as a list of strings."""
    value = dataset.get("suspension_type")
    if isinstance(value, list):
        return [str(item) for item in value if item]
    if value:
        return [str(value)]
    return []

get_tissue_types(dataset)

Extract tissue_type values nested inside tissue entries.

tissue_type is part of each TissueOntologyTermId entry per CELLxGene schema v7.0.0, not a top-level dataset field. Returns one value per matching tissue entry; unique_field_counts deduplicates per dataset.

Source code in src/cxg/filters.py
def get_tissue_types(dataset: dict[str, Any]) -> list[str]:
    """Extract tissue_type values nested inside tissue entries.

    ``tissue_type`` is part of each ``TissueOntologyTermId`` entry per
    CELLxGene schema v7.0.0, not a top-level dataset field. Returns one
    value per matching tissue entry; ``unique_field_counts`` deduplicates
    per dataset.
    """
    result: list[str] = []
    for entry in get_ontology_entries(dataset, "tissue"):
        value = entry.get("tissue_type")
        if value:
            result.append(str(value))
    return result

matches_any_substring(values, needles)

Check if any needle is a case-insensitive substring of any value.

Returns True if needles is empty.

Source code in src/cxg/filters.py
def matches_any_substring(values: list[str], needles: list[str]) -> bool:
    """Check if any needle is a case-insensitive substring of any value.

    Returns True if needles is empty.
    """
    if not needles:
        return True
    haystack = [_normalize_text(value) for value in values]
    return any(needle.casefold() in value for needle in needles for value in haystack)

matches_any_exact(values, expected)

Check if any expected value exactly matches any value (case-insensitive).

Returns True if expected is empty.

Source code in src/cxg/filters.py
def matches_any_exact(values: list[str], expected: list[str]) -> bool:
    """Check if any expected value exactly matches any value (case-insensitive).

    Returns True if expected is empty.
    """
    if not expected:
        return True
    normalized = {_normalize_text(value) for value in values}
    return any(item.casefold() in normalized for item in expected)

dataset_matches(dataset, filters)

Test whether a dataset matches all filter criteria.

Returns True if the dataset satisfies every non-empty filter.

Source code in src/cxg/filters.py
def dataset_matches(dataset: dict[str, Any], filters: DatasetFilters) -> bool:
    """Test whether a dataset matches all filter criteria.

    Returns True if the dataset satisfies every non-empty filter.
    """
    ontology_checks = [
        ("organism", filters.organism),
        ("tissue", filters.tissue),
        ("assay", filters.assay),
        ("cell_type", filters.cell_type),
        ("disease", filters.disease),
        ("self_reported_ethnicity", filters.self_reported_ethnicity),
    ]
    for field_name, values in ontology_checks:
        if values and not matches_any_substring(get_labels(dataset, field_name), values):
            return False

    if filters.suspension_type and not matches_any_exact(
        get_suspension_types(dataset), filters.suspension_type
    ):
        return False

    if filters.tissue_type and not matches_any_exact(
        get_tissue_types(dataset), filters.tissue_type
    ):
        return False

    if filters.title and filters.title.casefold() not in dataset_title(dataset).casefold():
        return False
    if (
        filters.collection
        and filters.collection.casefold() not in collection_name(dataset).casefold()
    ):
        return False
    if filters.collection_id and filters.collection_id != collection_id(dataset):
        return False

    count = get_cell_count(dataset)
    if filters.min_cells is not None and count < filters.min_cells:
        return False
    if filters.max_cells is not None and count > filters.max_cells:
        return False

    if filters.schema_version and filters.schema_version != get_schema_version(dataset):
        return False
    if filters.has_spatial and dataset.get("spatial") is None:
        return False
    return True

apply_filters(datasets, filters)

Filter a list of datasets by the given criteria.

Source code in src/cxg/filters.py
def apply_filters(datasets: list[dict[str, Any]], filters: DatasetFilters) -> list[dict[str, Any]]:
    """Filter a list of datasets by the given criteria."""
    return [dataset for dataset in datasets if dataset_matches(dataset, filters)]

unique_field_counts(datasets, field_name)

Count unique values for a field across datasets.

Returns:

Type Description
dict[str, int]

A dict mapping each unique value to the number of datasets

dict[str, int]

containing it.

Source code in src/cxg/filters.py
def unique_field_counts(datasets: list[dict[str, Any]], field_name: str) -> dict[str, int]:
    """Count unique values for a field across datasets.

    Returns:
        A dict mapping each unique value to the number of datasets
        containing it.
    """
    if field_name == "suspension_type":
        values_per_dataset = [get_suspension_types(dataset) for dataset in datasets]
    elif field_name == "tissue_type":
        values_per_dataset = [get_tissue_types(dataset) for dataset in datasets]
    elif field_name == "schema_version":
        values_per_dataset = [[get_schema_version(dataset)] for dataset in datasets]
    else:
        api_field = ONTOLOGY_FIELD_MAP.get(field_name, field_name)
        values_per_dataset = [get_labels(dataset, api_field) for dataset in datasets]

    counts: dict[str, int] = {}
    for values in values_per_dataset:
        for value in {item for item in values if item}:
            counts[value] = counts.get(value, 0) + 1
    return counts