API

Unischema

A unischema is a data structure definition which can be rendered as native schema/data-types objects in several different python libraries. Currently supported are pyspark, tensorflow, and numpy.

class petastorm.unischema.UnischemaField[source]

A type used to describe a single field in the schema:

  • name: name of the field.
  • numpy_dtype: a numpy dtype reference
  • shape: shape of the multidimensional array. None value is used to define a dimension with variable number of
    elements. E.g. (None, 3) defines a point cloud with three coordinates but unknown number of points.
  • codec: An instance of a codec object used to encode/decode data during serialization
    (e.g. CompressedImageCodec('png'))
  • nullable: Boolean indicating whether field can be None

A field is considered immutable, so we override both equality and hash operators for consistency and efficiency.

Create new instance of UnischemaField(name, numpy_dtype, shape, codec, nullable)

class petastorm.unischema.Unischema(name, fields)[source]

Describes a schema of a data structure which can be rendered as native schema/data-types objects in several different python libraries. Currently supported are pyspark, tensorflow, and numpy.

Creates an instance of a Unischema object.

Parameters:
  • name – name of the schema
  • fields – a list of UnischemaField instances describing the fields. The order of the fields is not important - they are stored sorted by name internally.
create_schema_view(fields)[source]

Creates a new instance of the schema using a subset of fields.

Fields can be either UnischemaField objects or regular expression patterns.

If one of the fields does not exist in this schema, an error is raised.

The example returns a schema, with field_1 and any other field matching other.*$ pattern.

>>> SomeSchema.create_schema_view(
>>>     [SomeSchema.field_1,
>>>      'other.*$'])
Parameters:fields – A list of UnischemaField objects and/or regular expressions
Returns:a new view of the original schema containing only the supplied fields
fields
as_spark_schema()[source]

Returns an object derived from the unischema as spark schema.

Example:

>>> spark.createDataFrame(dataset_rows,
>>>                       SomeSchema.as_spark_schema())
make_namedtuple(**kargs)[source]

Returns schema as a namedtuple type intialized with arguments passed to this method.

Example:

>>> some_schema.make_namedtuple(field1=10, field2='abc')
make_namedtuple_tf(*args, **kargs)[source]
classmethod from_arrow_schema(parquet_dataset, omit_unsupported_fields=False)[source]

Convert an apache arrow schema into a unischema object. This is useful for datasets of only scalars which need no special encoding/decoding. If there is an unsupported type in the arrow schema, it will throw an exception. When the warn_only parameter is turned to True, unsupported column types prints only warnings.

We do not set codec field in the generated fields since all parquet fields are out-of-the-box supported by pyarrow and we do not need perform any custom decoding.

Parameters:
  • arrow_schemapyarrow.lib.Schema
  • omit_unsupported_fieldsBoolean
Returns:

A Unischema object.

petastorm.unischema.dict_to_spark_row(unischema, row_dict)[source]

Converts a single row into a spark Row object.

Verifies that the data confirms with unischema definition types and encodes the data using the codec specified by the unischema.

The parameters are keywords to allow use of functools.partial.

Parameters:
  • unischema – an instance of Unischema object
  • row_dict – a dictionary where the keys match name of fields in the unischema.
Returns:

a single pyspark.Row object

petastorm.unischema.insert_explicit_nulls(unischema, row_dict)[source]

If input dictionary has missing fields that are nullable, this function will add the missing keys with None value.

If the fields that are missing are not nullable, a ValueError is raised.

Parameters:
  • unischema – An instance of a unischema
  • row_dict – dictionary that would be checked for missing nullable fields. The dictionary is modified inplace.
Returns:

None

petastorm.unischema.match_unischema_fields(schema, field_regex)[source]

Returns a list of UnischemaField objects that match a regular expression.

Parameters:
  • schema – An instance of a Unischema object.
  • field_regex – A list of regular expression patterns. A field is matched if the regular expression matches the entire field name.
Returns:

A list of UnischemaField instances matching at least one of the regular expression patterns given by field_regex.

Reader

petastorm.reader.make_reader(dataset_url, schema_fields=None, reader_pool_type='thread', workers_count=10, pyarrow_serialize=False, results_queue_size=50, shuffle_row_groups=True, shuffle_row_drop_partitions=1, predicate=None, rowgroup_selector=None, num_epochs=1, cur_shard=None, shard_count=None, cache_type='null', cache_location=None, cache_size_limit=None, cache_row_size_estimate=None, cache_extra_settings=None, hdfs_driver='libhdfs3', transform_spec=None)[source]

Creates an instance of Reader for reading Petastorm datasets. A Petastorm dataset is a dataset generated using materialize_dataset() context manager as explained here.

See make_batch_reader() to read from a Parquet store that was not generated using materialize_dataset().

Parameters:
  • dataset_url – an filepath or a url to a parquet directory, e.g. 'hdfs://some_hdfs_cluster/user/yevgeni/parquet8', or 'file:///tmp/mydataset' or 's3://bucket/mydataset'.
  • schema_fields – Can be: a list of unischema fields and/or regex pattern strings; None to read all fields; an NGram object, then it will return an NGram of the specified fields.
  • reader_pool_type – A string denoting the reader pool type. Should be one of [‘thread’, ‘process’, ‘dummy’] denoting a thread pool, process pool, or running everything in the master thread. Defaults to ‘thread’
  • workers_count – An int for the number of workers to use in the reader pool. This only is used for the thread or process pool. Defaults to 10
  • pyarrow_serialize – Whether to use pyarrow for serialization. Currently only applicable to process pool. Defaults to False.
  • results_queue_size – Size of the results queue to store prefetched rows. Currently only applicable to thread reader pool type.
  • shuffle_row_groups – Whether to shuffle row groups (the order in which full row groups are read)
  • shuffle_row_drop_partitions – This is is a positive integer which determines how many partitions to break up a row group into for increased shuffling in exchange for worse performance (extra reads). For example if you specify 2 each row group read will drop half of the rows within every row group and read the remaining rows in separate reads. It is recommended to keep this number below the regular row group size in order to not waste reads which drop all rows.
  • predicate – instance of PredicateBase object to filter rows to be returned by reader. The predicate will be passed a single row and must return a boolean value indicating whether to include it in the results.
  • rowgroup_selector – instance of row group selector object to select row groups to be read
  • num_epochs – An epoch is a single pass over all rows in the dataset. Setting num_epochs to None will result in an infinite number of epochs.
  • cur_shard – An int denoting the current shard number. Each node reading a shard should pass in a unique shard number in the range [0, shard_count). shard_count must be supplied as well. Defaults to None
  • shard_count – An int denoting the number of shards to break this dataset into. Defaults to None
  • cache_type – A string denoting the cache type, if desired. Options are [None, ‘null’, ‘local-disk’] to either have a null/noop cache or a cache implemented using diskcache. Caching is useful when communication to the main data store is either slow or expensive and the local machine has large enough storage to store entire dataset (or a partition of a dataset if shard_count is used). By default will be a null cache.
  • cache_location – A string denoting the location or path of the cache.
  • cache_size_limit – An int specifying the size limit of the cache in bytes
  • cache_row_size_estimate – An int specifying the estimated size of a row in the dataset
  • cache_extra_settings – A dictionary of extra settings to pass to the cache implementation,
  • hdfs_driver – A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are libhdfs (java through JNI) or libhdfs3 (C++)
  • transform_spec – An instance of TransformSpec object defining how a record is transformed after it is loaded and decoded. The transformation occurs on a worker thread/process (depends on the reader_pool_type value).
Returns:

A Reader object

petastorm.reader.make_batch_reader(dataset_url, schema_fields=None, reader_pool_type='thread', workers_count=10, shuffle_row_groups=True, shuffle_row_drop_partitions=1, predicate=None, rowgroup_selector=None, num_epochs=1, cur_shard=None, shard_count=None, cache_type='null', cache_location=None, cache_size_limit=None, cache_row_size_estimate=None, cache_extra_settings=None, hdfs_driver='libhdfs3', transform_spec=None)[source]

Creates an instance of Reader for reading batches out of a non-Petastorm Parquet store.

Currently, only stores having native scalar parquet data types are supported. Use make_reader() to read Petastorm Parquet stores generated with materialize_dataset().

NOTE: only scalar columns are currently supported.

Parameters:
  • dataset_url – an filepath or a url to a parquet directory, e.g. 'hdfs://some_hdfs_cluster/user/yevgeni/parquet8', or 'file:///tmp/mydataset' or 's3://bucket/mydataset'.
  • schema_fields – A list of regex pattern strings. Only columns matching at least one of the patterns in the list will be loaded.
  • reader_pool_type – A string denoting the reader pool type. Should be one of [‘thread’, ‘process’, ‘dummy’] denoting a thread pool, process pool, or running everything in the master thread. Defaults to ‘thread’
  • workers_count – An int for the number of workers to use in the reader pool. This only is used for the thread or process pool. Defaults to 10
  • shuffle_row_groups – Whether to shuffle row groups (the order in which full row groups are read)
  • shuffle_row_drop_partitions – This is is a positive integer which determines how many partitions to break up a row group into for increased shuffling in exchange for worse performance (extra reads). For example if you specify 2 each row group read will drop half of the rows within every row group and read the remaining rows in separate reads. It is recommended to keep this number below the regular row group size in order to not waste reads which drop all rows.
  • predicate – instance of PredicateBase object to filter rows to be returned by reader. The predicate will be passed a pandas DataFrame object and must return a pandas Series with boolean values of matching dimensions.
  • rowgroup_selector – instance of row group selector object to select row groups to be read
  • num_epochs – An epoch is a single pass over all rows in the dataset. Setting num_epochs to None will result in an infinite number of epochs.
  • cur_shard – An int denoting the current shard number. Each node reading a shard should pass in a unique shard number in the range [0, shard_count). shard_count must be supplied as well. Defaults to None
  • shard_count – An int denoting the number of shards to break this dataset into. Defaults to None
  • cache_type – A string denoting the cache type, if desired. Options are [None, ‘null’, ‘local-disk’] to either have a null/noop cache or a cache implemented using diskcache. Caching is useful when communication to the main data store is either slow or expensive and the local machine has large enough storage to store entire dataset (or a partition of a dataset if shard_count is used). By default will be a null cache.
  • cache_location – A string denoting the location or path of the cache.
  • cache_size_limit – An int specifying the size limit of the cache in bytes
  • cache_row_size_estimate – An int specifying the estimated size of a row in the dataset
  • cache_extra_settings – A dictionary of extra settings to pass to the cache implementation,
  • hdfs_driver – A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are libhdfs (java through JNI) or libhdfs3 (C++)
  • transform_spec – An instance of TransformSpec object defining how a record is transformed after it is loaded and decoded. The transformation occurs on a worker thread/process (depends on the reader_pool_type value).
Returns:

A Reader object

class petastorm.reader.Reader(pyarrow_filesystem, dataset_path, schema_fields=None, shuffle_row_groups=True, shuffle_row_drop_partitions=1, predicate=None, rowgroup_selector=None, reader_pool=None, num_epochs=1, cur_shard=None, shard_count=None, cache=None, worker_class=None, transform_spec=None, is_batched_reader=False)[source]

Reads a dataset from a Petastorm dataset.

Variables:last_row_consumed – True if the last row was already returned by the Reader.

Initializes a reader object.

Parameters:
  • pyarrow_filesystem – An instance of pyarrow.FileSystem that will be used. If not specified, then a default one will be selected based on the url (only for hdfs:// or file://; for s3:// support, use make_reader). The default hdfs driver is libhdfs3. If you want to to use libhdfs, use pyarrow_filesystem=pyarrow.hdfs.connect('hdfs:///some/path', driver='libhdfs').
  • dataset_path – filepath to a parquet directory on the specified filesystem. e.g. '/user/yevgeni/parquet8', or '/tmp/mydataset'.
  • schema_fields – Either list of unischema fields to subset, or None to read all fields. OR an NGram object, then it will return an NGram of the specified properties.
  • shuffle_row_groups – Whether to shuffle row groups (the order in which full row groups are read)
  • shuffle_row_drop_partitions – This is is a positive integer which determines how many partitions to break up a row group into for increased shuffling in exchange for worse performance (extra reads). For example if you specify 2 each row group read will drop half of the rows within every row group and read the remaining rows in separate reads. It is recommended to keep this number below the regular row group size in order to not waste reads which drop all rows.
  • predicate – instance of predicate object to filter rows to be returned by reader.
  • rowgroup_selector – instance of row group selector object to select row groups to be read
  • reader_pool – parallelization pool. ThreadPool(10) (10 threads) is used by default. This pool is a custom implementation used to parallelize reading data from the dataset. Any object from workers_pool package can be used (e.g. petastorm.workers_pool.process_pool.ProcessPool).
  • num_epochs – An epoch is a single pass over all rows in the dataset. Setting num_epochs to None will result in an infinite number of epochs.
  • cur_shard – An int denoting the current shard number used. Each reader instance should pass in a unique shard number in the range [0, shard_count). shard_count must be supplied as well. Defaults to None
  • shard_count – An int denoting the number of shard partitions there are. Defaults to None
  • cache – An object conforming to CacheBase interface. Before loading row groups from a parquet file the Reader will attempt to load these values from cache. Caching is useful when communication to the main data store is either slow or expensive and the local machine has large enough storage to store entire dataset (or a partition of a dataset if shards are used). By default, use the NullCache implementation.
  • worker_class – This is the class that will be instantiated on a different thread/process. It’s responsibility is to load and filter the data.
reset()[source]

Resets Reader state and allows to fetch more samples once the Reader finished reading all epochs, as specified by the num_epochs parameter.

Once all samples were read from a reader, an attempt to fetch new sample (e.g. next(reader) would raise StopIterationError. You can reset the reader to the original state and restart reading samples calling reset().

We do not support calling reset() until all samples were consumed. NotImplementedError will be raised if a user attempt to do so.

Calling reset after stop() was called has no effect.

Returns:None
batched_output
stop()[source]

Stops all worker threads/processes.

join()[source]

Joins all worker threads/processes. Will block until all worker workers have been fully terminated.

diagnostics
next()[source]
class petastorm.weighted_sampling_reader.WeightedSamplingReader(readers, probabilities)[source]

Allows to combine outputs of two or more Reader objects, sampling them with a configurable probability. Complies to the same interfaces as Reader, hence WeightedSamplingReader can be used anywhere the Reader can be used.

Creates an instance WeightedSamplingReader.

The constructor gets a list of readers and probabilities as its parameters. The lists must be the same length. WeightedSamplingReader implements an iterator interface. Each time a new element is requested, one of the readers is selected, weighted by the matching probability. An element produced by the selected reader is returned.

The iterator raises StopIteration exception once one of the embedded readers has no more data left.

The following example shows how a WeightedSamplingReader can be instantiated with two readers which are sampled with 10% and 90% probabilities respectively.

>>> from petastorm.weighted_sampling_reader import WeightedSamplingReader
>>> from petastorm.reader import Reader
>>>
>>> with WeightedSamplingReader([Reader('file:///dataset1'), Reader('file:///dataset1')], [0.1, 0.9]) as reader:
>>>     new_sample = next(reader)
Parameters:
  • readers – A list of readers. The length of the list must be the same as the length of the probabilities list.
  • probabilities – A list of probabilities. The length of the list must be the same as the length of readers argument. If the sum of all probability values is not 1.0, it will be automatically normalized.
next()[source]
class petastorm.ngram.NGram(fields, delta_threshold, timestamp_field, timestamp_overlap=True)[source]

Defines an NGram, having certain fields as set by fields, where consecutive items in an NGram are no further apart than the argument delta_threshold (inclusive). The argument timestamp_field indicate which field refers to the timestamp in the data.

The argument fields is a dictionary, where the keys are integers, and the value is an array of the Unischema fields to include at that timestep.

The delta_threshold and timestamp_field defines how far away each item in the NGram will be as described in the rules above.

The following are examples of what NGram will return based on the parameters:

  1. Case 1:
>>> fields = {
>>>  -1: [TestSchema.id, TestSchema.id2, TestSchema.image_png,
>>>       TestSchema.matrix],
>>>   0: [TestSchema.id, TestSchema.id2,
>>>       TestSchema.sensor_name],
>>> }
>>> delta_threshold = 5
>>> timestamp_field = 'id'
  • The data being:
>>> A {'id': 0,  ....}
>>> B {'id': 10, ....}
>>> C {'id': 20, ....}
>>> D {'id': 30, ....}
  • The result will be empty, since delta is 10 (more than allowed delta_threshold of 5)
  1. Case 2:
>>> fields = {
>>>     -1: [TestSchema.id, TestSchema.id2, TestSchema.image_png,
>>>          TestSchema.matrix],
>>>      0: [TestSchema.id, TestSchema.id2,
>>>          TestSchema.sensor_name],
>>> }
>>> delta_threshold = 4
>>> timestamp_field = 'id'
  • The data being:
>>> A {'id': 0, .....}
>>> B {'id': 3, .....}
>>> C {'id': 8, .....}
>>> D {'id': 10, .....}
>>> E {'id': 11, .....}
>>> G {'id': 20, .....}
>>> H {'id': 30, .....}
  • The result will be:
>>> {-1: A, 0: B},
>>> {-1: C, 0: D},
>>> {-1: D, 0: E}
  • Notice how: - (B, C) was skipped since B id is 3 and C id is 8 (difference of 5 >= delta_threshold of 4) - (E, G), (G, H) were also skipped due to the same reason

One caveat to note: All NGrams within the same parquet row group are guaranteed to be returned, but not across different parquet row groups. i.e. if row group 1 has [0, 5], row group 2 has [6, 10] then this will result in (0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (6, 7), (7, 8), (8, 9), (9, 10).

Notice how the (5, 6) was skipped because it is across two different row groups. In order to potentially produce more NGrams, the row group size should be increased (at minimum it needs to be at least as large as the NGram length).

Note: Passing a field argument like {-1: [TestSchema.id], 1: [TestSchema.id]} are allowed, the 0 field will just be empty. Passing a field argument like {1: [TestSchema.id], 0: [TestSchema.id]} is the same as passing a field argument like {0: [TestSchema.id], 1: [TestSchema.id]}

The return type will be a dictionary where the keys are the same as the keys passed to fields and the value of each key will be the item.

Constructor to initialize ngram with fields, delta_threshold and timestamp_field.

Parameters:
  • fields – A dictionary, with consecutive integers as keys and each value is an array of Unischema fields.
  • delta_threshold – The maximum threshold of delta between timestamp_field.
  • timestamp_field – The field that represents the timestamp.
  • timestamp_overlap – Whether timestamps in sequences are allowed to overlap (defaults to True), e.g., If the data consists of consecutive timestamps [{'id': 0}, {'id': 1}, ..., {'id': 5}] and you are asking for NGram of length 3 with timestamp_overlap set to True you will receive NGrams of [{'id': 0}, {'id': 1}, {'id': 2}] and [{'id': 1}, {'id': 2}, {'id': 3}] (in addition to others); however, note that {'id': 1}, and {'id': 2} appear twice. With timestamp_overlap set to False, this would not occur and instead return [{'id': 0}, {'id': 1}, {'id': 2}] and [{'id': 3}, {'id': 4}, {'id': 5}]. There is no overlap of timestamps between NGrams (and each timestamp record should only occur once in the returned data)
length

Returns the ngram length requested. :return: the ngram length.

fields

Returns the ngram fields. :return: The ngram fields.

delta_threshold

The maximum difference between one entry and the following one in timestamp_field. :return: The delta threshold.

resolve_regex_field_names(schema)[source]

Resolve string(s) (regular expression(s)) that were passed into ‘fields’ and ‘timestamp_field’ parameters

get_field_names_at_timestep(timestep)[source]

Return the field names at a certain timestep. :param timestep: The timestep to return the field names at. :return: A list of all the field names at that timestep.

get_schema_at_timestep(schema, timestep)[source]

Returns the schema of the data at a certain timestep. :param schema: The schema of the data, which schema at a certain timestep is a subset of. :param timestep: The timestep to get the schema at. :return: The schema of the data at a certain timestep.

form_ngram(data, schema)[source]

Return all the ngrams as dictated by fields, delta_threshold and timestamp_field. :param data: The data items, which is a list of Unischema items. :return: A dictionary, with keys [0, length - 1]. The value of each key is the corresponding item in the ngram at that position.

make_namedtuple(schema, ngram_as_dicts)[source]

Converts a ngram structure where mapped values are dictionaries to a mapped structure where mapped values are namedtuples.

Example:

>>> { -1 : {'f1': 10, 'f2': 20},
>>>    0 : {'f1': 30},
>>> }

is converted to

>>> { -1 : namedtuple(f1=10, f2=20),
>>>    0 : namedtuple(f1=30),
>>> }
Parameters:
  • schema – schema used for conversion
  • ngram_as_dicts – ngram in the timestamp-to-dict mapping
Returns:

ngram in the timestamp-to-namedtuple mapping

get_field_names_at_all_timesteps()[source]

Returns a list of fields that are present at least in the one of the time offsets

convert_fields(unischema, field_list)[source]

Convert all the fields in field_list into Unischema fields. field_list can contain unischema fields and strings (regular expressions)

Parameters:
  • unischema – Unischema object
  • field_list – A list of unischema fields or strings (regular expressions)
Returns:

list of unischema fields

TransformSpec

class petastorm.transform.TransformSpec(func=None, edit_fields=None, removed_fields=None)[source]

TransformSpec defines a user transformation that is applied to a loaded row on a worker thread/process.

The object defines the function (callable) that perform the transform as well as the schema transform: pre-transform-schema to post-transform-schema.

func argument is a callable which takes a row as its parameter and returns a modified row. edit_fields and removed_fields define mutating operations performed on the original schema that produce a post-transform schema. func return value must comply to this post-transform schema.

Parameters:
  • func – Optional. A callable. The function is called on the worker thread. It takes a dictionary that complies to the input schema and must return a dictionary that complies to a post-transform schema. User may In case the user wants to only remove certain fields, the user may omit this argument and specify only remove_fields argument.
  • edit_fields – Optional. A list of 4-tuples with the following fields: (name, numpy_dtype, shape, is_nullable)
  • removed_fields – Optional[list]. A list of field names that will be removed from the original schema.
petastorm.transform.transform_schema(schema, transform_spec)[source]

Creates a post-transform given a pre-transform schema and a transform_spec with mutation instructions.

Parameters:
  • schema – A pre-transform schema
  • transform_spec – a TransformSpec object with mutation instructions.
Returns:

A post-transform schema

Tensorflow

PyTorch

PySpark & SQL

A set of Spark specific helper functions for the petastorm dataset

petastorm.spark_utils.dataset_as_rdd(dataset_url, spark_session, schema_fields=None, hdfs_driver='libhdfs3')[source]

Retrieve a spark rdd for a given petastorm dataset

Parameters:
  • dataset_url – A string for the dataset url (e.g. hdfs:///path/to/dataset)
  • spark_session – A spark session
  • schema_fields – list of unischema fields to subset, or None to read all fields.
  • hdfs_driver – A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are libhdfs (java through JNI) or libhdfs3 (C++)
Returns:

A rdd of dictionary records from the dataset

Row queries

Predicates for petastorm

class petastorm.predicates.PredicateBase[source]

Base class for row predicates

get_fields()[source]
do_include(values)[source]
class petastorm.predicates.in_set(inclusion_values, predicate_field)[source]

Test if predicate_field value is in inclusion_values set

get_fields()[source]
do_include(values)[source]
class petastorm.predicates.in_intersection(inclusion_values, _predicate_field)[source]

Test if predicate_field list contain at least one value from inclusion_values set

get_fields()[source]
do_include(values)[source]
class petastorm.predicates.in_lambda(predicate_fields, predicate_func, state_arg=None)[source]

Wrap up custom function to be used as a predicate example: in_lambda([‘labels_object_roles’], lambda labels_object_roles : len(labels_object_roles) > 3)

Parameters:
  • predicate_fields – list of fields to be used in predicate
  • predicate_func – predicate function example: lambda labels_object_roles : len(labels_object_roles) > 3
  • state_arg – additional object to keep function state. it will be passed to predicate_func after fields arguments ONLY if it is not None
get_fields()[source]
do_include(values)[source]
class petastorm.predicates.in_negate(predicate)[source]

A predicate used to negate another predicate.

get_fields()[source]
do_include(values)[source]
class petastorm.predicates.in_reduce(predicate_list, reduce_func)[source]

A predicate used to aggregate other predicates using any reduce logical operation.

predicate_list: list of predicates reduce_func: function to aggregate result of all predicates in the list e.g. all() will implements logical ‘And’, any() implements logical ‘Or’

get_fields()[source]
do_include(values)[source]
class petastorm.predicates.in_pseudorandom_split(fraction_list, subset_index, predicate_field)[source]

Split dataset according to a split list based on volume_guid. The split is pseudorandom (can not supply the seed yet), i.e. the split outcome is always the same. Split is performed by hashing volume_guid uniformly to 0:1 range and returning part of full dataset which was hashed in given sub-range

Example:
‘split_list = [0.5, 0.2, 0.3]’ - dataset will be split on three subsets in proportion subset 1: 0.5 of log data subset 2: 0.2 of log data subset 3: 0.3 of log data Note, split is not exact, so avoid small fraction (e.g. 0.001) to avoid empty sets

split_list: a list of log fractions (real numbers in range [0:1]) subset_index: define which subset will be used by the Reader

get_fields()[source]
do_include(values)[source]

Local cache

class petastorm.cache.CacheBase[source]
get(key, fill_cache_func)[source]

Gets an entry from the cache implementation.

If there is a cache miss, fill_cache_func() will be evaluated to get the value.

Parameters:
  • key – A key identifying cache entry
  • fill_cache_func – This function will be evaluated (fill_cache_func()) to populate cache, if no value is present in the cache.
Returns:

A value from cache

class petastorm.cache.NullCache[source]

A pass-through cache implementation: value generating function will be called each.

get(key, fill_cache_func)[source]

Gets an entry from the cache implementation.

If there is a cache miss, fill_cache_func() will be evaluated to get the value.

Parameters:
  • key – A key identifying cache entry
  • fill_cache_func – This function will be evaluated (fill_cache_func()) to populate cache, if no value is present in the cache.
Returns:

A value from cache

class petastorm.local_disk_cache.LocalDiskCache(path, size_limit_bytes, expected_row_size_bytes, shards=6, cleanup=False, **settings)[source]

LocalDiskCache is an adapter to a diskcache implementation.

LocalDiskCache can be used by a petastorm Reader class to temporarily keep parts of the dataset on a local file system storage.

Parameters:
  • path – Path where the dataset cache is being stored.
  • size_limit_bytes – Maximal size of the disk-space to be used by cache. The size of the cache may actually grow somewhat above the size_limit_bytes, so the limit is not very strict.
  • expected_row_size_bytes – Approximate size of a single row. This argument is used to perform a sanity check on the capacity of individual shards.
  • shards – Cache can be sharded. Larger number of shards improve writing parallelism.
  • cleanup – If set to True, cache directory would be removed when cleanup() method is called.
  • settings – these parameters passed-through to the diskcache.Cache class. For details, see: http://www.grantjenks.com/docs/diskcache/tutorial.html#settings
get(key, fill_cache_func)[source]

Gets an entry from the cache implementation.

If there is a cache miss, fill_cache_func() will be evaluated to get the value.

Parameters:
  • key – A key identifying cache entry
  • fill_cache_func – This function will be evaluated (fill_cache_func()) to populate cache, if no value is present in the cache.
Returns:

A value from cache

cleanup()[source]

Codecs

A set of dataframe-column-codecs complements the limited data type variety of spark-/pyarrow-supported datatypes, enabling storage of numpy multidimensional arrays, as well as compressed images, into spark dataframes, and transitively to parquet files.

NOTE: Due to the way unischema is stored alongside dataset (with pickling), changing any of these codecs class names and fields can result in reader breakages.

class petastorm.codecs.DataframeColumnCodec[source]

The abstract base class of codecs.

encode(unischema_field, value)[source]
decode(unischema_field, value)[source]
spark_dtype()[source]

Spark datatype to be used for underlying storage

class petastorm.codecs.CompressedImageCodec(image_codec='png', quality=80)[source]

CompressedImageCodec would compress/encompress images.

Parameters:
  • image_codec – any format string supported by opencv. e.g. png, jpeg
  • quality – used when using jpeg lossy compression
encode(unischema_field, value)[source]

Encodes the image using OpenCV.

decode(unischema_field, value)[source]

Decodes the image using OpenCV.

spark_dtype()[source]

Spark datatype to be used for underlying storage

class petastorm.codecs.NdarrayCodec[source]

Encodes numpy ndarray into, or decodes an ndarray from, a spark dataframe field.

encode(unischema_field, value)[source]
decode(unischema_field, value)[source]
spark_dtype()[source]

Spark datatype to be used for underlying storage

class petastorm.codecs.CompressedNdarrayCodec[source]

Encodes numpy ndarray with compression into a spark dataframe field

encode(unischema_field, value)[source]
decode(unischema_field, value)[source]
spark_dtype()[source]

Spark datatype to be used for underlying storage

class petastorm.codecs.ScalarCodec(spark_type)[source]

Encodes a scalar into a spark dataframe field.

Constructs a codec.

Parameters:spark_type – an instance of a Type object from pyspark.sql.types
encode(unischema_field, value)[source]
decode(unischema_field, value)[source]
spark_dtype()[source]

Spark datatype to be used for underlying storage

Dataset generation

class petastorm.etl.RowGroupIndexerBase[source]

Base class for row group indexers.

index_name

Return unique index name.

column_names

Return list of column(s) reuired to build index.

indexed_values

Return list of values in index

get_row_group_indexes(value_key)[source]

Return row groups for given value in index.

build_index(decoded_rows, piece_index)[source]

index values in given rows.

exception petastorm.etl.dataset_metadata.PetastormMetadataError[source]

Error to specify when the petastorm metadata does not exist, does not contain the necessary information, or is corrupt/invalid.

exception petastorm.etl.dataset_metadata.PetastormMetadataGenerationError[source]

Error to specify when petastorm could not generate metadata properly. This error is usually accompanied with a message to try to regenerate dataset metadata.

petastorm.etl.dataset_metadata.materialize_dataset(spark, dataset_url, schema, row_group_size_mb=None, use_summary_metadata=False, filesystem_factory=None)[source]

A Context Manager which handles all the initialization and finalization necessary to generate metadata for a petastorm dataset. This should be used around your spark logic to materialize a dataset (specifically the writing of parquet output).

Note: Any rowgroup indexing should happen outside the materialize_dataset block

Example:

>>> spark = SparkSession.builder...
>>> ds_url = 'hdfs:///path/to/my/dataset'
>>> with materialize_dataset(spark, ds_url, MyUnischema, 64):
>>>   spark.sparkContext.parallelize(range(0, 10)).
>>>     ...
>>>     .write.parquet(ds_url)
>>> indexer = [SingleFieldIndexer(...)]
>>> build_rowgroup_index(ds_url, spark.sparkContext, indexer)

A user may provide their own instance of pyarrow filesystem object in pyarrow_filesystem argument (otherwise, petastorm will create a default one based on the url).

The following example shows how a custom pyarrow HDFS filesystem, instantiated using libhdfs driver can be used during Petastorm dataset generation:

>>> resolver=FilesystemResolver(dataset_url, spark.sparkContext._jsc.hadoopConfiguration(),
>>>                             hdfs_driver='libhdfs')
>>> with materialize_dataset(..., pyarrow_filesystem=resolver.filesystem()):
>>>     ...
Parameters:
  • spark – The spark session you are using
  • dataset_url – The dataset url to output your dataset to (e.g. hdfs:///path/to/dataset)
  • schema – The petastorm.unischema.Unischema definition of your dataset
  • row_group_size_mb – The parquet row group size to use for your dataset
  • use_summary_metadata – Whether to use the parquet summary metadata for row group indexing or a custom indexing method. The custom indexing method is more scalable for very large datasets.
  • pyarrow_filesystem – A pyarrow filesystem object to be used when saving Petastorm specific metadata to the Parquet store.
petastorm.etl.dataset_metadata.get_schema(dataset)[source]

Retrieves schema object stored as part of dataset methadata.

Parameters:dataset – an instance of pyarrow.parquet.ParquetDataset object
Returns:A petastorm.unischema.Unischema object
petastorm.etl.dataset_metadata.get_schema_from_dataset_url(dataset_url, hdfs_driver='libhdfs3')[source]

Returns a petastorm.unischema.Unischema object loaded from a dataset specified by a url.

Parameters:
  • dataset_url – A dataset URL
  • hdfs_driver – A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are libhdfs (java through JNI) or libhdfs3 (C++)
Returns:

A petastorm.unischema.Unischema object

petastorm.etl.dataset_metadata.infer_or_load_unischema(dataset)[source]

Try to recover Unischema object stored by materialize_dataset function. If it can be loaded, infer Unischema from native Parquet schema

Script to add petastorm metadata to an existing parquet dataset

petastorm.etl.petastorm_generate_metadata.generate_petastorm_metadata(spark, dataset_url, unischema_class=None, use_summary_metadata=False, hdfs_driver='libhdfs3')[source]

Generates metadata necessary to read a petastorm dataset to an existing dataset.

Parameters:
  • spark – spark session
  • dataset_url – url of existing dataset
  • unischema_class – (optional) fully qualified dataset unischema class. If not specified will attempt to find one already in the dataset. (e.g. examples.hello_world.generate_hello_world_dataset.HelloWorldSchema)
  • hdfs_driver – A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are libhdfs (java through JNI) or libhdfs3 (C++)
  • user – String denoting username when connecting to HDFS
petastorm.etl.petastorm_generate_metadata.main()[source]

Row-group selectors

class petastorm.selectors.RowGroupSelectorBase[source]

Base class for row group selectors.

get_index_names()[source]

Return list of indexes required for given selector.

select_row_groups(index_dict)[source]

Return set of row groups which are selected.

class petastorm.selectors.SingleIndexSelector(index_name, values_list)[source]

Generic selector for single field indexer. Select all row groups containing any of given values.

get_index_names()[source]

Return list of indexes required for given selector.

select_row_groups(index_dict)[source]

Return set of row groups which are selected.

class petastorm.selectors.IntersectIndexSelector(single_index_selectors)[source]

Multiple single-field indexers selector. Select row groups containing any of the values in all given selectors.

Parameters:single_index_selectors – List of SingleIndexSelector
get_index_names()[source]

Return list of indexes required for given selector.

select_row_groups(index_dict)[source]

Return set of row groups which are selected.

class petastorm.selectors.UnionIndexSelector(single_index_selectors)[source]

Multiple single-field indexers selector. Select row groups containing any of the values in at least one selector.

Parameters:single_index_selectors – List of SingleIndexSelector
get_index_names()[source]

Return list of indexes required for given selector.

select_row_groups(index_dict)[source]

Return set of row groups which are selected.

class petastorm.etl.rowgroup_indexers.SingleFieldIndexer(index_name, index_field)[source]

Class to index single field in parquet dataset.

This indexer only indexes numpty strings, numpty integers, or numpy arrays of strings.

index_name

Return unique index name.

column_names

Return list of column(s) reuired to build index.

indexed_values

Return list of values in index

get_row_group_indexes(value_key)[source]

Return row groups for given value in index.

build_index(decoded_rows, piece_index)[source]

index values in given rows.

class petastorm.etl.rowgroup_indexers.FieldNotNullIndexer(index_name, index_field)[source]

Class to index ‘Not Null’ condition forsingle field in parquet dataset

index_name

Return unique index name.

column_names

Return list of column(s) reuired to build index.

indexed_values

Return list of values in index

get_row_group_indexes(value_key=None)[source]

Return row groups for given value in index.

build_index(decoded_rows, piece_index)[source]

index values in given rows.

class petastorm.etl.rowgroup_indexing.PieceInfo(piece_index, path, row_group, partition_keys)

Create new instance of PieceInfo(piece_index, path, row_group, partition_keys)

partition_keys

Alias for field number 3

path

Alias for field number 1

piece_index

Alias for field number 0

row_group

Alias for field number 2

petastorm.etl.rowgroup_indexing.build_rowgroup_index(dataset_url, spark_context, indexers, hdfs_driver='libhdfs3')[source]

Build index for given list of fields to use for fast rowgroup selection :param dataset_url: (str) the url for the dataset (or a path if you would like to use the default hdfs config) :param spark_context: (SparkContext) :param indexers: list of objects to build row groups indexes. Should support RowGroupIndexerBase interface :param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are libhdfs (java through JNI) or libhdfs3 (C++) :return: None, upon successful completion the rowgroup predicates will be saved to _metadata file

petastorm.etl.rowgroup_indexing.get_row_group_indexes(dataset)[source]

Extract and return row group indexes from dataset :param dataset: dataset object :return: dataset indexes as dictionary

Benchmarks

HDFS

class petastorm.hdfs.namenode.HdfsNamenodeResolver(hadoop_configuration=None)[source]

This class embodies functionality to resolve HDFS namenodes: per default or a nameservice.

Sets the given HadoopConfiguration object for the resolver; or check for and pull hadoop configuration from an environment variable, in below preferred order to check.

Parameters:hadoop_configuration – an optional HadoopConfiguration
resolve_hdfs_name_service(namespace)[source]

Given the namespace of a name service, resolves the configured list of name nodes, and returns them as a list of URL strings.

Parameters:namespace – the HDFS name service to resolve
Returns:a list of URL strings of the name nodes for the given name service; or None of not properly configured.
resolve_default_hdfs_service()[source]

Resolves the default namenode using the given, or environment-derived, hadoop configuration, by parsing the configuration for fs.defaultFS.

Returns:a tuple of structure (nameservice, list of namenodes)
exception petastorm.hdfs.namenode.HdfsConnectError[source]
exception petastorm.hdfs.namenode.MaxFailoversExceeded(failed_exceptions, max_failover_attempts, func_name)[source]
class petastorm.hdfs.namenode.namenode_failover(func)[source]

This decorator class ensures seamless namenode failover and retry, when an HDFS call fails due to StandbyException, up to a maximum retry.

MAX_FAILOVER_ATTEMPTS = 2
petastorm.hdfs.namenode.failover_all_class_methods(decorator)[source]

This decorator function wraps an entire class to decorate each member method, incl. inherited.

Adapted from https://stackoverflow.com/a/6307868

class petastorm.hdfs.namenode.HAHdfsClient(connector_cls, list_of_namenodes, user=None)[source]

Attempt HDFS connection operation, storing the hdfs object for intercepted calls.

Parameters:
  • connector_cls – HdfsConnector class, so connector logic resides in one place, and also facilitates testing.
  • list_of_namenodes – List of name nodes to failover, cached to enable un-/pickling
  • user – String denoting username when connecting to HDFS. None implies login user.
Returns:

None

cat = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
chmod = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
chown = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
close = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
connect = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
delete = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
df = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
disk_usage = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
download = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
exists = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
get_capacity = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
get_space_used = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
info = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
isdir = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
isfile = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
ls = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
mkdir = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
mv = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
open = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
read_parquet = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
rename = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
rm = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
stat = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
upload = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
walk = functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)
class petastorm.hdfs.namenode.HdfsConnector[source]

HDFS connector class where failover logic is implemented. Facilitates testing.

MAX_NAMENODES = 2
classmethod hdfs_connect_namenode(url, driver='libhdfs3', user=None)[source]

Performs HDFS connect in one place, facilitating easy change of driver and test mocking.

Parameters:
  • url – An parsed URL object to the HDFS end point
  • driver – An optional driver identifier
  • user – String denoting username when connecting to HDFS. None implies login user.
Returns:

Pyarrow HDFS connection object.

classmethod connect_to_either_namenode(list_of_namenodes, user=None)[source]

Returns a wrapper HadoopFileSystem “high-availability client” object that enables name node failover.

Raises a HdfsConnectError if no successful connection can be established.

Parameters:
  • list_of_namenodes – a required list of name node URLs to connect to.
  • user – String denoting username when connecting to HDFS. None implies login user.
Returns:

the wrapped HDFS connection object