Skip to content

Helpers

zavod.helpers

Data cleaning and entity generation helpers.

This module contains a number of functions that are useful for parsing real-world data (like XML, CSV, date formats) and converting it into FollowTheMoney entity structures. Factory methods are provided for handling common entity patterns as a way to reduce boilerplate code and improve consistency across datasets.

A typical use might look like this:

from zavod import Context
from zavod import helpers as h

def crawl(context: Context) -> None:
    # ... fetch some data
    for row in data:
        entity = context.make("Person")
        entity.id = context.make_id(row.get("id"))
        # Using the helper guarantees a consistent handling of the
        # attributes, and in this case will also automatically
        # generate a full name for the entity:
        h.apply_name(
            entity,
            first_name=row.get("first_name"),
            patronymic=row.get("patronymic"),
            last_name=row.get("last_name"),
            title=row.get("title"),
        )
        context.emit(entity)

Any data wrangling code that is repeated in three or more crawlers should be considered for inclusion in the helper library.

apply_address(context, entity, address)

Link the given entity to the given address and emits the address.

Parameters:

Name Type Description Default
context Context

The runner context used for emitting entities.

required
entity Entity

The thing located at the given address.

required
address Optional[Entity]

The address entity, usually constructed with make_address.

required
Source code in zavod/helpers/addresses.py
def apply_address(context: Context, entity: Entity, address: Optional[Entity]) -> None:
    """Link the given entity to the given address and emits the address.

    Args:
        context: The runner context used for emitting entities.
        entity: The thing located at the given address.
        address: The address entity, usually constructed with `make_address`.
    """
    if address is None:
        return
    assert address.schema.is_a("Address"), "address must be an Address"
    assert (
        entity.schema.get("addressEntity") is not None
    ), "Entity must have addressEntity"
    entity.add("country", address.get("country"))
    if address.has("full"):
        entity.add("addressEntity", address)
        context.emit(address)

apply_date(entity, prop, text)

Apply a date value to an entity, parsing it if necessary and cleaning it up.

Uses the dates configuration of the dataset to parse the date.

Parameters:

Name Type Description Default
entity Entity

The entity to which the date will be applied.

required
prop str

The property to which the date will be applied.

required
text DateValue

The date value to be applied.

required
Source code in zavod/helpers/dates.py
def apply_date(entity: Entity, prop: str, text: DateValue) -> None:
    """Apply a date value to an entity, parsing it if necessary and cleaning it up.

    Uses the `dates` configuration of the dataset to parse the date.

    Args:
        entity: The entity to which the date will be applied.
        prop: The property to which the date will be applied.
        text: The date value to be applied.
    """
    prop_ = entity.schema.get(prop)
    if prop_ is None or prop_.type != registry.date:
        log.warning("Property is not a date: %s" % prop, text=text)
        return

    if text is None:
        return None
    if isinstance(text, datetime) or isinstance(text, date):
        original = str(text)
    else:
        original = text

    dates = extract_date(entity.dataset, text)
    return entity.add(prop_, dates, original_value=original)

apply_dates(entity, prop, texts)

Apply a list of date values to an entity, parsing them if necessary and cleaning them up.

Parameters:

Name Type Description Default
entity Entity

The entity to which the date will be applied.

required
prop str

The property to which the date will be applied.

required
texts Iterable[DateValue]

The iterable of date values to be applied.

required
Source code in zavod/helpers/dates.py
def apply_dates(entity: Entity, prop: str, texts: Iterable[DateValue]) -> None:
    """Apply a list of date values to an entity, parsing them if necessary and cleaning them up.

    Args:
        entity: The entity to which the date will be applied.
        prop: The property to which the date will be applied.
        texts: The iterable of date values to be applied.
    """
    for text in texts:
        apply_date(entity, prop, text)

apply_name(entity, full=None, name1=None, first_name=None, given_name=None, name2=None, second_name=None, middle_name=None, name3=None, patronymic=None, matronymic=None, name4=None, name5=None, tail_name=None, last_name=None, maiden_name=None, prefix=None, suffix=None, alias=False, name_prop='name', is_weak=False, quiet=False, lang=None)

A standardised way to set a name for a person or other entity, which handles normalising the categories of names found in source data to the correct properties (e.g. "family name" becomes "lastName").

Parameters:

Name Type Description Default
entity Entity

The entity to set the name on.

required
full Optional[str]

The full name if available (this will otherwise be generated).

None
name1 Optional[str]

The first name if numeric parts are used.

None
first_name Optional[str]

The first name.

None
given_name Optional[str]

The given name (also first name).

None
name2 Optional[str]

The second name if numeric parts are used.

None
second_name Optional[str]

The second name.

None
middle_name Optional[str]

The middle name.

None
name3 Optional[str]

The third name if numeric parts are used.

None
patronymic Optional[str]

The patronymic (father-derived) name.

None
matronymic Optional[str]

The matronymic (mother-derived) name.

None
name4 Optional[str]

The fourth name if numeric parts are used.

None
name5 Optional[str]

The fifth name if numeric parts are used.

None
tail_name Optional[str]

A secondary last name.

None
last_name Optional[str]

The last/family name name.

None
maiden_name Optional[str]

The maiden name (before marriage).

None
prefix Optional[str]

A prefix to the name (e.g. "Mr").

None
suffix Optional[str]

A suffix to the name (e.g. "Jr").

None
alias bool

If this is an alias name.

False
name_prop str

The property to set the full name on.

'name'
is_weak bool

If this is a weak alias name.

False
quiet bool

If this should not raise errors on invalid properties.

False
lang Optional[str]

The language of the name.

None
Source code in zavod/helpers/names.py
def apply_name(
    entity: Entity,
    full: Optional[str] = None,
    name1: Optional[str] = None,
    first_name: Optional[str] = None,
    given_name: Optional[str] = None,
    name2: Optional[str] = None,
    second_name: Optional[str] = None,
    middle_name: Optional[str] = None,
    name3: Optional[str] = None,
    patronymic: Optional[str] = None,
    matronymic: Optional[str] = None,
    name4: Optional[str] = None,
    name5: Optional[str] = None,
    tail_name: Optional[str] = None,
    last_name: Optional[str] = None,
    maiden_name: Optional[str] = None,
    prefix: Optional[str] = None,
    suffix: Optional[str] = None,
    alias: bool = False,
    name_prop: str = "name",
    is_weak: bool = False,
    quiet: bool = False,
    lang: Optional[str] = None,
) -> None:
    """A standardised way to set a name for a person or other entity, which handles
    normalising the categories of names found in source data to the correct properties
    (e.g. "family name" becomes "lastName").

    Args:
        entity: The entity to set the name on.
        full: The full name if available (this will otherwise be generated).
        name1: The first name if numeric parts are used.
        first_name: The first name.
        given_name: The given name (also first name).
        name2: The second name if numeric parts are used.
        second_name: The second name.
        middle_name: The middle name.
        name3: The third name if numeric parts are used.
        patronymic: The patronymic (father-derived) name.
        matronymic: The matronymic (mother-derived) name.
        name4: The fourth name if numeric parts are used.
        name5: The fifth name if numeric parts are used.
        tail_name: A secondary last name.
        last_name: The last/family name name.
        maiden_name: The maiden name (before marriage).
        prefix: A prefix to the name (e.g. "Mr").
        suffix: A suffix to the name (e.g. "Jr").
        alias: If this is an alias name.
        name_prop: The property to set the full name on.
        is_weak: If this is a weak alias name.
        quiet: If this should not raise errors on invalid properties.
        lang: The language of the name.
    """
    if not is_weak:
        set_name_part(entity, "firstName", given_name, quiet, lang)
        set_name_part(entity, "firstName", first_name, quiet, lang)
        set_name_part(entity, "secondName", second_name, quiet, lang)
        set_name_part(entity, "middleName", middle_name, quiet, lang)
        set_name_part(entity, "fatherName", patronymic, quiet, lang)
        set_name_part(entity, "motherName", matronymic, quiet, lang)
        set_name_part(entity, "lastName", last_name, quiet, lang)
        set_name_part(entity, "lastName", maiden_name, quiet, lang)
        set_name_part(entity, "firstName", name1, quiet, lang)
        set_name_part(entity, "secondName", name2, quiet, lang)
        set_name_part(entity, "middleName", name3, quiet, lang)
        set_name_part(entity, "middleName", name4, quiet, lang)
        set_name_part(entity, "middleName", name5, quiet, lang)
        set_name_part(entity, "lastName", tail_name, quiet, lang)
    if alias:
        name_prop = "alias"
    if is_weak:
        name_prop = "weakAlias"
    full = make_name(
        full=full,
        name1=name1,
        first_name=first_name,
        given_name=given_name,
        name2=name2,
        second_name=second_name,
        middle_name=middle_name,
        name3=name3,
        patronymic=patronymic,
        matronymic=matronymic,
        name4=name4,
        name5=name5,
        tail_name=tail_name,
        last_name=last_name,
        prefix=prefix,
        suffix=suffix,
    )
    if full is not None and len(full):
        entity.add(name_prop, full, quiet=quiet, lang=lang)

assert_dom_hash(node, hash, raise_exc=False, text_only=False)

Assert that a DOM node has a given SHA1 hash.

Source code in zavod/helpers/change.py
def assert_dom_hash(
    node: Optional[ElementOrTree],
    hash: str,
    raise_exc: bool = False,
    text_only: bool = False,
) -> bool:
    """Assert that a DOM node has a given SHA1 hash."""
    actual = _compute_node_hash(node, text_only=text_only)
    if actual != hash:
        if raise_exc:
            msg = f"Expected hash {hash}, got {actual} for {node!r}"
            raise AssertionError(msg)
        else:
            log.warning(
                "DOM hash changed: %s" % node,
                expected=hash,
                actual=actual,
                node=repr(node),
            )
        return False
    return True

assert_html_url_hash(context, url, hash, path=None, raise_exc=False, text_only=False)

Assert that an HTML document located at the URL has a given SHA1 hash.

Source code in zavod/helpers/change.py
def assert_html_url_hash(
    context: Context,
    url: str,
    hash: str,
    path: Optional[str] = None,
    raise_exc: bool = False,
    text_only: bool = False,
) -> bool:
    """Assert that an HTML document located at the URL has a given SHA1 hash."""
    doc = context.fetch_html(url)
    node = doc.find(path) if path is not None else doc
    return assert_dom_hash(node, hash, raise_exc=raise_exc, text_only=text_only)

assert_url_hash(context, url, hash, raise_exc=False, auth=None, headers=None)

Assert that a document located at the URL has a given SHA1 hash.

Source code in zavod/helpers/change.py
def assert_url_hash(
    context: Context,
    url: str,
    hash: str,
    raise_exc: bool = False,
    auth: Optional[Any] = None,
    headers: Optional[Any] = None,
) -> bool:
    """Assert that a document located at the URL has a given SHA1 hash."""
    digest = sha1()
    with context.http.get(url, auth=auth, headers=headers, stream=True) as res:
        res.raise_for_status()
        for chunk in res.iter_content(chunk_size=8192 * 10):
            digest.update(chunk)
    actual = digest.hexdigest()
    if actual != hash:
        if raise_exc:
            msg = f"Expected hash {hash}, got {actual} for {url}"
            raise AssertionError(msg)
        else:
            log.warning(
                "URL hash changed: %s" % url,
                expected=hash,
                actual=actual,
                url=url,
            )
        return False
    return True

cells_to_str(row)

Return the string value of each HtmlElement value in the passed dictionary

Useful when all you need is the string value of each cell in a table row.

Source code in zavod/helpers/html.py
def cells_to_str(row: Dict[str, HtmlElement]) -> Dict[str, str | None]:
    """
    Return the string value of each HtmlElement value in the passed dictionary

    Useful when all you need is the string value of each cell in a table row.
    """
    return {k: collapse_spaces(v.text_content()) for k, v in row.items()}

check_no_year(text)

Check for a few formats in which dates are given as day/month, with no year specified.

Source code in zavod/helpers/dates.py
def check_no_year(text: Optional[str]) -> bool:
    """Check for a few formats in which dates are given as day/month, with no year
    specified."""
    if text is None:
        return True
    return len(extract_years(text)) == 0

clean_note(text)

Remove a set of specific text sections from notes supplied by sanctions data publishers. These include cross-references to the Security Council web site and the Interpol web site.

Parameters:

Name Type Description Default
text Union[Optional[str], List[Optional[str]]]

The note text from source

required

Returns:

Type Description
List[str]

A cleaned version of the text.

Source code in zavod/helpers/text.py
def clean_note(text: Union[Optional[str], List[Optional[str]]]) -> List[str]:
    """Remove a set of specific text sections from notes supplied by sanctions data
    publishers. These include cross-references to the Security Council web site and
    the Interpol web site.

    Args:
        text: The note text from source

    Returns:
        A cleaned version of the text.
    """
    out: List[str] = []
    if text is None:
        return out
    if is_listish(text):
        for t in text:
            out.extend(clean_note(t))
        return out
    if isinstance(text, str):
        text = PREFIX.sub(" ", text)
        text = INTERPOL_URL.sub(" ", text)
        text = collapse_spaces(text)
        if text is None:
            return out
        return [text]
    return out

convert_excel_cell(book, cell)

Convert an Excel cell to a string, handling different types.

Parameters:

Name Type Description Default
book Book

The Excel workbook.

required
cell Cell

The Excel cell.

required

Returns:

Type Description
Optional[str]

The cell value as a string, or None if the cell is empty.

Source code in zavod/helpers/excel.py
def convert_excel_cell(book: Book, cell: Cell) -> Optional[str]:
    """Convert an Excel cell to a string, handling different types.

    Args:
        book: The Excel workbook.
        cell: The Excel cell.

    Returns:
        The cell value as a string, or `None` if the cell is empty.
    """
    if cell.ctype == 2:
        return str(int(cell.value))
    elif cell.ctype in (0, 5, 6):
        return None
    if cell.ctype == 3:
        dt: datetime = xldate_as_datetime(cell.value, book.datemode)
        return datetime_iso(dt)
    else:
        if cell.value is None:
            return None
        return str(cell.value)

convert_excel_date(value)

Convert an Excel date to a string.

Parameters:

Name Type Description Default
value Optional[Union[str, int, float]]

The Excel date value (e.g. 44876).

required

Returns:

Type Description
Optional[str]

The date value as a string, or None if the value is empty.

Source code in zavod/helpers/excel.py
def convert_excel_date(value: Optional[Union[str, int, float]]) -> Optional[str]:
    """Convert an Excel date to a string.

    Args:
        value: The Excel date value (e.g. 44876).

    Returns:
        The date value as a string, or `None` if the value is empty.
    """
    if value is None:
        return None
    if isinstance(value, str):
        try:
            value = float(value)
        except ValueError:
            return None
    if isinstance(value, float):
        value = int(value)
    if value < 4_000 or value > 100_000:
        return None
    dt = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + value - 2)
    return datetime_iso(dt)

copy_address(entity, address)

Assign to full address text and country directly to the given entity.

This is an alternative to using apply_address when the address should be inlined into the entity, instead of emitting a separate address object.

Parameters:

Name Type Description Default
entity Entity

The entity to be assigned the address.

required
address Optional[Entity]

The address entity to be copied into the entity.

required
Source code in zavod/helpers/addresses.py
def copy_address(entity: Entity, address: Optional[Entity]) -> None:
    """Assign to full address text and country directly to the given entity.

    This is an alternative to using `apply_address` when the address should
    be inlined into the entity, instead of emitting a separate address object.

    Args:
        entity: The entity to be assigned the address.
        address: The address entity to be copied into the entity.
    """
    if address is not None:
        entity.add("address", address.get("full"))
        for country in address.get("country"):
            if country not in entity.countries:
                entity.add("country", country)

extract_cryptos(text)

Extract cryptocurrency addresses from text.

Parameters:

Name Type Description Default
text Optional[str]

The text to extract from.

required

Returns:

Type Description
Dict[str, str]

A set of cryptocurrency IDs, with currency code.

Source code in zavod/helpers/crypto.py
def extract_cryptos(text: Optional[str]) -> Dict[str, str]:
    """Extract cryptocurrency addresses from text.

    Args:
        text: The text to extract from.

    Returns:
        A set of cryptocurrency IDs, with currency code.
    """
    out: Dict[str, str] = {}
    if text is None:
        return out
    for currency, v in CRYPTOS_RE.items():
        for key in v.findall(text):
            out[key] = currency
    return out

extract_date(dataset, text) cached

Extract a date from the provided text using predefined formats in the metadata. If the text doesn't match any format, returns the original text.

Source code in zavod/helpers/dates.py
@lru_cache(maxsize=5000)
def extract_date(dataset: Dataset, text: DateValue) -> List[str]:
    """
    Extract a date from the provided text using predefined `formats` in the metadata.
    If the text doesn't match any format, returns the original text.
    """
    if text is None:
        return []
    if isinstance(text, date):
        return [text.isoformat()]
    elif isinstance(text, datetime):
        if text.tzinfo is not None:
            text = text.astimezone(timezone.utc)
        iso = text.date().isoformat()
        return [iso]

    replaced_text = replace_months(dataset, text)
    formats = dataset.dates.formats + ALWAYS_FORMATS
    parsed = parse_formats(replaced_text, formats)
    if parsed.text is not None:
        return [parsed.text]
    if dataset.dates.year_only:
        years = extract_years(text)
        if len(years):
            return years
    return [text]

extract_years(text)

Try to locate year numbers in a string such as 'circa 1990'. This will fail if any numbers that don't look like years are found in the string, a strong indicator that a more precise date is encoded (e.g. '1990 Mar 03').

This is bounded to years between 1800 and 2100.

Parameters:

Name Type Description Default
text str

a string to extract years from.

required

Returns:

Type Description
List[str]

a set of year strings.

Source code in zavod/helpers/dates.py
def extract_years(text: str) -> List[str]:
    """Try to locate year numbers in a string such as 'circa 1990'. This will fail if
    any numbers that don't look like years are found in the string, a strong indicator
    that a more precise date is encoded (e.g. '1990 Mar 03').

    This is bounded to years between 1800 and 2100.

    Args:
        text: a string to extract years from.

    Returns:
        a set of year strings.
    """
    years: Set[str] = set()
    for match in NUMBERS.finditer(text):
        year = match.group()
        number = int(year)
        if number < 1800 or number > 2100:
            continue
        years.add(year)
    return list(years)

format_address(summary=None, po_box=None, street=None, house=None, house_number=None, postal_code=None, city=None, county=None, state=None, state_district=None, state_code=None, country=None, country_code=None) cached

Given the components of a postal address, format it into a single line using some country-specific templating logic.

Parameters:

Name Type Description Default
summary Optional[str]

A short description of the address.

None
po_box Optional[str]

The PO box/mailbox number.

None
street Optional[str]

The street or road name.

None
house Optional[str]

The descriptive name of the house.

None
house_number Optional[str]

The number of the house on the street.

None
postal_code Optional[str]

The postal code or ZIP code.

None
city Optional[str]

The city or town name.

None
county Optional[str]

The county or district name.

None
state Optional[str]

The state or province name.

None
state_district Optional[str]

The state or province district name.

None
state_code Optional[str]

The state or province code.

None
country Optional[str]

The name of the country (words, not ISO code).

None
country_code Optional[str]

A pre-normalized country code.

None

Returns:

Type Description
str

A single-line string with the formatted address.

Source code in zavod/helpers/addresses.py
@lru_cache(maxsize=5000)
def format_address(
    summary: Optional[str] = None,
    po_box: Optional[str] = None,
    street: Optional[str] = None,
    house: Optional[str] = None,
    house_number: Optional[str] = None,
    postal_code: Optional[str] = None,
    city: Optional[str] = None,
    county: Optional[str] = None,
    state: Optional[str] = None,
    state_district: Optional[str] = None,
    state_code: Optional[str] = None,
    country: Optional[str] = None,
    country_code: Optional[str] = None,
) -> str:
    """Given the components of a postal address, format it into a single line
    using some country-specific templating logic.

    Args:
        summary: A short description of the address.
        po_box: The PO box/mailbox number.
        street: The street or road name.
        house: The descriptive name of the house.
        house_number: The number of the house on the street.
        postal_code: The postal code or ZIP code.
        city: The city or town name.
        county: The county or district name.
        state: The state or province name.
        state_district: The state or province district name.
        state_code: The state or province code.
        country: The name of the country (words, not ISO code).
        country_code: A pre-normalized country code.

    Returns:
        A single-line string with the formatted address."""
    if country_code is None and country is not None:
        country_code = registry.country.clean_text(country)
    data = {
        "attention": summary,
        "road": street,
        "house": po_box or house,
        "house_number": house_number,
        "postcode": postal_code,
        "city": city,
        "county": county,
        "state": state,
        "state_district": state_district,
        "state_code": state_code,
        "country": country,
    }
    return _get_formatter().one_line(data, country=country_code)

is_empty(text)

Check if the given text is empty: it can either be null, or the stripped version of the string could have 0 length.

Parameters:

Name Type Description Default
text Optional[str]

Text to be checked

required

Returns:

Type Description
bool

Whether the text is empty or not.

Source code in zavod/helpers/text.py
def is_empty(text: Optional[str]) -> bool:
    """Check if the given text is empty: it can either be null, or
    the stripped version of the string could have 0 length.

    Args:
        text: Text to be checked

    Returns:
        Whether the text is empty or not.
    """
    if text is None:
        return True
    if isinstance(text, str):
        text = text.strip()
        return len(text) == 0
    return False

Return a dictionary of the text content and href of each anchor element in the passed HtmlElement

Useful for when the link labels are consistent and can be used as keys

Source code in zavod/helpers/html.py
def links_to_dict(el: HtmlElement) -> Dict[str | None, str | None]:
    """
    Return a dictionary of the text content and href of each anchor element in the
    passed HtmlElement

    Useful for when the link labels are consistent and can be used as keys
    """
    return {
        slugify(a.text_content(), sep="_"): a.get("href") for a in el.findall(".//a")
    }

make_address(context, full=None, remarks=None, summary=None, po_box=None, street=None, street2=None, street3=None, city=None, place=None, postal_code=None, state=None, region=None, country=None, country_code=None, key=None, lang=None)

Generate an address schema object adjacent to the main entity.

Parameters:

Name Type Description Default
context Context

The runner context used for making and emitting entities.

required
full Optional[str]

The full address as a single string.

None
remarks Optional[str]

Delivery remarks for the address.

None
summary Optional[str]

A short description of the address.

None
po_box Optional[str]

The PO box/mailbox number.

None
street Optional[str]

The street or road name.

None
street2 Optional[str]

The street or road name, line 2.

None
street3 Optional[str]

The street or road name, line 3.

None
city Optional[str]

The city or town name.

None
place Optional[str]

The name of a smaller locality (same as city).

None
postal_code Optional[str]

The postal code or ZIP code.

None
state Optional[str]

The state or province name.

None
region Optional[str]

The region or district name.

None
country Optional[str]

The country name (words, not ISO code).

None
country_code Optional[str]

A pre-normalized country code.

None
key Optional[str]

An optional key to be included in the ID of the address.

None
lang Optional[str]

The language of the address details.

None

Returns:

Type Description
Optional[Entity]

A new entity of type Address.

Source code in zavod/helpers/addresses.py
def make_address(
    context: Context,
    full: Optional[str] = None,
    remarks: Optional[str] = None,
    summary: Optional[str] = None,
    po_box: Optional[str] = None,
    street: Optional[str] = None,
    street2: Optional[str] = None,
    street3: Optional[str] = None,
    city: Optional[str] = None,
    place: Optional[str] = None,
    postal_code: Optional[str] = None,
    state: Optional[str] = None,
    region: Optional[str] = None,
    country: Optional[str] = None,
    country_code: Optional[str] = None,
    key: Optional[str] = None,
    lang: Optional[str] = None,
) -> Optional[Entity]:
    """Generate an address schema object adjacent to the main entity.

    Args:
        context: The runner context used for making and emitting entities.
        full: The full address as a single string.
        remarks: Delivery remarks for the address.
        summary: A short description of the address.
        po_box: The PO box/mailbox number.
        street: The street or road name.
        street2: The street or road name, line 2.
        street3: The street or road name, line 3.
        city: The city or town name.
        place: The name of a smaller locality (same as city).
        postal_code: The postal code or ZIP code.
        state: The state or province name.
        region: The region or district name.
        country: The country name (words, not ISO code).
        country_code: A pre-normalized country code.
        key: An optional key to be included in the ID of the address.
        lang: The language of the address details.

    Returns:
        A new entity of type `Address`."""
    city = join_text(place, city, sep=", ")
    street = join_text(street, street2, street3, sep=", ")

    # This is meant to handle cases where the country field contains a country code
    # in a subset of the given records:
    if country is not None and len(country.strip()) == 2:
        context.log.warn(
            "Country name looks like a country code",
            country=country,
            country_code=country_code,
        )
        if country_code is None:
            country_code = country
            country = None

    if country is not None:
        parsed_code = registry.country.clean(country)
        if parsed_code is not None:
            if country_code is not None and country_code != parsed_code:
                context.log.warn(
                    "Country code mismatch",
                    country=country,
                    country_code=country_code,
                )
            country_code = parsed_code

    if country_code is None:
        country_code = registry.country.clean(full)

    if not full:
        full = format_address(
            summary=summary,
            po_box=po_box,
            street=street,
            postal_code=postal_code,
            city=city,
            state=state,
            state_district=join_text(region, state, sep=", "),
            country=country,
            country_code=country_code,
        )

    if full == country:
        full = None

    address = context.make("Address")
    address.id = _make_id(address, full, country_code, key=key)
    if address.id is None:
        return None

    address.add("full", full, lang=lang)
    address.add("remarks", remarks, lang=lang)
    address.add("summary", summary, lang=lang)
    address.add("postOfficeBox", po_box, lang=lang)
    address.add("street", street, lang=lang)
    address.add("city", city, lang=lang)
    address.add("postalCode", postal_code, lang=lang)
    address.add("region", region, lang=lang)
    address.add("state", state, quiet=True, lang=lang)
    address.add("country", country_code, lang=lang, original_value=country)
    return address

make_identification(context, entity, number, doc_type=None, country=None, summary=None, start_date=None, end_date=None, authority=None, key=None, passport=False)

Create an Identification or Passport object linked to a passport holder.

Parameters:

Name Type Description Default
context Context

The context used for making entities.

required
entity Entity

The entity that holds the passport.

required
number Optional[str]

The passport number.

required
doc_type Optional[str]

The type of document (e.g. "passport", "national id").

None
country Optional[str]

The country that issued the passport.

None
summary Optional[str]

A summary of the passport details.

None
start_date Optional[str]

The date the passport was issued.

None
end_date Optional[str]

The date the passport expires.

None
authority Optional[str]

The issuing authority.

None
key Optional[str]

An optional key to be included in the ID of the identification.

None
passport bool

Whether the identification is a passport or not.

False

Returns:

Type Description
Optional[Entity]

A new entity of type Identification or Passport.

Source code in zavod/helpers/identification.py
def make_identification(
    context: Context,
    entity: Entity,
    number: Optional[str],
    doc_type: Optional[str] = None,
    country: Optional[str] = None,
    summary: Optional[str] = None,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    authority: Optional[str] = None,
    key: Optional[str] = None,
    passport: bool = False,
) -> Optional[Entity]:
    """Create an `Identification` or `Passport` object linked to a passport holder.

    Args:
        context: The context used for making entities.
        entity: The entity that holds the passport.
        number: The passport number.
        doc_type: The type of document (e.g. "passport", "national id").
        country: The country that issued the passport.
        summary: A summary of the passport details.
        start_date: The date the passport was issued.
        end_date: The date the passport expires.
        authority: The issuing authority.
        key: An optional key to be included in the ID of the identification.
        passport: Whether the identification is a passport or not.

    Returns:
        A new entity of type `Identification` or `Passport`.
    """
    schema = "Passport" if passport else "Identification"
    proxy = context.make(schema)
    holder_prop = proxy.schema.get("holder")
    assert holder_prop is not None
    assert holder_prop.range is not None
    if not entity.schema.is_a(holder_prop.range):
        log.warning(
            "Holder is not a valid type for %s" % schema,
            entity_schema=entity.schema,
            entity_id=entity.id,
            number=number,
        )
        return None

    if number is None:
        return None
    proxy.id = context.make_id(entity.id, number, doc_type, key)
    proxy.add("holder", entity.id)
    proxy.add("number", number)
    proxy.add("type", doc_type)
    proxy.add("country", country)
    proxy.add("authority", authority)
    proxy.add("summary", summary)
    proxy.add("startDate", start_date)
    proxy.add("endDate", end_date)
    # context.inspect(proxy.to_dict())
    if passport:
        entity.add("passportNumber", number)
    else:
        entity.add("idNumber", number)
    return proxy

make_name(full=None, name1=None, first_name=None, given_name=None, name2=None, second_name=None, middle_name=None, name3=None, patronymic=None, matronymic=None, name4=None, name5=None, tail_name=None, last_name=None, prefix=None, suffix=None)

Provides a standardised way of assembling the components of a human name. This does a whole lot of cultural ignorance work, so YMMV.

Parameters:

Name Type Description Default
full Optional[str]

The full name if available (this will otherwise be generated).

None
name1 Optional[str]

The first name if numeric parts are used.

None
first_name Optional[str]

The first name.

None
given_name Optional[str]

The given name (also first name).

None
name2 Optional[str]

The second name if numeric parts are used.

None
second_name Optional[str]

The second name.

None
middle_name Optional[str]

The middle name.

None
name3 Optional[str]

The third name if numeric parts are used.

None
patronymic Optional[str]

The patronymic (father-derived) name.

None
matronymic Optional[str]

The matronymic (mother-derived) name.

None
name4 Optional[str]

The fourth name if numeric parts are used.

None
name5 Optional[str]

The fifth name if numeric parts are used.

None
tail_name Optional[str]

A secondary last name.

None
last_name Optional[str]

The last/family name name.

None
prefix Optional[str]

A prefix to the name (e.g. "Mr").

None
suffix Optional[str]

A suffix to the name (e.g. "Jr").

None

Returns:

Type Description
Optional[str]

The full name.

Source code in zavod/helpers/names.py
def make_name(
    full: Optional[str] = None,
    name1: Optional[str] = None,
    first_name: Optional[str] = None,
    given_name: Optional[str] = None,
    name2: Optional[str] = None,
    second_name: Optional[str] = None,
    middle_name: Optional[str] = None,
    name3: Optional[str] = None,
    patronymic: Optional[str] = None,
    matronymic: Optional[str] = None,
    name4: Optional[str] = None,
    name5: Optional[str] = None,
    tail_name: Optional[str] = None,
    last_name: Optional[str] = None,
    prefix: Optional[str] = None,
    suffix: Optional[str] = None,
) -> Optional[str]:
    """Provides a standardised way of assembling the components of a human name.
    This does a whole lot of cultural ignorance work, so YMMV.

    Args:
        full: The full name if available (this will otherwise be generated).
        name1: The first name if numeric parts are used.
        first_name: The first name.
        given_name: The given name (also first name).
        name2: The second name if numeric parts are used.
        second_name: The second name.
        middle_name: The middle name.
        name3: The third name if numeric parts are used.
        patronymic: The patronymic (father-derived) name.
        matronymic: The matronymic (mother-derived) name.
        name4: The fourth name if numeric parts are used.
        name5: The fifth name if numeric parts are used.
        tail_name: A secondary last name.
        last_name: The last/family name name.
        prefix: A prefix to the name (e.g. "Mr").
        suffix: A suffix to the name (e.g. "Jr").

    Returns:
        The full name.
    """
    full = collapse_spaces(full)
    if full is not None and len(full) > 1:
        return full
    return join_text(
        prefix,
        name1,
        first_name,
        given_name,
        name2,
        second_name,
        middle_name,
        name3,
        patronymic,
        matronymic,
        name4,
        name5,
        tail_name,
        last_name,
        suffix,
    )

make_occupancy(context, person, position, no_end_implies_current=True, current_time=settings.RUN_TIME, start_date=None, end_date=None, birth_date=None, death_date=None, categorisation=None, status=None, propagate_country=True)

Creates and returns an Occupancy entity if the arguments meet our criteria for PEP position occupancy, otherwise returns None. Also adds the position countries and the role.pep topic to the person if an Occupancy is returned. Emit the person after calling this to include these changes.

Unless status is overridden, Occupancies are only returned if end_date is None or less than the after-office period after current_time.

current_time defaults to the process start date and time.

The after-office threshold is determined based on the position topics.

Occupancy.status is set to

  • current if end_date is None and no_end_implies_current is True, otherwise status will be unknown
  • current if end_date is some date in the future, unless the dataset coverage.end is a date in the past, in which case status will be unknown
  • ended if end_date is some date in the past.

Parameters:

Name Type Description Default
context Context

The context to create the entity in.

required
person Entity

The person holding the position. They will be added to the holder property.

required
position Entity

The position held by the person. This will be added to the post property.

required
no_end_implies_current bool

Set this to True if a dataset is regularly maintained and it can be assumed that no end date implies the person is currently occupying this position. In this case, status will be set to current. Otherwise, status will be set to unknown.

True
current_time datetime

Defaults to the run time of the current crawl.

RUN_TIME
start_date Optional[str]

Set if the date the person started occupying the position is known.

None
end_date Optional[str]

Set if the date the person left the position is known.

None
status Optional[OccupancyStatus]

Overrides determining PEP occupancy status

None
Source code in zavod/helpers/positions.py
def make_occupancy(
    context: Context,
    person: Entity,
    position: Entity,
    no_end_implies_current: bool = True,
    current_time: datetime = settings.RUN_TIME,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    birth_date: Optional[str] = None,
    death_date: Optional[str] = None,
    categorisation: Optional[PositionCategorisation] = None,
    status: Optional[OccupancyStatus] = None,
    propagate_country: bool = True,
) -> Optional[Entity]:
    """Creates and returns an Occupancy entity if the arguments meet our criteria
    for PEP position occupancy, otherwise returns None. Also adds the position countries
    and the `role.pep` topic to the person if an Occupancy is returned.
    **Emit the person after calling this to include these changes.**

    Unless `status` is overridden, Occupancies are only returned if end_date is None or
    less than the after-office period after current_time.

    current_time defaults to the process start date and time.

    The after-office threshold is determined based on the position topics.

    Occupancy.status is set to

    - `current` if `end_date` is `None` and `no_end_implies_current` is `True`,
      otherwise `status` will be `unknown`
    - `current` if `end_date` is some date in the future, unless the dataset
      `coverage.end` is a date in the past, in which case `status` will be `unknown`
    - `ended` if `end_date` is some date in the past.

    Args:
        context: The context to create the entity in.
        person: The person holding the position. They will be added to the
            `holder` property.
        position: The position held by the person. This will be added to the
            `post` property.
        no_end_implies_current: Set this to True if a dataset is regularly maintained
            and it can be assumed that no end date implies the person is currently
            occupying this position. In this case, `status` will be set to `current`.
            Otherwise, `status` will be set to `unknown`.
        current_time: Defaults to the run time of the current crawl.
        start_date: Set if the date the person started occupying the position is known.
        end_date: Set if the date the person left the position is known.
        status: Overrides determining PEP occupancy status
    """
    assert person.schema.is_a("Person")
    assert position.schema.is_a("Position")

    occupancy = context.make("Occupancy")
    # Include started and ended strings so that two occupancies, one missing start
    # and and one missing end, don't get normalisted to the same ID
    parts = [
        person.id,
        position.id,
        "started",
        start_date or "unknown",
        "ended",
        end_date or "unknown",
    ]
    occupancy.id = context.make_id(*parts)
    occupancy.add("holder", person)
    occupancy.add("post", position)

    h.apply_date(occupancy, "startDate", start_date)
    h.apply_date(occupancy, "endDate", end_date)

    if birth_date not in person.get("birthDate"):
        h.apply_date(person, "birthDate", birth_date)
    if death_date not in person.get("deathDate"):
        h.apply_date(person, "deathDate", death_date)

    if categorisation is not None:
        assert categorisation.is_pep, person

    if status is None:
        status = occupancy_status(
            context,
            person,
            position,
            no_end_implies_current,
            current_time,
            max(occupancy.get("startDate"), default=None),
            max(occupancy.get("endDate"), default=None),
            max(person.get("birthDate"), default=None),
            max(person.get("deathDate"), default=None),
            categorisation,
        )
    if status is None:
        return None

    occupancy.add("status", status.value)

    person.add("topics", "role.pep")
    if propagate_country:
        person.add("country", position.get("country"))

    return occupancy

make_pdf_page_images(pdf_path)

Split a PDF file into PNG images of its pages.

This requires pdftoppm to be installed on the system, which is part of the poppler-utils package on Debian-based systems.

Source code in zavod/helpers/pdf.py
def make_pdf_page_images(pdf_path: Path) -> List[Path]:
    """Split a PDF file into PNG images of its pages.

    This requires `pdftoppm` to be installed on the system, which is
    part of the `poppler-utils` package on Debian-based systems.
    """
    output_path = Path(mkdtemp())
    output_prefix = output_path / pdf_path.stem
    command = [
        "pdftoppm",
        "-png",
        "-r",
        "150",
        pdf_path.as_posix(),
        output_prefix.as_posix(),
    ]
    subprocess.run(command, check=True)
    return sorted(output_path.glob("*.png"))

make_position(context, name, summary=None, description=None, country=None, topics=None, subnational_area=None, organization=None, inception_date=None, dissolution_date=None, number_of_seats=None, wikidata_id=None, source_url=None, lang=None, id_hash_prefix=None)

Creates a Position entity.

Position categorisation should then be fetched using zavod.logic.pep.categorise and the result's is_pep checked.

Parameters:

Name Type Description Default
context Context

The context to create the entity in.

required
name str

The name of the position.

required
summary Optional[str]

A short summary of the position.

None
description Optional[str]

A longer description of the position.

None
country Optional[str | Iterable[str]]

The country or countries the position is in.

None
subnational_area Optional[str]

The state or district the position is in.

None
organization Optional[Entity]

The organization the position is a part of.

None
inception_date Optional[Iterable[str]]

The date the position was created.

None
dissolution_date Optional[Iterable[str]]

The date the position was dissolved.

None
number_of_seats Optional[str]

The number of seats that can hold the position.

None
wikidata_id Optional[str]

The Wikidata QID of the position.

None
source_url Optional[str]

The URL of the source the position was found in.

None
lang Optional[str]

The language of the position details.

None

Returns:

Type Description
Entity

A new entity of type Position.

Source code in zavod/helpers/positions.py
def make_position(
    context: Context,
    name: str,
    summary: Optional[str] = None,
    description: Optional[str] = None,
    country: Optional[str | Iterable[str]] = None,
    topics: Optional[List[str]] = None,
    subnational_area: Optional[str] = None,
    organization: Optional[Entity] = None,
    inception_date: Optional[Iterable[str]] = None,
    dissolution_date: Optional[Iterable[str]] = None,
    number_of_seats: Optional[str] = None,
    wikidata_id: Optional[str] = None,
    source_url: Optional[str] = None,
    lang: Optional[str] = None,
    id_hash_prefix: Optional[str] = None,
) -> Entity:
    """Creates a Position entity.

    Position categorisation should then be fetched using zavod.logic.pep.categorise
    and the result's is_pep checked.

    Args:
        context: The context to create the entity in.
        name: The name of the position.
        summary: A short summary of the position.
        description: A longer description of the position.
        country: The country or countries the position is in.
        subnational_area: The state or district the position is in.
        organization: The organization the position is a part of.
        inception_date: The date the position was created.
        dissolution_date: The date the position was dissolved.
        number_of_seats: The number of seats that can hold the position.
        wikidata_id: The Wikidata QID of the position.
        source_url: The URL of the source the position was found in.
        lang: The language of the position details.

    Returns:
        A new entity of type `Position`."""

    position = context.make("Position")

    parts: List[str] = [name]
    if country is not None:
        parts.extend(ensure_list(country))
    if inception_date is not None:
        parts.extend(ensure_list(inception_date))
    if dissolution_date is not None:
        parts.extend(ensure_list(dissolution_date))
    if subnational_area is not None:
        parts.extend(ensure_list(subnational_area))

    if wikidata_id is not None:
        position.id = wikidata_id
    else:
        position.id = context.make_id(*parts, hash_prefix=id_hash_prefix)

    position.add("name", name, lang=lang)
    position.add("summary", summary, lang=lang)
    position.add("description", description, lang=lang)
    position.add("country", country)
    position.add("topics", topics)
    position.add("organization", organization, lang=lang)
    position.add("subnationalArea", subnational_area, lang=lang)
    position.add("inceptionDate", inception_date)
    position.add("dissolutionDate", dissolution_date)
    position.add("numberOfSeats", number_of_seats)
    position.add("wikidataId", wikidata_id)
    position.add("sourceUrl", source_url)

    return position

make_sanction(context, entity, key=None)

Create and return a sanctions object derived from the dataset metadata.

The country, authority, sourceUrl, and subject entity properties are automatically set.

Parameters:

Name Type Description Default
context Context

The runner context with dataset metadata.

required
entity Entity

The entity to which the sanctions object will be linked.

required
key Optional[str]

An optional key to be included in the ID of the sanction.

None

Returns:

Type Description
Entity

A new entity of type Sanction.

Source code in zavod/helpers/sanctions.py
def make_sanction(
    context: Context, entity: Entity, key: Optional[str] = None
) -> Entity:
    """Create and return a sanctions object derived from the dataset metadata.

    The country, authority, sourceUrl, and subject entity properties
    are automatically set.

    Args:
        context: The runner context with dataset metadata.
        entity: The entity to which the sanctions object will be linked.
        key: An optional key to be included in the ID of the sanction.

    Returns:
        A new entity of type Sanction.
    """
    assert entity.schema.is_a("Thing"), entity.schema
    assert entity.id is not None, entity.id
    dataset = context.dataset
    assert dataset.publisher is not None
    sanction = context.make("Sanction")
    sanction.id = context.make_id("Sanction", entity.id, key)
    sanction.add("entity", entity)
    if dataset.publisher.country != "zz":
        sanction.add("country", dataset.publisher.country)
    sanction.add("authority", dataset.publisher.name)
    sanction.add("sourceUrl", dataset.url)
    return sanction

make_security(context, isin)

Make a security entity.

Source code in zavod/helpers/securities.py
def make_security(context: Context, isin: str) -> Entity:
    """Make a security entity."""
    isin = isin.upper()
    entity = context.make("Security")
    entity.id = f"isin-{isin}"
    entity.add("isin", isin)
    cc = isin[:2]
    if cc not in ("XS", "CS"):
        entity.add("country", cc)
    return entity

multi_split(text, splitters)

Sequentially attempt to split a text based on an array of splitting criteria. This is useful for strings where multiple separators are used to separate values, e.g.: test,other/misc. A special case of this is itemised lists like a) test b) other c) misc which sanction-makers seem to love.

Parameters:

Name Type Description Default
text Optional[Union[str, Iterable[Optional[str]]]]

A text or list of texts to be split up further.

required
splitters Iterable[str]

A sequence of text splitting criteria to be applied to the text.

required

Returns:

Type Description
List[str]

Fully subdivided text snippets.

Source code in zavod/helpers/text.py
def multi_split(
    text: Optional[Union[str, Iterable[Optional[str]]]], splitters: Iterable[str]
) -> List[str]:
    """Sequentially attempt to split a text based on an array of splitting criteria.
    This is useful for strings where multiple separators are used to separate values,
    e.g.: `test,other/misc`. A special case of this is itemised lists like `a) test
    b) other c) misc` which sanction-makers seem to love.

    Args:
        text: A text or list of texts to be split up further.
        splitters: A sequence of text splitting criteria to be applied to the text.

    Returns:
        Fully subdivided text snippets.
    """
    if text is None:
        return []
    fragments = ensure_list(text)
    for splitter in splitters:
        out: List[Optional[str]] = []
        for fragment in fragments:
            if fragment is None:
                continue
            for frag in fragment.split(splitter):
                frag = frag.strip()
                if len(frag):
                    out.append(frag)
        fragments = out
    return [f for f in fragments if f is not None]

parse_date(text, formats, default=None)

Parse a date two ways: first, try and apply a set of structured formats and return a partial date if any of them parse correctly. Otherwise, apply extract_years on the remaining string.

Source code in zavod/helpers/dates.py
def parse_date(
    text: Optional[str], formats: Iterable[str], default: Optional[str] = None
) -> List[str]:
    """Parse a date two ways: first, try and apply a set of structured formats and
    return a partial date if any of them parse correctly. Otherwise, apply
    `extract_years` on the remaining string."""
    if text is None:
        return [default] if default is not None else []
    parsed = parse_formats(text, formats)
    if parsed.text is not None:
        return [parsed.text]
    years = extract_years(text)
    if len(years):
        return years
    return [default or text]

parse_html_table(table, header_tag='th', skiprows=0)

Parse an HTML table into a generator yielding a dict for each row.

Returns:

Type Description
None

Generator of dict per row, where the keys are the _-slugified table headings and the values are the HtmlElement of the cell.

See also
  • zavod.helpers.cells_to_str
  • zavod.helpers.links_to_dict
Source code in zavod/helpers/html.py
def parse_html_table(
    table: HtmlElement,
    header_tag: str = "th",
    skiprows: int = 0,
) -> Generator[Dict[str, HtmlElement], None, None]:
    """
    Parse an HTML table into a generator yielding a dict for each row.

    Returns:
        Generator of dict per row, where the keys are the _-slugified table headings
            and the values are the HtmlElement of the cell.

    See also:
      - `zavod.helpers.cells_to_str`
      - `zavod.helpers.links_to_dict`
    """
    headers = None
    for rownum, row in enumerate(table.findall(".//tr")):
        if rownum < skiprows:
            continue

        if headers is None:
            headers = []
            for el in row.findall(f"./{header_tag}"):
                # Workaround because lxml-stubs doesn't yet support HtmlElement
                # https://github.com/lxml/lxml-stubs/pull/71
                eltree = cast(HtmlElement, el)
                headers.append(slugify(eltree.text_content(), sep="_"))
            continue

        cells = row.findall("./td")
        assert len(headers) == len(cells), (headers, cells)
        yield {hdr: c for hdr, c in zip(headers, cells)}

parse_pdf_table(context, path, headers_per_page=False, preserve_header_newlines=False, start_page=None, end_page=None, skiprows=0, page_settings=None, save_debug_images=False)

Parse the largest table on each page of a PDF file and yield their rows as dictionaries.

Parameters:

Name Type Description Default
path Path

Path to the PDF file.

required
headers_per_page bool

Set to true if the headers are repeated on each page.

False
preserve_header_newlines bool

Don't slugify newlines in headers - e.g. for when the line breaks are meaningful.

False
start_page Optional[int]

The first page to process. 1-indexed.

None
end_page Optional[int]

The last page to process. 1-indexed.

None
skiprows int

The number of rows to skip before processing table headers.

0
page_settings Optional[Callable[[Page], Tuple[Page, Dict[str, Any]]]]

A function that takes a pdfplumber.page.Page object and returns a tuple of a Page that will be used to extract a table, and a dictionary of settings for extract_table. The page could be e.g. a cropped version of the original.

None
Pro tip

Save debug images in the page settings function to help with debugging.

  • https://github.com/jsvine/pdfplumber?tab=readme-ov-file#drawing-methods
  • https://github.com/jsvine/pdfplumber?tab=readme-ov-file#visually-debugging-the-table-finder
def settings_func(page):
    cropped = page.crop((0, 93, page.width, page.height))
    im = cropped.to_image()
    im.save(f"page-{cropped.page_number}.png")
    return (cropped, PAGE_SETTINGS)
Source code in zavod/helpers/pdf.py
def parse_pdf_table(
    context: Context,
    path: Path,
    headers_per_page: bool = False,
    preserve_header_newlines: bool = False,
    start_page: Optional[int] = None,
    end_page: Optional[int] = None,
    skiprows: int = 0,
    page_settings: Optional[Callable[[Page], Tuple[Page, Dict[str, Any]]]] = None,
    save_debug_images: bool = False,
) -> Generator[Dict[str, str], None, None]:
    """
    Parse the largest table on each page of a PDF file and yield their rows as dictionaries.

    Arguments:
        path: Path to the PDF file.
        headers_per_page: Set to true if the headers are repeated on each page.
        preserve_header_newlines: Don't slugify newlines in headers -
            e.g. for when the line breaks are meaningful.
        start_page: The first page to process. 1-indexed.
        end_page: The last page to process. 1-indexed.
        skiprows: The number of rows to skip before processing table headers.
        page_settings: A function that takes a `pdfplumber.page.Page` object and returns
            a tuple of a Page that will be used to extract a table, and a dictionary of
            settings for `extract_table`. The page could be e.g. a cropped version of the
            original.

    Pro tip:
        Save debug images in the page settings function to help with debugging.

        - https://github.com/jsvine/pdfplumber?tab=readme-ov-file#drawing-methods
        - https://github.com/jsvine/pdfplumber?tab=readme-ov-file#visually-debugging-the-table-finder

        ```
        def settings_func(page):
            cropped = page.crop((0, 93, page.width, page.height))
            im = cropped.to_image()
            im.save(f"page-{cropped.page_number}.png")
            return (cropped, PAGE_SETTINGS)
        ```
    """
    start_page_idx = start_page - 1 if isinstance(start_page, int) else None
    end_page_idx = end_page if isinstance(end_page, int) else None
    pdf = pdfplumber.open(path)
    headers = None
    for page in pdf.pages[start_page_idx:end_page_idx]:
        if page.page_number % 100 == 0:
            context.log.info(f"Processing page {page.page_number}...")

        if headers_per_page:
            headers = None

        if page_settings is not None:
            page, settings = page_settings(page)
        else:
            settings = {}

        rows = page.extract_table(settings)
        if rows is None:
            raise Exception(f"No table found on page {page.page_number} of {path}")
        for row_num, row in enumerate(rows):
            if headers is None:
                if row_num < skiprows:
                    continue
                headers = [
                    header_slug(cell or "", preserve_header_newlines) for cell in row
                ]
                continue
            assert len(headers) == len(row), (headers, row)
            yield dict(zip(headers, row))

        page.close()
    pdf.close()

parse_xlsx_sheet(context, sheet, skiprows=0, header_lookup=None)

Parse an Excel sheet into a sequence of dictionaries.

Parameters:

Name Type Description Default
context Context

Crawler context.

required
sheet Worksheet

The Excel sheet.

required
skiprows int

The number of rows to skip.

0
header_lookup Optional[str]

The lookup key for translating headers.

None
Source code in zavod/helpers/excel.py
def parse_xlsx_sheet(
    context: Context,
    sheet: Worksheet,
    skiprows: int = 0,
    header_lookup: Optional[str] = None,
) -> Generator[Dict[str, str | None], None, None]:
    """
    Parse an Excel sheet into a sequence of dictionaries.

    Args:
        context: Crawler context.
        sheet: The Excel sheet.
        skiprows: The number of rows to skip.
        header_lookup: The lookup key for translating headers.
    """
    headers = None
    row_counter = 0

    for row in sheet.iter_rows():
        # Increment row counter
        row_counter += 1

        # Skip the desired number of rows
        if row_counter <= skiprows:
            continue
        cells = [c.value for c in row]
        if headers is None:
            headers = []
            for idx, header in enumerate(cells):
                if header is None:
                    header = f"column_{idx}"
                if header_lookup:
                    header = context.lookup_value(
                        header_lookup,
                        stringify(header),
                        stringify(header),
                    )
                headers.append(slugify(header, sep="_"))
            continue

        record = {}
        for header, value in zip(headers, cells):
            if isinstance(value, datetime):
                value = value.date()
            record[header] = stringify(value)
        if len(record) == 0:
            continue
        if all(v is None for v in record.values()):
            continue
        yield record

postcode_pobox(text)

For when PO Box is stuffed into postcode, sometimes.

Returns:

Type Description
Tuple[Optional[str], Optional[str]]

Tuple of (postcode, po_box)

Source code in zavod/helpers/addresses.py
def postcode_pobox(text: Optional[str]) -> Tuple[Optional[str], Optional[str]]:
    """
    For when PO Box is stuffed into postcode, sometimes.

    Returns:
        Tuple of (postcode, po_box)
    """
    if text is None:
        return None, None
    if match := REGEX_POBOX.match(text):
        return None, match.group(0)
    return text, None

remove_bracketed(text)

Helps to deal with property values where additional info has been supplied in brackets that makes it harder to parse the value. Examples:

  • Russia (former USSR)
  • 1977 (as Muhammad Da'ud Salman)

It's probably not useful in all of these cases to try and parse and derive meaning from the bracketed bit, so we'll just discard it.

Parameters:

Name Type Description Default
text Optional[str]

Text with sub-text in brackets

required

Returns:

Type Description
Optional[str]

Text that was not in brackets.

Source code in zavod/helpers/text.py
def remove_bracketed(text: Optional[str]) -> Optional[str]:
    """Helps to deal with property values where additional info has been supplied in
    brackets that makes it harder to parse the value. Examples:

    - Russia (former USSR)
    - 1977 (as Muhammad Da'ud Salman)

    It's probably not useful in all of these cases to try and parse and derive meaning
    from the bracketed bit, so we'll just discard it.

    Args:
        text: Text with sub-text in brackets

    Returns:
        Text that was not in brackets.
    """
    if text is None:
        return None
    return BRACKETED.sub(" ", text)

remove_namespace(el)

Remove namespace in the passed XML/HTML document in place and return an updated element tree.

If the namespaces in a document define multiple tags with the same local tag name, this will create ambiguity and lead to errors. Most XML documents, however, only actively use one namespace.

Parameters:

Name Type Description Default
el ElementOrTree

The root element or tree to remove namespaces from.

required

Returns:

Type Description
ElementOrTree

An updated element tree with the namespaces removed.

Source code in zavod/helpers/xml.py
def remove_namespace(el: ElementOrTree) -> ElementOrTree:
    """Remove namespace in the passed XML/HTML document in place and
    return an updated element tree.

    If the namespaces in a document define multiple tags with the same
    local tag name, this will create ambiguity and lead to errors. Most
    XML documents, however, only actively use one namespace.

    Args:
        el: The root element or tree to remove namespaces from.

    Returns:
        An updated element tree with the namespaces removed.
    """
    for elem in el.iter():
        # https://stackoverflow.com/a/47233934
        if elem.tag is etree.Comment:  # type: ignore
            # Can't make a QName from a comment
            continue
        elem.tag = etree.QName(elem).localname
        for key, value in list(elem.attrib.items()):
            local_key = etree.QName(key).localname
            if key != local_key:
                elem.attrib[local_key] = value
    etree.cleanup_namespaces(el)
    return el

replace_months(dataset, text)

Re-write month names to the latin form to get a date string ready for parsing.

Parameters:

Name Type Description Default
dataset Dataset

The dataset which contains a date format specification.

required
text str

The string inside of which month names will be replaced.

required

Returns:

Type Description
str

A string in which month names are normalized.

Source code in zavod/helpers/dates.py
def replace_months(dataset: Dataset, text: str) -> str:
    """Re-write month names to the latin form to get a date string ready for parsing.

    Args:
        dataset: The dataset which contains a date format specification.
        text: The string inside of which month names will be replaced.

    Returns:
        A string in which month names are normalized.
    """
    spec = dataset.dates
    if spec.months_re is None:
        return text
    return spec.months_re.sub(lambda m: spec.mappings[m.group().lower()], text)

split_comma_names(context, text)

Split a string of multiple names that may contain company and individual names, some including commas, into individual names without breaking partnership names like "A, B and C Inc" or individuals like "Smith, Jane".

To make life easier, commas are stripped from company type suffixes like "Blue, LLC"

If the string can't be split into whole names reliably, a datapatch is looked up under the comma_names key, which should contain a list of names in the names attribute. If no match is found, the name is returned as a single item list, and a warning emitted.

Source code in zavod/helpers/names.py
def split_comma_names(context: Context, text: str) -> List[str]:
    """Split a string of multiple names that may contain company and individual names,
    some including commas, into individual names without breaking partnership names
    like "A, B and C Inc" or individuals like "Smith, Jane".

    To make life easier, commas are stripped from company type suffixes like "Blue, LLC"

    If the string can't be split into whole names reliably, a datapatch is looked up
    under the `comma_names` key, which should contain a list of names in the `names`
    attribute. If no match is found, the name is returned as a single item list,
    and a warning emitted.
    """
    text = collapse_spaces(text) or ""
    if not text:
        return []

    text = REGEX_CLEAN_COMMA.sub(r" \1", text)
    # If the string ends in a comma, the last comma is unnecessary (e.g. Goldman Sachs & Co. LLC,)
    if text.endswith(","):
        text = text[:-1]

    if not REGEX_AND.search(text) and not REGEX_LNAME_FNAME.match(text):
        names = [n.strip() for n in text.split(",")]
        return names
    else:
        if ("," in text) or (" and " in text):
            res = context.lookup("comma_names", text)
            if res:
                return cast("List[str]", res.names)
            else:
                context.log.warning(
                    "Not sure how to split on comma or and.", text=text.lower()
                )
                return [text]
        else:
            return [text]