data_utils
FileStore
from dara.core.data_utils import FileStore
class FileStore(BaseModel)
Provides a low level data storage API. Acts as a cache Store but for files stored on disk
Attributes
- root_path: str
Methods
root_path
Path to where the data is stored
get_scoped_path
def get_scoped_path(cache_type: CacheType) -> str
Get a path to the sub-store for given cache scope.
Arguments:
cache_type
: cache type to get the scoped path to
Returns:
a path to the sub-store for the given cache type Examples:
from dara.core.data_utils import FileStore
from dara.core.base_definitions import CacheType
file_store = FileStore(root_path='./data_root')
# Note: The following must be executed in an authenticated context
# to take effect, which means inside a variable or action resolver, or a py_component;
# otherwise the scoped path will always be global
file_store.get_scoped_path(CacheType.USER)
# > './data_root/{user_id}'
file_store.get_scoped_path(CacheType.SESSION)
# > './data_root/{session_id}'
file_store.get_scoped_path(CacheType.GLOBAL)
# > './data_root/global'
list_files
def list_files(cache_type: CacheType) -> List[str]
List files in a directory for a given cache type
Arguments:
cache_type
: cache type to get the files for
Returns:
a list of file names or an empty list if the directory for the given cache does not exist
file_exists
def file_exists(cache_type: CacheType, name: str) -> bool
Whether a file with a given name exists
Arguments:
cache_type
: cache type to get the files forname
: name of the file
Returns:
True if the file exists, False otherwise
get_file
def get_file(cache_type: CacheType, name: str) -> Optional[io.BufferedReader]
Get a BufferedReader to read a file from the data store
Arguments:
cache_type
: cache type to get the files forname
: name of the file
Returns:
a BufferedReader to read the file or None if the file does not exist
write_file
def write_file(cache_type: CacheType, name: str) -> io.BufferedWriter
Get a BufferedWriter to write a file to the data store.
Creates the directory for the cache type if it does not exist.
Arguments:
cache_type
: cache type to get the files forname
: name of the file
Returns:
a BufferedWriter to write the file
delete_file
def delete_file(cache_type: CacheType, name: str) -> None
Delete a file from the data store
Arguments:
cache_type
: cache type to get the files forname
: name of the file
DataFactory
from dara.core.data_utils import DataFactory
class DataFactory(BaseModel)
Acts as a factory of variables, actions and methods to interact with data stored locally.
Internally datasets are stored as parquet files.
Attributes
- file_store: FileStore
Methods
__init__
def __init__(root_path: str)
Acts as a factory of variables, actions and methods to interact with data stored locally
Arguments:
root_path
: root path to the directory where data should be stored
list_datasets
def list_datasets(cache: CacheType = CacheType.GLOBAL) -> List[str]
Get a list of datasets (filenames) available for a given cache type
Arguments:
cache
: cache type to get the list of datasets for
Returns:
a list of datasets (filenames) available for a given cache type
list_datasets_var
def list_datasets_var(
cache: Union[CacheType, NonDataVariable] = CacheType.GLOBAL,
polling_interval: Optional[int] = None) -> DerivedVariable[List[str]]
Create a DerivedVariable which stores a list of datasets (filenames) available for a given cache type
Arguments:
cache
: cache type to get the list of datasets forpolling_interval
: optional polling_interval in seconds for the derived variable
write_dataset
def write_dataset(dataset: DataFrame,
name: str,
cache: CacheType = CacheType.GLOBAL) -> None
Write a dataset to disk.
Creates a new one or overwrites an existing one.
Can be used e.g. as a resolver for UploadDropzone or in a SideEffect to create an arbitrary dataset.
Arguments:
dataset
: DataFrame to write to diskname
: name to use for the datasetcache
: cache type to get the list of datasets for Upload example
import pandas
import io
from dara.components import UploadDropzone
from dara.core.data_utils import DataFactory
ds_factory = DataFactory('./data_root')
def resolver(content: bytes, name: str) -> None:
# Assumes uploaded file is csv
file_object_io = io.StringIO(content.decode('utf-8'))
dataset = pandas.read_csv(file_object_io)
ds_factory.write_dataset(dataset, name)
UploadDropzone(resolver=resolver)
Arbitrary creation example
import pandas
import numpy as np
import io
from uuid import uuid4
from dara.components import UploadDropzone
from dara.core.data_utils import DataFactory
from dara.core import SideEffect, Variable
ds_factory = DataFactory('./data_root')
def create_df(ctx: SideEffect.Ctx):
df = pandas.DataFrame(np.random.randint(ctx.extras[0], 100, size=(100, 4)), columns=list('ABCD'))
uid = str(uuid4())
ds_factory.write_dataset(df, f'random_{uid}')
extra_data = Variable(1) # some extra data used to create the DataFrame
SideEffect(resolver=create_df, extras=[extra_data])
get_dataset_path
def get_dataset_path(name: str, cache: CacheType = CacheType.GLOBAL) -> str
Get path to a dataset on disk
Arguments:
name
: name of the datasetcache
: cache type to get dataset for
Returns:
path to the dataset on disk
read_dataset
def read_dataset(name: str,
cache: CacheType = CacheType.GLOBAL) -> Optional[DataFrame]
Read a dataset from disk to a DataFrame.
Arguments:
name
: name of the datasetcache
: cache type to get dataset for
Returns:
DataFrame or None if the dataset does not exist
read_dataset_var
def read_dataset_var(
name: Union[str, NonDataVariable],
cache: Union[CacheType, NonDataVariable] = CacheType.GLOBAL,
polling_interval: Optional[int] = None) -> DerivedDataVariable
Create a DerivedDataVariable which reads a specific dataset from disk
Arguments:
name
: name of the datasetcache
: cache to get the dataset forpolling_interval
: optional polling interval in seconds for the derived variable
delete_dataset
def delete_dataset(name: str, cache: CacheType = CacheType.GLOBAL) -> None
Delete a dataset from disk
Arguments:
name
: name of the datasetcache
: cache to remove the dataset for
delete_dataset_action
def delete_dataset_action(name: Union[str, NonDataVariable],
cache: Union[CacheType,
NonDataVariable] = CacheType.GLOBAL)
Get a SideEffect action which deletes a given dataset
Arguments:
name
: name of the datasetcache
: cache to remove the daatset for
download_dataset_action
def download_dataset_action(name: Union[str, NonDataVariable],
cache: Union[CacheType,
NonDataVariable] = CacheType.GLOBAL)
Get a DownloadContent action which downloads a dataset with a given name as a .csv
Arguments:
name
: name of the dataset to downloadcache
: cache to download dataset for