Data Uploaders#
In this tutorial we are going to demonstrate the usage of the Bayesline Uploaders API. The Uploaders API provides a generalized mechanism to bring different types of data into the Bayesline ecosystem.
Specifically, we will introduce and explore:
Data Types
Datasets
Schemas and Parsers
The staging concept
The commit concept
Staging Validation
Housekeeping
Imports & Setup#
For this tutorial notebook, you will need to import the following packages.
import tempfile
from pathlib import Path
import polars as pl
from bayesline.apiclient import BayeslineApiClient
We will also need to have a Bayesline API client configured.
bln = BayeslineApiClient.new_client(
endpoint="https://[ENDPOINT]",
api_key="[API-KEY]",
)
The main entrypoint for the Uploaders API sits on bln.equity.uploaders
. All upload functionality can be reached from here on out.
See here for relevant docs:
uploaders = bln.equity.uploaders
Data Types#
A data type distinguishes distinct types of data that can be brought into the Bayesline ecosystem and are pre-configured by Bayesline. These include portfolio holdings, factor exposures, etc.
uploaders.get_data_types()
['exposures', 'factors', 'hierarchies', 'portfolios']
We can obtain a specific uploader for a data type. In this tutorial, we will be working with the exposure uploader, but all other uploaders operate analogously.
The get_data_type
method will return a DataTypeUploaderApi
instance which distinguishes the concept of datasets (see below).
See here for relevant docs:
exposure_uploader = uploaders.get_data_type("exposures")
Datasets#
For each data type (e.g. exposures) we can create isolated datasets. For instance, we might want to upload different sets of exposures. This can be achieved by creating a dataset. We can always retrieve existing datasets using the get_datasets
method. Since we haven’t created any datasets yet, this will be empty.
exposure_uploader.get_datasets()
[]
Now, we start by creating a new dataset "tutorial"
.
See here for relevant docs:
dataset = exposure_uploader.create_dataset("tutorial")
Schemas and Parsers#
Every data type comes with its own dataframe schema. Every uploaded dataframe will be converted into this schema to ensure a uniform way to view the data for a specific data type.
dataset.get_schema()
{'date': Date,
'asset_id': String,
'asset_id_type': String,
'factor_group': String,
'factor': String,
'exposure': Float32}
We may have input data in a different format than what the exposures data type declares as its schema (e.g. a wide format). We can either convert it ourselves or use one of the predefined input data parsers.
A parser:
Is defined for an input format and will convert it to the schema that the uploader expects.
Will add operations such as null-filtering.
Will record error messages if a given input cannot be parsed.
Provides access to example dataframes for the expected input.
Will ensure that the dataframe is valid if the parsing succeeds.
dataset.get_parser_names()
['Long-Format', 'Wide-Format']
For demonstration, we will use the Wide-Format
parser. When uploading dataframes, we can simply pass the name of the parser.
parser = dataset.get_parser("Wide-Format")
example_df = parser.get_examples()[0]
example_df
date | asset_id | asset_id_type | style^momentum_6 | style^momentum_12 | style^growth | market^market |
---|---|---|---|---|---|---|
date | str | str | f64 | f64 | f64 | f64 |
2025-01-06 | "GOOG" | "cusip9" | -0.3 | -0.2 | 1.2 | 1.0 |
2025-01-06 | "AAPL" | "cusip9" | 0.1 | 0.5 | 1.1 | 1.0 |
2025-01-07 | "GOOG" | "cusip9" | -0.28 | -0.19 | 1.21 | 1.0 |
Before running the parser, we can check if the data can be successfully parsed with the can_handle
method.
parser.can_handle(example_df)
UploadParserResult(parser='Wide-Format', success=True, messages=[])
parser.parse(example_df)
(shape: (12, 6)
┌────────────┬──────────┬───────────────┬──────────────┬─────────────┬──────────┐
│ date ┆ asset_id ┆ asset_id_type ┆ factor_group ┆ factor ┆ exposure │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ date ┆ str ┆ str ┆ str ┆ str ┆ f32 │
╞════════════╪══════════╪═══════════════╪══════════════╪═════════════╪══════════╡
│ 2025-01-06 ┆ GOOG ┆ cusip9 ┆ style ┆ momentum_6 ┆ -0.3 │
│ 2025-01-06 ┆ AAPL ┆ cusip9 ┆ style ┆ momentum_6 ┆ 0.1 │
│ 2025-01-07 ┆ GOOG ┆ cusip9 ┆ style ┆ momentum_6 ┆ -0.28 │
│ 2025-01-06 ┆ GOOG ┆ cusip9 ┆ style ┆ momentum_12 ┆ -0.2 │
│ 2025-01-06 ┆ AAPL ┆ cusip9 ┆ style ┆ momentum_12 ┆ 0.5 │
│ … ┆ … ┆ … ┆ … ┆ … ┆ … │
│ 2025-01-06 ┆ AAPL ┆ cusip9 ┆ style ┆ growth ┆ 1.1 │
│ 2025-01-07 ┆ GOOG ┆ cusip9 ┆ style ┆ growth ┆ 1.21 │
│ 2025-01-06 ┆ GOOG ┆ cusip9 ┆ market ┆ market ┆ 1.0 │
│ 2025-01-06 ┆ AAPL ┆ cusip9 ┆ market ┆ market ┆ 1.0 │
│ 2025-01-07 ┆ GOOG ┆ cusip9 ┆ market ┆ market ┆ 1.0 │
└────────────┴──────────┴───────────────┴──────────────┴─────────────┴──────────┘,
UploadParserResult(parser='Wide-Format', success=True, messages=[]))
Staging Data#
Staging takes an input dataframe (or file), parses it, and keeps it in a separate area (stage). We can repeat this process of staging multiple times to stage multiple files (e.g. if we have daily files). The staging area can then be committed which concatenates all staged dataframes and writes them to versioned storage.
Adding to the Staging Area#
We use the example wide dataframe for staging. We define a name example-1
to be able to tell the staged dataframes apart later on. We also specify a concrete parser we want to use. Note that the parser can be left blank in which case all available parsers will be tried and the first succeeding parser will be chosen.
dataset.stage_df(name="example-1", df=example_df, parser="Wide-Format")
UploadStagingResult(name='example-1', timestamp=datetime.datetime(2025, 7, 22, 0, 54, 35, 489867, tzinfo=TzInfo(UTC)), success=True, results=[UploadParserResult(parser='Wide-Format', success=True, messages=[])])
Below we’re adding a second dataframe for demonstration purposes. For this we use the existing example dataframe and roll the dates by one week.
example_df2 = example_df.with_columns(pl.col("date").dt.add_business_days(5))
example_df2
date | asset_id | asset_id_type | style^momentum_6 | style^momentum_12 | style^growth | market^market |
---|---|---|---|---|---|---|
date | str | str | f64 | f64 | f64 | f64 |
2025-01-13 | "GOOG" | "cusip9" | -0.3 | -0.2 | 1.2 | 1.0 |
2025-01-13 | "AAPL" | "cusip9" | 0.1 | 0.5 | 1.1 | 1.0 |
2025-01-14 | "GOOG" | "cusip9" | -0.28 | -0.19 | 1.21 | 1.0 |
# note that if we used the same name "example-1" this cell would fail
dataset.stage_df(name="example-2", df=example_df2, parser="Wide-Format")
UploadStagingResult(name='example-2', timestamp=datetime.datetime(2025, 7, 22, 0, 54, 35, 518148, tzinfo=TzInfo(UTC)), success=True, results=[UploadParserResult(parser='Wide-Format', success=True, messages=[])])
Retrieving Staged Data#
Below we demonstrate how to obtain previously staged data. We can either obtain the staging results (as we saw above when calling the stage_df
method) or the data itself.
dataset.get_staging_results()
{'example-2': UploadStagingResult(name='example-2', timestamp=datetime.datetime(2025, 7, 22, 0, 54, 35, 518148, tzinfo=TzInfo(UTC)), success=True, results=[UploadParserResult(parser='Wide-Format', success=True, messages=[])]),
'example-1': UploadStagingResult(name='example-1', timestamp=datetime.datetime(2025, 7, 22, 0, 54, 35, 489867, tzinfo=TzInfo(UTC)), success=True, results=[UploadParserResult(parser='Wide-Format', success=True, messages=[])])}
dataset.get_staging_data().collect()
_name | date | asset_id | asset_id_type | factor_group | factor | exposure |
---|---|---|---|---|---|---|
str | date | str | str | str | str | f32 |
"example-2" | 2025-01-13 | "GOOG" | "cusip9" | "style" | "momentum_6" | -0.3 |
"example-2" | 2025-01-13 | "AAPL" | "cusip9" | "style" | "momentum_6" | 0.1 |
"example-2" | 2025-01-14 | "GOOG" | "cusip9" | "style" | "momentum_6" | -0.28 |
"example-2" | 2025-01-13 | "GOOG" | "cusip9" | "style" | "momentum_12" | -0.2 |
"example-2" | 2025-01-13 | "AAPL" | "cusip9" | "style" | "momentum_12" | 0.5 |
… | … | … | … | … | … | … |
"example-1" | 2025-01-06 | "AAPL" | "cusip9" | "style" | "growth" | 1.1 |
"example-1" | 2025-01-07 | "GOOG" | "cusip9" | "style" | "growth" | 1.21 |
"example-1" | 2025-01-06 | "GOOG" | "cusip9" | "market" | "market" | 1.0 |
"example-1" | 2025-01-06 | "AAPL" | "cusip9" | "market" | "market" | 1.0 |
"example-1" | 2025-01-07 | "GOOG" | "cusip9" | "market" | "market" | 1.0 |
dataset.get_staging_data(names=["example-1"]).collect()
_name | date | asset_id | asset_id_type | factor_group | factor | exposure |
---|---|---|---|---|---|---|
str | date | str | str | str | str | f32 |
"example-1" | 2025-01-06 | "GOOG" | "cusip9" | "style" | "momentum_6" | -0.3 |
"example-1" | 2025-01-06 | "AAPL" | "cusip9" | "style" | "momentum_6" | 0.1 |
"example-1" | 2025-01-07 | "GOOG" | "cusip9" | "style" | "momentum_6" | -0.28 |
"example-1" | 2025-01-06 | "GOOG" | "cusip9" | "style" | "momentum_12" | -0.2 |
"example-1" | 2025-01-06 | "AAPL" | "cusip9" | "style" | "momentum_12" | 0.5 |
… | … | … | … | … | … | … |
"example-1" | 2025-01-06 | "AAPL" | "cusip9" | "style" | "growth" | 1.1 |
"example-1" | 2025-01-07 | "GOOG" | "cusip9" | "style" | "growth" | 1.21 |
"example-1" | 2025-01-06 | "GOOG" | "cusip9" | "market" | "market" | 1.0 |
"example-1" | 2025-01-06 | "AAPL" | "cusip9" | "market" | "market" | 1.0 |
"example-1" | 2025-01-07 | "GOOG" | "cusip9" | "market" | "market" | 1.0 |
Retrieving Summary Data#
We can also obtain predefined summary data for each staged file.
dataset.get_staging_data_summary()
_name | n_dates | n_assets | min_date | max_date | n_factor_groups | n_factors | min_exposure | max_exposure | mean_exposure | median_exposure | std_exposure |
---|---|---|---|---|---|---|---|---|---|---|---|
str | u32 | u32 | date | date | u32 | u32 | f32 | f32 | f32 | f32 | f32 |
"example-1" | 2 | 2 | 2025-01-06 | 2025-01-07 | 2 | 4 | -0.3 | 1.21 | 0.511667 | 0.75 | 0.610803 |
"example-2" | 2 | 2 | 2025-01-13 | 2025-01-14 | 2 | 4 | -0.3 | 1.21 | 0.511667 | 0.75 | 0.610803 |
Drilling down, we can obtain a more detailed summary as well.
dataset.get_staging_data_detail_summary()
_name | date | n_assets | min_exposure | max_exposure | mean_exposure | median_exposure | std_exposure |
---|---|---|---|---|---|---|---|
str | date | u32 | f32 | f32 | f32 | f32 | f32 |
"example-1" | 2025-01-06 | 2 | -0.3 | 1.2 | 0.55 | 0.75 | 0.572276 |
"example-1" | 2025-01-07 | 1 | -0.28 | 1.21 | 0.435 | 0.405 | 0.674852 |
"example-2" | 2025-01-13 | 2 | -0.3 | 1.2 | 0.55 | 0.75 | 0.572276 |
"example-2" | 2025-01-14 | 1 | -0.28 | 1.21 | 0.435 | 0.405 | 0.674852 |
Staging from a File#
Instead of passing a dataframe directly, we can also stage data from an existing csv
, csv.gz
, parquet
or zip
file.
Below we’ll demonstrate how to:
Stage a file
Obtain the staged data
Remove the file from the staging area
First, we define an output path where we will write our example_df2
dataframe.
path = Path(tempfile.mkdtemp()) / "example2.csv"
example_df2.write_csv(path)
We can then stage the output file with stage_file
, retrieve back the data with get_staging_data
, and wipe the staging area with wipe_staging
.
dataset.stage_file(path, parser="Wide-Format")
UploadStagingResult(name='example2', timestamp=datetime.datetime(2025, 7, 22, 0, 54, 35, 637686, tzinfo=TzInfo(UTC)), success=True, results=[UploadParserResult(parser='Wide-Format', success=True, messages=[])])
dataset.get_staging_data(names=["example2"]).collect()
_name | date | asset_id | asset_id_type | factor_group | factor | exposure |
---|---|---|---|---|---|---|
str | date | str | str | str | str | f32 |
"example2" | 2025-01-13 | "GOOG" | "cusip9" | "style" | "momentum_6" | -0.3 |
"example2" | 2025-01-13 | "AAPL" | "cusip9" | "style" | "momentum_6" | 0.1 |
"example2" | 2025-01-14 | "GOOG" | "cusip9" | "style" | "momentum_6" | -0.28 |
"example2" | 2025-01-13 | "GOOG" | "cusip9" | "style" | "momentum_12" | -0.2 |
"example2" | 2025-01-13 | "AAPL" | "cusip9" | "style" | "momentum_12" | 0.5 |
… | … | … | … | … | … | … |
"example2" | 2025-01-13 | "AAPL" | "cusip9" | "style" | "growth" | 1.1 |
"example2" | 2025-01-14 | "GOOG" | "cusip9" | "style" | "growth" | 1.21 |
"example2" | 2025-01-13 | "GOOG" | "cusip9" | "market" | "market" | 1.0 |
"example2" | 2025-01-13 | "AAPL" | "cusip9" | "market" | "market" | 1.0 |
"example2" | 2025-01-14 | "GOOG" | "cusip9" | "market" | "market" | 1.0 |
dataset.wipe_staging(names=["example2"])
{'example2': UploadStagingResult(name='example2', timestamp=datetime.datetime(2025, 7, 22, 0, 54, 35, 637686, tzinfo=TzInfo(UTC)), success=True, results=[UploadParserResult(parser='Wide-Format', success=True, messages=[])])}
Committing Data#
Once we’re happy with the staged files, we can commit them into versioned storage. Versions are immutable so every commmit creates a new version, which allows for full time travel.
When committing staged data, we need to choose a mode
which defines how to write the data in the context of a versioned storage.
dataset.get_commit_modes()
{'append': 'Appends new factor/date combinations to the existing data.Collisions will be ignored.',
'append_factor': 'Appends new factors to the existing data.Collisions with existing factors will be ignored.',
'append_from': 'Appends new factor/date combinations to the existing data but only after the last date in the existing data. Collisions will be ignored.'}
We’ll follow the steps below to demonstrate the commit and versioning process:
Commit entire staging area.
Show empty staging area (committed staged names are cleared from the staging area).
Show version history.
Get data at latest version.
Re-stage the example-1 dataframe and commit in
append
mode.Re-stage the example-2 dataframe and commit in
append_from
mode.Get data at different versions.
We start by committing the entire staging area. When we do this, we see that the commit was created as version 1.
dataset.commit(mode="append")
UploadCommitResult(version=1, committed_names=['example-2', 'example-1'])
After committing, the staging area should now be empty.
dataset.get_staging_data().collect()
_name | date | asset_id | asset_id_type | factor_group | factor | exposure |
---|---|---|---|---|---|---|
str | date | str | str | str | str | f32 |
We can get the list of all historical versions with the version_history
method. We see below that there are 2 versions. Version 0 corresponds to the automatic creation of the dataset before it was overwritten with our commit changes in Version 1.
dataset.version_history()
{1: datetime.datetime(2025, 7, 22, 0, 54, 35, 980000, tzinfo=datetime.timezone.utc),
0: datetime.datetime(2025, 7, 22, 0, 54, 35, 946000, tzinfo=datetime.timezone.utc)}
dataset.get_data().collect()
date | asset_id | asset_id_type | factor_group | factor | exposure |
---|---|---|---|---|---|
date | str | str | str | str | f32 |
2025-01-06 | "GOOG" | "cusip9" | "style" | "momentum_6" | -0.300049 |
2025-01-06 | "AAPL" | "cusip9" | "style" | "momentum_6" | 0.099976 |
2025-01-07 | "GOOG" | "cusip9" | "style" | "momentum_6" | -0.280029 |
2025-01-13 | "GOOG" | "cusip9" | "style" | "momentum_6" | -0.300049 |
2025-01-13 | "AAPL" | "cusip9" | "style" | "momentum_6" | 0.099976 |
… | … | … | … | … | … |
2025-01-13 | "AAPL" | "cusip9" | "market" | "market" | 1.0 |
2025-01-14 | "GOOG" | "cusip9" | "market" | "market" | 1.0 |
2025-01-06 | "GOOG" | "cusip9" | "market" | "market" | 1.0 |
2025-01-06 | "AAPL" | "cusip9" | "market" | "market" | 1.0 |
2025-01-07 | "GOOG" | "cusip9" | "market" | "market" | 1.0 |
Below we will append some more data to demonstrate the versioning.
dataset.stage_df(
"example-1",
example_df.with_columns(pl.col("date").dt.add_business_days(10)),
parser="Wide-Format"
)
UploadStagingResult(name='example-1', timestamp=datetime.datetime(2025, 7, 22, 0, 54, 37, 397199, tzinfo=TzInfo(UTC)), success=True, results=[UploadParserResult(parser='Wide-Format', success=True, messages=[])])
dataset.commit(mode="append")
UploadCommitResult(version=2, committed_names=['example-1'])
dataset.version_history()
{2: datetime.datetime(2025, 7, 22, 0, 54, 38, 290000, tzinfo=datetime.timezone.utc),
1: datetime.datetime(2025, 7, 22, 0, 54, 35, 980000, tzinfo=datetime.timezone.utc),
0: datetime.datetime(2025, 7, 22, 0, 54, 35, 946000, tzinfo=datetime.timezone.utc)}
dataset.stage_df(
"example-2",
example_df2.with_columns(pl.col("date").dt.add_business_days(10)),
parser="Wide-Format"
)
UploadStagingResult(name='example-2', timestamp=datetime.datetime(2025, 7, 22, 0, 54, 38, 796359, tzinfo=TzInfo(UTC)), success=True, results=[UploadParserResult(parser='Wide-Format', success=True, messages=[])])
dataset.commit(mode="append_from")
UploadCommitResult(version=3, committed_names=['example-2'])
dataset.version_history()
{3: datetime.datetime(2025, 7, 22, 0, 54, 39, 918000, tzinfo=datetime.timezone.utc),
2: datetime.datetime(2025, 7, 22, 0, 54, 38, 290000, tzinfo=datetime.timezone.utc),
1: datetime.datetime(2025, 7, 22, 0, 54, 35, 980000, tzinfo=datetime.timezone.utc),
0: datetime.datetime(2025, 7, 22, 0, 54, 35, 946000, tzinfo=datetime.timezone.utc)}
# both example-1 and example-2 at this version
dataset.get_data(version=3).collect()
date | asset_id | asset_id_type | factor_group | factor | exposure |
---|---|---|---|---|---|
date | str | str | str | str | f32 |
2025-01-06 | "GOOG" | "cusip9" | "style" | "momentum_6" | -0.300049 |
2025-01-06 | "AAPL" | "cusip9" | "style" | "momentum_6" | 0.099976 |
2025-01-07 | "GOOG" | "cusip9" | "style" | "momentum_6" | -0.280029 |
2025-01-13 | "GOOG" | "cusip9" | "style" | "momentum_6" | -0.300049 |
2025-01-13 | "AAPL" | "cusip9" | "style" | "momentum_6" | 0.099976 |
… | … | … | … | … | … |
2025-01-27 | "GOOG" | "cusip9" | "style" | "growth" | 1.200195 |
2025-01-27 | "AAPL" | "cusip9" | "style" | "growth" | 1.099609 |
2025-01-27 | "GOOG" | "cusip9" | "style" | "momentum_6" | -0.300049 |
2025-01-27 | "AAPL" | "cusip9" | "style" | "momentum_6" | 0.099976 |
2025-01-28 | "GOOG" | "cusip9" | "style" | "momentum_6" | -0.280029 |
# only example-1 at this version
dataset.get_data(version=2).collect()
date | asset_id | asset_id_type | factor_group | factor | exposure |
---|---|---|---|---|---|
date | str | str | str | str | f32 |
2025-01-06 | "GOOG" | "cusip9" | "style" | "momentum_6" | -0.300049 |
2025-01-06 | "AAPL" | "cusip9" | "style" | "momentum_6" | 0.099976 |
2025-01-07 | "GOOG" | "cusip9" | "style" | "momentum_6" | -0.280029 |
2025-01-13 | "GOOG" | "cusip9" | "style" | "momentum_6" | -0.300049 |
2025-01-13 | "AAPL" | "cusip9" | "style" | "momentum_6" | 0.099976 |
… | … | … | … | … | … |
2025-01-20 | "AAPL" | "cusip9" | "style" | "momentum_12" | 0.5 |
2025-01-21 | "GOOG" | "cusip9" | "style" | "momentum_12" | -0.189941 |
2025-01-20 | "GOOG" | "cusip9" | "market" | "market" | 1.0 |
2025-01-20 | "AAPL" | "cusip9" | "market" | "market" | 1.0 |
2025-01-21 | "GOOG" | "cusip9" | "market" | "market" | 1.0 |
Note that if we append data that already exists as identified by their primary key (i.e. there is no data to append), then no new version will be recorded.
dataset.stage_df("no-new-data", example_df)
UploadStagingResult(name='no-new-data', timestamp=datetime.datetime(2025, 7, 22, 0, 54, 42, 223413, tzinfo=TzInfo(UTC)), success=True, results=[UploadParserResult(parser='Wide-Format', success=True, messages=[])])
dataset.commit(mode="append")
UploadCommitResult(version=3, committed_names=['no-new-data'])
dataset.version_history()
{3: datetime.datetime(2025, 7, 22, 0, 54, 39, 918000, tzinfo=datetime.timezone.utc),
2: datetime.datetime(2025, 7, 22, 0, 54, 38, 290000, tzinfo=datetime.timezone.utc),
1: datetime.datetime(2025, 7, 22, 0, 54, 35, 980000, tzinfo=datetime.timezone.utc),
0: datetime.datetime(2025, 7, 22, 0, 54, 35, 946000, tzinfo=datetime.timezone.utc)}
Retrieving Summary Data#
Similar to the summary data we could obtain for the staging data, we can do the same for the committed data (at different versions).
dataset.get_data_summary()
n_dates | n_assets | min_date | max_date | n_factor_groups | n_factors | min_exposure | max_exposure | mean_exposure | median_exposure | std_exposure |
---|---|---|---|---|---|---|---|---|---|---|
u32 | u32 | date | date | u32 | u32 | f32 | f32 | f32 | f32 | f32 |
8 | 2 | 2025-01-06 | 2025-01-28 | 2 | 4 | -0.300049 | 1.209961 | 0.511648 | 0.75 | 0.610786 |
dataset.get_data_summary(version=2)
n_dates | n_assets | min_date | max_date | n_factor_groups | n_factors | min_exposure | max_exposure | mean_exposure | median_exposure | std_exposure |
---|---|---|---|---|---|---|---|---|---|---|
u32 | u32 | date | date | u32 | u32 | f32 | f32 | f32 | f32 | f32 |
6 | 2 | 2025-01-06 | 2025-01-21 | 2 | 4 | -0.300049 | 1.209961 | 0.511648 | 0.75 | 0.610786 |
dataset.get_data_detail_summary()
date | n_assets | min_exposure | max_exposure | mean_exposure | median_exposure | std_exposure |
---|---|---|---|---|---|---|
date | u32 | f32 | f32 | f32 | f32 | f32 |
2025-01-06 | 2 | -0.300049 | 1.200195 | 0.549973 | 0.75 | 0.57226 |
2025-01-07 | 1 | -0.280029 | 1.209961 | 0.434998 | 0.405029 | 0.674835 |
2025-01-13 | 2 | -0.300049 | 1.200195 | 0.549973 | 0.75 | 0.57226 |
2025-01-14 | 1 | -0.280029 | 1.209961 | 0.434998 | 0.405029 | 0.674835 |
2025-01-20 | 2 | -0.300049 | 1.200195 | 0.549973 | 0.75 | 0.57226 |
2025-01-21 | 1 | -0.280029 | 1.209961 | 0.434998 | 0.405029 | 0.674835 |
2025-01-27 | 2 | -0.300049 | 1.200195 | 0.549973 | 0.75 | 0.57226 |
2025-01-28 | 1 | -0.280029 | 1.209961 | 0.434998 | 0.405029 | 0.674835 |
dataset.get_data_detail_summary(version=2)
date | n_assets | min_exposure | max_exposure | mean_exposure | median_exposure | std_exposure |
---|---|---|---|---|---|---|
date | u32 | f32 | f32 | f32 | f32 | f32 |
2025-01-06 | 2 | -0.300049 | 1.200195 | 0.549973 | 0.75 | 0.57226 |
2025-01-07 | 1 | -0.280029 | 1.209961 | 0.434998 | 0.405029 | 0.674835 |
2025-01-13 | 2 | -0.300049 | 1.200195 | 0.549973 | 0.75 | 0.57226 |
2025-01-14 | 1 | -0.280029 | 1.209961 | 0.434998 | 0.405029 | 0.674835 |
2025-01-20 | 2 | -0.300049 | 1.200195 | 0.549973 | 0.75 | 0.57226 |
2025-01-21 | 1 | -0.280029 | 1.209961 | 0.434998 | 0.405029 | 0.674835 |
Validating Staging Data#
When staging multiple files, it’s possible that their combined contents may not be valid and so cannot be committed. For example, if the files introduce duplicate entries, the commit
method will fail.
The example below illustrates how to validate the staging area before committing. To simulate a validation failure, we intentionally stage the same example dataframe twice, resulting in duplicate records.
dataset.stage_df("example-1", example_df, parser="Wide-Format")
dataset.stage_df("example-2", example_df, parser="Wide-Format")
dataset.get_staging_results()
{'example-2': UploadStagingResult(name='example-2', timestamp=datetime.datetime(2025, 7, 22, 0, 54, 47, 151083, tzinfo=TzInfo(UTC)), success=True, results=[UploadParserResult(parser='Wide-Format', success=True, messages=[])]),
'example-1': UploadStagingResult(name='example-1', timestamp=datetime.datetime(2025, 7, 22, 0, 54, 47, 135495, tzinfo=TzInfo(UTC)), success=True, results=[UploadParserResult(parser='Wide-Format', success=True, messages=[])])}
The validation check below will produce non-empty dataframes if any validation errors occurred. In this case, it produces the duplicated records together with a _name
column which indicates the names of the staged dataframes that introduced the duplication.
dataset.validate_staging_data()
{'Duplication Check': shape: (12, 8)
┌────────────┬──────────┬─────────────┬─────────────┬────────────┬──────────┬─────────┬────────────┐
│ date ┆ asset_id ┆ asset_id_ty ┆ factor_grou ┆ factor ┆ exposure ┆ n_dupes ┆ _name │
│ --- ┆ --- ┆ pe ┆ p ┆ --- ┆ --- ┆ --- ┆ --- │
│ date ┆ str ┆ --- ┆ --- ┆ str ┆ f32 ┆ u32 ┆ str │
│ ┆ ┆ str ┆ str ┆ ┆ ┆ ┆ │
╞════════════╪══════════╪═════════════╪═════════════╪════════════╪══════════╪═════════╪════════════╡
│ 2025-01-06 ┆ AAPL ┆ cusip9 ┆ style ┆ growth ┆ 1.1 ┆ 2 ┆ example-2, │
│ ┆ ┆ ┆ ┆ ┆ ┆ ┆ example-1 │
│ 2025-01-06 ┆ GOOG ┆ cusip9 ┆ style ┆ growth ┆ 1.2 ┆ 2 ┆ example-2, │
│ ┆ ┆ ┆ ┆ ┆ ┆ ┆ example-1 │
│ 2025-01-07 ┆ GOOG ┆ cusip9 ┆ style ┆ growth ┆ 1.21 ┆ 2 ┆ example-2, │
│ ┆ ┆ ┆ ┆ ┆ ┆ ┆ example-1 │
│ 2025-01-06 ┆ AAPL ┆ cusip9 ┆ market ┆ market ┆ 1.0 ┆ 2 ┆ example-2, │
│ ┆ ┆ ┆ ┆ ┆ ┆ ┆ example-1 │
│ 2025-01-06 ┆ GOOG ┆ cusip9 ┆ market ┆ market ┆ 1.0 ┆ 2 ┆ example-2, │
│ ┆ ┆ ┆ ┆ ┆ ┆ ┆ example-1 │
│ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │
│ 2025-01-06 ┆ GOOG ┆ cusip9 ┆ style ┆ momentum_6 ┆ -0.3 ┆ 2 ┆ example-2, │
│ ┆ ┆ ┆ ┆ ┆ ┆ ┆ example-1 │
│ 2025-01-07 ┆ GOOG ┆ cusip9 ┆ style ┆ momentum_6 ┆ -0.28 ┆ 2 ┆ example-2, │
│ ┆ ┆ ┆ ┆ ┆ ┆ ┆ example-1 │
│ 2025-01-06 ┆ AAPL ┆ cusip9 ┆ style ┆ momentum_1 ┆ 0.5 ┆ 2 ┆ example-2, │
│ ┆ ┆ ┆ ┆ 2 ┆ ┆ ┆ example-1 │
│ 2025-01-06 ┆ GOOG ┆ cusip9 ┆ style ┆ momentum_1 ┆ -0.2 ┆ 2 ┆ example-2, │
│ ┆ ┆ ┆ ┆ 2 ┆ ┆ ┆ example-1 │
│ 2025-01-07 ┆ GOOG ┆ cusip9 ┆ style ┆ momentum_1 ┆ -0.19 ┆ 2 ┆ example-2, │
│ ┆ ┆ ┆ ┆ 2 ┆ ┆ ┆ example-1 │
└────────────┴──────────┴─────────────┴─────────────┴────────────┴──────────┴─────────┴────────────┘}
# this call would fail with: `UploadError: Staging data fails validation checks.`
# dataset.commit(mode="append")
To resolve this error, we can delete one of the erroneously staged dataframes, after which the validation will produce an empty dataframe, indicating successful validation.
dataset.wipe_staging(names=["example-2"])
{'example-2': UploadStagingResult(name='example-2', timestamp=datetime.datetime(2025, 7, 22, 0, 54, 47, 151083, tzinfo=TzInfo(UTC)), success=True, results=[UploadParserResult(parser='Wide-Format', success=True, messages=[])])}
dataset.validate_staging_data()
{}
Fast Commit#
We can skip the staging process and commit a dataframe straight into versioned storage as demonstrated below.
dataset.fast_commit(example_df, mode="append", parser="Wide-Format")
UploadCommitResult(version=3, committed_names=[])
Housekeeping#
To delete a dataset entirely we can call its destroy
method. Warning: This cannot be undone, so exercise caution when deleting datasets.
dataset.destroy()