Core concepts

Data Source

A data source describes any form of data, ranging from a database table, key-value store, file, or structured folder. Consequently, the system can support existing infrastructures such as application databases, data warehouses, as well as more complex infrastructures like data lakes.

Furthermore, data sources will vary in the amount of needed metadata. Therefore, each data source is represented with a unique key enabling us to understand which metadata to decode. For instance, a CSV data source can contain a separator symbol and a compression format. However, a PostgreSQL source needs a table name, and potentially a schema name. Hence, enabling the user to be flexible, yet detailed.

There are currently support for the following data sources:

  • PostgreSQL
  • AWS Redshift
  • AWS S3
  • Parquet
  • CSV
  • Feature View
  • Custom Source

PostgreSQL

from aligned import PostgreSQLConfig

database = PostgreSQLConfig(env_var='PSQL_DB_URL')
taxi_data = database.table("taxi")

PostgreSQL Renaming

There is often not a one to one mapping between data and the "wanted" structure. Therefore, bellow is a way to rename columns.

from aligned import PostgreSQLConfig

database = PostgreSQLConfig(env_var='PSQL_DB_URL')
taxi_data = database.table(
    name="taxi", 
    mapping_values={
        "ID": "taxi_id",
        "Duration": "duration"
    }
)

AWS Redshift

from aligned import RedshiftSQLConfig

database = RedshiftSQLConfig(env_var='REDSHIFT_DB_URL')
taxi_data = database.table("taxi")

Redshift Renaming

There is often not a one to one mapping between data and the "wanted" structure. Therefore, bellow is a way to rename columns.

from aligned import RedshiftSQLConfig

database = RedshiftSQLConfig(env_var='REDSHIFT_DB_URL')
with_renames = database.table(
    name="taxi", 
    mapping_values={
        "ID": "taxi_id",
        "Duration": "duration"
    }
)

CSV File

from aligned import FileSource

taxi_file = FileSource.csv_at("taxi_data.csv")

CSV Config

There are a lot of differnet variations of CSV files. That is why it is possible to define a CsvConfig containting which seperator, and compressions to use.

from aligned import FileSource
from aligned.sources.local import CsvConfig

taxi_file_config = CsvConfig(
    seperator=";",
    compression="gzip"
)
compressed_taxi_data = FileSource.csv_at(
    "gzip_taxi_data.csv",
    mapping_keys={
        "ID": "taxi_id",
        "Duration": "duration",
    },
    csv_config=taxi_file_config
)

Parquet File

There are a lot of differnet variations of Parquet files. That is why it is possible to define a ParquetConfig containting which seperator, and compressions to use.

from aligned import FileSource

taxi_file = FileSource.parquet_at("taxi_data.parquet")

Parquet Config

from aligned import FileSource
from aligned.sources.local import ParquetConfig

taxi_file_config = ParquetConfig(
    compression="gzip"
)
compressed_taxi_data = FileSource.parquet_at(
    "gzip_taxi_data.parquet",
    mapping_keys={
        "ID": "taxi_id",
        "Duration": "duration",
    },
    config=taxi_file_config
)

AWS S3

from aligned import AwsConfig

bucket = AwsS3Config(
    access_token_env="AWS_ACCESS_TOKEN",
    secret_token_env="AWS_SECRET_TOKEN",
    bucket_env="AWS_BUCKET",
    region_env="AWS_REGION"
)
taxi_parquet = bucket.parquet_at("taxi.parquet")
taxi_csv = bucket.csv_at("taxi.csv")

Feature View

You can also use a feature view as a data source.

Feature View Lineage

Using a feature view as the source for another will collect data lineage information. Therefore, aligned knows which sources depend on each other, which could be useful to keeping materialized sources up to date

@feature_view(
    name="zipcode",
    source=FileSource.parquet_at("...")
)
class Zipcode:
    zipcode = Int64().as_entity()

    event_timestamp = EventTimestamp()
    created_timestamp = Timestamp()

    location_type = String()
    population = Int64()



@feature_view(
    name="other",
    source=Zipcode
)
class Other:
    location_type = String().as_entity()

    population = Int64()
    event_timestamp = EventTimestamp()

Schema Structure

When using a feature view as a source. Make sure that the schema is matching in the down stream view. There is currently poor support for makeing sure you uphold the schama. However, this will be improved, as we can check that the schema would be upheld.

Custom Source

In some cases can it be nice to integrate with custom logic. Maybe you want to add a way to load data from an API or simulate data.

In this scenario do aligned support a custom data source where you can provide the functionality.

Here will you always get a RetrivalRequest structure. That contains which entities, and features are expected, and some other information like event timestamps etc.

from aligned import CustomMethodDataSource
from aligned.request.retrival_request import RetrivalRequest
import polars as pl

def load_all_data(request: RetrivalRequest, limit: int | None) -> pl.LazyFrame:
    # My custom method
    return pl.DataFrame(...).lazy()

custom_source = CustomMethodDataSource.from_methods(
    all_data=load_all_data,
    all_between_dates=None,
    features_for=None
)
Previous
Model Contracts