Source code for petastorm.etl.rowgroup_indexers
# Copyright (c) 2017-2018 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from collections import defaultdict
from petastorm.etl import RowGroupIndexerBase
[docs]class SingleFieldIndexer(RowGroupIndexerBase):
"""
Class to index single field in parquet dataset.
This indexer only indexes numpty strings, numpty integers, or numpy arrays of strings.
"""
def __init__(self, index_name, index_field):
self._index_name = index_name
self._column_name = index_field
self._index_data = defaultdict(set)
def __add__(self, other):
if not isinstance(other, SingleFieldIndexer):
raise TypeError("Make sure Spark map function return the same indexer type")
if self._column_name != other._column_name:
raise ValueError("Make sure indexers in Spark map function index the same fields")
for value_key in other._index_data:
self._index_data[value_key].update(other._index_data[value_key])
return self
@property
def index_name(self):
return self._index_name
@property
def column_names(self):
return [self._column_name]
@property
def indexed_values(self):
return list(self._index_data.keys())
[docs] def get_row_group_indexes(self, value_key):
return self._index_data[value_key]
[docs] def build_index(self, decoded_rows, piece_index):
field_column = [row[self._column_name] for row in decoded_rows]
if not field_column:
raise ValueError("Cannot build index for empty rows, column '{}'"
.format(self._column_name))
for field_val in field_column:
if field_val is not None:
# check type of field, if it is array index each array value,
# otherwise index field value directly
if isinstance(field_val, np.ndarray):
for val in field_val:
self._index_data[val].add(piece_index)
else:
self._index_data[field_val].add(piece_index)
return self._index_data
[docs]class FieldNotNullIndexer(RowGroupIndexerBase):
"""
Class to index 'Not Null' condition forsingle field in parquet dataset
"""
def __init__(self, index_name, index_field):
self._index_name = index_name
self._column_name = index_field
self._index_data = set()
def __add__(self, other):
if not isinstance(other, FieldNotNullIndexer):
raise TypeError("Make sure Spark map function return the same indexer type")
if self._column_name != other._column_name:
raise ValueError("Make sure indexers in Spark map function index the same fields")
self._index_data.update(other._index_data)
return self
@property
def index_name(self):
return self._index_name
@property
def column_names(self):
return [self._column_name]
@property
def indexed_values(self):
return ['Field is Not Null']
[docs] def get_row_group_indexes(self, value_key=None):
return self._index_data
[docs] def build_index(self, decoded_rows, piece_index):
field_column = [row[self._column_name] for row in decoded_rows]
if not field_column:
raise ValueError("Cannot build index for empty rows, column '{}'"
.format(self._column_name))
for field_val in field_column:
if field_val is not None:
self._index_data.add(piece_index)
break
return self._index_data