Core concepts
Data Source
A data source describes any form of data, ranging from a database table, key-value store, file, or structured folder. Consequently, the system can support existing infrastructures such as application databases, data warehouses, as well as more complex infrastructures like data lakes.
Furthermore, data sources will vary in the amount of needed metadata. Therefore, each data source is represented with a unique key enabling us to understand which metadata to decode. For instance, a CSV data source can contain a separator symbol and a compression format. However, a PostgreSQL source needs a table name, and potentially a schema name. Hence, enabling the user to be flexible, yet detailed.
There are currently support for the following data sources:
- PostgreSQL
- AWS Redshift
- AWS S3
- Parquet
- CSV
- Feature View
- Custom Source
PostgreSQL
from aligned import PostgreSQLConfig
database = PostgreSQLConfig(env_var='PSQL_DB_URL')
taxi_data = database.table("taxi")
PostgreSQL Renaming
There is often not a one to one mapping between data and the "wanted" structure. Therefore, bellow is a way to rename columns.
from aligned import PostgreSQLConfig
database = PostgreSQLConfig(env_var='PSQL_DB_URL')
taxi_data = database.table(
name="taxi",
mapping_values={
"ID": "taxi_id",
"Duration": "duration"
}
)
AWS Redshift
from aligned import RedshiftSQLConfig
database = RedshiftSQLConfig(env_var='REDSHIFT_DB_URL')
taxi_data = database.table("taxi")
Redshift Renaming
There is often not a one to one mapping between data and the "wanted" structure. Therefore, bellow is a way to rename columns.
from aligned import RedshiftSQLConfig
database = RedshiftSQLConfig(env_var='REDSHIFT_DB_URL')
with_renames = database.table(
name="taxi",
mapping_values={
"ID": "taxi_id",
"Duration": "duration"
}
)
CSV File
from aligned import FileSource
taxi_file = FileSource.csv_at("taxi_data.csv")
CSV Config
There are a lot of differnet variations of CSV files. That is why it is possible to define a CsvConfig
containting which seperator, and compressions to use.
from aligned import FileSource
from aligned.sources.local import CsvConfig
taxi_file_config = CsvConfig(
seperator=";",
compression="gzip"
)
compressed_taxi_data = FileSource.csv_at(
"gzip_taxi_data.csv",
mapping_keys={
"ID": "taxi_id",
"Duration": "duration",
},
csv_config=taxi_file_config
)
Parquet File
There are a lot of differnet variations of Parquet files. That is why it is possible to define a ParquetConfig
containting which seperator, and compressions to use.
from aligned import FileSource
taxi_file = FileSource.parquet_at("taxi_data.parquet")
Parquet Config
from aligned import FileSource
from aligned.sources.local import ParquetConfig
taxi_file_config = ParquetConfig(
compression="gzip"
)
compressed_taxi_data = FileSource.parquet_at(
"gzip_taxi_data.parquet",
mapping_keys={
"ID": "taxi_id",
"Duration": "duration",
},
config=taxi_file_config
)
AWS S3
from aligned import AwsConfig
bucket = AwsS3Config(
access_token_env="AWS_ACCESS_TOKEN",
secret_token_env="AWS_SECRET_TOKEN",
bucket_env="AWS_BUCKET",
region_env="AWS_REGION"
)
taxi_parquet = bucket.parquet_at("taxi.parquet")
taxi_csv = bucket.csv_at("taxi.csv")
Feature View
You can also use a feature view as a data source.
Feature View Lineage
Using a feature view as the source for another will collect data lineage information. Therefore, aligned knows which sources depend on each other, which could be useful to keeping materialized sources up to date
@feature_view(
name="zipcode",
source=FileSource.parquet_at("...")
)
class Zipcode:
zipcode = Int64().as_entity()
event_timestamp = EventTimestamp()
created_timestamp = Timestamp()
location_type = String()
population = Int64()
@feature_view(
name="other",
source=Zipcode
)
class Other:
location_type = String().as_entity()
population = Int64()
event_timestamp = EventTimestamp()
Schema Structure
When using a feature view as a source. Make sure that the schema is matching in the down stream view. There is currently poor support for makeing sure you uphold the schama. However, this will be improved, as we can check that the schema would be upheld.
Custom Source
In some cases can it be nice to integrate with custom logic. Maybe you want to add a way to load data from an API or simulate data.
In this scenario do aligned support a custom data source where you can provide the functionality.
Here will you always get a RetrivalRequest
structure. That contains which entities, and features are expected, and some other information like event timestamps etc.
from aligned import CustomMethodDataSource
from aligned.request.retrival_request import RetrivalRequest
import polars as pl
def load_all_data(request: RetrivalRequest, limit: int | None) -> pl.LazyFrame:
# My custom method
return pl.DataFrame(...).lazy()
custom_source = CustomMethodDataSource.from_methods(
all_data=load_all_data,
all_between_dates=None,
features_for=None
)