Core concepts
Model contract
Representing our models behavior is needed in order to fulfill the goal of the proposed solution. Therefore, Aligned makes it possible to represent the data flow related to a model. At it’s most basic representation do this mean describing the features going in to the model, and the features coming out. However, there are more then just input and outputs related to a model. Therefore, it is possible to describe the following information as well.
from examples.taxi.arrival import TaxiArrivals
from examples.taxi.departure import TaxiDepartures, TaxiVendor
departures = TaxiDepartures()
arrivals = TaxiArrivals()
vendor = TaxiVendor()
@model_contract(
input_features=[
departures.day_of_week,
departures.travel_distance,
vendor.passenger_hour_mean,
vendor.passenger_20_min_mean,
],
output_source=FileSource.parquet_at("predicted_trips.parquet"),
)
class TaxiModel:
trip_id = Int32().as_entity()
predicted_duration = arrivals.duration.as_regression_target()
predicted_at = EventTimestamp()
Or a more complicated example.
from aligned import model_contract, Int32, Timestamp, EventTimestamp, FileSource
from aligned.exposed_model.mlflow import mlflow_server
from examples.taxi.arrival import TaxiArrivals
from examples.taxi.departure import TaxiDepartures, TaxiVendor
departures = TaxiDepartures()
arrivals = TaxiArrivals()
vendor = TaxiVendor()
@model_contract(
name="taxi",
description="A regression model that predicts the duration of a ride",
contacts=["@MatsMoll"],
input_features=[
departures.day_of_week,
departures.travel_distance,
vendor.passenger_hour_variance,
vendor.passenger_hour_count,
vendor.passenger_hour_mean,
vendor.passenger_20_min_variance,
vendor.passenger_20_min_count,
vendor.passenger_20_min_mean,
vendor.mean_passenger_change,
],
output_source=FileSource.parquet_at("predicted_trips.parquet"),
exposed_model=mlflow_server(
host="https://my-taxi-model-endpoint:8000"
),
acceptable_freshness=timedelta(minutes=5),
unacceptable_freshness=timedelta(minutes=15),
dataset_store=FileSource.json_at("taxi_datasets.json")
)
class TaxiModel:
trip_id = Int32().as_entity()
predicted_duration = arrivals.duration.as_regression_target()
predicted_at = EventTimestamp()
used_model = Int32().as_model_version()
Labels
An ML model will produce some kind of output. Therefore, Aligned makes it possible to describe the output of supervised models. This will either be in the form of a regression, or classification target. Therefore, we can know which type of model it is, the expected data output, but we can also understand which data source the target feature is located at.
In the above example we know that the duration
feature in our TaxiArrivals
view is the one we want to predict, as it is labeled with as_regresssion_target()
. Therefore, we also know that it is a regression model, which will help us in the future with model monitoring.
Lastly, labeling this feature also helps us know that we do not need to load the TaxiArrivals
table when making predictions, but that it is only needed for use-cases that need the ground truth.
Output Source
In many cases will it be interesting to store the predictions of our model somewhere. Therefore, Aligned makes it possible to define a source where historical predictions can be found. The same ting is possible for stream to get real time predictions. Furthermore, a prediction may contain more then just the predicted value itself. For instance could other metadata about the predictions be stored, such as model version, and entity ids. However, other metrics like prediction certainty, and probability for classes may also be interesting.
In the above example have we defined that our predictions will be stored in the taxi_db
, and in the table predicted_trips
. Furthermore, we have defined that the table consists of three properties. The predicted_duration
, predicted_at
, and the used_model
which is a reference to the model version.
You can load the predictions with the following code
stored_preds = await TaxiModel.query().all_predictions().to_pandas()
Ground Truth
However, only having access to a prediction, and it’s related metadata is not enough. To know how well our models actually work, will we need to know where and when our ground truths will be located at. To some degree will Aligned know where to fetch historical ground truth values, based on the indirect dependency on a feature view. Therefore, this works well when creating data sets as we can define which entities to create data sets for. However, this is not the case when we want to observe the ground truth in real time. In such a scenario will we also need to know when a value is a valid ground truth.
Because of this is it possible to register streams where the ground truth will arrive. Either by describing the condition and a sink of the ground truth, or by describing a stream source where the value will be stored. Therefore, you can choose if Aligned should handle the ground truth condition, or if it should be handled manually outside the scope of Aligned.
Exposed Models
A model have little value without making it easy to use it. Therefore, have Aligned made it possible to define where our models are exposed. This means you can define if you want to run the model in memory, or off load it to an external server.
from aligned.exposed_model.mlflow import mlflow_server
...
@model_contract(
name="taxi",
exposed_model=mlflow_server(
host="https://my-taxi-model-endpoint:8000"
),
...
)
class TaxiModel:
...
Defining an exposed model will also make it possible to use the model with the following.
store = await ContractStore.from_dir(".")
preds = await store.model("taxi").predict_over({
"trip_id": [...],
"vendor_id": [...]
}).to_polars()
Freshness
All data products have a few assumptions on how frequently a data product will be updated. That is why Aligned makes it possible to add freshness thresholds.
This makes it clear what is the "expected" delay of a prediction, also refered to as the acceptable_freshness
. However, it also defines an unacceptable_freshness
which defines the an delay that should not occure.
Combined with the Aligned UI can these thresholds be used to send of different types of alerts that monitor the data quality.
Dataset Store
Working with data often leads to very spesific datsets, like train
, test
, validation
, but you may want to store other spesific datasets related to a model. Therefore, will Aligned enable the user to register where these datasets exists.
In the shown example will it create a json file called taxi_datasets
that contains the location of the dataset files, with some associated metadata.
@model_contract(
name="taxi",
dataset_store=FileSource.json_at("taxi_datasets.json"),
...
)
class TaxiModel:
pass
You can manually load or modify the DatasetStore
object, or you can use some built in functions like the following training pipeline.
store = await ContractStore.from_dir(".")
entites = ...
dataset_store = store.model("taxi").dataset_store
datasets = await (store.model("taxi")
.with_labels()
.features_for(entities)
.train_test(train_size=0.7)
.store_dataset_at_directory(
FileSource.directory("datasets/taxi"),
dataset_store=dataset_store
)
)
train = await datasets.train.to_polars()
print(train.input)
print(train.labels)