API¶
Unischema¶
A unischema
is a data structure definition which can be rendered as native schema/data-types objects
in several different python libraries. Currently supported are pyspark, tensorflow, and numpy.
-
class
petastorm.unischema.
UnischemaField
[source]¶ A type used to describe a single field in the schema:
- name: name of the field.
- numpy_dtype: a numpy
dtype
reference - shape: shape of the multidimensional array. None value is used to define a dimension with variable number of
- elements. E.g.
(None, 3)
defines a point cloud with three coordinates but unknown number of points.
- codec: An instance of a codec object used to encode/decode data during serialization
- (e.g.
CompressedImageCodec('png')
)
- nullable: Boolean indicating whether field can be None
A field is considered immutable, so we override both equality and hash operators for consistency and efficiency.
Create new instance of UnischemaField(name, numpy_dtype, shape, codec, nullable)
-
class
petastorm.unischema.
Unischema
(name, fields)[source]¶ Describes a schema of a data structure which can be rendered as native schema/data-types objects in several different python libraries. Currently supported are pyspark, tensorflow, and numpy.
Creates an instance of a Unischema object.
Parameters: - name – name of the schema
- fields – a list of
UnischemaField
instances describing the fields. The order of the fields is not important - they are stored sorted by name internally.
-
create_schema_view
(fields)[source]¶ Creates a new instance of the schema using a subset of fields.
Fields can be either UnischemaField objects or regular expression patterns.
If one of the fields does not exist in this schema, an error is raised.
The example returns a schema, with field_1 and any other field matching
other.*$
pattern.>>> SomeSchema.create_schema_view( >>> [SomeSchema.field_1, >>> 'other.*$'])
Parameters: fields – A list of UnischemaField objects and/or regular expressions Returns: a new view of the original schema containing only the supplied fields
-
fields
¶
-
as_spark_schema
()[source]¶ Returns an object derived from the unischema as spark schema.
Example:
>>> spark.createDataFrame(dataset_rows, >>> SomeSchema.as_spark_schema())
-
make_namedtuple
(**kargs)[source]¶ Returns schema as a namedtuple type intialized with arguments passed to this method.
Example:
>>> some_schema.make_namedtuple(field1=10, field2='abc')
-
classmethod
from_arrow_schema
(parquet_dataset, omit_unsupported_fields=False)[source]¶ Convert an apache arrow schema into a unischema object. This is useful for datasets of only scalars which need no special encoding/decoding. If there is an unsupported type in the arrow schema, it will throw an exception. When the warn_only parameter is turned to True, unsupported column types prints only warnings.
We do not set codec field in the generated fields since all parquet fields are out-of-the-box supported by pyarrow and we do not need perform any custom decoding.
Parameters: - arrow_schema –
pyarrow.lib.Schema
- omit_unsupported_fields –
Boolean
Returns: A
Unischema
object.- arrow_schema –
-
petastorm.unischema.
dict_to_spark_row
(unischema, row_dict)[source]¶ Converts a single row into a spark Row object.
Verifies that the data confirms with unischema definition types and encodes the data using the codec specified by the unischema.
The parameters are keywords to allow use of functools.partial.
Parameters: - unischema – an instance of Unischema object
- row_dict – a dictionary where the keys match name of fields in the unischema.
Returns: a single pyspark.Row object
-
petastorm.unischema.
insert_explicit_nulls
(unischema, row_dict)[source]¶ If input dictionary has missing fields that are nullable, this function will add the missing keys with None value.
If the fields that are missing are not nullable, a
ValueError
is raised.Parameters: - unischema – An instance of a unischema
- row_dict – dictionary that would be checked for missing nullable fields. The dictionary is modified inplace.
Returns: None
-
petastorm.unischema.
match_unischema_fields
(schema, field_regex)[source]¶ Returns a list of
UnischemaField
objects that match a regular expression.Parameters: - schema – An instance of a
Unischema
object. - field_regex – A list of regular expression patterns. A field is matched if the regular expression matches the entire field name.
Returns: A list of
UnischemaField
instances matching at least one of the regular expression patterns given byfield_regex
.- schema – An instance of a
Reader¶
-
petastorm.reader.
make_reader
(dataset_url, schema_fields=None, reader_pool_type='thread', workers_count=10, pyarrow_serialize=False, results_queue_size=50, shuffle_row_groups=True, shuffle_row_drop_partitions=1, predicate=None, rowgroup_selector=None, num_epochs=1, cur_shard=None, shard_count=None, cache_type='null', cache_location=None, cache_size_limit=None, cache_row_size_estimate=None, cache_extra_settings=None, hdfs_driver='libhdfs3', transform_spec=None)[source]¶ Creates an instance of Reader for reading Petastorm datasets. A Petastorm dataset is a dataset generated using
materialize_dataset()
context manager as explained here.See
make_batch_reader()
to read from a Parquet store that was not generated usingmaterialize_dataset()
.Parameters: - dataset_url – an filepath or a url to a parquet directory,
e.g.
'hdfs://some_hdfs_cluster/user/yevgeni/parquet8'
, or'file:///tmp/mydataset'
or's3://bucket/mydataset'
. - schema_fields – Can be: a list of unischema fields and/or regex pattern strings;
None
to read all fields; an NGram object, then it will return an NGram of the specified fields. - reader_pool_type – A string denoting the reader pool type. Should be one of [‘thread’, ‘process’, ‘dummy’] denoting a thread pool, process pool, or running everything in the master thread. Defaults to ‘thread’
- workers_count – An int for the number of workers to use in the reader pool. This only is used for the thread or process pool. Defaults to 10
- pyarrow_serialize – Whether to use pyarrow for serialization. Currently only applicable to process pool. Defaults to False.
- results_queue_size – Size of the results queue to store prefetched rows. Currently only applicable to thread reader pool type.
- shuffle_row_groups – Whether to shuffle row groups (the order in which full row groups are read)
- shuffle_row_drop_partitions – This is is a positive integer which determines how many partitions to break up a row group into for increased shuffling in exchange for worse performance (extra reads). For example if you specify 2 each row group read will drop half of the rows within every row group and read the remaining rows in separate reads. It is recommended to keep this number below the regular row group size in order to not waste reads which drop all rows.
- predicate – instance of
PredicateBase
object to filter rows to be returned by reader. The predicate will be passed a single row and must return a boolean value indicating whether to include it in the results. - rowgroup_selector – instance of row group selector object to select row groups to be read
- num_epochs – An epoch is a single pass over all rows in the dataset. Setting
num_epochs
toNone
will result in an infinite number of epochs. - cur_shard – An int denoting the current shard number. Each node reading a shard should pass in a unique shard number in the range [0, shard_count). shard_count must be supplied as well. Defaults to None
- shard_count – An int denoting the number of shards to break this dataset into. Defaults to None
- cache_type – A string denoting the cache type, if desired. Options are [None, ‘null’, ‘local-disk’] to either have a null/noop cache or a cache implemented using diskcache. Caching is useful when communication to the main data store is either slow or expensive and the local machine has large enough storage to store entire dataset (or a partition of a dataset if shard_count is used). By default will be a null cache.
- cache_location – A string denoting the location or path of the cache.
- cache_size_limit – An int specifying the size limit of the cache in bytes
- cache_row_size_estimate – An int specifying the estimated size of a row in the dataset
- cache_extra_settings – A dictionary of extra settings to pass to the cache implementation,
- hdfs_driver – A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are libhdfs (java through JNI) or libhdfs3 (C++)
- transform_spec – An instance of
TransformSpec
object defining how a record is transformed after it is loaded and decoded. The transformation occurs on a worker thread/process (depends on thereader_pool_type
value).
Returns: A
Reader
object- dataset_url – an filepath or a url to a parquet directory,
e.g.
-
petastorm.reader.
make_batch_reader
(dataset_url, schema_fields=None, reader_pool_type='thread', workers_count=10, shuffle_row_groups=True, shuffle_row_drop_partitions=1, predicate=None, rowgroup_selector=None, num_epochs=1, cur_shard=None, shard_count=None, cache_type='null', cache_location=None, cache_size_limit=None, cache_row_size_estimate=None, cache_extra_settings=None, hdfs_driver='libhdfs3', transform_spec=None)[source]¶ Creates an instance of Reader for reading batches out of a non-Petastorm Parquet store.
Currently, only stores having native scalar parquet data types are supported. Use
make_reader()
to read Petastorm Parquet stores generated withmaterialize_dataset()
.NOTE: only scalar columns are currently supported.
Parameters: - dataset_url – an filepath or a url to a parquet directory,
e.g.
'hdfs://some_hdfs_cluster/user/yevgeni/parquet8'
, or'file:///tmp/mydataset'
or's3://bucket/mydataset'
. - schema_fields – A list of regex pattern strings. Only columns matching at least one of the patterns in the list will be loaded.
- reader_pool_type – A string denoting the reader pool type. Should be one of [‘thread’, ‘process’, ‘dummy’] denoting a thread pool, process pool, or running everything in the master thread. Defaults to ‘thread’
- workers_count – An int for the number of workers to use in the reader pool. This only is used for the thread or process pool. Defaults to 10
- shuffle_row_groups – Whether to shuffle row groups (the order in which full row groups are read)
- shuffle_row_drop_partitions – This is is a positive integer which determines how many partitions to break up a row group into for increased shuffling in exchange for worse performance (extra reads). For example if you specify 2 each row group read will drop half of the rows within every row group and read the remaining rows in separate reads. It is recommended to keep this number below the regular row group size in order to not waste reads which drop all rows.
- predicate – instance of
PredicateBase
object to filter rows to be returned by reader. The predicate will be passed a pandas DataFrame object and must return a pandas Series with boolean values of matching dimensions. - rowgroup_selector – instance of row group selector object to select row groups to be read
- num_epochs – An epoch is a single pass over all rows in the dataset. Setting
num_epochs
toNone
will result in an infinite number of epochs. - cur_shard – An int denoting the current shard number. Each node reading a shard should pass in a unique shard number in the range [0, shard_count). shard_count must be supplied as well. Defaults to None
- shard_count – An int denoting the number of shards to break this dataset into. Defaults to None
- cache_type – A string denoting the cache type, if desired. Options are [None, ‘null’, ‘local-disk’] to either have a null/noop cache or a cache implemented using diskcache. Caching is useful when communication to the main data store is either slow or expensive and the local machine has large enough storage to store entire dataset (or a partition of a dataset if shard_count is used). By default will be a null cache.
- cache_location – A string denoting the location or path of the cache.
- cache_size_limit – An int specifying the size limit of the cache in bytes
- cache_row_size_estimate – An int specifying the estimated size of a row in the dataset
- cache_extra_settings – A dictionary of extra settings to pass to the cache implementation,
- hdfs_driver – A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are libhdfs (java through JNI) or libhdfs3 (C++)
- transform_spec – An instance of
TransformSpec
object defining how a record is transformed after it is loaded and decoded. The transformation occurs on a worker thread/process (depends on thereader_pool_type
value).
Returns: A
Reader
object- dataset_url – an filepath or a url to a parquet directory,
e.g.
-
class
petastorm.reader.
Reader
(pyarrow_filesystem, dataset_path, schema_fields=None, shuffle_row_groups=True, shuffle_row_drop_partitions=1, predicate=None, rowgroup_selector=None, reader_pool=None, num_epochs=1, cur_shard=None, shard_count=None, cache=None, worker_class=None, transform_spec=None, is_batched_reader=False)[source]¶ Reads a dataset from a Petastorm dataset.
Variables: last_row_consumed – True if the last row was already returned by the Reader. Initializes a reader object.
Parameters: - pyarrow_filesystem – An instance of
pyarrow.FileSystem
that will be used. If not specified, then a default one will be selected based on the url (only forhdfs://
orfile://
; fors3://
support, usemake_reader
). The default hdfs driver islibhdfs3
. If you want to to uselibhdfs
, usepyarrow_filesystem=pyarrow.hdfs.connect('hdfs:///some/path', driver='libhdfs')
. - dataset_path – filepath to a parquet directory on the specified filesystem.
e.g.
'/user/yevgeni/parquet8'
, or'/tmp/mydataset'
. - schema_fields – Either list of unischema fields to subset, or
None
to read all fields. OR an NGram object, then it will return an NGram of the specified properties. - shuffle_row_groups – Whether to shuffle row groups (the order in which full row groups are read)
- shuffle_row_drop_partitions – This is is a positive integer which determines how many partitions to break up a row group into for increased shuffling in exchange for worse performance (extra reads). For example if you specify 2 each row group read will drop half of the rows within every row group and read the remaining rows in separate reads. It is recommended to keep this number below the regular row group size in order to not waste reads which drop all rows.
- predicate – instance of predicate object to filter rows to be returned by reader.
- rowgroup_selector – instance of row group selector object to select row groups to be read
- reader_pool – parallelization pool.
ThreadPool(10)
(10 threads) is used by default. This pool is a custom implementation used to parallelize reading data from the dataset. Any object from workers_pool package can be used (e.g.petastorm.workers_pool.process_pool.ProcessPool
). - num_epochs – An epoch is a single pass over all rows in the dataset. Setting
num_epochs
toNone
will result in an infinite number of epochs. - cur_shard – An int denoting the current shard number used. Each reader instance should
pass in a unique shard number in the range
[0, shard_count)
.shard_count
must be supplied as well. Defaults to None - shard_count – An int denoting the number of shard partitions there are. Defaults to None
- cache – An object conforming to
CacheBase
interface. Before loading row groups from a parquet file the Reader will attempt to load these values from cache. Caching is useful when communication to the main data store is either slow or expensive and the local machine has large enough storage to store entire dataset (or a partition of a dataset if shards are used). By default, use theNullCache
implementation. - worker_class – This is the class that will be instantiated on a different thread/process. It’s responsibility is to load and filter the data.
-
reset
()[source]¶ Resets
Reader
state and allows to fetch more samples once theReader
finished reading all epochs, as specified by thenum_epochs
parameter.Once all samples were read from a reader, an attempt to fetch new sample (e.g.
next(reader)
would raiseStopIterationError
. You can reset the reader to the original state and restart reading samples callingreset()
.We do not support calling
reset()
until all samples were consumed.NotImplementedError
will be raised if a user attempt to do so.Calling reset after
stop()
was called has no effect.Returns: None
-
batched_output
¶
-
join
()[source]¶ Joins all worker threads/processes. Will block until all worker workers have been fully terminated.
-
diagnostics
¶
- pyarrow_filesystem – An instance of
-
class
petastorm.weighted_sampling_reader.
WeightedSamplingReader
(readers, probabilities)[source]¶ Allows to combine outputs of two or more Reader objects, sampling them with a configurable probability. Complies to the same interfaces as
Reader
, henceWeightedSamplingReader
can be used anywhere theReader
can be used.Creates an instance WeightedSamplingReader.
The constructor gets a list of readers and probabilities as its parameters. The lists must be the same length.
WeightedSamplingReader
implements an iterator interface. Each time a new element is requested, one of the readers is selected, weighted by the matching probability. An element produced by the selected reader is returned.The iterator raises StopIteration exception once one of the embedded readers has no more data left.
The following example shows how a
WeightedSamplingReader
can be instantiated with two readers which are sampled with 10% and 90% probabilities respectively.>>> from petastorm.weighted_sampling_reader import WeightedSamplingReader >>> from petastorm.reader import Reader >>> >>> with WeightedSamplingReader([Reader('file:///dataset1'), Reader('file:///dataset1')], [0.1, 0.9]) as reader: >>> new_sample = next(reader)
Parameters: - readers – A list of readers. The length of the list must be the same as the length of the
probabilities
list. - probabilities – A list of probabilities. The length of the list must be the same as the length
of
readers
argument. If the sum of all probability values is not 1.0, it will be automatically normalized.
- readers – A list of readers. The length of the list must be the same as the length of the
-
class
petastorm.ngram.
NGram
(fields, delta_threshold, timestamp_field, timestamp_overlap=True)[source]¶ Defines an NGram, having certain fields as set by fields, where consecutive items in an NGram are no further apart than the argument delta_threshold (inclusive). The argument timestamp_field indicate which field refers to the timestamp in the data.
The argument fields is a dictionary, where the keys are integers, and the value is an array of the Unischema fields to include at that timestep.
The delta_threshold and timestamp_field defines how far away each item in the NGram will be as described in the rules above.
The following are examples of what NGram will return based on the parameters:
- Case 1:
>>> fields = { >>> -1: [TestSchema.id, TestSchema.id2, TestSchema.image_png, >>> TestSchema.matrix], >>> 0: [TestSchema.id, TestSchema.id2, >>> TestSchema.sensor_name], >>> } >>> delta_threshold = 5 >>> timestamp_field = 'id'
- The data being:
>>> A {'id': 0, ....} >>> B {'id': 10, ....} >>> C {'id': 20, ....} >>> D {'id': 30, ....}
- The result will be empty, since delta is 10 (more than allowed delta_threshold of 5)
- Case 2:
>>> fields = { >>> -1: [TestSchema.id, TestSchema.id2, TestSchema.image_png, >>> TestSchema.matrix], >>> 0: [TestSchema.id, TestSchema.id2, >>> TestSchema.sensor_name], >>> } >>> delta_threshold = 4 >>> timestamp_field = 'id'
- The data being:
>>> A {'id': 0, .....} >>> B {'id': 3, .....} >>> C {'id': 8, .....} >>> D {'id': 10, .....} >>> E {'id': 11, .....} >>> G {'id': 20, .....} >>> H {'id': 30, .....}
- The result will be:
>>> {-1: A, 0: B}, >>> {-1: C, 0: D}, >>> {-1: D, 0: E}
- Notice how:
-
(B, C)
was skipped since Bid
is 3 and Cid
is 8 (difference of 5>= delta_threshold
of 4) -(E, G), (G, H)
were also skipped due to the same reason
One caveat to note: All NGrams within the same parquet row group are guaranteed to be returned, but not across different parquet row groups. i.e. if row group 1 has
[0, 5]
, row group 2 has[6, 10]
then this will result in(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (6, 7), (7, 8), (8, 9), (9, 10)
.Notice how the
(5, 6)
was skipped because it is across two different row groups. In order to potentially produce more NGrams, the row group size should be increased (at minimum it needs to be at least as large as the NGram length).Note: Passing a field argument like
{-1: [TestSchema.id], 1: [TestSchema.id]}
are allowed, the 0 field will just be empty. Passing a field argument like{1: [TestSchema.id], 0: [TestSchema.id]}
is the same as passing a field argument like{0: [TestSchema.id], 1: [TestSchema.id]}
The return type will be a dictionary where the keys are the same as the keys passed to fields and the value of each key will be the item.
Constructor to initialize ngram with fields, delta_threshold and timestamp_field.
Parameters: - fields – A dictionary, with consecutive integers as keys and each value is an array of Unischema fields.
- delta_threshold – The maximum threshold of delta between timestamp_field.
- timestamp_field – The field that represents the timestamp.
- timestamp_overlap – Whether timestamps in sequences are allowed to overlap (defaults to True),
e.g., If the data consists of consecutive timestamps
[{'id': 0}, {'id': 1}, ..., {'id': 5}]
and you are asking for NGram of length 3 with timestamp_overlap set to True you will receive NGrams of[{'id': 0}, {'id': 1}, {'id': 2}]
and[{'id': 1}, {'id': 2}, {'id': 3}]
(in addition to others); however, note that{'id': 1}
, and{'id': 2}
appear twice. With timestamp_overlap set to False, this would not occur and instead return[{'id': 0}, {'id': 1}, {'id': 2}]
and[{'id': 3}, {'id': 4}, {'id': 5}]
. There is no overlap of timestamps between NGrams (and each timestamp record should only occur once in the returned data)
-
length
¶ Returns the ngram length requested. :return: the ngram length.
-
fields
¶ Returns the ngram fields. :return: The ngram fields.
-
delta_threshold
¶ The maximum difference between one entry and the following one in timestamp_field. :return: The delta threshold.
-
resolve_regex_field_names
(schema)[source]¶ Resolve string(s) (regular expression(s)) that were passed into ‘fields’ and ‘timestamp_field’ parameters
-
get_field_names_at_timestep
(timestep)[source]¶ Return the field names at a certain timestep. :param timestep: The timestep to return the field names at. :return: A list of all the field names at that timestep.
-
get_schema_at_timestep
(schema, timestep)[source]¶ Returns the schema of the data at a certain timestep. :param schema: The schema of the data, which schema at a certain timestep is a subset of. :param timestep: The timestep to get the schema at. :return: The schema of the data at a certain timestep.
-
form_ngram
(data, schema)[source]¶ Return all the ngrams as dictated by fields, delta_threshold and timestamp_field. :param data: The data items, which is a list of Unischema items. :return: A dictionary, with keys
[0, length - 1]
. The value of each key is the corresponding item in the ngram at that position.
-
make_namedtuple
(schema, ngram_as_dicts)[source]¶ Converts a ngram structure where mapped values are dictionaries to a mapped structure where mapped values are namedtuples.
Example:
>>> { -1 : {'f1': 10, 'f2': 20}, >>> 0 : {'f1': 30}, >>> }
is converted to
>>> { -1 : namedtuple(f1=10, f2=20), >>> 0 : namedtuple(f1=30), >>> }
Parameters: - schema – schema used for conversion
- ngram_as_dicts – ngram in the timestamp-to-dict mapping
Returns: ngram in the timestamp-to-namedtuple mapping
-
get_field_names_at_all_timesteps
()[source]¶ Returns a list of fields that are present at least in the one of the time offsets
-
convert_fields
(unischema, field_list)[source]¶ Convert all the fields in field_list into Unischema fields. field_list can contain unischema fields and strings (regular expressions)
Parameters: - unischema – Unischema object
- field_list – A list of unischema fields or strings (regular expressions)
Returns: list of unischema fields
TransformSpec¶
-
class
petastorm.transform.
TransformSpec
(func=None, edit_fields=None, removed_fields=None)[source]¶ TransformSpec defines a user transformation that is applied to a loaded row on a worker thread/process.
The object defines the function (callable) that perform the transform as well as the schema transform: pre-transform-schema to post-transform-schema.
func
argument is a callable which takes a row as its parameter and returns a modified row.edit_fields
andremoved_fields
define mutating operations performed on the original schema that produce a post-transform schema.func
return value must comply to this post-transform schema.Parameters: - func – Optional. A callable. The function is called on the worker thread. It takes a dictionary that complies to the input schema and must return a dictionary that complies to a post-transform schema. User may In case the user wants to only remove certain fields, the user may omit this argument and specify only remove_fields argument.
- edit_fields – Optional. A list of 4-tuples with the following fields:
(name, numpy_dtype, shape, is_nullable)
- removed_fields – Optional[list]. A list of field names that will be removed from the original schema.
-
petastorm.transform.
transform_schema
(schema, transform_spec)[source]¶ Creates a post-transform given a pre-transform schema and a transform_spec with mutation instructions.
Parameters: - schema – A pre-transform schema
- transform_spec – a TransformSpec object with mutation instructions.
Returns: A post-transform schema
Tensorflow¶
PyTorch¶
PySpark & SQL¶
A set of Spark specific helper functions for the petastorm dataset
-
petastorm.spark_utils.
dataset_as_rdd
(dataset_url, spark_session, schema_fields=None, hdfs_driver='libhdfs3')[source]¶ Retrieve a spark rdd for a given petastorm dataset
Parameters: - dataset_url – A string for the dataset url (e.g. hdfs:///path/to/dataset)
- spark_session – A spark session
- schema_fields – list of unischema fields to subset, or None to read all fields.
- hdfs_driver – A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are libhdfs (java through JNI) or libhdfs3 (C++)
Returns: A rdd of dictionary records from the dataset
Row queries¶
Predicates for petastorm
-
class
petastorm.predicates.
in_set
(inclusion_values, predicate_field)[source]¶ Test if predicate_field value is in inclusion_values set
-
class
petastorm.predicates.
in_intersection
(inclusion_values, _predicate_field)[source]¶ Test if predicate_field list contain at least one value from inclusion_values set
-
class
petastorm.predicates.
in_lambda
(predicate_fields, predicate_func, state_arg=None)[source]¶ Wrap up custom function to be used as a predicate example: in_lambda([‘labels_object_roles’], lambda labels_object_roles : len(labels_object_roles) > 3)
Parameters: - predicate_fields – list of fields to be used in predicate
- predicate_func – predicate function example: lambda labels_object_roles : len(labels_object_roles) > 3
- state_arg – additional object to keep function state. it will be passed to predicate_func after fields arguments ONLY if it is not None
-
class
petastorm.predicates.
in_negate
(predicate)[source]¶ A predicate used to negate another predicate.
-
class
petastorm.predicates.
in_reduce
(predicate_list, reduce_func)[source]¶ A predicate used to aggregate other predicates using any reduce logical operation.
predicate_list: list of predicates reduce_func: function to aggregate result of all predicates in the list e.g. all() will implements logical ‘And’, any() implements logical ‘Or’
-
class
petastorm.predicates.
in_pseudorandom_split
(fraction_list, subset_index, predicate_field)[source]¶ Split dataset according to a split list based on volume_guid. The split is pseudorandom (can not supply the seed yet), i.e. the split outcome is always the same. Split is performed by hashing volume_guid uniformly to 0:1 range and returning part of full dataset which was hashed in given sub-range
- Example:
- ‘split_list = [0.5, 0.2, 0.3]’ - dataset will be split on three subsets in proportion subset 1: 0.5 of log data subset 2: 0.2 of log data subset 3: 0.3 of log data Note, split is not exact, so avoid small fraction (e.g. 0.001) to avoid empty sets
split_list: a list of log fractions (real numbers in range [0:1]) subset_index: define which subset will be used by the Reader
Local cache¶
-
class
petastorm.cache.
CacheBase
[source]¶ -
get
(key, fill_cache_func)[source]¶ Gets an entry from the cache implementation.
If there is a cache miss,
fill_cache_func()
will be evaluated to get the value.Parameters: - key – A key identifying cache entry
- fill_cache_func – This function will be evaluated (
fill_cache_func()
) to populate cache, if no value is present in the cache.
Returns: A value from cache
-
-
class
petastorm.cache.
NullCache
[source]¶ A pass-through cache implementation: value generating function will be called each.
-
get
(key, fill_cache_func)[source]¶ Gets an entry from the cache implementation.
If there is a cache miss,
fill_cache_func()
will be evaluated to get the value.Parameters: - key – A key identifying cache entry
- fill_cache_func – This function will be evaluated (
fill_cache_func()
) to populate cache, if no value is present in the cache.
Returns: A value from cache
-
-
class
petastorm.local_disk_cache.
LocalDiskCache
(path, size_limit_bytes, expected_row_size_bytes, shards=6, cleanup=False, **settings)[source]¶ LocalDiskCache is an adapter to a diskcache implementation.
LocalDiskCache can be used by a petastorm Reader class to temporarily keep parts of the dataset on a local file system storage.
Parameters: - path – Path where the dataset cache is being stored.
- size_limit_bytes – Maximal size of the disk-space to be used by cache. The size of the cache may actually grow somewhat above the size_limit_bytes, so the limit is not very strict.
- expected_row_size_bytes – Approximate size of a single row. This argument is used to perform a sanity check on the capacity of individual shards.
- shards – Cache can be sharded. Larger number of shards improve writing parallelism.
- cleanup – If set to True, cache directory would be removed when cleanup() method is called.
- settings – these parameters passed-through to the diskcache.Cache class. For details, see: http://www.grantjenks.com/docs/diskcache/tutorial.html#settings
-
get
(key, fill_cache_func)[source]¶ Gets an entry from the cache implementation.
If there is a cache miss,
fill_cache_func()
will be evaluated to get the value.Parameters: - key – A key identifying cache entry
- fill_cache_func – This function will be evaluated (
fill_cache_func()
) to populate cache, if no value is present in the cache.
Returns: A value from cache
Codecs¶
A set of dataframe-column-codecs complements the limited data type variety of spark-/pyarrow-supported datatypes, enabling storage of numpy multidimensional arrays, as well as compressed images, into spark dataframes, and transitively to parquet files.
NOTE: Due to the way unischema is stored alongside dataset (with pickling), changing any of these codecs class names and fields can result in reader breakages.
-
class
petastorm.codecs.
CompressedImageCodec
(image_codec='png', quality=80)[source]¶ CompressedImageCodec would compress/encompress images.
Parameters: - image_codec – any format string supported by opencv. e.g.
png
,jpeg
- quality – used when using
jpeg
lossy compression
- image_codec – any format string supported by opencv. e.g.
-
class
petastorm.codecs.
NdarrayCodec
[source]¶ Encodes numpy ndarray into, or decodes an ndarray from, a spark dataframe field.
-
class
petastorm.codecs.
CompressedNdarrayCodec
[source]¶ Encodes numpy ndarray with compression into a spark dataframe field
Dataset generation¶
-
class
petastorm.etl.
RowGroupIndexerBase
[source]¶ Base class for row group indexers.
-
index_name
¶ Return unique index name.
-
column_names
¶ Return list of column(s) reuired to build index.
-
indexed_values
¶ Return list of values in index
-
-
exception
petastorm.etl.dataset_metadata.
PetastormMetadataError
[source]¶ Error to specify when the petastorm metadata does not exist, does not contain the necessary information, or is corrupt/invalid.
-
exception
petastorm.etl.dataset_metadata.
PetastormMetadataGenerationError
[source]¶ Error to specify when petastorm could not generate metadata properly. This error is usually accompanied with a message to try to regenerate dataset metadata.
-
petastorm.etl.dataset_metadata.
materialize_dataset
(spark, dataset_url, schema, row_group_size_mb=None, use_summary_metadata=False, filesystem_factory=None)[source]¶ A Context Manager which handles all the initialization and finalization necessary to generate metadata for a petastorm dataset. This should be used around your spark logic to materialize a dataset (specifically the writing of parquet output).
Note: Any rowgroup indexing should happen outside the materialize_dataset block
Example:
>>> spark = SparkSession.builder... >>> ds_url = 'hdfs:///path/to/my/dataset' >>> with materialize_dataset(spark, ds_url, MyUnischema, 64): >>> spark.sparkContext.parallelize(range(0, 10)). >>> ... >>> .write.parquet(ds_url) >>> indexer = [SingleFieldIndexer(...)] >>> build_rowgroup_index(ds_url, spark.sparkContext, indexer)
A user may provide their own instance of pyarrow filesystem object in
pyarrow_filesystem
argument (otherwise, petastorm will create a default one based on the url).The following example shows how a custom pyarrow HDFS filesystem, instantiated using
libhdfs
driver can be used during Petastorm dataset generation:>>> resolver=FilesystemResolver(dataset_url, spark.sparkContext._jsc.hadoopConfiguration(), >>> hdfs_driver='libhdfs') >>> with materialize_dataset(..., pyarrow_filesystem=resolver.filesystem()): >>> ...
Parameters: - spark – The spark session you are using
- dataset_url – The dataset url to output your dataset to (e.g.
hdfs:///path/to/dataset
) - schema – The
petastorm.unischema.Unischema
definition of your dataset - row_group_size_mb – The parquet row group size to use for your dataset
- use_summary_metadata – Whether to use the parquet summary metadata for row group indexing or a custom indexing method. The custom indexing method is more scalable for very large datasets.
- pyarrow_filesystem – A pyarrow filesystem object to be used when saving Petastorm specific metadata to the Parquet store.
-
petastorm.etl.dataset_metadata.
get_schema
(dataset)[source]¶ Retrieves schema object stored as part of dataset methadata.
Parameters: dataset – an instance of pyarrow.parquet.ParquetDataset object
Returns: A petastorm.unischema.Unischema
object
-
petastorm.etl.dataset_metadata.
get_schema_from_dataset_url
(dataset_url, hdfs_driver='libhdfs3')[source]¶ Returns a
petastorm.unischema.Unischema
object loaded from a dataset specified by a url.Parameters: - dataset_url – A dataset URL
- hdfs_driver – A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are libhdfs (java through JNI) or libhdfs3 (C++)
Returns: A
petastorm.unischema.Unischema
object
-
petastorm.etl.dataset_metadata.
infer_or_load_unischema
(dataset)[source]¶ Try to recover Unischema object stored by
materialize_dataset
function. If it can be loaded, infer Unischema from native Parquet schema
Script to add petastorm metadata to an existing parquet dataset
-
petastorm.etl.petastorm_generate_metadata.
generate_petastorm_metadata
(spark, dataset_url, unischema_class=None, use_summary_metadata=False, hdfs_driver='libhdfs3')[source]¶ Generates metadata necessary to read a petastorm dataset to an existing dataset.
Parameters: - spark – spark session
- dataset_url – url of existing dataset
- unischema_class – (optional) fully qualified dataset unischema class. If not specified will attempt
to find one already in the dataset. (e.g.
examples.hello_world.generate_hello_world_dataset.HelloWorldSchema
) - hdfs_driver – A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are libhdfs (java through JNI) or libhdfs3 (C++)
- user – String denoting username when connecting to HDFS
Row-group selectors¶
-
class
petastorm.selectors.
SingleIndexSelector
(index_name, values_list)[source]¶ Generic selector for single field indexer. Select all row groups containing any of given values.
-
class
petastorm.selectors.
IntersectIndexSelector
(single_index_selectors)[source]¶ Multiple single-field indexers selector. Select row groups containing any of the values in all given selectors.
Parameters: single_index_selectors – List of SingleIndexSelector
-
class
petastorm.selectors.
UnionIndexSelector
(single_index_selectors)[source]¶ Multiple single-field indexers selector. Select row groups containing any of the values in at least one selector.
Parameters: single_index_selectors – List of SingleIndexSelector
-
class
petastorm.etl.rowgroup_indexers.
SingleFieldIndexer
(index_name, index_field)[source]¶ Class to index single field in parquet dataset.
This indexer only indexes numpty strings, numpty integers, or numpy arrays of strings.
-
index_name
¶ Return unique index name.
-
column_names
¶ Return list of column(s) reuired to build index.
-
indexed_values
¶ Return list of values in index
-
-
class
petastorm.etl.rowgroup_indexers.
FieldNotNullIndexer
(index_name, index_field)[source]¶ Class to index ‘Not Null’ condition forsingle field in parquet dataset
-
index_name
¶ Return unique index name.
-
column_names
¶ Return list of column(s) reuired to build index.
-
indexed_values
¶ Return list of values in index
-
-
class
petastorm.etl.rowgroup_indexing.
PieceInfo
(piece_index, path, row_group, partition_keys)¶ Create new instance of PieceInfo(piece_index, path, row_group, partition_keys)
-
partition_keys
¶ Alias for field number 3
-
path
¶ Alias for field number 1
-
piece_index
¶ Alias for field number 0
-
row_group
¶ Alias for field number 2
-
-
petastorm.etl.rowgroup_indexing.
build_rowgroup_index
(dataset_url, spark_context, indexers, hdfs_driver='libhdfs3')[source]¶ Build index for given list of fields to use for fast rowgroup selection :param dataset_url: (str) the url for the dataset (or a path if you would like to use the default hdfs config) :param spark_context: (SparkContext) :param indexers: list of objects to build row groups indexes. Should support RowGroupIndexerBase interface :param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are libhdfs (java through JNI) or libhdfs3 (C++) :return: None, upon successful completion the rowgroup predicates will be saved to _metadata file
Benchmarks¶
HDFS¶
-
class
petastorm.hdfs.namenode.
HdfsNamenodeResolver
(hadoop_configuration=None)[source]¶ This class embodies functionality to resolve HDFS namenodes: per default or a nameservice.
Sets the given HadoopConfiguration object for the resolver; or check for and pull hadoop configuration from an environment variable, in below preferred order to check.
Parameters: hadoop_configuration – an optional HadoopConfiguration
-
resolve_hdfs_name_service
(namespace)[source]¶ Given the namespace of a name service, resolves the configured list of name nodes, and returns them as a list of URL strings.
Parameters: namespace – the HDFS name service to resolve Returns: a list of URL strings of the name nodes for the given name service; or None of not properly configured.
-
-
exception
petastorm.hdfs.namenode.
MaxFailoversExceeded
(failed_exceptions, max_failover_attempts, func_name)[source]¶
-
class
petastorm.hdfs.namenode.
namenode_failover
(func)[source]¶ This decorator class ensures seamless namenode failover and retry, when an HDFS call fails due to StandbyException, up to a maximum retry.
-
MAX_FAILOVER_ATTEMPTS
= 2¶
-
-
petastorm.hdfs.namenode.
failover_all_class_methods
(decorator)[source]¶ This decorator function wraps an entire class to decorate each member method, incl. inherited.
Adapted from https://stackoverflow.com/a/6307868
-
class
petastorm.hdfs.namenode.
HAHdfsClient
(connector_cls, list_of_namenodes, user=None)[source]¶ Attempt HDFS connection operation, storing the hdfs object for intercepted calls.
Parameters: - connector_cls – HdfsConnector class, so connector logic resides in one place, and also facilitates testing.
- list_of_namenodes – List of name nodes to failover, cached to enable un-/pickling
- user – String denoting username when connecting to HDFS. None implies login user.
Returns: None
-
cat
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
chmod
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
chown
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
close
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
connect
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
delete
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
df
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
disk_usage
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
download
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
exists
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
get_capacity
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
get_space_used
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
info
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
isdir
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
isfile
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
ls
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
mkdir
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
mv
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
open
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
read_parquet
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
rename
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
rm
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
stat
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
upload
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
walk
= functools.partial(<bound method namenode_failover.__call__ of <petastorm.hdfs.namenode.namenode_failover object>>, None)¶
-
class
petastorm.hdfs.namenode.
HdfsConnector
[source]¶ HDFS connector class where failover logic is implemented. Facilitates testing.
-
MAX_NAMENODES
= 2¶
-
classmethod
hdfs_connect_namenode
(url, driver='libhdfs3', user=None)[source]¶ Performs HDFS connect in one place, facilitating easy change of driver and test mocking.
Parameters: - url – An parsed URL object to the HDFS end point
- driver – An optional driver identifier
- user – String denoting username when connecting to HDFS. None implies login user.
Returns: Pyarrow HDFS connection object.
-
classmethod
connect_to_either_namenode
(list_of_namenodes, user=None)[source]¶ Returns a wrapper HadoopFileSystem “high-availability client” object that enables name node failover.
Raises a HdfsConnectError if no successful connection can be established.
Parameters: - list_of_namenodes – a required list of name node URLs to connect to.
- user – String denoting username when connecting to HDFS. None implies login user.
Returns: the wrapped HDFS connection object
-