bayesline.api.equity.AsyncUploaderApi#

class bayesline.api.equity.AsyncUploaderApi#

Bases: ABC

Provides functionality to parse, stage and upload dataframes to a versioned storage.

Parsing: This uploader contains a set of parsers that can be used to parse raw dataframes into their standardized format.

Staging: The staging functionality allows to stage multiple dataframes for upload. This is done by providing a name and a dataframe for each staged dataframe and is not affecting the ‘committed’ data until the staged dataframes are committed. The staging area is unversioned.

Commit: Allows to commit the staged dataframes to the versioned storage. All committed dataframes will be removed from the staging area. At commit time all staged dataframes will be concatenated. The committed data is one consolidated dataframe.

Versioning: Committed data is versioned. At commit time a ‘commit mode’ can be provided to determine how the committed data is merged with the already committed data (e.g. overwrite, append, upsert, etc.).

Methods

__init__()

can_handle(df, *[, parser, name])

Parameters df: pl.DataFrame The dataframe to check if the parser can handle. parser: str, optional The parser to use. If None will check all parsers and choose the first parser that can handle the dataframe. name: str | None, optional The name of the dataframe (which could be the filename). Some parsers might extract information (such as a date) from the name.

commit(mode[, names])

Commits and clears all staged files identified by given names.

commit_as_task(mode[, names])

destroy()

Deletes the entire dataset including the entire version history of uploads and staged files.

fast_commit(df, mode[, parser])

Directly commits the given dataframe without staging.

fast_commit_as_task(df, mode[, parser])

get_commit_modes()

Returns dict[str, str] available modes (keys) that can be passed to the commit function and their descriptions (values).

get_data([columns, unique, version])

Gets the committed data at the given version.

get_data_as_task([columns, unique, version])

get_data_detail_summary([version])

Gets the detail data summary statistics (multi row) at the given version.

get_data_detail_summary_as_task([version])

get_data_summary([version])

Gets the data summary statistics (single row) at the given version.

get_data_summary_as_task([version])

get_detail_summary_schema([which])

Parameters which: Literal["data", "staging"], optional The type of detail summary statistics to get the schema for. If "data" will get the schema for the committed data. If "staging" will get the schema for the staged data.

get_parser(parser)

Parameters parser: str The parser to obtain.

get_parser_names()

Returns list[str]: The list of available parsers that can be used to parse files with this uploader.

get_schema([which])

Parameters which: Literal["data", "staging"], optional The type of schema to get. If "data" will get the schema for the committed data. If "staging" will get the schema for the staged data.

get_staging_data([names, columns, unique])

Gets the staging data for the optional list of names.

get_staging_data_as_task([names, columns, ...])

get_staging_data_detail_summary([names])

Get the detail staging summary statistics for the optional list of names.

get_staging_data_detail_summary_as_task([names])

get_staging_data_summary([names])

Get the staging summary statistics for the optional list of names.

get_staging_data_summary_as_task([names])

get_staging_results([names])

Get the staging results for the optional list of names.

get_summary_schema([which])

Parameters which: Literal["data", "staging"], optional The type of summary statistics to get the schema for. If "data" will get the schema for the committed data. If "staging" will get the schema for the staged data.

stage_df(name, df[, parser, replace])

Stages the dataframe for upload under the given name.

stage_df_as_task(name, df[, parser, replace])

stage_file(path[, name, parser, replace])

Stages the dataframe from the given file for upload under the given name.

stage_file_as_task(path[, name, parser, replace])

validate_staging_data([names, short])

Validates the staging data for the optional list of names and returns a dict of error dataframes keyed by the name of the validation test.

validate_staging_data_as_task([names, short])

version_history()

Returns dict[int, dt.datetime]: A dictionary of version numbers and the corresponding timestamps.

wipe_staging([names])

Wipe the staging results for the optional list of names.

abstract async get_schema(which: Literal['data', 'staging'] = 'data') dict[str, DataType]#

Parameters#

which: Literal[“data”, “staging”], optional

The type of schema to get. If “data” will get the schema for the committed data. If “staging” will get the schema for the staged data.

Returns#

dict[str, pl.DataType]:

The schema of the uploaded files.

abstract async get_summary_schema(which: Literal['data', 'staging'] = 'data') dict[str, DataType]#

Parameters#

which: Literal[“data”, “staging”], optional

The type of summary statistics to get the schema for. If “data” will get the schema for the committed data. If “staging” will get the schema for the staged data.

Returns#

dict[str, pl.DataType]:

The schema of the summary statistics.

abstract async get_detail_summary_schema(which: Literal['data', 'staging'] = 'data') dict[str, DataType]#

Parameters#

which: Literal[“data”, “staging”], optional

The type of detail summary statistics to get the schema for. If “data” will get the schema for the committed data. If “staging” will get the schema for the staged data.

Returns#

dict[str, pl.DataType]:

The schema of the detail summary statistics.

abstract async get_commit_modes() dict[str, str]#

Returns#

dict[str, str]

available modes (keys) that can be passed to the commit function and their descriptions (values).

abstract async get_parser_names() list[str]#

Returns#

list[str]:

The list of available parsers that can be used to parse files with this uploader.

abstract async get_parser(parser: str) AsyncUploadParserApi#

Parameters#

parser: str

The parser to obtain.

Returns#

AsyncUploadParserApi:

The parser.

abstract async can_handle(df: DataFrame, *, parser: str | None = None, name: str | None = None) MultiParserResult#

Parameters#

df: pl.DataFrame

The dataframe to check if the parser can handle.

parser: str, optional

The parser to use. If None will check all parsers and choose the first parser that can handle the dataframe.

name: str | None, optional

The name of the dataframe (which could be the filename). Some parsers might extract information (such as a date) from the name.

Returns#

MultiParserResult:

The result of the parser check.

abstract async stage_df(name: str, df: DataFrame, parser: str | None = None, replace: bool = False) UploadStagingResult#

Stages the dataframe for upload under the given name.

Parameters#

name: str

The name of the dataframe to stage.

df: pl.DataFrame

The dataframe to stage.

parser: str, optional

The parser to use. If None will check all parsers.

replace: bool, optional

If True will replace a possible existing staging with the same name. If False a name clash will result in a failed staging result.

Raises#

UploadError

if the given name already exists

Returns#

UploadStagingResult

abstract async stage_df_as_task(name: str, df: DataFrame, parser: str | None = None, replace: bool = False) AsyncTask[UploadStagingResult]#
abstract async stage_file(path: Path, name: str | None = None, parser: str | None = None, replace: bool = False) UploadStagingResult#

Stages the dataframe from the given file for upload under the given name.

Parameters#

path: Path

File path for the file to stage.

name: str, optional

The name of the dataframe to stage. If None will use the file name.

parser: str, optional

The parser to use. If None will check all parsers.

replace: bool, optional

If True will replace a possible existing staging with the same name. If False a name clash will result in a failed staging result.

Raises#

UploadError

if the file could not be staged

Returns#

UploadStagingResult

abstract async stage_file_as_task(path: Path, name: str | None = None, parser: str | None = None, replace: bool = False) AsyncTask[UploadStagingResult]#
abstract async get_staging_results(names: list[str] | None = None) dict[str, UploadStagingResult]#

Get the staging results for the optional list of names.

Parameters#

names: list[str] | None, optional

The names of the staging results to get. If None will get all staging results. Missing names will be ignored and won’t be part of the result.

Returns#

dict[str, UploadStagingResult]:

A dictionary of staging results.

abstract async wipe_staging(names: list[str] | None = None) dict[str, UploadStagingResult]#

Wipe the staging results for the optional list of names.

Parameters#

names: list[str] | None, optional

The names of the staging results to wipe. If None will wipe all staging results. Names that do not exist will be ignored.

Returns#

dict[str, UploadStagingResult]:

A dictionary of staging results that were wiped.

abstract async get_staging_data(names: list[str] | None = None, *, columns: list[str] | None = None, unique: bool = False) LazyFrame#

Gets the staging data for the optional list of names.

Parameters#

names: list[str] | None, optional

The names of the staging data to get. If None will get all staging data. Names that do not exist will be ignored.

columns: list[str] | None, optional

The columns to get. If None will get all columns.

unique: bool, optional

If True will return a unique set of rows. If False will return all rows.

Returns#

pl.LazyFrame:

The staging data with an additional column _name that contains the staging name.

abstract async get_staging_data_as_task(names: list[str] | None = None, *, columns: list[str] | None = None, unique: bool = False) AsyncTask[LazyFrame]#
abstract async get_staging_data_summary(names: list[str] | None = None) DataFrame#

Get the staging summary statistics for the optional list of names.

Parameters#

names: list[str] | None, optional

The names for which to get the staging summary statistics. If None will get all staging data. Names that do not exist will be ignored.

Returns#

pl.DataFrame:

The staging summary statistics data which will contain one row per staged file. The schema matches get_summary_schema with an additional column _name (at the beginning) of type str which contains the name of the staged file.

abstract async get_staging_data_summary_as_task(names: list[str] | None = None) AsyncTask[DataFrame]#
abstract async get_staging_data_detail_summary(names: list[str] | None = None) DataFrame#

Get the detail staging summary statistics for the optional list of names.

Parameters#

names: list[str] | None, optional

The names for which to get the detail staging summary statistics. If None will get all staging data. Names that do not exist will be ignored.

Returns#

pl.DataFrame:

The detail staging summary statistics data which will contain multiple rows per staged file. The schema matches get_detail_summary_schema with an additional column _name (at the beginning) of type str which contains the name of the staged file.

abstract async get_staging_data_detail_summary_as_task(names: list[str] | None = None) AsyncTask[DataFrame]#
abstract async validate_staging_data(names: list[str] | None = None, short: bool = False) dict[str, DataFrame]#

Validates the staging data for the optional list of names and returns a dict of error dataframes keyed by the name of the validation test.

Parameters#

names: list[str] | None, optional

The names for which to carry out the validation. If None will use all staging data. Names that do not exist will be ignored.

short: bool, optional

If True will return a shorter dataframes which only contain a column _name to indicate the name of the staging file which has validation errors with other staging files. If False will return a dataframe with the _name (as above) and other columns to indicate the validation errors.

Returns#

dict[str, pl.DataFrame]

Error dataframes for different validation checks. The dataframes will contain a column _name. If short is False then the _name column contains a comma separated list of staging names that are the cause of the respective validation error. If short is True then the _name column will be the only column and each row contains a staging file name that has validation errors.

abstract async validate_staging_data_as_task(names: list[str] | None = None, short: bool = False) AsyncTask[dict[str, DataFrame]]#
abstract async commit(mode: str, names: list[str] | None = None) UploadCommitResult#

Commits and clears all staged files identified by given names.

Parameters#

mode: str

commit mode, one of commit_modes

names: list[str] | None, optional

The names of the staging results to commit. If None will commit all staging results. Names that do not exist will be ignored.

Raises#

ValueError

if the given commit mode does not exist

UploadError

if the staged data fails validation (as tested by validate_staging_data)

Returns#

UploadCommitResult

abstract async commit_as_task(mode: str, names: list[str] | None = None) AsyncTask[UploadCommitResult]#
abstract async fast_commit(df: DataFrame | Path, mode: str, parser: str | None = None) UploadCommitResult#

Directly commits the given dataframe without staging.

Parameters#

df: pl.DataFrame | Path

The dataframe to stage.

mode: str

commit mode, one of commit_modes

parser: str, optional

The parser to use. If None will check all parsers.

Raises#

ValueError

if the given commit mode does not exist

UploadError

if the staged data fails validation (as tested by validate_staging_data)

Returns#

UploadCommitResult

abstract async fast_commit_as_task(df: DataFrame | Path, mode: str, parser: str | None = None) AsyncTask[UploadCommitResult]#
abstract async get_data(columns: list[str] | None = None, *, unique: bool = False, version: int | datetime | None = None) LazyFrame#

Gets the committed data at the given version.

Parameters#

columns: list[str] | None, optional

The columns to get. If None will get all columns.

unique: bool, optional

If True will return a unique set of rows. If False will return all rows.

version: int | dt.datetime | None, optional

The version of the underlying data, latest if not given. By definition version 0 is the empty dataframe with correct schema.

Raises#

KeyError

If the given version does not exist.

Returns#

pl.LazyFrame:

The committed data at the given version.

abstract async get_data_as_task(columns: list[str] | None = None, *, unique: bool = False, version: int | datetime | None = None) AsyncTask[LazyFrame]#
abstract async get_data_summary(version: int | datetime | None = None) DataFrame#

Gets the data summary statistics (single row) at the given version.

Parameters#

version: int | dt.datetime | None, optional

the version of the underlying data, latest if not given

Returns#

pl.DataFrame:

The summary statistics data (single row) for the data at the given version.

abstract async get_data_summary_as_task(version: int | datetime | None = None) AsyncTask[DataFrame]#
abstract async get_data_detail_summary(version: int | datetime | None = None) DataFrame#

Gets the detail data summary statistics (multi row) at the given version.

Parameters#

version: int | dt.datetime | None, optional

the version of the underlying data, latest if not given

Returns#

pl.DataFrame:

The summary statistics data (multi row) for the data at the given version.

abstract async get_data_detail_summary_as_task(version: int | datetime | None = None) AsyncTask[DataFrame]#
abstract async destroy() None#

Deletes the entire dataset including the entire version history of uploads and staged files. Cannot be undone.

abstract async version_history() dict[int, datetime]#

Returns#

dict[int, dt.datetime]:

A dictionary of version numbers and the corresponding timestamps.