bayesline.api.equity.UploaderApi#
- class bayesline.api.equity.UploaderApi#
Provide functionality to parse, stage and upload data to a versioned storage.
Parsing: This uploader contains a set of parsers that can be used to parse raw dataframes into their standardized format.
Staging: The staging functionality allows to stage multiple dataframes for upload. This is done by providing a name and a dataframe for each staged dataframe and is not affecting the ‘committed’ data until the staged dataframes are committed. The staging area is unversioned.
Commit: Allows to commit the staged dataframes to the versioned storage. All committed dataframes will be removed from the staging area. At commit time all staged dataframes will be concatenated. The committed data is one consolidated dataframe.
Versioning: Committed data is versioned. At commit time a ‘commit mode’ can be provided to determine how the committed data is merged with the already committed data (e.g. overwrite, append, upsert, etc.).
- __init__()#
Methods
__init__()can_handle(df, *[, parser, name])Check if the uploader can handle the given dataframe.
commit(mode[, names])Commit and clear all staged files identified by given names.
commit_as_task(mode[, names])destroy()Irreversibly delete the entire dataset.
fast_commit(df, mode[, parser])Directly commit the given dataframe without staging.
fast_commit_as_task(df, mode[, parser])Get the available commit modes and their descriptions.
get_data([columns, filters, unique, head, ...])Get the committed data at the given version.
get_data_as_task([columns, filters, unique, ...])get_data_detail_summary([version])Get the detail data summary statistics (multi row) at the given version.
get_data_detail_summary_as_task([version])get_data_summary([version])Get the data summary statistics (single row) at the given version.
get_data_summary_as_task([version])get_detail_summary_schema([which])Get the detail summary schema for the specified data type.
get_parser(parser)Get a specific parser by name.
Get the list of available parser names.
get_schema([which])Get the schema for the specified data type.
get_staging_data([names, columns, unique])Get the staging data for the optional list of names.
get_staging_data_as_task([names, columns, ...])get_staging_data_detail_summary([names])Get the detail staging summary statistics for the optional list of names.
get_staging_data_summary([names])Get the staging summary statistics for the optional list of names.
get_staging_data_summary_as_task([names])get_staging_results([names])Get the staging results for the optional list of names.
get_summary_schema([which])Get the summary schema for the specified data type.
stage_df(name, df[, parser, replace])Stage a dataframe for upload under the given name.
stage_df_as_task(name, df[, parser, replace])stage_file(path[, name, parser, replace])Stage a file for upload under the given name.
stage_file_as_task(path[, name, parser, replace])validate_staging_data([names, short])Validate the staging data for the optional list of names, and return errors.
validate_staging_data_as_task([names, short])Get the version history of the dataset.
wipe_staging([names])Wipe the staging results for the optional list of names.
- abstract get_schema(which: Literal['data', 'staging'] = 'data') dict[str, DataType]#
Get the schema for the specified data type.
Parameters#
- whichLiteral[“data”, “staging”], default=”data”
The type of schema to get. If “data” will get the schema for the committed data. If “staging” will get the schema for the staged data.
Returns#
- dict[str, pl.DataType]
The schema of the uploaded files.
- abstract get_summary_schema(which: Literal['data', 'staging'] = 'data') dict[str, DataType]#
Get the summary schema for the specified data type.
Parameters#
- whichLiteral[“data”, “staging”], default=”data”
The type of summary statistics to get the schema for. If “data” will get the schema for the committed data. If “staging” will get the schema for the staged data.
Returns#
- dict[str, pl.DataType]
The schema of the summary statistics.
- abstract get_detail_summary_schema(which: Literal['data', 'staging'] = 'data') dict[str, DataType]#
Get the detail summary schema for the specified data type.
Parameters#
- whichLiteral[“data”, “staging”], default=”data”
The type of detail summary statistics to get the schema for. If “data” will get the schema for the committed data. If “staging” will get the schema for the staged data.
Returns#
- dict[str, pl.DataType]
The schema of the detail summary statistics.
- abstract get_commit_modes() dict[str, str]#
Get the available commit modes and their descriptions.
Returns#
- dict[str, str]
Available modes (keys) that can be passed to the commit function and their descriptions (values).
- abstract get_parser_names() list[str]#
Get the list of available parser names.
Returns#
- list[str]
The list of available parsers that can be used to parse files with this uploader.
- abstract get_parser(parser: str) UploadParserApi#
Get a specific parser by name.
Parameters#
- parserstr
The parser to obtain.
Returns#
- UploadParserApi
The parser.
- abstract can_handle(df: DataFrame, *, parser: str | None = None, name: str | None = None) MultiParserResult#
Check if the uploader can handle the given dataframe.
Parameters#
- dfpl.DataFrame
The dataframe to check if the parser can handle.
- parserstr | None, default=None
The parser to use. If None will check all parsers and choose the first parser that can handle the dataframe.
- namestr | None, default=None
The name of the dataframe (which could be the filename). Some parsers might extract information (such as a date) from the name.
Returns#
- MultiParserResult
The result of the parser check.
- abstract stage_df(name: str, df: DataFrame, parser: str | None = None, replace: bool = False) UploadStagingResult#
Stage a dataframe for upload under the given name.
Parameters#
- namestr
The name of the dataframe to stage.
- dfpl.DataFrame
The dataframe to stage.
- parserstr | None, default=None
The parser to use. If None will check all parsers.
- replacebool, default=False
If True will replace a possible existing staging with the same name. If False a name clash will result in a failed staging result.
Raises#
- UploadError
If the given name already exists.
Returns#
- UploadStagingResult
The result of the staging operation.
- abstract stage_df_as_task(name: str, df: DataFrame, parser: str | None = None, replace: bool = False) Task[UploadStagingResult]#
- abstract stage_file(path: Path, name: str | None = None, parser: str | None = None, replace: bool = False) UploadStagingResult#
Stage a file for upload under the given name.
Parameters#
- pathPath
File path for the file to stage.
- namestr | None, default=None
The name of the dataframe to stage. If None will use the file name.
- parserstr | None, default=None
The parser to use. If None will check all parsers.
- replacebool, default=False
If True will replace a possible existing staging with the same name. If False a name clash will result in a failed staging result.
Raises#
- UploadError
If the file could not be staged.
Returns#
- UploadStagingResult
The result of the staging operation.
- abstract stage_file_as_task(path: Path, name: str | None = None, parser: str | None = None, replace: bool = False) Task[UploadStagingResult]#
- abstract get_staging_results(names: list[str] | None = None) dict[str, UploadStagingResult]#
Get the staging results for the optional list of names.
Parameters#
- nameslist[str] | None, default=None
The names of the staging results to get. If None will get all staging results. Missing names will be ignored and won’t be part of the result.
Returns#
- dict[str, UploadStagingResult]
A dictionary of staging results.
- abstract wipe_staging(names: list[str] | None = None) dict[str, UploadStagingResult]#
Wipe the staging results for the optional list of names.
Parameters#
- nameslist[str] | None, default=None
The names of the staging results to wipe. If None will wipe all staging results. Names that do not exist will be ignored.
Returns#
- dict[str, UploadStagingResult]
A dictionary of staging results that were wiped.
- abstract get_staging_data(names: list[str] | None = None, *, columns: list[str] | None = None, unique: bool = False) LazyFrame#
Get the staging data for the optional list of names.
Parameters#
- nameslist[str] | None, default=None
The names of the staging data to get. If None will get all staging data. Names that do not exist will be ignored.
- columnslist[str] | None, default=None
The columns to get. If None will get all columns.
- uniquebool, default=False
If True will return a unique set of rows. If False will return all rows.
Returns#
- pl.LazyFrame
The staging data with an additional column _name that contains the staging name.
- abstract get_staging_data_as_task(names: list[str] | None = None, *, columns: list[str] | None = None, unique: bool = False) Task[LazyFrame]#
- abstract get_staging_data_summary(names: list[str] | None = None) DataFrame#
Get the staging summary statistics for the optional list of names.
Parameters#
- nameslist[str] | None, default=None
The names for which to get the staging summary statistics. If None will get all staging data. Names that do not exist will be ignored.
Returns#
- pl.DataFrame
The staging summary statistics data which will contain one row per staged file. The schema matches get_summary_schema with an additional column _name (at the beginning) of type str which contains the name of the staged file.
- abstract get_staging_data_detail_summary(names: list[str] | None = None) DataFrame#
Get the detail staging summary statistics for the optional list of names.
Parameters#
- nameslist[str] | None, default=None
The names for which to get the detail staging summary statistics. If None will get all staging data. Names that do not exist will be ignored.
Returns#
- pl.DataFrame
The detail staging summary statistics data which will contain multiple rows per staged file. The schema matches get_detail_summary_schema with an additional column _name (at the beginning) of type str which contains the name of the staged file.
- abstract validate_staging_data(names: list[str] | None = None, short: bool = False) dict[str, DataFrame]#
Validate the staging data for the optional list of names, and return errors.
The errors are returned as a dict of error dataframes keyed by the name of the validation test.
Parameters#
- nameslist[str] | None, default=None
The names for which to carry out the validation. If None will use all staging data. Names that do not exist will be ignored.
- shortbool, default=False
If True will return a shorter dataframes which only contain a column _name to indicate the name of the staging file which has validation errors with other staging files. If False will return a dataframe with the _name (as above) and other columns to indicate the validation errors.
Returns#
- dict[str, pl.DataFrame]
Error dataframes for different validation checks. The dataframes will contain a column _name. If short is False then the _name column contains a comma separated list of staging names that are the cause of the respective validation error. If short is True then the _name column will be the only column and each row contains a staging file name that has validation errors.
- abstract validate_staging_data_as_task(names: list[str] | None = None, short: bool = False) Task[dict[str, DataFrame]]#
- abstract commit(mode: str, names: list[str] | None = None) UploadCommitResult#
Commit and clear all staged files identified by given names.
Parameters#
- modestr
Commit mode, one of commit_modes.
- nameslist[str] | None, default=None
The names of the staging results to commit. If None will commit all staging results. Names that do not exist will be ignored.
Raises#
- ValueError
If the given commit mode does not exist.
- UploadError
If the staged data fails validation (as tested by validate_staging_data).
Returns#
- UploadCommitResult
The result of the commit operation.
- abstract fast_commit(df: DataFrame | Path, mode: str, parser: str | None = None) UploadCommitResult#
Directly commit the given dataframe without staging.
Parameters#
- dfpl.DataFrame | Path
The dataframe to commit.
- modestr
Commit mode, one of commit_modes.
- parserstr | None, default=None
The parser to use. If None will check all parsers.
Raises#
- ValueError
If the given commit mode does not exist.
- UploadError
If the staged data fails validation (as tested by validate_staging_data).
Returns#
- UploadCommitResult
The result of the commit operation.
- abstract fast_commit_as_task(df: DataFrame | Path, mode: str, parser: str | None = None) Task[UploadCommitResult]#
- abstract get_data(columns: list[str] | None = None, *, filters: list[tuple[str, str, Any] | Sequence[tuple[str, str, Any]]] | None = None, unique: bool = False, head: int | None = None, version: int | datetime | None = None, download_to: Path | str | None = None, download_filename: str = 'data-{i}.parquet') LazyFrame#
Get the committed data at the given version.
Parameters#
- columnslist[str] | None, default=None
The columns to get. If None will get all columns.
- filtersDNFFilterExpressions | None, default=None
The filters to apply to the data. If None will get all data. Follow the pyarrow filter syntax. https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html
- uniquebool, default=False
If True will return a unique set of rows. If False will return all rows.
- headint | None, default=None
If given will return the first head rows. If None will return all rows.
- versionint | dt.datetime | None, default=None
The version of the underlying data, latest if not given. By definition version 0 is the empty dataframe with correct schema.
- download_toPath | str | None, default=None
If given will download the data to the given path and then return a LazyFrame against that downloaded path. The path must point to a an existing directory. If a string is given then a Path object will be created from it using Path(download_to). If None will consume the data in memory and return a lazy frame.
- download_filenamestr, default=”data-{i}.parquet”
The filename to use for the downloaded data which must contain the placeholder {i} which will be replaced with the index of the file. Only used if download_to is given. If download_to/download_filename already exists then an exception will be raised.
Raises#
- KeyError
If the given version does not exist.
- FileExistsError
If download_to/download_filename already exists.
- FileNotFoundError
If download_to is given and does not exist.
Returns#
- pl.LazyFrame
The committed data at the given version.
- abstract get_data_as_task(columns: list[str] | None = None, *, filters: list[tuple[str, str, Any] | Sequence[tuple[str, str, Any]]] | None = None, unique: bool = False, head: int | None = None, version: int | datetime | None = None, download_to: Path | str | None = None, download_filename: str = 'data-{i}.parquet') Task[LazyFrame]#
- abstract get_data_summary(version: int | datetime | None = None) DataFrame#
Get the data summary statistics (single row) at the given version.
Parameters#
- versionint | dt.datetime | None, default=None
The version of the underlying data, latest if not given.
Returns#
- pl.DataFrame
The summary statistics data (single row) for the data at the given version.
- abstract get_data_detail_summary(version: int | datetime | None = None) DataFrame#
Get the detail data summary statistics (multi row) at the given version.
Parameters#
- versionint | dt.datetime | None, default=None
The version of the underlying data, latest if not given.
Returns#
- pl.DataFrame
The summary statistics data (multi row) for the data at the given version.