Skip to content

Data Reviews

When we don't believe that automated extraction will be sufficiently accurate, we can use Data Reviews to request reviews by human reviewers who can fix extraction issues before accepting an extraction result.

Context

We want the following properties from reviews:

  • We want to be notified when there are new reviews that need attention
  • Data removed from the source should also drop out of the dataset.
  • If the source data changes in a way that the automated extraction changes, e.g. for a correction, we want to update and re-review the data.
  • If we change the data model, e.g. to extract additional fields, we want to be able to decide whether existing reviews should be redone or can be backward compatible.

    • We want incompatible data model changes to fail early and loudly.
  • We want user-edited data changes to be validated early (ideally in the UI) to prevent painful slow review/editing turnaround time.

Implementation

The basic workflow is:

  1. define a pydantic model for the data
  2. Perform the automated extraction
  3. Call review = review_extraction()
  4. If review.accepted is true, use review.extracted_data
  5. Assert that all the reviews related to a given crawler are accepted using assert_all_accepted, opting to either emit a warning, or raise an exception. An exception is useful to prevent publishing a partial dataset - if we would prefer to hold off publishing a new release until the data has been accepted.
  6. After the crawler has run, reviewers can review the data in Zavod UI and correct/accept extraction results. The crawler can then use the accepted data in its next run.

For example, imagine a crawler crawling web pages with regulatory notices.

from zavod.shed.gpt import run_typed_text_prompt
from zavod.stateful.review import review_extraction, assert_all_accepted, HtmlSourceValue


Schema = Literal["Person", "Company", "LegalEntity"]
schema_field = Field(
    description=(
        "Use LegalEntity if it isn't clear whether the entity is a person or a company."
    )
)


class Defendant(BaseModel):
    entity_schema: Schema = schema_field
    name: str


class Defendants(BaseModel):
    defendants: List[Defendant]


PROMPT = f"""
Extract the defendants in the attached article. ONLY include names mentioned
in the article text.

Instructions for specific fields:

  - entity_schema: {schema_field.description}
"""

def crawl_page(context: Context, url: str, page: _Element) -> None:
    article = doc.xpath(".//article")[0]
    source_value = HtmlSourceValue(
        key_parts=notice_id(url),
        label="Notice of regulatory action taken",
        element=article_element,
        url=url,
    )
    prompt_result = run_typed_text_prompt(
        context,
        prompt=PROMPT,
        string=source_value.value_string,
        response_type=Defendants,
    )
    # If a review has previously been requested for the same source_value.key_parts,
    # it'll be found here.
    review = review_extraction(
        context,
        source_value=source_value
        original_extraction=prompt_result,
        origin=gpt.DEFAULT_MODEL,
    )
    # Once it's been accepted by a reviewer, we can use it
    if not review.accepted:
        return

    for item in review.extracted_data.defendants:
        entity = context.make(item.entity_schema)
        entity.id = context.make_id(item.name)
        entity.add("name", item.name, origin=review.origin)
        context.emit(entity)

def crawl(context: Context) -> None:
    ...
    for url in urls:
        ...
        crawl_page(context, url, page)

    # This will raise an exception unless all the reviews fetched
    # during a given crawl have `accepted == True`.
    assert_all_accepted(context)

zavod.stateful.review.review_extraction(context, source_value, original_extraction, origin, crawler_version=1, default_accepted=False)

Ensures a Review exists for the given key to allow human review of automated data extraction from a source value.

  • If it's new, extracted_data will default to original_extraction and accepted to default_accepted.
  • last_seen_version is always updated to the current crawl version.
  • If it's not accepted yet, original_extraction and extracted_data will be updated.
  • If both source_value and original_extraction have changed or crawler_version has been bumped, all values are reset as if it's new.

Parameters:

Name Type Description Default
context Context

The runner context with dataset metadata.

required
source_value SourceValue

The source value for the extracted data.

required
original_extraction ModelType

An instance of a pydantic model of data extracted from the source. Any reviewer changes to the data will be validated against this model. Initially Review.extracted_data will be set to this value. Reviewer edits will then be stored in Review.extracted_data with Review.original_extraction remaining as the original.

required
origin str

A short string indicating the origin of the extraction, e.g. the model name or "lookups" if it's backfilled from datapatches lookups.

required
crawler_version int

A version number a crawler can use as a checkpoint for changes requiring re-extraction and/or re-review. Don't bump this for every crawler change, only for backward incompatible data model changes or to force re-review for some other reason.

1
default_accepted bool

Whether the data should be marked as accepted on creation or reset.

False
Source code in zavod/stateful/review.py
def review_extraction(
    context: Context,
    source_value: SourceValue,
    original_extraction: ModelType,
    origin: str,
    crawler_version: int = 1,
    default_accepted: bool = False,
) -> Review[ModelType]:
    """
    Ensures a Review exists for the given key to allow human review of automated
    data extraction from a source value.

    - If it's new, `extracted_data` will default to `original_extraction` and
      `accepted` to `default_accepted`.
    - `last_seen_version` is always updated to the current crawl version.
    - If it's not accepted yet, `original_extraction` and `extracted_data` will be updated.
    - If both `source_value` and `original_extraction` have changed or `crawler_version` has been bumped, all values are reset as if it's new.

    Args:
        context: The runner context with dataset metadata.
        source_value: The source value for the extracted data.
        original_extraction: An instance of a pydantic model of data extracted
            from the source. Any reviewer changes to the data will be validated against
            this model. Initially `Review.extracted_data` will be set to this value.
            Reviewer edits will then be stored in `Review.extracted_data` with
            `Review.original_extraction` remaining as the original.
        origin: A short string indicating the origin of the extraction, e.g. the
            model name or "lookups" if it's backfilled from datapatches lookups.
        crawler_version: A version number a crawler can use as a checkpoint for changes
            requiring re-extraction and/or re-review. Don't bump this for every crawler
            change, only for backward incompatible data model changes or to force re-review
            for some other reason.
        default_accepted: Whether the data should be marked as accepted on creation or reset.
    """
    key_slug = review_key(source_value.key_parts)
    assert key_slug is not None

    data_model = type(original_extraction)
    schema = data_model.model_json_schema(schema_generator=SchemaGenerator)
    now = datetime.now(timezone.utc)

    review = Review[ModelType].by_key(
        context.conn, data_model, dataset=context.dataset.name, key=key_slug
    )
    save_new_revision = False
    if review is None:
        context.log.debug("Creating new review", key=key_slug)
        review = Review[ModelType](
            dataset=context.dataset.name,
            key=key_slug,
            source_mime_type=source_value.mime_type,
            source_label=source_value.label,
            source_url=source_value.url,
            accepted=default_accepted,
            extraction_schema=schema,
            source_value=source_value.value_string,
            data_model=data_model,
            original_extraction=original_extraction,
            origin=origin,
            extracted_data=original_extraction,
            crawler_version=crawler_version,
            last_seen_version=context.version.id,
            modified_at=now,
            modified_by=MODIFIED_BY_CRAWLER,
        )
        save_new_revision = True
    else:
        review.last_seen_version = context.version.id

        crawler_version_changed = review.crawler_version < crawler_version
        # Don't try to read (and thus validate) the extracted data if the crawler
        # version changed. We bump that when the model isn't backward compatible.
        if crawler_version_changed or (
            not source_value.matches(review)
            and not review.matches_original(original_extraction)
        ):
            if crawler_version_changed:
                context.log.debug(
                    "Crawler version changed. Resetting review.",
                    key=key_slug,
                    old=review.crawler_version,
                    new=crawler_version,
                )
            else:
                context.log.debug(
                    "Source value changed. Resetting review.", key=key_slug
                )
            review.crawler_version = crawler_version
            review.data_model = data_model
            review.extraction_schema = schema
            review.original_extraction = original_extraction
            review.origin = origin
            review.extracted_data = original_extraction
            review.accepted = default_accepted
            save_new_revision = True
        elif not review.accepted and not review.matches_original(original_extraction):
            context.log.debug("Extraction changed for unaccepted review.", key=key_slug)
            # If we haven't accepted this yet and the extraction changed, we want the
            # change regardless of whether the source changed since the prompt or the
            # model might be better.
            review.original_extraction = original_extraction
            # Resetting extracted_data to original_extraction here loses unaccepted edits
            # but prompt improvements happen more often than unaccepted edits.
            review.extracted_data = original_extraction
            # Saving a new revision every crawl for an unaccepted item makes a revision
            # for items where two items mapping to the same key, e.g.
            # "American Express Inc" and "American Express Inc." when we're likely not
            # a ton of reviewer work.
            # Once accepted, the one extraction shouldn't be overwriting the other.
            save_new_revision = False
            review.modified_at = now
            review.modified_by = MODIFIED_BY_CRAWLER

        if save_new_revision:
            review.modified_at = now
            review.modified_by = MODIFIED_BY_CRAWLER
    review.save(context.conn, new_revision=save_new_revision)
    return review

zavod.stateful.review.assert_all_accepted(context, *, raise_on_unaccepted=True)

Raise an exception or warning with the number of unaccepted items if any extraction entries for the current dataset and version are not accepted yet.

Parameters:

Name Type Description Default
context Context

The runner context with dataset metadata.

required
raise_on_unaccepted bool

Whether to raise an exception if there are unaccepted items. If False, a warning will be logged instead.

True
Source code in zavod/stateful/review.py
def assert_all_accepted(context: Context, *, raise_on_unaccepted: bool = True) -> None:
    """
    Raise an exception or warning with the number of unaccepted items if any extraction
    entries for the current dataset and version are not accepted yet.

    Args:
        context: The runner context with dataset metadata.
        raise_on_unaccepted: Whether to raise an exception if there are unaccepted items.
            If False, a warning will be logged instead.
    """
    # Make sure everything is saved to the database in case we raise:
    context.flush()

    count = Review.count_unaccepted(
        context.conn, context.dataset.name, context.version.id
    )
    if count > 0:
        message = (
            f"There are {count} unaccepted items for dataset "
            f"{context.dataset.name} and version {context.version.id}"
        )
        if raise_on_unaccepted:
            raise Exception(message)
        else:
            context.log.warning(message)

zavod.stateful.review.HtmlSourceValue

Bases: SourceValue

Source code in zavod/stateful/review.py
class HtmlSourceValue(SourceValue):
    element: HtmlElement

    def __init__(
        self,
        key_parts: str | List[str],
        label: str,
        element: HtmlElement,
        url: str,
    ):
        """
        Sets `value_string` as the serialized HTML of the element.

        Args:
            key_parts: Information from the source that uniquely and
                consistently identifies the review within the dataset. For an
                enforcement action, that might be an action reference number or
                publication url (for lack of a consistent identifier).
        """
        self.key_parts = key_parts
        self.mime_type = HTML
        self.label = label
        self.url = url
        self.value_string = tostring(element, pretty_print=True, encoding="unicode")
        self.element = element

    def matches(self, review: Review[ModelType]) -> bool:
        assert review.source_mime_type == HTML, review.source_mime_type
        seen_element = fromstring(review.source_value)
        return h.html.element_text_hash(seen_element) == h.html.element_text_hash(
            self.element
        )

__init__(key_parts, label, element, url)

Sets value_string as the serialized HTML of the element.

Parameters:

Name Type Description Default
key_parts str | List[str]

Information from the source that uniquely and consistently identifies the review within the dataset. For an enforcement action, that might be an action reference number or publication url (for lack of a consistent identifier).

required
Source code in zavod/stateful/review.py
def __init__(
    self,
    key_parts: str | List[str],
    label: str,
    element: HtmlElement,
    url: str,
):
    """
    Sets `value_string` as the serialized HTML of the element.

    Args:
        key_parts: Information from the source that uniquely and
            consistently identifies the review within the dataset. For an
            enforcement action, that might be an action reference number or
            publication url (for lack of a consistent identifier).
    """
    self.key_parts = key_parts
    self.mime_type = HTML
    self.label = label
    self.url = url
    self.value_string = tostring(element, pretty_print=True, encoding="unicode")
    self.element = element

zavod.stateful.review.TextSourceValue

Bases: SourceValue

Source code in zavod/stateful/review.py
class TextSourceValue(SourceValue):
    def __init__(
        self,
        key_parts: str | List[str],
        label: str,
        text: str,
        url: Optional[str] = None,
    ):
        """
        Args:
            key_parts: Information from the source that uniquely and
                consistently identifies the review within the dataset. For a string
                of names that rarely changes, that string itself might work.
        """
        self.key_parts = key_parts
        self.mime_type = PLAIN
        self.label = label
        self.url = url
        self.value_string = text

    def matches(self, review: Review[ModelType]) -> bool:
        """
        Performs the same normalisation as `review_key` so that we consider
        multiple source values normalising to the same key a match.
        """
        assert review.source_mime_type == PLAIN, review.source_mime_type
        return slugify(self.value_string) == slugify(review.source_value)

__init__(key_parts, label, text, url=None)

Parameters:

Name Type Description Default
key_parts str | List[str]

Information from the source that uniquely and consistently identifies the review within the dataset. For a string of names that rarely changes, that string itself might work.

required
Source code in zavod/stateful/review.py
def __init__(
    self,
    key_parts: str | List[str],
    label: str,
    text: str,
    url: Optional[str] = None,
):
    """
    Args:
        key_parts: Information from the source that uniquely and
            consistently identifies the review within the dataset. For a string
            of names that rarely changes, that string itself might work.
    """
    self.key_parts = key_parts
    self.mime_type = PLAIN
    self.label = label
    self.url = url
    self.value_string = text

matches(review)

Performs the same normalisation as review_key so that we consider multiple source values normalising to the same key a match.

Source code in zavod/stateful/review.py
def matches(self, review: Review[ModelType]) -> bool:
    """
    Performs the same normalisation as `review_key` so that we consider
    multiple source values normalising to the same key a match.
    """
    assert review.source_mime_type == PLAIN, review.source_mime_type
    return slugify(self.value_string) == slugify(review.source_value)

Best practices

Review keys

The key should uniquely identify a given piece of data extraction/review content. Ideally it should be consistent in spite of changes to the content, but this isn't always possible. Key input gets slugified by review_extraction.

For free text in a CSV cell that doesn't have a consistent identifier, e.g. John, Sally, and partners, just use the string as the key.

For web pages, e.g. of regulatory notices, try and use the notice ID if one can be extracted reliably, rather than the web page URL, because the the URL can change if they reorganise the website, and the notice ID could likely be extracted consistently despite such a change.

Model Documentation

Use model documentation (e.g. fieldname: MyEnum = Field(description="...")) to explain how fields should be extracted. This gets included in the JSON schema so it's made available to the human reviewer in Zavod UI.

OpenAI's structured output API doesn't seem to support JSON schema description properties yet so also include it explicitly in the prompt. See example above.