bayesline.api.equity.UploaderApi#
- class bayesline.api.equity.UploaderApi#
Bases:
ABC
Provides functionality to parse, stage and upload dataframes to a versioned storage.
Parsing: This uploader contains a set of parsers that can be used to parse raw dataframes into their standardized format.
Staging: The staging functionality allows to stage multiple dataframes for upload. This is done by providing a name and a dataframe for each staged dataframe and is not affecting the ‘committed’ data until the staged dataframes are committed. The staging area is unversioned.
Commit: Allows to commit the staged dataframes to the versioned storage. All committed dataframes will be removed from the staging area. At commit time all staged dataframes will be concatenated. The committed data is one consolidated dataframe.
Versioning: Committed data is versioned. At commit time a ‘commit mode’ can be provided to determine how the committed data is merged with the already committed data (e.g. overwrite, append, upsert, etc.).
Methods
__init__
()can_handle
(df, *[, parser, name])Parameters df: pl.DataFrame The dataframe to check if the parser can handle. parser: str, optional The parser to use. If None will check all parsers and choose the first parser that can handle the dataframe. name: str | None, optional The name of the dataframe (which could be the filename). Some parsers might extract information (such as a date) from the name.
commit
(mode[, names])Commits and clears all staged files identified by given names.
commit_as_task
(mode[, names])destroy
()Deletes the entire dataset including the entire version history of uploads and staged files.
fast_commit
(df, mode[, parser])Directly commits the given dataframe without staging.
fast_commit_as_task
(df, mode[, parser])Returns dict[str, str] available modes (keys) that can be passed to the commit function and their descriptions (values).
get_data
([columns, unique, version])Gets the committed data at the given version.
get_data_as_task
([columns, unique, version])get_data_detail_summary
([version])Gets the detail data summary statistics (multi row) at the given version.
get_data_detail_summary_as_task
([version])get_data_summary
([version])Gets the data summary statistics (single row) at the given version.
get_data_summary_as_task
([version])get_detail_summary_schema
([which])Parameters which: Literal["data", "staging"], optional The type of detail summary statistics to get the schema for. If "data" will get the schema for the committed data. If "staging" will get the schema for the staged data.
get_parser
(parser)Parameters parser: str The parser to obtain.
Returns list[str]: The list of available parsers that can be used to parse files with this uploader.
get_schema
([which])Parameters which: Literal["data", "staging"], optional The type of schema to get. If "data" will get the schema for the committed data. If "staging" will get the schema for the staged data.
get_staging_data
([names, columns, unique])Gets the staging data for the optional list of names.
get_staging_data_as_task
([names, columns, ...])get_staging_data_detail_summary
([names])Get the detail staging summary statistics for the optional list of names.
get_staging_data_summary
([names])Get the staging summary statistics for the optional list of names.
get_staging_data_summary_as_task
([names])get_staging_results
([names])Get the staging results for the optional list of names.
get_summary_schema
([which])Parameters which: Literal["data", "staging"], optional The type of summary statistics to get the schema for. If "data" will get the schema for the committed data. If "staging" will get the schema for the staged data.
stage_df
(name, df[, parser, replace])Stages the dataframe for upload under the given name.
stage_df_as_task
(name, df[, parser, replace])stage_file
(path[, name, parser, replace])Stages the dataframe from the given file for upload under the given name.
stage_file_as_task
(path[, name, parser, replace])validate_staging_data
([names, short])Validates the staging data for the optional list of names and returns a dict of error dataframes keyed by the name of the validation test.
validate_staging_data_as_task
([names, short])Returns dict[int, dt.datetime]: A dictionary of version numbers and the corresponding timestamps.
wipe_staging
([names])Wipe the staging results for the optional list of names.
- abstract get_schema(which: Literal['data', 'staging'] = 'data') dict[str, DataType] #
Parameters#
- which: Literal[“data”, “staging”], optional
The type of schema to get. If “data” will get the schema for the committed data. If “staging” will get the schema for the staged data.
Returns#
- dict[str, pl.DataType]:
The schema of the uploaded files.
- abstract get_summary_schema(which: Literal['data', 'staging'] = 'data') dict[str, DataType] #
Parameters#
- which: Literal[“data”, “staging”], optional
The type of summary statistics to get the schema for. If “data” will get the schema for the committed data. If “staging” will get the schema for the staged data.
Returns#
- dict[str, pl.DataType]:
The schema of the summary statistics.
- abstract get_detail_summary_schema(which: Literal['data', 'staging'] = 'data') dict[str, DataType] #
Parameters#
- which: Literal[“data”, “staging”], optional
The type of detail summary statistics to get the schema for. If “data” will get the schema for the committed data. If “staging” will get the schema for the staged data.
Returns#
- dict[str, pl.DataType]:
The schema of the detail summary statistics.
- abstract get_commit_modes() dict[str, str] #
Returns#
- dict[str, str]
available modes (keys) that can be passed to the commit function and their descriptions (values).
- abstract get_parser_names() list[str] #
Returns#
- list[str]:
The list of available parsers that can be used to parse files with this uploader.
- abstract get_parser(parser: str) UploadParserApi #
Parameters#
- parser: str
The parser to obtain.
Returns#
- UploadParserApi:
The parser.
- abstract can_handle(df: DataFrame, *, parser: str | None = None, name: str | None = None) MultiParserResult #
Parameters#
- df: pl.DataFrame
The dataframe to check if the parser can handle.
- parser: str, optional
The parser to use. If None will check all parsers and choose the first parser that can handle the dataframe.
- name: str | None, optional
The name of the dataframe (which could be the filename). Some parsers might extract information (such as a date) from the name.
Returns#
- MultiParserResult:
The result of the parser check.
- abstract stage_df(name: str, df: DataFrame, parser: str | None = None, replace: bool = False) UploadStagingResult #
Stages the dataframe for upload under the given name.
Parameters#
- name: str
The name of the dataframe to stage.
- df: pl.DataFrame
The dataframe to stage.
- parser: str, optional
The parser to use. If None will check all parsers.
- replace: bool, optional
If True will replace a possible existing staging with the same name. If False a name clash will result in a failed staging result.
Raises#
- UploadError
if the given name already exists
Returns#
UploadStagingResult
- abstract stage_df_as_task(name: str, df: DataFrame, parser: str | None = None, replace: bool = False) Task[UploadStagingResult] #
- abstract stage_file(path: Path, name: str | None = None, parser: str | None = None, replace: bool = False) UploadStagingResult #
Stages the dataframe from the given file for upload under the given name.
Parameters#
- path: Path
File path for the file to stage.
- name: str, optional
The name of the dataframe to stage. If None will use the file name.
- parser: str, optional
The parser to use. If None will check all parsers.
- replace: bool, optional
If True will replace a possible existing staging with the same name. If False a name clash will result in a failed staging result.
Raises#
- UploadError
if the file could not be staged
Returns#
UploadStagingResult
- abstract stage_file_as_task(path: Path, name: str | None = None, parser: str | None = None, replace: bool = False) Task[UploadStagingResult] #
- abstract get_staging_results(names: list[str] | None = None) dict[str, UploadStagingResult] #
Get the staging results for the optional list of names.
Parameters#
- names: list[str] | None, optional
The names of the staging results to get. If None will get all staging results. Missing names will be ignored and won’t be part of the result.
Returns#
- dict[str, UploadStagingResult]:
A dictionary of staging results.
- abstract wipe_staging(names: list[str] | None = None) dict[str, UploadStagingResult] #
Wipe the staging results for the optional list of names.
Parameters#
- names: list[str] | None, optional
The names of the staging results to wipe. If None will wipe all staging results. Names that do not exist will be ignored.
Returns#
- dict[str, UploadStagingResult]:
A dictionary of staging results that were wiped.
- abstract get_staging_data(names: list[str] | None = None, *, columns: list[str] | None = None, unique: bool = False) LazyFrame #
Gets the staging data for the optional list of names.
Parameters#
- names: list[str] | None, optional
The names of the staging data to get. If None will get all staging data. Names that do not exist will be ignored.
- columns: list[str] | None, optional
The columns to get. If None will get all columns.
- unique: bool, optional
If True will return a unique set of rows. If False will return all rows.
Returns#
- pl.LazyFrame:
The staging data with an additional column _name that contains the staging name.
- abstract get_staging_data_as_task(names: list[str] | None = None, *, columns: list[str] | None = None, unique: bool = False) Task[LazyFrame] #
- abstract get_staging_data_summary(names: list[str] | None = None) DataFrame #
Get the staging summary statistics for the optional list of names.
Parameters#
- names: list[str] | None, optional
The names for which to get the staging summary statistics. If None will get all staging data. Names that do not exist will be ignored.
Returns#
- pl.DataFrame:
The staging summary statistics data which will contain one row per staged file. The schema matches get_summary_schema with an additional column _name (at the beginning) of type str which contains the name of the staged file.
- abstract get_staging_data_detail_summary(names: list[str] | None = None) DataFrame #
Get the detail staging summary statistics for the optional list of names.
Parameters#
- names: list[str] | None, optional
The names for which to get the detail staging summary statistics. If None will get all staging data. Names that do not exist will be ignored.
Returns#
- pl.DataFrame:
The detail staging summary statistics data which will contain multiple rows per staged file. The schema matches get_detail_summary_schema with an additional column _name (at the beginning) of type str which contains the name of the staged file.
- abstract validate_staging_data(names: list[str] | None = None, short: bool = False) dict[str, DataFrame] #
Validates the staging data for the optional list of names and returns a dict of error dataframes keyed by the name of the validation test.
Parameters#
- names: list[str] | None, optional
The names for which to carry out the validation. If None will use all staging data. Names that do not exist will be ignored.
- short: bool, optional
If True will return a shorter dataframes which only contain a column _name to indicate the name of the staging file which has validation errors with other staging files. If False will return a dataframe with the _name (as above) and other columns to indicate the validation errors.
Returns#
- dict[str, pl.DataFrame]
Error dataframes for different validation checks. The dataframes will contain a column _name. If short is False then the _name column contains a comma separated list of staging names that are the cause of the respective validation error. If short is True then the _name column will be the only column and each row contains a staging file name that has validation errors.
- abstract validate_staging_data_as_task(names: list[str] | None = None, short: bool = False) Task[dict[str, DataFrame]] #
- abstract commit(mode: str, names: list[str] | None = None) UploadCommitResult #
Commits and clears all staged files identified by given names.
Parameters#
- mode: str
commit mode, one of commit_modes
- names: list[str] | None, optional
The names of the staging results to commit. If None will commit all staging results. Names that do not exist will be ignored.
Raises#
- ValueError
if the given commit mode does not exist
- UploadError
if the staged data fails validation (as tested by validate_staging_data)
Returns#
UploadCommitResult
- abstract fast_commit(df: DataFrame | Path, mode: str, parser: str | None = None) UploadCommitResult #
Directly commits the given dataframe without staging.
Parameters#
- df: pl.DataFrame | Path, optional
The dataframe to stage.
- mode: str
commit mode, one of commit_modes
- parser: str, optional
The parser to use. If None will check all parsers.
Raises#
- ValueError
if the given commit mode does not exist
- UploadError
if the staged data fails validation (as tested by validate_staging_data)
Returns#
UploadCommitResult
- abstract fast_commit_as_task(df: DataFrame | Path, mode: str, parser: str | None = None) Task[UploadCommitResult] #
- abstract get_data(columns: list[str] | None = None, *, unique: bool = False, version: int | datetime | None = None) LazyFrame #
Gets the committed data at the given version.
Parameters#
- columns: list[str] | None, optional
The columns to get. If None will get all columns.
- unique: bool, optional
If True will return a unique set of rows. If False will return all rows.
- version: int | dt.datetime | None, optional
The version of the underlying data, latest if not given. By definition version 0 is the empty dataframe with correct schema.
Raises#
- KeyError
If the given version does not exist.
Returns#
- pl.LazyFrame:
The committed data at the given version.
- abstract get_data_as_task(columns: list[str] | None = None, *, unique: bool = False, version: int | datetime | None = None) Task[LazyFrame] #
- abstract get_data_summary(version: int | datetime | None = None) DataFrame #
Gets the data summary statistics (single row) at the given version.
Parameters#
- version: int | dt.datetime | None, optional
the version of the underlying data, latest if not given
Returns#
- pl.DataFrame:
The summary statistics data (single row) for the data at the given version.
- abstract get_data_detail_summary(version: int | datetime | None = None) DataFrame #
Gets the detail data summary statistics (multi row) at the given version.
Parameters#
- version: int | dt.datetime | None, optional
the version of the underlying data, latest if not given
Returns#
- pl.DataFrame:
The summary statistics data (multi row) for the data at the given version.