Core concepts

Data Contract

A data contract defines the expectations we have for our data. Thereby making it easier to find issues and reducing cognitive load.

Therefore, a data contract is very similar to a schema definiton.

A simple @data_contract

Defining a data_contract is very similar to how we define a dataclass.

Therefore, describing an Article could look something like the following.

from aligned import data_contract, String, ImageUrl, Timestamp

@data_contract()
class Article:
    title = String()
    author = String().is_optional()
    content = String()

    published_at = Timestamp()
    image_url = ImageUrl().is_optional()

Data Validation

This makes it possible to start validating our data with something like.

data = pl.DataFrame({
    "title": ["Intro to Aligned", "Data Contracts", "Is this valid?"],
    "author": [None, "this is", "optional"],
    "content": ["This column", "is requered", None],
    "published_at": [datetime.utcnow(), datetime.utcnow(), None],
    "image_url": ["https://aligned.codes/image.png", None, None],
    "an_additional_column": ["This", "could be", "anything"]
})

validated_data = Article.drop_invalid(data)

print(validated_data)

Which lead to the following output

shape: (2, 6)
┌──────────────────┬─────────┬─────────────┬────────────────────────────┬─────────────────────────────────┬──────────────────────┐
│ title            ┆ author  ┆ content     ┆ published_at               ┆ image_url                       ┆ an_additional_column │
│ ---              ┆ ---     ┆ ---         ┆ ---                        ┆ ---                             ┆ ---                  │
│ str              ┆ str     ┆ str         ┆ datetime[μs]               ┆ str                             ┆ str                  │
╞══════════════════╪═════════╪═════════════╪════════════════════════════╪═════════════════════════════════╪══════════════════════╡
│ Intro to Aligned ┆ null    ┆ This column ┆ 2025-06-27 06:19:50.701922 ┆ https://aligned.codes/image.pn… ┆ This                 │
│ Data Contracts   ┆ this is ┆ is requered ┆ 2025-06-27 06:19:50.702039 ┆ null                            ┆ could be             │
└──────────────────┴─────────┴─────────────┴────────────────────────────┴─────────────────────────────────┴──────────────────────┘

However, this is basic validation. We can also validate columns with more complicated checks.

So let's modify the contract to the following

@data_contract()
class Article:
    title = String().min_length(3).max_length(15)
    ...


validated = Article.drop_invalid({
    "title": ["Intro to Aligned", "Data Contracts", "Is this valid?"],
    ...
})

Assuming we have fixed the other data issues and validating the following data.

shape: (2, 6)
┌────────────────┬──────────┬─────────────┬────────────────────────────┬───────────┬──────────────────────┐
│ title          ┆ author   ┆ content     ┆ published_at               ┆ image_url ┆ an_additional_column │
│ ---            ┆ ---      ┆ ---         ┆ ---                        ┆ ---       ┆ ---                  │
│ str            ┆ str      ┆ str         ┆ datetime[μs]               ┆ str       ┆ str                  │
╞════════════════╪══════════╪═════════════╪════════════════════════════╪═══════════╪══════════════════════╡
│ Data Contracts ┆ this is  ┆ is requered ┆ 2025-06-27 06:39:14.576722 ┆ null      ┆ could be             │
│ Is this valid? ┆ optional ┆             ┆ 2025-06-27 06:39:14.576722 ┆ null      ┆ anything             │
└────────────────┴──────────┴─────────────┴────────────────────────────┴───────────┴──────────────────────┘

Validation

It is okay that a source or data frame contains more columns then what is defined in the contract. However, those columns are a minimum requirements.

It is also possible to define renaming logic, so a 1:1 match is not needed.

Sources

For each data contract is it also possible to add different sources. This enables you to both read and write to them, but also validate that the underlying sources are following the contract.

from aligned import data_contract, String, ImageUrl, Timestamp, FileSource

@data_contract(
    source=FileSource.csv_at("articles.csv")
)
class Article:
    title = String()
    author = String().is_optional()
    content = String()

    published_at = Timestamp()
    image_url = ImageUrl().is_optional()

Load data

Finaly we can load data with the following code.

polars_df = await Article.query().all().to_polars()

pandas_df = await Article.query().all().to_pandas()

Or if we have a loaded feature store.

df = await store.feature_view(Zipcode).all().to_polars()

Filtering

Furthermore, we can define type-safe filters with the following code.

df = await Article.query().filter(
    Article().author == "NY Times"
).to_polars()

Materialized Sources

You can even add a materialized_source making it possible to setup ETL pipelines from one system to another.

If defined will the materialized source be the default source for the contract, but you can change it by using the using_source method.

from aligned import data_contract, String, ImageUrl, Timestamp, FileSource

@data_contract(
    source=FileSource.csv_at("articles.csv"),
    materialized_source=FileSource.parquet_at("articles.parquet"),
)
class Article:
    ...


read_source = Article.metadata.source

await Article.query().overwrite(
    Article.query().using_source(read_source).all()
)

Metadata

Furthermore, you can also add description, tags and a list of contacts.

@data_contract(
    source=FileSource.csv_at("article.csv"),
    description="Some articles from different news papers.",
    contacts=[
        Contact(name="Mats", email="mats@something.com", slack_member_id="...")
    ],
    tags=["eta-team"]
)
class Article:
    ...
Previous
Data Sources