Source code for splicemachine.features.feature_set

from splicemachine.features import Feature
from .constants import Columns
from splicemachine.spark import PySpliceContext
from typing import List, Dict

[docs]class FeatureSet: def __init__(self, *, splice_ctx: PySpliceContext = None, table_name, schema_name, description, primary_keys: Dict[str, str], feature_set_id=None, deployed: bool = False, **kwargs): self.splice_ctx = splice_ctx self.table_name = table_name.upper() self.schema_name = schema_name.upper() self.description = description self.primary_keys = primary_keys self.feature_set_id = feature_set_id self.deployed = deployed args = {k.lower(): kwargs[k] for k in kwargs} # Lowercase keys args = {k: args[k].split(',') if 'columns' in k else args[k] for k in args} # Make value a list for specific pkcolumns because Splice doesn't support Arrays self.__dict__.update(args) self.pk_columns = list(primary_keys.keys())
[docs] def is_deployed(self): """ Returns whether or not this Feature Set has been deployed (the schema.table has been created in the database) :return: (bool) True if the Feature Set is deployed """ return self.deployed
def __eq__(self, other): if isinstance(other, FeatureSet): return self.table_name.lower() == other.table_name.lower() and \ self.schema_name.lower() == other.schema_name.lower() return False def __repr__(self): return self.__str__() def __str__(self): return f'FeatureSet(FeatureSetID={self.__dict__.get("feature_set_id", "NA")}, SchemaName={self.schema_name}, ' \ f'TableName={self.table_name}, Description={self.description}, PKColumns={self.pk_columns}'