Skip to main content

data_utils

FileStore

from dara.core.data_utils import FileStore

class FileStore(BaseModel)

Provides a low level data storage API. Acts as a cache Store but for files stored on disk

Attributes

  • root_path: str

Methods

root_path

Path to where the data is stored

get_scoped_path

def get_scoped_path(cache_type: CacheType) -> str

Get a path to the sub-store for given cache scope.

Arguments:

  • cache_type: cache type to get the scoped path to

Returns:

a path to the sub-store for the given cache type Examples:


from dara.core.data_utils import FileStore
from dara.core.base_definitions import CacheType

file_store = FileStore(root_path='./data_root')

# Note: The following must be executed in an authenticated context
# to take effect, which means inside a variable or action resolver, or a py_component;
# otherwise the scoped path will always be global
file_store.get_scoped_path(CacheType.USER)
# > './data_root/{user_id}'
file_store.get_scoped_path(CacheType.SESSION)
# > './data_root/{session_id}'
file_store.get_scoped_path(CacheType.GLOBAL)
# > './data_root/global'

list_files

def list_files(cache_type: CacheType) -> List[str]

List files in a directory for a given cache type

Arguments:

  • cache_type: cache type to get the files for

Returns:

a list of file names or an empty list if the directory for the given cache does not exist

file_exists

def file_exists(cache_type: CacheType, name: str) -> bool

Whether a file with a given name exists

Arguments:

  • cache_type: cache type to get the files for
  • name: name of the file

Returns:

True if the file exists, False otherwise

get_file

def get_file(cache_type: CacheType, name: str) -> Optional[io.BufferedReader]

Get a BufferedReader to read a file from the data store

Arguments:

  • cache_type: cache type to get the files for
  • name: name of the file

Returns:

a BufferedReader to read the file or None if the file does not exist

write_file

def write_file(cache_type: CacheType, name: str) -> io.BufferedWriter

Get a BufferedWriter to write a file to the data store.

Creates the directory for the cache type if it does not exist.

Arguments:

  • cache_type: cache type to get the files for
  • name: name of the file

Returns:

a BufferedWriter to write the file

delete_file

def delete_file(cache_type: CacheType, name: str) -> None

Delete a file from the data store

Arguments:

  • cache_type: cache type to get the files for
  • name: name of the file

DataFactory

from dara.core.data_utils import DataFactory

class DataFactory(BaseModel)

Acts as a factory of variables, actions and methods to interact with data stored locally.

Internally datasets are stored as parquet files.

Attributes

  • file_store: FileStore

Methods

__init__

def __init__(root_path: str)

Acts as a factory of variables, actions and methods to interact with data stored locally

Arguments:

  • root_path: root path to the directory where data should be stored

list_datasets

def list_datasets(cache: CacheType = CacheType.GLOBAL) -> List[str]

Get a list of datasets (filenames) available for a given cache type

Arguments:

  • cache: cache type to get the list of datasets for

Returns:

a list of datasets (filenames) available for a given cache type

list_datasets_var

def list_datasets_var(
cache: Union[CacheType, NonDataVariable] = CacheType.GLOBAL,
polling_interval: Optional[int] = None) -> DerivedVariable[List[str]]

Create a DerivedVariable which stores a list of datasets (filenames) available for a given cache type

Arguments:

  • cache: cache type to get the list of datasets for
  • polling_interval: optional polling_interval in seconds for the derived variable

write_dataset

def write_dataset(dataset: DataFrame,
name: str,
cache: CacheType = CacheType.GLOBAL) -> None

Write a dataset to disk.

Creates a new one or overwrites an existing one.

Can be used e.g. as a resolver for UploadDropzone or in a SideEffect to create an arbitrary dataset.

Arguments:

  • dataset: DataFrame to write to disk
  • name: name to use for the dataset
  • cache: cache type to get the list of datasets for Upload example

import pandas
import io
from dara.components import UploadDropzone
from dara.core.data_utils import DataFactory

ds_factory = DataFactory('./data_root')

def resolver(content: bytes, name: str) -> None:
# Assumes uploaded file is csv
file_object_io = io.StringIO(content.decode('utf-8'))
dataset = pandas.read_csv(file_object_io)
ds_factory.write_dataset(dataset, name)

UploadDropzone(resolver=resolver)

Arbitrary creation example


import pandas
import numpy as np
import io
from uuid import uuid4
from dara.components import UploadDropzone
from dara.core.data_utils import DataFactory
from dara.core import SideEffect, Variable

ds_factory = DataFactory('./data_root')

def create_df(ctx: SideEffect.Ctx):
df = pandas.DataFrame(np.random.randint(ctx.extras[0], 100, size=(100, 4)), columns=list('ABCD'))
uid = str(uuid4())
ds_factory.write_dataset(df, f'random_{uid}')

extra_data = Variable(1) # some extra data used to create the DataFrame
SideEffect(resolver=create_df, extras=[extra_data])

get_dataset_path

def get_dataset_path(name: str, cache: CacheType = CacheType.GLOBAL) -> str

Get path to a dataset on disk

Arguments:

  • name: name of the dataset
  • cache: cache type to get dataset for

Returns:

path to the dataset on disk

read_dataset

def read_dataset(name: str,
cache: CacheType = CacheType.GLOBAL) -> Optional[DataFrame]

Read a dataset from disk to a DataFrame.

Arguments:

  • name: name of the dataset
  • cache: cache type to get dataset for

Returns:

DataFrame or None if the dataset does not exist

read_dataset_var

def read_dataset_var(
name: Union[str, NonDataVariable],
cache: Union[CacheType, NonDataVariable] = CacheType.GLOBAL,
polling_interval: Optional[int] = None) -> DerivedDataVariable

Create a DerivedDataVariable which reads a specific dataset from disk

Arguments:

  • name: name of the dataset
  • cache: cache to get the dataset for
  • polling_interval: optional polling interval in seconds for the derived variable

delete_dataset

def delete_dataset(name: str, cache: CacheType = CacheType.GLOBAL) -> None

Delete a dataset from disk

Arguments:

  • name: name of the dataset
  • cache: cache to remove the dataset for

delete_dataset_action

def delete_dataset_action(name: Union[str, NonDataVariable],
cache: Union[CacheType,
NonDataVariable] = CacheType.GLOBAL)

Get a SideEffect action which deletes a given dataset

Arguments:

  • name: name of the dataset
  • cache: cache to remove the daatset for

download_dataset_action

def download_dataset_action(name: Union[str, NonDataVariable],
cache: Union[CacheType,
NonDataVariable] = CacheType.GLOBAL)

Get a DownloadContent action which downloads a dataset with a given name as a .csv

Arguments:

  • name: name of the dataset to download
  • cache: cache to download dataset for