diff --git a/doc/api/prep_data/feature_store.rst b/doc/api/prep_data/feature_store.rst index 278574e400..50a10c5089 100644 --- a/doc/api/prep_data/feature_store.rst +++ b/doc/api/prep_data/feature_store.rst @@ -91,6 +91,10 @@ Inputs :members: :show-inheritance: +.. autoclass:: sagemaker.feature_store.inputs.TtlDuration + :members: + :show-inheritance: + .. autoclass:: sagemaker.feature_store.inputs.S3StorageConfig :members: :show-inheritance: diff --git a/src/sagemaker/feature_store/feature_group.py b/src/sagemaker/feature_store/feature_group.py index 9ffb0ea9da..39915b60dc 100644 --- a/src/sagemaker/feature_store/feature_group.py +++ b/src/sagemaker/feature_store/feature_group.py @@ -19,7 +19,7 @@ list feature groups APIs can be used to manage feature groups. """ -from __future__ import absolute_import +from __future__ import absolute_import, annotations import copy import logging @@ -28,14 +28,15 @@ import tempfile from concurrent.futures import as_completed from concurrent.futures import ThreadPoolExecutor -from typing import Optional, Sequence, List, Dict, Any, Union +from typing import Optional, Sequence, List, Dict, Any, Union, Iterable from urllib.parse import urlparse from multiprocessing.pool import AsyncResult import signal import attr import pandas as pd -from pandas import DataFrame +from pandas import DataFrame, Series +from pandas.api.types import is_list_like import boto3 from botocore.config import Config @@ -50,6 +51,7 @@ from sagemaker.feature_store.feature_definition import ( FeatureDefinition, FeatureTypeEnum, + ListCollectionType, ) from sagemaker.feature_store.inputs import ( OnlineStoreConfig, @@ -66,6 +68,7 @@ OnlineStoreStorageTypeEnum, ThroughputConfig, ThroughputConfigUpdate, + TargetStoreEnum, ) from sagemaker.utils import resolve_value_from_config, format_tags, Tags @@ -182,6 +185,9 @@ class IngestionManagerPandas: Attributes: feature_group_name (str): name of the Feature Group. + feature_definitions (Dict[str, Dict[Any, Any]]): dictionary of feature definitions. + where the key is the feature name and the value is the FeatureDefinition. + The FeatureDefinition contains the data type of the feature. sagemaker_fs_runtime_client_config (Config): instance of the Config class for boto calls. sagemaker_session (Session): session instance to perform boto calls. @@ -194,6 +200,7 @@ class IngestionManagerPandas: """ feature_group_name: str = attr.ib() + feature_definitions: Dict[str, Dict[Any, Any]] = attr.ib() sagemaker_fs_runtime_client_config: Config = attr.ib(default=None) sagemaker_session: Session = attr.ib(default=None) max_workers: int = attr.ib(default=1) @@ -207,9 +214,11 @@ class IngestionManagerPandas: def _ingest_single_batch( data_frame: DataFrame, feature_group_name: str, + feature_definitions: Dict[str, Dict[Any, Any]], client_config: Config, start_index: int, end_index: int, + target_stores: Sequence[TargetStoreEnum] = None, profile_name: str = None, ) -> List[int]: """Ingest a single batch of DataFrame rows into FeatureStore. @@ -217,10 +226,14 @@ def _ingest_single_batch( Args: data_frame (DataFrame): source DataFrame to be ingested. feature_group_name (str): name of the Feature Group. + feature_definitions (Dict[str, Dict[Any, Any]]): dictionary of feature definitions. + where the key is the feature name and the value is the FeatureDefinition. + The FeatureDefinition contains the data type of the feature. client_config (Config): Configuration for the sagemaker feature store runtime client to perform boto calls. start_index (int): starting position to ingest in this batch. end_index (int): ending position to ingest in this batch. + target_stores (Sequence[TargetStoreEnum]): stores to be used for ingestion. profile_name (str): the profile credential should be used for ``PutRecord`` (default: None). @@ -240,8 +253,10 @@ def _ingest_single_batch( for row in data_frame[start_index:end_index].itertuples(): IngestionManagerPandas._ingest_row( data_frame=data_frame, + target_stores=target_stores, row=row, feature_group_name=feature_group_name, + feature_definitions=feature_definitions, sagemaker_fs_runtime_client=sagemaker_fs_runtime_client, failed_rows=failed_rows, ) @@ -289,46 +304,146 @@ def wait(self, timeout=None): @staticmethod def _ingest_row( data_frame: DataFrame, - row: int, + row: Iterable[tuple[Any, ...]], feature_group_name: str, + feature_definitions: Dict[str, Dict[Any, Any]], sagemaker_fs_runtime_client: Session, failed_rows: List[int], + target_stores: Sequence[TargetStoreEnum] = None, ): """Ingest a single Dataframe row into FeatureStore. Args: data_frame (DataFrame): source DataFrame to be ingested. - row (int): current row that is being ingested + row (Iterable[tuple[Any, ...]]): current row that is being ingested feature_group_name (str): name of the Feature Group. - sagemaker_featurestore_runtime_client (Session): session instance to perform boto calls. + feature_definitions (Dict[str, Dict[Any, Any]]): dictionary of feature definitions. + where the key is the feature name and the value is the FeatureDefinition. + The FeatureDefinition contains the data type of the feature. + sagemaker_fs_runtime_client (Session): session instance to perform boto calls. failed_rows (List[int]): list of indices from the data frame for which ingestion failed. + target_stores (Sequence[TargetStoreEnum]): stores to be used for ingestion. Returns: int of row indices that failed to be ingested. """ - record = [ - FeatureValue( - feature_name=data_frame.columns[index - 1], - value_as_string=str(row[index]), - ) - for index in range(1, len(row)) - if pd.notna(row[index]) - ] try: - sagemaker_fs_runtime_client.put_record( - FeatureGroupName=feature_group_name, - Record=[value.to_dict() for value in record], - ) + record = [ + ( + FeatureValue( + feature_name=data_frame.columns[index - 1], + value_as_string_list=IngestionManagerPandas._covert_feature_to_string_list( + row[index] + ), + ) + if IngestionManagerPandas._is_feature_collection_type( + feature_name=data_frame.columns[index - 1], + feature_definitions=feature_definitions, + ) + else FeatureValue( + feature_name=data_frame.columns[index - 1], value_as_string=str(row[index]) + ) + ) + for index in range(1, len(row)) + if IngestionManagerPandas._feature_value_is_not_none(feature_value=row[index]) + ] + + put_record_params = { + "FeatureGroupName": feature_group_name, + "Record": [value.to_dict() for value in record], + } + if target_stores: + put_record_params["TargetStores"] = [ + target_store.value for target_store in target_stores + ] + + sagemaker_fs_runtime_client.put_record(**put_record_params) except Exception as e: # pylint: disable=broad-except logger.error("Failed to ingest row %d: %s", row[0], e) failed_rows.append(row[0]) - def _run_single_process_single_thread(self, data_frame: DataFrame): - """Ingest a utilizing single process and single thread. + @staticmethod + def _is_feature_collection_type( + feature_name: str, feature_definitions: Dict[str, Dict[Any, Any]] + ): + """Check if the feature is a collection type. + + Args: + feature_name (str): name of the feature. + feature_definitions (Dict[str, Dict[Any, Any]]): dictionary of feature definitions. + where the key is the feature name and the value is the FeatureDefinition. + The FeatureDefinition contains the data type of the feature and + the type of collection. + If the feature is not a collection type, the value of the CollectionType attribute + is None. + + Returns: + bool: True if the feature is a collection type, False otherwise. + """ + feature_definition = feature_definitions.get(feature_name) + if feature_definition is not None: + return feature_definition.get("CollectionType") is not None + return None + + @staticmethod + def _feature_value_is_not_none( + feature_value: Any, + ): + """Check if the feature value is not None. + + For Collection Type feature, we want to keep this check simple, + where if the value is not None, + we convert and pass it to PutRecord, instead of relying on Pandas.notna(obj).all(). + + Also, we don't want to skip the collection attribute with partial None values, + when calling PutRecord. Since, + vector value can have some dimensions as None. Instead, + we want to let PutRecord either accept or fail the + entire record based on the service side implementation. + As of this change the service fails any partial None + collection types. + + For the Scalar values (non Collection) we want to still use pd.notna() + to keep the behavior same. + + Args: + feature_value (Any): feature value. + + Returns: + bool: True if the feature value is not None, False otherwise. + """ + if not is_list_like(feature_value): + return pd.notna(feature_value) + return feature_value + + @staticmethod + def _covert_feature_to_string_list(feature_value: List[Any]): + """Convert a list of feature values to a list of strings. + + Args: + feature_value (List[Any]): list of feature values. + + Returns: + List[str]: list of strings. + """ + if not is_list_like(feature_value): + raise ValueError( + f"Invalid feature value, feature value: {feature_value}" + f" for a collection type feature" + f" must be an Array, but instead was {type(feature_value)}" + ) + return [str(value) if value is not None else None for value in feature_value] + + def _run_single_process_single_thread( + self, data_frame: DataFrame, target_stores: Sequence[TargetStoreEnum] = None + ): + """Ingest utilizing a single process and a single thread. Args: data_frame (DataFrame): source DataFrame to be ingested. + target_stores (Sequence[TargetStoreEnum]): target stores to ingest to. + If not specified, ingest to both online and offline stores. """ logger.info("Started ingesting index %d to %d") failed_rows = list() @@ -336,8 +451,10 @@ def _run_single_process_single_thread(self, data_frame: DataFrame): for row in data_frame.itertuples(): IngestionManagerPandas._ingest_row( data_frame=data_frame, + target_stores=target_stores, row=row, feature_group_name=self.feature_group_name, + feature_definitions=self.feature_definitions, sagemaker_fs_runtime_client=sagemaker_fs_runtime_client, failed_rows=failed_rows, ) @@ -349,11 +466,19 @@ def _run_single_process_single_thread(self, data_frame: DataFrame): f"Failed to ingest some data into FeatureGroup {self.feature_group_name}", ) - def _run_multi_process(self, data_frame: DataFrame, wait=True, timeout=None): + def _run_multi_process( + self, + data_frame: DataFrame, + target_stores: Sequence[TargetStoreEnum] = None, + wait=True, + timeout=None, + ): """Start the ingestion process with the specified number of processes. Args: data_frame (DataFrame): source DataFrame to be ingested. + target_stores (Sequence[TargetStoreEnum]): target stores to ingest to. + If not specified, ingest to both online and offline stores. wait (bool): whether to wait for the ingestion to finish or not. timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised if timeout is reached. @@ -370,8 +495,10 @@ def _run_multi_process(self, data_frame: DataFrame, wait=True, timeout=None): ( self.max_workers, self.feature_group_name, + self.feature_definitions, self.sagemaker_fs_runtime_client_config, data_frame[start_index:end_index], + target_stores, start_index, timeout, self.profile_name, @@ -395,8 +522,10 @@ def init_worker(): def _run_multi_threaded( max_workers: int, feature_group_name: str, + feature_definitions: Dict[str, Dict[Any, Any]], sagemaker_fs_runtime_client_config: Config, data_frame: DataFrame, + target_stores: Sequence[TargetStoreEnum] = None, row_offset=0, timeout=None, profile_name=None, @@ -405,6 +534,8 @@ def _run_multi_threaded( Args: data_frame (DataFrame): source DataFrame to be ingested. + target_stores (Sequence[TargetStoreEnum]): target stores to ingest to. + If not specified, ingest to both online and offline stores. row_offset (int): if ``data_frame`` is a partition of a parent DataFrame, then the index of the parent where ``data_frame`` starts. Otherwise, 0. wait (bool): whether to wait for the ingestion to finish or not. @@ -429,7 +560,9 @@ def _run_multi_threaded( executor.submit( IngestionManagerPandas._ingest_single_batch, feature_group_name=feature_group_name, + feature_definitions=feature_definitions, data_frame=data_frame, + target_stores=target_stores, start_index=start_index, end_index=end_index, client_config=sagemaker_fs_runtime_client_config, @@ -449,19 +582,31 @@ def _run_multi_threaded( return failed_indices - def run(self, data_frame: DataFrame, wait=True, timeout=None): + def run( + self, + data_frame: DataFrame, + target_stores: Sequence[TargetStoreEnum] = None, + wait=True, + timeout=None, + ): """Start the ingestion process. Args: data_frame (DataFrame): source DataFrame to be ingested. + target_stores (Sequence[TargetStoreEnum]): list of target stores to be used for + the ingestion. If None, the default target store is used. wait (bool): whether to wait for the ingestion to finish or not. timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised if timeout is reached. """ if self.max_workers == 1 and self.max_processes == 1 and self.profile_name is None: - self._run_single_process_single_thread(data_frame=data_frame) + self._run_single_process_single_thread( + data_frame=data_frame, target_stores=target_stores + ) else: - self._run_multi_process(data_frame=data_frame, wait=wait, timeout=timeout) + self._run_multi_process( + data_frame=data_frame, target_stores=target_stores, wait=wait, timeout=timeout + ) class IngestionError(Exception): @@ -765,9 +910,88 @@ def list_parameters_for_feature_metadata(self, feature_name: str) -> Sequence[Di feature_group_name=self.name, feature_name=feature_name ).get("Parameters") + @staticmethod + def _check_list_type(value): + """Check if the value is a list or None. + + Args: + value: value to be checked. + + Returns: + True if value is a list or None, False otherwise. + """ + return is_list_like(value) or pd.isna(value) + + @staticmethod + def _determine_collection_list_type(series: Series) -> FeatureTypeEnum | None: + """Determine the collection type of the feature. + + Args: + series (Series): column from the data frame. + + Returns: + feature type. + """ + + if series.apply( + lambda lst: ( + all(isinstance(x, int) or pd.isna(x) for x in lst) if is_list_like(lst) else True + ) + ).all(): + return FeatureTypeEnum.INTEGRAL + if series.apply( + lambda lst: ( + all(isinstance(x, (float, int)) or pd.isna(x) for x in lst) + if is_list_like(lst) + else True + ) + ).all(): + return FeatureTypeEnum.FRACTIONAL + if series.apply( + lambda lst: ( + all(isinstance(x, str) or pd.isna(x) for x in lst) if is_list_like(lst) else True + ) + ).all(): + return FeatureTypeEnum.STRING + return None + + def _generate_feature_definition( + self, series: Series, online_storage_type: OnlineStoreStorageTypeEnum + ) -> FeatureDefinition: + """Generate feature definition from the Panda Series. + + Args: + series (Series): column from the data frame. + + Returns: + feature definition. + """ + params = {"feature_name": series.name} + + dtype = str(series.dtype).lower() + if ( + online_storage_type + and online_storage_type == OnlineStoreStorageTypeEnum.IN_MEMORY + and dtype == "object" + and pd.notna(series.head(1000)).any() + and series.head(1000).apply(FeatureGroup._check_list_type).all() + ): + params["collection_type"] = ListCollectionType() + params["feature_type"] = FeatureGroup._determine_collection_list_type(series.head(1000)) + else: + params["feature_type"] = self.DTYPE_TO_FEATURE_DEFINITION_CLS_MAP.get(dtype, None) + + if params["feature_type"] is None: + raise ValueError( + f"Failed to infer Feature type based on dtype {dtype} " f"for column {series.name}." + ) + + feature_definition = FeatureDefinition(**params) + + return feature_definition + def load_feature_definitions( - self, - data_frame: DataFrame, + self, data_frame: DataFrame, online_storage_type: OnlineStoreStorageTypeEnum = None ) -> Sequence[FeatureDefinition]: """Load feature definitions from a Pandas DataFrame. @@ -780,26 +1004,37 @@ def load_feature_definitions( No feature definitions will be loaded if the given data_frame contains unsupported dtypes. + For IN_MEMORY online_storage_type all collection type columns within DataFrame + will be inferred as a List, + instead of a String. Due to performance limitations, + only first 1,000 values of the column will be sampled, + when inferring collection Type. + Customers can manually update the inferred collection type as needed. + Args: - data_frame (DataFrame): + data_frame (DataFrame): A Pandas DataFrame containing features. + online_storage_type (OnlineStoreStorageTypeEnum): + Optional. Online storage type for the feature group. + The value can be either STANDARD or IN_MEMORY + If not specified,STANDARD will be used by default. + If specified as IN_MEMORY, + we will infer any collection type column within DataFrame as a List instead of a + String. + All, collection types (List, Set and Vector) will be inferred as List. + We will only sample the first 1,000 values of the column when inferring + collection Type. + + Returns: list of FeatureDefinition """ feature_definitions = [] for column in data_frame: - feature_type = self.DTYPE_TO_FEATURE_DEFINITION_CLS_MAP.get( - str(data_frame[column].dtype).lower(), None + feature_definition = self._generate_feature_definition( + data_frame[column], online_storage_type ) - if feature_type: - feature_definitions.append( - FeatureDefinition(feature_name=column, feature_type=feature_type) - ) - else: - raise ValueError( - f"Failed to infer Feature type based on dtype {data_frame[column].dtype} " - f"for column {column}." - ) + feature_definitions.append(feature_definition) self.feature_definitions = feature_definitions return self.feature_definitions @@ -822,24 +1057,27 @@ def get_record( feature_names=feature_names, ).get("Record") - def put_record(self, record: Sequence[FeatureValue], ttl_duration: TtlDuration = None): + def put_record( + self, + record: Sequence[FeatureValue], + target_stores: Sequence[TargetStoreEnum] = None, + ttl_duration: TtlDuration = None, + ): """Put a single record in the FeatureGroup. Args: record (Sequence[FeatureValue]): a list contains feature values. + target_stores (Sequence[str]): a list of target stores. ttl_duration (TtlDuration): customer specified ttl duration. """ - if ttl_duration is not None: - return self.sagemaker_session.put_record( - feature_group_name=self.name, - record=[value.to_dict() for value in record], - ttl_duration=ttl_duration.to_dict(), - ) - return self.sagemaker_session.put_record( feature_group_name=self.name, record=[value.to_dict() for value in record], + target_stores=( + [target_store.value for target_store in target_stores] if target_stores else None + ), + ttl_duration=ttl_duration.to_dict() if ttl_duration is not None else None, ) def delete_record( @@ -869,6 +1107,7 @@ def delete_record( def ingest( self, data_frame: DataFrame, + target_stores: Sequence[TargetStoreEnum] = None, max_workers: int = 1, max_processes: int = 1, wait: bool = True, @@ -895,7 +1134,7 @@ def ingest( the ``ingest`` function synchronously. To access the rows that failed to ingest, set ``wait`` to ``False``. The - ``IngestionError.failed_rows`` object saves all of the rows that failed to ingest. + ``IngestionError.failed_rows`` object saves all the rows that failed to ingest. `profile_name` argument is an optional one. It will use the default credential if None is passed. This `profile_name` is used in the sagemaker_featurestore_runtime client only. See @@ -904,6 +1143,8 @@ def ingest( Args: data_frame (DataFrame): data_frame to be ingested to feature store. + target_stores (Sequence[TargetStoreEnum]): target stores to be used for + ingestion. (default: None). max_workers (int): number of threads to be created. max_processes (int): number of processes to be created. Each process spawns ``max_worker`` number of threads. @@ -925,19 +1166,38 @@ def ingest( if profile_name is None and self.sagemaker_session.boto_session.profile_name != "default": profile_name = self.sagemaker_session.boto_session.profile_name + feature_definition_dict = self._get_feature_definition_dict() + manager = IngestionManagerPandas( feature_group_name=self.name, + feature_definitions=feature_definition_dict, sagemaker_session=self.sagemaker_session, - sagemaker_fs_runtime_client_config=self.sagemaker_session.sagemaker_featurestore_runtime_client.meta.config, + sagemaker_fs_runtime_client_config=( + self.sagemaker_session.sagemaker_featurestore_runtime_client.meta.config + ), max_workers=max_workers, max_processes=max_processes, profile_name=profile_name, ) - manager.run(data_frame=data_frame, wait=wait, timeout=timeout) + manager.run(data_frame=data_frame, target_stores=target_stores, wait=wait, timeout=timeout) return manager + def _get_feature_definition_dict(self) -> Dict[str, Dict[Any, Any]]: + """Get a dictionary of feature definitions with Feature Name as Key. + + We are converting the FeatureDefinition into a List for faster lookups. + + Returns: + Dictionary of feature definitions with Key being the Feature Name. + """ + feature_definitions = self.describe()["FeatureDefinitions"] + feature_definition_dict = {} + for feature_definition in feature_definitions: + feature_definition_dict[feature_definition["FeatureName"]] = feature_definition + return feature_definition_dict + def athena_query(self) -> AthenaQuery: """Create an AthenaQuery instance. diff --git a/src/sagemaker/feature_store/inputs.py b/src/sagemaker/feature_store/inputs.py index aaff977d3c..78c26fe026 100644 --- a/src/sagemaker/feature_store/inputs.py +++ b/src/sagemaker/feature_store/inputs.py @@ -108,6 +108,17 @@ def to_dict(self) -> Dict[str, Any]: ) +@attr.s +class TargetStoreEnum(Enum): + """Enum of store types for put record. + + The store types can be Standard or InMemory. + """ + + ONLINE_STORE = "OnlineStore" + OFFLINE_STORE = "OfflineStore" + + class OnlineStoreStorageTypeEnum(Enum): """Enum of storage types for online store. diff --git a/src/sagemaker/session.py b/src/sagemaker/session.py index 0cad3ab0da..ff5a82a902 100644 --- a/src/sagemaker/session.py +++ b/src/sagemaker/session.py @@ -5992,27 +5992,34 @@ def put_record( self, feature_group_name: str, record: Sequence[Dict[str, str]], - ttl_duration: Dict[str, str] = None, + target_stores: Sequence[str] = None, + ttl_duration: Dict[str, Any] = None, ): """Puts a single record in the FeatureGroup. Args: - feature_group_name (str): name of the FeatureGroup. - record (Sequence[Dict[str, str]]): list of FeatureValue dicts to be ingested + feature_group_name (str): Name of the FeatureGroup. + record (Sequence[Dict[str, str]]): List of FeatureValue dicts to be ingested into FeatureStore. + target_stores (Sequence[str]): Optional. List of target stores to put the record. + ttl_duration (Dict[str, str]): Optional. Time-to-Live (TTL) duration for the record. + + Returns: + Response dict from service. """ + params = { + "FeatureGroupName": feature_group_name, + "Record": record, + } + if ttl_duration: - return self.sagemaker_featurestore_runtime_client.put_record( - FeatureGroupName=feature_group_name, - Record=record, - TtlDuration=ttl_duration, - ) + params["TtlDuration"] = ttl_duration - return self.sagemaker_featurestore_runtime_client.put_record( - FeatureGroupName=feature_group_name, - Record=record, - ) + if target_stores: + params["TargetStores"] = target_stores + + return self.sagemaker_featurestore_runtime_client.put_record(**params) def delete_record( self, diff --git a/tests/integ/test_feature_store.py b/tests/integ/test_feature_store.py index 319d492e83..949c41e79c 100644 --- a/tests/integ/test_feature_store.py +++ b/tests/integ/test_feature_store.py @@ -30,7 +30,7 @@ StringFeatureDefinition, ListCollectionType, ) -from sagemaker.feature_store.feature_group import FeatureGroup +from sagemaker.feature_store.feature_group import FeatureGroup, IngestionError from sagemaker.feature_store.feature_store import FeatureStore from sagemaker.feature_store.inputs import ( FeatureValue, @@ -46,6 +46,7 @@ ThroughputConfig, ThroughputModeEnum, ThroughputConfigUpdate, + TargetStoreEnum, ) from sagemaker.feature_store.dataset_builder import ( JoinTypeEnum, @@ -138,6 +139,264 @@ def pandas_data_frame(): return df +@pytest.fixture +def get_record_results_for_data_frame(): + return { + "0.0": [ + {"FeatureName": "feature1", "ValueAsString": "0.0"}, + {"FeatureName": "feature2", "ValueAsString": "0"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "0.0"}, + ], + "1.0": [ + {"FeatureName": "feature1", "ValueAsString": "1.0"}, + {"FeatureName": "feature2", "ValueAsString": "1"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "1.0"}, + ], + "2.0": [ + {"FeatureName": "feature1", "ValueAsString": "2.0"}, + {"FeatureName": "feature2", "ValueAsString": "2"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "2.0"}, + ], + "3.0": [ + {"FeatureName": "feature1", "ValueAsString": "3.0"}, + {"FeatureName": "feature2", "ValueAsString": "3"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "3.0"}, + ], + "4.0": [ + {"FeatureName": "feature1", "ValueAsString": "4.0"}, + {"FeatureName": "feature2", "ValueAsString": "4"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "4.0"}, + ], + "5.0": [ + {"FeatureName": "feature1", "ValueAsString": "5.0"}, + {"FeatureName": "feature2", "ValueAsString": "5"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + "6.0": [ + {"FeatureName": "feature1", "ValueAsString": "6.0"}, + {"FeatureName": "feature2", "ValueAsString": "6"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + "7.0": [ + {"FeatureName": "feature1", "ValueAsString": "7.0"}, + {"FeatureName": "feature2", "ValueAsString": "7"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + "8.0": [ + {"FeatureName": "feature1", "ValueAsString": "8.0"}, + {"FeatureName": "feature2", "ValueAsString": "8"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + "9.0": [ + {"FeatureName": "feature1", "ValueAsString": "9.0"}, + {"FeatureName": "feature2", "ValueAsString": "9"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + } + + +@pytest.fixture +def pandas_data_frame_with_collection_type(): + df = pd.DataFrame( + { + "feature1": pd.Series(np.arange(10.0), dtype="float64"), + "feature2": pd.Series(np.arange(10), dtype="int64"), + "feature3": pd.Series(["2020-10-30T03:43:21Z"] * 10, dtype="string"), + "feature4": pd.Series(np.arange(5.0), dtype="float64"), # contains nan + "feature5": pd.Series( + [["a", "abc"], ["b", "c"], ["c", "f"], ["d"], []], dtype="object" + ), + "feature6": pd.Series([[1, 2], [1, 2, 3], [1, 5], [1], []], dtype="object"), + "feature7": pd.Series( + [[1.1, 2.3], [1.4, 2.5, 3.2, 25], [1.0, 5.3], [1.2], []], dtype="object" + ), + "feature8": pd.Series([[1, 2], [1, 2, 3], [1, 5], [1], [], []], dtype="object"), + "feature9": pd.Series( + [[1.1, 2.3], [1.4, 25, 3.2], [1.0, 3, 4], [1.2], []], dtype="object" + ), + "feature10": pd.Series( + [["a", "abc"], ["b", "c"], ["c", "None"], ["d"], []], dtype="object" + ), + } + ) + return df + + +@pytest.fixture +def get_record_results_for_data_frame_with_collection_type(): + return { + "0.0": [ + {"FeatureName": "feature1", "ValueAsString": "0.0"}, + {"FeatureName": "feature2", "ValueAsString": "0"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "0.0"}, + {"FeatureName": "feature5", "ValueAsStringList": ["a", "abc"]}, + {"FeatureName": "feature6", "ValueAsStringList": ["1", "2"]}, + {"FeatureName": "feature7", "ValueAsStringList": ["1.1", "2.3"]}, + {"FeatureName": "feature8", "ValueAsStringList": ["1", "2"]}, + {"FeatureName": "feature9", "ValueAsStringList": ["1.1", "2.3"]}, + {"FeatureName": "feature10", "ValueAsStringList": ["a", "abc"]}, + ], + "1.0": [ + {"FeatureName": "feature1", "ValueAsString": "1.0"}, + {"FeatureName": "feature2", "ValueAsString": "1"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "1.0"}, + {"FeatureName": "feature5", "ValueAsStringList": ["b", "c"]}, + {"FeatureName": "feature6", "ValueAsStringList": ["1", "2", "3"]}, + {"FeatureName": "feature7", "ValueAsStringList": ["1.4", "2.5", "3.2", "25"]}, + {"FeatureName": "feature8", "ValueAsStringList": ["1", "2", "3"]}, + {"FeatureName": "feature9", "ValueAsStringList": ["1.4", "25", "3.2"]}, + {"FeatureName": "feature10", "ValueAsStringList": ["b", "c"]}, + ], + "2.0": [ + {"FeatureName": "feature1", "ValueAsString": "2.0"}, + {"FeatureName": "feature2", "ValueAsString": "2"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "2.0"}, + {"FeatureName": "feature5", "ValueAsStringList": ["c", "f"]}, + {"FeatureName": "feature6", "ValueAsStringList": ["1", "5"]}, + {"FeatureName": "feature7", "ValueAsStringList": ["1.0", "5.3"]}, + {"FeatureName": "feature8", "ValueAsStringList": ["1", "5"]}, + {"FeatureName": "feature9", "ValueAsStringList": ["1.0", "3", "4"]}, + {"FeatureName": "feature10", "ValueAsStringList": ["c", "None"]}, + ], + "3.0": [ + {"FeatureName": "feature1", "ValueAsString": "3.0"}, + {"FeatureName": "feature2", "ValueAsString": "3"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "3.0"}, + {"FeatureName": "feature5", "ValueAsStringList": ["d"]}, + {"FeatureName": "feature6", "ValueAsStringList": ["1"]}, + {"FeatureName": "feature7", "ValueAsStringList": ["1.2"]}, + {"FeatureName": "feature8", "ValueAsStringList": ["1"]}, + {"FeatureName": "feature9", "ValueAsStringList": ["1.2"]}, + {"FeatureName": "feature10", "ValueAsStringList": ["d"]}, + ], + "4.0": [ + {"FeatureName": "feature1", "ValueAsString": "4.0"}, + {"FeatureName": "feature2", "ValueAsString": "4"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "4.0"}, + ], + "5.0": [ + {"FeatureName": "feature1", "ValueAsString": "5.0"}, + {"FeatureName": "feature2", "ValueAsString": "5"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + "6.0": [ + {"FeatureName": "feature1", "ValueAsString": "6.0"}, + {"FeatureName": "feature2", "ValueAsString": "6"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + "7.0": [ + {"FeatureName": "feature1", "ValueAsString": "7.0"}, + {"FeatureName": "feature2", "ValueAsString": "7"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + "8.0": [ + {"FeatureName": "feature1", "ValueAsString": "8.0"}, + {"FeatureName": "feature2", "ValueAsString": "8"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + "9.0": [ + {"FeatureName": "feature1", "ValueAsString": "9.0"}, + {"FeatureName": "feature2", "ValueAsString": "9"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + } + + +@pytest.fixture +def get_record_results_for_data_frame_without_collection_type(): + return { + "0.0": [ + {"FeatureName": "feature1", "ValueAsString": "0.0"}, + {"FeatureName": "feature2", "ValueAsString": "0"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "0.0"}, + {"FeatureName": "feature5", "ValueAsString": "['a', 'abc']"}, + {"FeatureName": "feature6", "ValueAsString": "[1, 2]"}, + {"FeatureName": "feature7", "ValueAsString": "[1.1, 2.3]"}, + {"FeatureName": "feature8", "ValueAsString": "[1, 2]"}, + {"FeatureName": "feature9", "ValueAsString": "[1.1, 2.3]"}, + {"FeatureName": "feature10", "ValueAsString": "['a', 'abc']"}, + ], + "1.0": [ + {"FeatureName": "feature1", "ValueAsString": "1.0"}, + {"FeatureName": "feature2", "ValueAsString": "1"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "1.0"}, + {"FeatureName": "feature5", "ValueAsString": "['b', 'c']"}, + {"FeatureName": "feature6", "ValueAsString": "[1, 2, 3]"}, + {"FeatureName": "feature7", "ValueAsString": "[1.4, 2.5, 3.2, 25]"}, + {"FeatureName": "feature8", "ValueAsString": "[1, 2, 3]"}, + {"FeatureName": "feature9", "ValueAsString": "[1.4, 25, 3.2]"}, + {"FeatureName": "feature10", "ValueAsString": "['b', 'c']"}, + ], + "2.0": [ + {"FeatureName": "feature1", "ValueAsString": "2.0"}, + {"FeatureName": "feature2", "ValueAsString": "2"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "2.0"}, + {"FeatureName": "feature5", "ValueAsString": "['c', 'f']"}, + {"FeatureName": "feature6", "ValueAsString": "[1, 5]"}, + {"FeatureName": "feature7", "ValueAsString": "[1.0, 5.3]"}, + {"FeatureName": "feature8", "ValueAsString": "[1, 5]"}, + {"FeatureName": "feature9", "ValueAsString": "[1.0, 3, 4]"}, + {"FeatureName": "feature10", "ValueAsString": "['c', 'None']"}, + ], + "3.0": [ + {"FeatureName": "feature1", "ValueAsString": "3.0"}, + {"FeatureName": "feature2", "ValueAsString": "3"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "3.0"}, + {"FeatureName": "feature5", "ValueAsString": "['d']"}, + {"FeatureName": "feature6", "ValueAsString": "[1]"}, + {"FeatureName": "feature7", "ValueAsString": "[1.2]"}, + {"FeatureName": "feature8", "ValueAsString": "[1]"}, + {"FeatureName": "feature9", "ValueAsString": "[1.2]"}, + {"FeatureName": "feature10", "ValueAsString": "['d']"}, + ], + "4.0": [ + {"FeatureName": "feature1", "ValueAsString": "4.0"}, + {"FeatureName": "feature2", "ValueAsString": "4"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "4.0"}, + ], + "5.0": [ + {"FeatureName": "feature1", "ValueAsString": "5.0"}, + {"FeatureName": "feature2", "ValueAsString": "5"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + "6.0": [ + {"FeatureName": "feature1", "ValueAsString": "6.0"}, + {"FeatureName": "feature2", "ValueAsString": "6"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + "7.0": [ + {"FeatureName": "feature1", "ValueAsString": "7.0"}, + {"FeatureName": "feature2", "ValueAsString": "7"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + "8.0": [ + {"FeatureName": "feature1", "ValueAsString": "8.0"}, + {"FeatureName": "feature2", "ValueAsString": "8"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + "9.0": [ + {"FeatureName": "feature1", "ValueAsString": "9.0"}, + {"FeatureName": "feature2", "ValueAsString": "9"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + } + + @pytest.fixture def base_dataframe(): base_data = [ @@ -315,6 +574,285 @@ def test_create_feature_store( assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") +def test_create_feature_store_ingest_with_offline_target_stores( + feature_store_session, + role, + feature_group_name, + offline_store_s3_uri, + pandas_data_frame, + record, + create_table_ddl, + get_record_results_for_data_frame, +): + feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session) + feature_group.load_feature_definitions(data_frame=pandas_data_frame) + with cleanup_feature_group(feature_group): + output = feature_group.create( + s3_uri=offline_store_s3_uri, + record_identifier_name="feature1", + event_time_feature_name="feature3", + role_arn=role, + enable_online_store=True, + ) + _wait_for_feature_group_create(feature_group) + + resolved_output_s3_uri = ( + feature_group.describe() + .get("OfflineStoreConfig") + .get("S3StorageConfig") + .get("ResolvedOutputS3Uri") + ) + # Ingest data + feature_group.put_record(record=record) + ingestion_manager = feature_group.ingest( + data_frame=pandas_data_frame, + target_stores=[TargetStoreEnum.OFFLINE_STORE], + max_workers=3, + max_processes=2, + wait=False, + ) + ingestion_manager.wait() + assert 0 == len(ingestion_manager.failed_rows) + + for index, value in pandas_data_frame["feature1"].items(): + assert feature_group.get_record(record_identifier_value_as_string=str(value)) is None + + # Query the integrated Glue table. + athena_query = feature_group.athena_query() + df = DataFrame() + with timeout(minutes=10): + while df.shape[0] < 11: + athena_query.run( + query_string=f'SELECT * FROM "{athena_query.table_name}"', + output_location=f"{offline_store_s3_uri}/query_results", + ) + athena_query.wait() + assert "SUCCEEDED" == athena_query.get_query_execution().get("QueryExecution").get( + "Status" + ).get("State") + df = athena_query.as_dataframe() + print(f"Found {df.shape[0]} records.") + time.sleep(60) + + assert df.shape[0] == 11 + nans = pd.isna(df.loc[df["feature1"].isin([5, 6, 7, 8, 9])]["feature4"]) + for is_na in nans.items(): + assert is_na + assert ( + create_table_ddl.format( + feature_group_name=feature_group_name, + region=feature_store_session.boto_session.region_name, + account=feature_store_session.account_id(), + resolved_output_s3_uri=resolved_output_s3_uri, + ) + == feature_group.as_hive_ddl() + ) + assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") + + +def test_create_feature_store_ingest_with_online_offline_target_stores( + feature_store_session, + role, + feature_group_name, + offline_store_s3_uri, + pandas_data_frame, + record, + create_table_ddl, + get_record_results_for_data_frame, +): + feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session) + feature_group.load_feature_definitions(data_frame=pandas_data_frame) + with cleanup_feature_group(feature_group): + output = feature_group.create( + s3_uri=offline_store_s3_uri, + record_identifier_name="feature1", + event_time_feature_name="feature3", + role_arn=role, + enable_online_store=True, + ) + _wait_for_feature_group_create(feature_group) + + resolved_output_s3_uri = ( + feature_group.describe() + .get("OfflineStoreConfig") + .get("S3StorageConfig") + .get("ResolvedOutputS3Uri") + ) + # Ingest data + feature_group.put_record(record=record) + ingestion_manager = feature_group.ingest( + data_frame=pandas_data_frame, + target_stores=[TargetStoreEnum.ONLINE_STORE, TargetStoreEnum.OFFLINE_STORE], + max_workers=3, + max_processes=2, + wait=False, + ) + ingestion_manager.wait() + assert 0 == len(ingestion_manager.failed_rows) + + for index, value in pandas_data_frame["feature1"].items(): + assert ( + feature_group.get_record(record_identifier_value_as_string=str(value)) + == get_record_results_for_data_frame[str(value)] + ) + + # Query the integrated Glue table. + athena_query = feature_group.athena_query() + df = DataFrame() + with timeout(minutes=10): + while df.shape[0] < 11: + athena_query.run( + query_string=f'SELECT * FROM "{athena_query.table_name}"', + output_location=f"{offline_store_s3_uri}/query_results", + ) + athena_query.wait() + assert "SUCCEEDED" == athena_query.get_query_execution().get("QueryExecution").get( + "Status" + ).get("State") + df = athena_query.as_dataframe() + print(f"Found {df.shape[0]} records.") + time.sleep(60) + + assert df.shape[0] == 11 + nans = pd.isna(df.loc[df["feature1"].isin([5, 6, 7, 8, 9])]["feature4"]) + for is_na in nans.items(): + assert is_na + assert ( + create_table_ddl.format( + feature_group_name=feature_group_name, + region=feature_store_session.boto_session.region_name, + account=feature_store_session.account_id(), + resolved_output_s3_uri=resolved_output_s3_uri, + ) + == feature_group.as_hive_ddl() + ) + assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") + + +def test_create_feature_store_ingest_with_online_target_stores( + feature_store_session, + role, + feature_group_name, + offline_store_s3_uri, + pandas_data_frame, + record, + create_table_ddl, + get_record_results_for_data_frame, +): + feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session) + feature_group.load_feature_definitions(data_frame=pandas_data_frame) + with cleanup_feature_group(feature_group): + output = feature_group.create( + s3_uri=offline_store_s3_uri, + record_identifier_name="feature1", + event_time_feature_name="feature3", + role_arn=role, + enable_online_store=True, + ) + _wait_for_feature_group_create(feature_group) + + # Ingest data + ingestion_manager = feature_group.ingest( + data_frame=pandas_data_frame, + target_stores=[TargetStoreEnum.ONLINE_STORE], + max_workers=3, + max_processes=2, + wait=False, + ) + ingestion_manager.wait() + assert 0 == len(ingestion_manager.failed_rows) + + for index, value in pandas_data_frame["feature1"].items(): + assert ( + feature_group.get_record(record_identifier_value_as_string=str(value)) + == get_record_results_for_data_frame[str(value)] + ) + + feature_group.put_record( + record=[ + FeatureValue(feature_name="feature1", value_as_string="100.0"), + FeatureValue(feature_name="feature2", value_as_string="100"), + FeatureValue(feature_name="feature3", value_as_string="2020-10-30T03:43:21Z"), + FeatureValue(feature_name="feature4", value_as_string="100.0"), + ], + target_stores=[TargetStoreEnum.OFFLINE_STORE], + ) + assert feature_group.get_record(record_identifier_value_as_string="100.0") is None + + # Query the integrated Glue table. + athena_query = feature_group.athena_query() + df = DataFrame() + with timeout(minutes=10): + while df.shape[0] < 1: + athena_query.run( + query_string=f'SELECT * FROM "{athena_query.table_name}"', + output_location=f"{offline_store_s3_uri}/query_results", + ) + athena_query.wait() + assert "SUCCEEDED" == athena_query.get_query_execution().get("QueryExecution").get( + "Status" + ).get("State") + df = athena_query.as_dataframe() + print(f"Found {df.shape[0]} records.") + time.sleep(60) + + assert df.shape[0] == 1 + assert df.loc[0, "feature1"] == 100.0 + assert df.loc[0, "feature2"] == 100 + assert df.loc[0, "feature3"] == "2020-10-30T03:43:21Z" + assert df.loc[0, "feature4"] == 100.0 + assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") + + +def test_put_record_with_target_stores( + feature_store_session, + role, + feature_group_name, + offline_store_s3_uri, + pandas_data_frame, + record, + create_table_ddl, + get_record_results_for_data_frame, +): + feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session) + feature_group.load_feature_definitions(data_frame=pandas_data_frame) + with cleanup_feature_group(feature_group): + feature_group.create( + s3_uri=offline_store_s3_uri, + record_identifier_name="feature1", + event_time_feature_name="feature3", + role_arn=role, + enable_online_store=True, + ) + _wait_for_feature_group_create(feature_group) + feature_group.put_record( + record=[ + FeatureValue(feature_name="feature1", value_as_string="100.0"), + FeatureValue(feature_name="feature2", value_as_string="100"), + FeatureValue(feature_name="feature3", value_as_string="2020-10-30T03:43:21Z"), + FeatureValue(feature_name="feature4", value_as_string="100.0"), + ], + target_stores=[TargetStoreEnum.OFFLINE_STORE], + ) + assert feature_group.get_record(record_identifier_value_as_string="100.0") is None + + feature_group.put_record( + record=[ + FeatureValue(feature_name="feature1", value_as_string="100.0"), + FeatureValue(feature_name="feature2", value_as_string="100"), + FeatureValue(feature_name="feature3", value_as_string="2020-10-30T03:43:21Z"), + FeatureValue(feature_name="feature4", value_as_string="100.0"), + ], + target_stores=[TargetStoreEnum.ONLINE_STORE], + ) + assert feature_group.get_record(record_identifier_value_as_string="100.0") == [ + {"FeatureName": "feature1", "ValueAsString": "100.0"}, + {"FeatureName": "feature2", "ValueAsString": "100"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "100.0"}, + ] + + def test_create_feature_group_iceberg_table_format( feature_store_session, role, @@ -1630,6 +2168,202 @@ def test_get_feature_group_with_session( assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") +def test_ingest_in_memory_multi_process_with_collection_types( + feature_store_session, + role, + feature_group_name, + offline_store_s3_uri, + pandas_data_frame_with_collection_type, + get_record_results_for_data_frame_with_collection_type, +): + feature_group = FeatureGroup(feature_group_name, sagemaker_session=feature_store_session) + feature_group.load_feature_definitions( + data_frame=pandas_data_frame_with_collection_type, + online_storage_type=OnlineStoreStorageTypeEnum.IN_MEMORY, + ) + + with cleanup_feature_group(feature_group): + output = feature_group.create( + record_identifier_name="feature1", + event_time_feature_name="feature3", + role_arn=role, + enable_online_store=True, + online_store_storage_type=OnlineStoreStorageTypeEnum.IN_MEMORY, + s3_uri=False, + ) + _wait_for_feature_group_create(feature_group) + + ingestion_manager = feature_group.ingest( + data_frame=pandas_data_frame_with_collection_type, + max_workers=3, + max_processes=2, + wait=True, + ) + ingestion_manager.wait() + assert 0 == len(ingestion_manager.failed_rows) + assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") + for index, value in pandas_data_frame_with_collection_type["feature1"].items(): + assert ( + feature_group.get_record(record_identifier_value_as_string=str(value)) + == get_record_results_for_data_frame_with_collection_type[str(value)] + ) + + new_row_data = [ + 10.0, + 10, + "2020-10-30T03:43:21Z", + 5.0, + ["a", "b"], + [1, 2, None], + [3.0, 4.0], + [1, 2], + [3.0, 4.0], + ["a", "b"], + ] + pandas_data_frame_with_collection_type.loc[ + len(pandas_data_frame_with_collection_type) + ] = new_row_data + with pytest.raises(IngestionError): + feature_group.ingest( + data_frame=pandas_data_frame_with_collection_type, + max_workers=1, + max_processes=1, + wait=True, + ) + + +def test_ingest_in_memory_single_process_with_collection_types( + feature_store_session, + role, + feature_group_name, + offline_store_s3_uri, + pandas_data_frame_with_collection_type, + get_record_results_for_data_frame_with_collection_type, +): + feature_group = FeatureGroup(feature_group_name, sagemaker_session=feature_store_session) + feature_group.load_feature_definitions( + data_frame=pandas_data_frame_with_collection_type, + online_storage_type=OnlineStoreStorageTypeEnum.IN_MEMORY, + ) + + with cleanup_feature_group(feature_group): + output = feature_group.create( + record_identifier_name="feature1", + event_time_feature_name="feature3", + role_arn=role, + enable_online_store=True, + online_store_storage_type=OnlineStoreStorageTypeEnum.IN_MEMORY, + s3_uri=False, + ) + _wait_for_feature_group_create(feature_group) + + ingestion_manager = feature_group.ingest( + data_frame=pandas_data_frame_with_collection_type, + max_workers=1, + max_processes=1, + wait=True, + ) + assert 0 == len(ingestion_manager.failed_rows) + assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") + for index, value in pandas_data_frame_with_collection_type["feature1"].items(): + assert ( + feature_group.get_record(record_identifier_value_as_string=str(value)) + == get_record_results_for_data_frame_with_collection_type[str(value)] + ) + + new_row_data = [ + 10.0, + 10, + "2020-10-30T03:43:21Z", + 5.0, + ["a", "b"], + [1, 2, None], + [3.0, 4.0], + [1, 2], + [3.0, 4.0], + ["a", "b"], + ] + pandas_data_frame_with_collection_type.loc[ + len(pandas_data_frame_with_collection_type) + ] = new_row_data + with pytest.raises(IngestionError): + feature_group.ingest( + data_frame=pandas_data_frame_with_collection_type, + max_workers=1, + max_processes=1, + wait=True, + ) + + +def test_ingest_standard_multi_process_with_collection_types( + feature_store_session, + role, + feature_group_name, + offline_store_s3_uri, + pandas_data_frame_with_collection_type, + get_record_results_for_data_frame_without_collection_type, +): + feature_group = FeatureGroup(feature_group_name, sagemaker_session=feature_store_session) + feature_group.load_feature_definitions( + data_frame=pandas_data_frame_with_collection_type, + online_storage_type=OnlineStoreStorageTypeEnum.STANDARD, + ) + + with cleanup_feature_group(feature_group): + output = feature_group.create( + record_identifier_name="feature1", + event_time_feature_name="feature3", + role_arn=role, + enable_online_store=True, + online_store_storage_type=OnlineStoreStorageTypeEnum.STANDARD, + s3_uri=False, + ) + _wait_for_feature_group_create(feature_group) + + new_row_data = [ + 10.0, + 10, + "2020-10-30T03:43:21Z", + 5.0, + ["a", "b"], + [1, 2, None], + [3.0, 4.0], + [1, 2], + [3.0, 4.0], + ["a", "b"], + ] + pandas_data_frame_with_collection_type.loc[ + len(pandas_data_frame_with_collection_type) + ] = new_row_data + + ingestion_manager = feature_group.ingest( + data_frame=pandas_data_frame_with_collection_type, + max_workers=3, + max_processes=2, + wait=True, + ) + ingestion_manager.wait() + assert 0 == len(ingestion_manager.failed_rows) + get_record_results_for_data_frame_without_collection_type["10.0"] = [ + {"FeatureName": "feature1", "ValueAsString": "10.0"}, + {"FeatureName": "feature2", "ValueAsString": "10"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "5.0"}, + {"FeatureName": "feature5", "ValueAsString": "['a', 'b']"}, + {"FeatureName": "feature6", "ValueAsString": "[1, 2, None]"}, + {"FeatureName": "feature7", "ValueAsString": "[3.0, 4.0]"}, + {"FeatureName": "feature8", "ValueAsString": "[1, 2]"}, + {"FeatureName": "feature9", "ValueAsString": "[3.0, 4.0]"}, + {"FeatureName": "feature10", "ValueAsString": "['a', 'b']"}, + ] + assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") + for index, value in pandas_data_frame_with_collection_type["feature1"].items(): + assert ( + feature_group.get_record(record_identifier_value_as_string=str(value)) + == get_record_results_for_data_frame_without_collection_type[str(value)] + ) + + @contextmanager def cleanup_feature_group(feature_group: FeatureGroup): try: diff --git a/tests/unit/sagemaker/feature_store/test_feature_group.py b/tests/unit/sagemaker/feature_store/test_feature_group.py index 394ecb25b3..5155a802d4 100644 --- a/tests/unit/sagemaker/feature_store/test_feature_group.py +++ b/tests/unit/sagemaker/feature_store/test_feature_group.py @@ -13,10 +13,10 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import - import pandas as pd +import numpy as np import pytest -from mock import Mock, patch, MagicMock +from mock import Mock, patch, MagicMock, call from botocore.exceptions import ProfileNotFound from sagemaker.feature_store.feature_definition import ( @@ -27,6 +27,7 @@ VectorCollectionType, SetCollectionType, ListCollectionType, + FeatureDefinition, ) from sagemaker.feature_store.feature_group import ( FeatureGroup, @@ -43,6 +44,7 @@ ThroughputModeEnum, ThroughputConfig, ThroughputConfigUpdate, + TargetStoreEnum, ) from tests.unit import SAGEMAKER_CONFIG_FEATURE_GROUP @@ -84,6 +86,163 @@ def feature_group_dummy_definitions(): ] +@pytest.fixture +def feature_group_describe_dummy_definitions(): + return [ + {"FeatureName": "feature1", "FeatureType": "Fractional"}, + {"FeatureName": "feature2", "FeatureType": "Integral"}, + {"FeatureName": "feature3", "FeatureType": "String"}, + ] + + +@pytest.fixture +def feature_group_dummy_definition_dict(): + return { + "feature1": {"FeatureName": "feature1", "FeatureType": "Fractional"}, + "feature2": {"FeatureName": "feature2", "FeatureType": "Integral"}, + "feature3": {"FeatureName": "feature3", "FeatureType": "String"}, + } + + +@pytest.fixture +def data_frame_with_collection_type(): + df = pd.DataFrame( + { + "feature1": pd.Series(np.arange(10.0), dtype="float64"), + "feature2": pd.Series(np.arange(10), dtype="int64"), + "feature3": pd.Series(["2020-10-30T03:43:21Z"] * 10, dtype="string"), + "feature4": pd.Series(np.arange(5.0), dtype="float64"), # contains nan + "feature5": pd.Series( + [["a", "abc"], ["b", "c"], ["c", "f"], ["d"], []], dtype="object" + ), + "feature6": pd.Series([[1, 2], [1, 2, 3], [1, 5], [1], []], dtype="object"), + "feature7": pd.Series( + [[1.1, 2.3], [1.4, 2.5, 3.2, 25], [1.0, 5.3], [1.2], []], dtype="object" + ), + "feature8": pd.Series([[1, 2], [1, 2, None], [1, 5], [1], [], [None]], dtype="object"), + "feature9": pd.Series( + [[1.1, 2.3], [1.4, 25, 3.2], [1.0, 3, None], [1.2], [], [None]], dtype="object" + ), + "feature10": pd.Series( + [["a", "abc"], ["b", "c"], ["c", None], ["d"], [], [None]], dtype="object" + ), + } + ) + return df + + +@pytest.fixture +def expected_standard_feature_definitions(): + return [ + FeatureDefinition(feature_name="feature1", feature_type=FeatureTypeEnum.FRACTIONAL), + FeatureDefinition(feature_name="feature2", feature_type=FeatureTypeEnum.INTEGRAL), + FeatureDefinition(feature_name="feature3", feature_type=FeatureTypeEnum.STRING), + FeatureDefinition(feature_name="feature4", feature_type=FeatureTypeEnum.FRACTIONAL), + FeatureDefinition(feature_name="feature5", feature_type=FeatureTypeEnum.STRING), + FeatureDefinition(feature_name="feature6", feature_type=FeatureTypeEnum.STRING), + FeatureDefinition(feature_name="feature7", feature_type=FeatureTypeEnum.STRING), + FeatureDefinition(feature_name="feature8", feature_type=FeatureTypeEnum.STRING), + FeatureDefinition(feature_name="feature9", feature_type=FeatureTypeEnum.STRING), + FeatureDefinition(feature_name="feature10", feature_type=FeatureTypeEnum.STRING), + ] + + +@pytest.fixture +def expected_standard_feature_definition_dict(): + return { + "feature1": {"FeatureName": "feature1", "FeatureType": "Fractional"}, + "feature2": {"FeatureName": "feature2", "FeatureType": "Integral"}, + "feature3": {"FeatureName": "feature3", "FeatureType": "String"}, + "feature4": { + "FeatureName": "feature4", + "FeatureType": "Fractional", + "CollectionType": None, + }, + "feature5": {"FeatureName": "feature5", "FeatureType": "String"}, + "feature6": {"FeatureName": "feature6", "FeatureType": "Integral"}, + "feature7": {"FeatureName": "feature7", "FeatureType": "Fractional"}, + "feature8": {"FeatureName": "feature8", "FeatureType": "Integral"}, + "feature9": {"FeatureName": "feature9", "FeatureType": "Fractional"}, + "feature10": {"FeatureName": "feature10", "FeatureType": "String"}, + } + + +@pytest.fixture +def expected_in_memory_feature_definitions(): + return [ + FeatureDefinition(feature_name="feature1", feature_type=FeatureTypeEnum.FRACTIONAL), + FeatureDefinition(feature_name="feature2", feature_type=FeatureTypeEnum.INTEGRAL), + FeatureDefinition(feature_name="feature3", feature_type=FeatureTypeEnum.STRING), + FeatureDefinition(feature_name="feature4", feature_type=FeatureTypeEnum.FRACTIONAL), + FeatureDefinition( + feature_name="feature5", + feature_type=FeatureTypeEnum.STRING, + collection_type=ListCollectionType(), + ), + FeatureDefinition( + feature_name="feature6", + feature_type=FeatureTypeEnum.INTEGRAL, + collection_type=ListCollectionType(), + ), + FeatureDefinition( + feature_name="feature7", + feature_type=FeatureTypeEnum.FRACTIONAL, + collection_type=ListCollectionType(), + ), + FeatureDefinition( + feature_name="feature8", + feature_type=FeatureTypeEnum.INTEGRAL, + collection_type=ListCollectionType(), + ), + FeatureDefinition( + feature_name="feature9", + feature_type=FeatureTypeEnum.FRACTIONAL, + collection_type=ListCollectionType(), + ), + FeatureDefinition( + feature_name="feature10", + feature_type=FeatureTypeEnum.STRING, + collection_type=ListCollectionType(), + ), + ] + + +@pytest.fixture +def expected_in_memory_feature_definition_dict(): + return { + "feature1": {"FeatureName": "feature1", "FeatureType": "Fractional"}, + "feature2": {"FeatureName": "feature2", "FeatureType": "Integral"}, + "feature3": {"FeatureName": "feature3", "FeatureType": "String"}, + "feature4": {"FeatureName": "feature4", "FeatureType": "Fractional"}, + "feature5": {"FeatureName": "feature5", "FeatureType": "String", "CollectionType": "List"}, + "feature6": { + "FeatureName": "feature6", + "FeatureType": "Integral", + "CollectionType": "List", + }, + "feature7": { + "FeatureName": "feature7", + "FeatureType": "Fractional", + "CollectionType": "List", + }, + "feature8": { + "FeatureName": "feature8", + "FeatureType": "Integral", + "CollectionType": "List", + }, + "feature9": { + "FeatureName": "feature9", + "FeatureType": "Fractional", + "CollectionType": "List", + }, + "feature10": { + "FeatureName": "feature10", + "FeatureType": "String", + "CollectionType": "List", + }, + } + + @pytest.fixture def create_table_ddl(): return ( @@ -120,7 +279,6 @@ def test_feature_group_create_without_role( def test_feature_store_create_with_config_injection( sagemaker_session, role_arn, feature_group_dummy_definitions, s3_uri ): - sagemaker_session.sagemaker_config = SAGEMAKER_CONFIG_FEATURE_GROUP sagemaker_session.create_feature_group = Mock() @@ -161,6 +319,49 @@ def test_feature_store_create_with_config_injection( ) +def test_feature_group_load_definition( + sagemaker_session_mock, + data_frame_with_collection_type, + expected_standard_feature_definitions, + expected_in_memory_feature_definitions, +): + feature_group = FeatureGroup(name="MyFeatureGroup", sagemaker_session=sagemaker_session_mock) + + feature_group.load_feature_definitions(data_frame=data_frame_with_collection_type) + assert feature_group.feature_definitions == expected_standard_feature_definitions + + feature_group.load_feature_definitions( + data_frame=data_frame_with_collection_type, + online_storage_type=OnlineStoreStorageTypeEnum.STANDARD, + ) + assert feature_group.feature_definitions == expected_standard_feature_definitions + + feature_group.load_feature_definitions( + data_frame=data_frame_with_collection_type, + online_storage_type=OnlineStoreStorageTypeEnum.IN_MEMORY, + ) + assert feature_group.feature_definitions == expected_in_memory_feature_definitions + + data_frame_with_collection_type["feature11"] = pd.Series( + [[1.1, "2.3"], [1.4, 2.5, 3.2, 25], [1.0, 5.3], [1.2], []], dtype="object" + ) + + feature_group.load_feature_definitions( + data_frame=data_frame_with_collection_type, + online_storage_type=OnlineStoreStorageTypeEnum.STANDARD, + ) + expected_standard_feature_definitions.append( + FeatureDefinition(feature_name="feature11", feature_type=FeatureTypeEnum.STRING) + ) + assert feature_group.feature_definitions == expected_standard_feature_definitions + + with pytest.raises(ValueError): + feature_group.load_feature_definitions( + data_frame=data_frame_with_collection_type, + online_storage_type=OnlineStoreStorageTypeEnum.IN_MEMORY, + ) + + def test_feature_store_create( sagemaker_session_mock, role_arn, feature_group_dummy_definitions, s3_uri ): @@ -491,7 +692,7 @@ def test_put_record(sagemaker_session_mock): feature_group = FeatureGroup(name="MyFeatureGroup", sagemaker_session=sagemaker_session_mock) feature_group.put_record(record=[]) sagemaker_session_mock.put_record.assert_called_with( - feature_group_name="MyFeatureGroup", record=[] + feature_group_name="MyFeatureGroup", record=[], target_stores=None, ttl_duration=None ) @@ -502,6 +703,23 @@ def test_put_record_ttl_duration(sagemaker_session_mock): sagemaker_session_mock.put_record.assert_called_with( feature_group_name="MyFeatureGroup", record=[], + target_stores=None, + ttl_duration=ttl_duration.to_dict(), + ) + + +def test_put_record_target_stores(sagemaker_session_mock): + feature_group = FeatureGroup(name="MyFeatureGroup", sagemaker_session=sagemaker_session_mock) + ttl_duration = TtlDuration(unit="Minutes", value=123) + feature_group.put_record( + record=[], + target_stores=[TargetStoreEnum.ONLINE_STORE, TargetStoreEnum.OFFLINE_STORE], + ttl_duration=ttl_duration, + ) + sagemaker_session_mock.put_record.assert_called_with( + feature_group_name="MyFeatureGroup", + record=[], + target_stores=[TargetStoreEnum.ONLINE_STORE.value, TargetStoreEnum.OFFLINE_STORE.value], ttl_duration=ttl_duration.to_dict(), ) @@ -628,10 +846,19 @@ def test_ingest_zero_workers(): @patch("sagemaker.feature_store.feature_group.IngestionManagerPandas") -def test_ingest(ingestion_manager_init, sagemaker_session_mock, fs_runtime_client_config_mock): +def test_ingest( + ingestion_manager_init, + sagemaker_session_mock, + fs_runtime_client_config_mock, + feature_group_describe_dummy_definitions, + feature_group_dummy_definition_dict, +): sagemaker_session_mock.sagemaker_featurestore_runtime_client.meta.config = ( fs_runtime_client_config_mock ) + sagemaker_session_mock.describe_feature_group.return_value = { + "FeatureDefinitions": feature_group_describe_dummy_definitions + } feature_group = FeatureGroup(name="MyGroup", sagemaker_session=sagemaker_session_mock) df = pd.DataFrame(dict((f"float{i}", pd.Series([2.0], dtype="float64")) for i in range(300))) @@ -642,6 +869,7 @@ def test_ingest(ingestion_manager_init, sagemaker_session_mock, fs_runtime_clien ingestion_manager_init.assert_called_once_with( feature_group_name="MyGroup", + feature_definitions=feature_group_dummy_definition_dict, sagemaker_session=sagemaker_session_mock, sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock, max_workers=10, @@ -649,16 +877,25 @@ def test_ingest(ingestion_manager_init, sagemaker_session_mock, fs_runtime_clien profile_name=sagemaker_session_mock.boto_session.profile_name, ) mock_ingestion_manager_instance.run.assert_called_once_with( - data_frame=df, wait=True, timeout=None + data_frame=df, target_stores=None, wait=True, timeout=None ) @patch("sagemaker.feature_store.feature_group.IngestionManagerPandas") -def test_ingest_default(ingestion_manager_init, sagemaker_session_mock): +def test_ingest_default( + ingestion_manager_init, + sagemaker_session_mock, + fs_runtime_client_config_mock, + feature_group_describe_dummy_definitions, + feature_group_dummy_definition_dict, +): sagemaker_session_mock.sagemaker_featurestore_runtime_client.meta.config = ( fs_runtime_client_config_mock ) sagemaker_session_mock.boto_session.profile_name = "default" + sagemaker_session_mock.describe_feature_group.return_value = { + "FeatureDefinitions": feature_group_describe_dummy_definitions + } feature_group = FeatureGroup(name="MyGroup", sagemaker_session=sagemaker_session_mock) df = pd.DataFrame(dict((f"float{i}", pd.Series([2.0], dtype="float64")) for i in range(300))) @@ -669,6 +906,7 @@ def test_ingest_default(ingestion_manager_init, sagemaker_session_mock): ingestion_manager_init.assert_called_once_with( feature_group_name="MyGroup", + feature_definitions=feature_group_dummy_definition_dict, sagemaker_session=sagemaker_session_mock, sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock, max_workers=1, @@ -676,17 +914,103 @@ def test_ingest_default(ingestion_manager_init, sagemaker_session_mock): profile_name=None, ) mock_ingestion_manager_instance.run.assert_called_once_with( - data_frame=df, wait=True, timeout=None + data_frame=df, target_stores=None, wait=True, timeout=None ) +@patch("sagemaker.feature_store.feature_group.IngestionManagerPandas") +def test_ingest_with_target_stores( + ingestion_manager_init, + sagemaker_session_mock, + fs_runtime_client_config_mock, + feature_group_describe_dummy_definitions, + feature_group_dummy_definition_dict, +): + sagemaker_session_mock.sagemaker_featurestore_runtime_client.meta.config = ( + fs_runtime_client_config_mock + ) + sagemaker_session_mock.describe_feature_group.return_value = { + "FeatureDefinitions": feature_group_describe_dummy_definitions + } + + feature_group = FeatureGroup(name="MyGroup", sagemaker_session=sagemaker_session_mock) + df = pd.DataFrame(dict((f"float{i}", pd.Series([2.0], dtype="float64")) for i in range(300))) + + mock_ingestion_manager_instance = Mock() + ingestion_manager_init.return_value = mock_ingestion_manager_instance + feature_group.ingest( + data_frame=df, max_workers=10, target_stores=[TargetStoreEnum.ONLINE_STORE] + ) + feature_group.ingest( + data_frame=df, max_workers=10, target_stores=[TargetStoreEnum.OFFLINE_STORE] + ) + feature_group.ingest( + data_frame=df, + max_workers=10, + target_stores=[TargetStoreEnum.ONLINE_STORE, TargetStoreEnum.OFFLINE_STORE], + ) + + actual_ingestion_manager_init_calls = ingestion_manager_init.mock_calls + expected_ingestion_manager_init_calls = [ + call( + feature_group_name="MyGroup", + feature_definitions=feature_group_dummy_definition_dict, + sagemaker_session=sagemaker_session_mock, + sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock, + max_workers=10, + max_processes=1, + profile_name=sagemaker_session_mock.boto_session.profile_name, + ), + call().run( + data_frame=df, target_stores=[TargetStoreEnum.ONLINE_STORE], wait=True, timeout=None + ), + call( + feature_group_name="MyGroup", + feature_definitions=feature_group_dummy_definition_dict, + sagemaker_session=sagemaker_session_mock, + sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock, + max_workers=10, + max_processes=1, + profile_name=sagemaker_session_mock.boto_session.profile_name, + ), + call().run( + data_frame=df, target_stores=[TargetStoreEnum.OFFLINE_STORE], wait=True, timeout=None + ), + call( + feature_group_name="MyGroup", + feature_definitions=feature_group_dummy_definition_dict, + sagemaker_session=sagemaker_session_mock, + sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock, + max_workers=10, + max_processes=1, + profile_name=sagemaker_session_mock.boto_session.profile_name, + ), + call().run( + data_frame=df, + target_stores=[TargetStoreEnum.ONLINE_STORE, TargetStoreEnum.OFFLINE_STORE], + wait=True, + timeout=None, + ), + ] + assert ( + actual_ingestion_manager_init_calls == expected_ingestion_manager_init_calls + ), f"Expected {expected_ingestion_manager_init_calls} calls, but got {actual_ingestion_manager_init_calls}" + + @patch("sagemaker.feature_store.feature_group.IngestionManagerPandas") def test_ingest_with_profile_name( - ingestion_manager_init, sagemaker_session_mock, fs_runtime_client_config_mock + ingestion_manager_init, + sagemaker_session_mock, + fs_runtime_client_config_mock, + feature_group_describe_dummy_definitions, + feature_group_dummy_definition_dict, ): sagemaker_session_mock.sagemaker_featurestore_runtime_client.meta.config = ( fs_runtime_client_config_mock ) + sagemaker_session_mock.describe_feature_group.return_value = { + "FeatureDefinitions": feature_group_describe_dummy_definitions + } feature_group = FeatureGroup(name="MyGroup", sagemaker_session=sagemaker_session_mock) df = pd.DataFrame(dict((f"float{i}", pd.Series([2.0], dtype="float64")) for i in range(300))) @@ -697,6 +1021,7 @@ def test_ingest_with_profile_name( ingestion_manager_init.assert_called_once_with( feature_group_name="MyGroup", + feature_definitions=feature_group_dummy_definition_dict, sagemaker_session=sagemaker_session_mock, sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock, max_workers=10, @@ -704,7 +1029,7 @@ def test_ingest_with_profile_name( profile_name="profile_name", ) mock_ingestion_manager_instance.run.assert_called_once_with( - data_frame=df, wait=True, timeout=None + data_frame=df, target_stores=None, wait=True, timeout=None ) @@ -763,17 +1088,20 @@ def test_as_hive_ddl(create_table_ddl, feature_group_dummy_definitions, sagemake "sagemaker.feature_store.feature_group.IngestionManagerPandas._run_multi_process", MagicMock(), ) -def test_ingestion_manager_run_success(): +def test_ingestion_manager__run_multi_process_success(): df = pd.DataFrame({"float": pd.Series([2.0], dtype="float64")}) manager = IngestionManagerPandas( feature_group_name="MyGroup", + feature_definitions=feature_group_dummy_definition_dict, sagemaker_session=sagemaker_session_mock, sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock, max_workers=10, ) manager.run(df) - manager._run_multi_process.assert_called_once_with(data_frame=df, wait=True, timeout=None) + manager._run_multi_process.assert_called_once_with( + data_frame=df, target_stores=None, wait=True, timeout=None + ) @patch( @@ -786,6 +1114,7 @@ def test_ingestion_manager_run_multi_process_with_multi_thread_success( df = pd.DataFrame({"float": pd.Series([2.0], dtype="float64")}) manager = IngestionManagerPandas( feature_group_name="MyGroup", + feature_definitions=feature_group_dummy_definition_dict, sagemaker_session=sagemaker_session_mock, sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock, max_workers=2, @@ -802,6 +1131,7 @@ def test_ingestion_manager_run_failure(): df = pd.DataFrame({"float": pd.Series([2.0], dtype="float64")}) manager = IngestionManagerPandas( feature_group_name="MyGroup", + feature_definitions=feature_group_dummy_definition_dict, sagemaker_session=sagemaker_session_mock, sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock, max_workers=2, @@ -815,6 +1145,353 @@ def test_ingestion_manager_run_failure(): assert manager.failed_rows == [1, 1] +@patch( + "sagemaker.feature_store.feature_group.IngestionManagerPandas._ingest_row", + MagicMock(return_value=[1]), +) +def test_ingestion_manager_run_success(sagemaker_session_mock, fs_runtime_client_config_mock): + sagemaker_session_mock.sagemaker_featurestore_runtime_client = fs_runtime_client_config_mock + df = pd.DataFrame({"float": pd.Series([2.0], dtype="float64")}) + manager = IngestionManagerPandas( + feature_group_name="MyGroup", + feature_definitions=feature_group_dummy_definition_dict, + sagemaker_session=sagemaker_session_mock, + sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock, + ) + + manager.run(df) + for row in df.itertuples(): + manager._ingest_row.assert_called_with( + data_frame=df, + target_stores=None, + row=row, + feature_group_name="MyGroup", + feature_definitions=feature_group_dummy_definition_dict, + sagemaker_fs_runtime_client=fs_runtime_client_config_mock, + failed_rows=[], + ) + + expected_invocation_count = 1 # Set your expected count + actual_invocation_count = len(manager._ingest_row.mock_calls) + assert ( + actual_invocation_count == expected_invocation_count + ), f"Expected {expected_invocation_count} calls, but got {actual_invocation_count}" + + +def test_ingestion_manager_run_standard( + sagemaker_session_mock, fs_runtime_client_config_mock, feature_group_dummy_definition_dict +): + sagemaker_session_mock.sagemaker_featurestore_runtime_client = fs_runtime_client_config_mock + df = pd.DataFrame(data={"feature1": [2.0, 3.0], "feature2": [3, 4], "feature3": ["abc", "edf"]}) + + manager = IngestionManagerPandas( + feature_group_name="MyGroup", + feature_definitions=feature_group_dummy_definition_dict, + sagemaker_session=sagemaker_session_mock, + sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock, + ) + + manager.run(df) + + actual_put_record_calls = fs_runtime_client_config_mock.put_record.mock_calls + expected_put_record_calls = [ + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "2.0"}, + {"FeatureName": "feature2", "ValueAsString": "3"}, + {"FeatureName": "feature3", "ValueAsString": "abc"}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "3.0"}, + {"FeatureName": "feature2", "ValueAsString": "4"}, + {"FeatureName": "feature3", "ValueAsString": "edf"}, + ], + ), + ] + assert ( + actual_put_record_calls == expected_put_record_calls + ), f"Expected {expected_put_record_calls} calls, but got {actual_put_record_calls}" + + +def test_ingestion_manager_run_non_collection_type( + sagemaker_session_mock, + fs_runtime_client_config_mock, + feature_group_dummy_definition_dict, + data_frame_with_collection_type, + expected_standard_feature_definition_dict, +): + sagemaker_session_mock.sagemaker_featurestore_runtime_client = fs_runtime_client_config_mock + manager = IngestionManagerPandas( + feature_group_name="MyGroup", + feature_definitions=expected_standard_feature_definition_dict, + sagemaker_session=sagemaker_session_mock, + sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock, + ) + + manager.run(data_frame_with_collection_type) + + actual_put_record_calls = fs_runtime_client_config_mock.put_record.mock_calls + expected_put_record_calls = [ + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "0.0"}, + {"FeatureName": "feature2", "ValueAsString": "0"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "0.0"}, + {"FeatureName": "feature5", "ValueAsString": "['a', 'abc']"}, + {"FeatureName": "feature6", "ValueAsString": "[1, 2]"}, + {"FeatureName": "feature7", "ValueAsString": "[1.1, 2.3]"}, + {"FeatureName": "feature8", "ValueAsString": "[1, 2]"}, + {"FeatureName": "feature9", "ValueAsString": "[1.1, 2.3]"}, + {"FeatureName": "feature10", "ValueAsString": "['a', 'abc']"}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "1.0"}, + {"FeatureName": "feature2", "ValueAsString": "1"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "1.0"}, + {"FeatureName": "feature5", "ValueAsString": "['b', 'c']"}, + {"FeatureName": "feature6", "ValueAsString": "[1, 2, 3]"}, + {"FeatureName": "feature7", "ValueAsString": "[1.4, 2.5, 3.2, 25]"}, + {"FeatureName": "feature8", "ValueAsString": "[1, 2, None]"}, + {"FeatureName": "feature9", "ValueAsString": "[1.4, 25, 3.2]"}, + {"FeatureName": "feature10", "ValueAsString": "['b', 'c']"}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "2.0"}, + {"FeatureName": "feature2", "ValueAsString": "2"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "2.0"}, + {"FeatureName": "feature5", "ValueAsString": "['c', 'f']"}, + {"FeatureName": "feature6", "ValueAsString": "[1, 5]"}, + {"FeatureName": "feature7", "ValueAsString": "[1.0, 5.3]"}, + {"FeatureName": "feature8", "ValueAsString": "[1, 5]"}, + {"FeatureName": "feature9", "ValueAsString": "[1.0, 3, None]"}, + {"FeatureName": "feature10", "ValueAsString": "['c', None]"}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "3.0"}, + {"FeatureName": "feature2", "ValueAsString": "3"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "3.0"}, + {"FeatureName": "feature5", "ValueAsString": "['d']"}, + {"FeatureName": "feature6", "ValueAsString": "[1]"}, + {"FeatureName": "feature7", "ValueAsString": "[1.2]"}, + {"FeatureName": "feature8", "ValueAsString": "[1]"}, + {"FeatureName": "feature9", "ValueAsString": "[1.2]"}, + {"FeatureName": "feature10", "ValueAsString": "['d']"}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "4.0"}, + {"FeatureName": "feature2", "ValueAsString": "4"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "4.0"}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "5.0"}, + {"FeatureName": "feature2", "ValueAsString": "5"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature8", "ValueAsString": "[None]"}, + {"FeatureName": "feature9", "ValueAsString": "[None]"}, + {"FeatureName": "feature10", "ValueAsString": "[None]"}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "6.0"}, + {"FeatureName": "feature2", "ValueAsString": "6"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "7.0"}, + {"FeatureName": "feature2", "ValueAsString": "7"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "8.0"}, + {"FeatureName": "feature2", "ValueAsString": "8"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "9.0"}, + {"FeatureName": "feature2", "ValueAsString": "9"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + ), + ] + assert ( + actual_put_record_calls == expected_put_record_calls + ), f"Expected {expected_put_record_calls} calls, but got {actual_put_record_calls}" + + +def test_ingestion_manager_run_collection_type( + sagemaker_session_mock, + fs_runtime_client_config_mock, + feature_group_dummy_definition_dict, + data_frame_with_collection_type, + expected_in_memory_feature_definition_dict, +): + sagemaker_session_mock.sagemaker_featurestore_runtime_client = fs_runtime_client_config_mock + + manager = IngestionManagerPandas( + feature_group_name="MyGroup", + feature_definitions=expected_in_memory_feature_definition_dict, + sagemaker_session=sagemaker_session_mock, + sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock, + ) + + manager.run(data_frame_with_collection_type) + + actual_put_record_calls = fs_runtime_client_config_mock.put_record.mock_calls + expected_put_record_calls = [ + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "0.0"}, + {"FeatureName": "feature2", "ValueAsString": "0"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "0.0"}, + {"FeatureName": "feature5", "ValueAsStringList": ["a", "abc"]}, + {"FeatureName": "feature6", "ValueAsStringList": ["1", "2"]}, + {"FeatureName": "feature7", "ValueAsStringList": ["1.1", "2.3"]}, + {"FeatureName": "feature8", "ValueAsStringList": ["1", "2"]}, + {"FeatureName": "feature9", "ValueAsStringList": ["1.1", "2.3"]}, + {"FeatureName": "feature10", "ValueAsStringList": ["a", "abc"]}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "1.0"}, + {"FeatureName": "feature2", "ValueAsString": "1"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "1.0"}, + {"FeatureName": "feature5", "ValueAsStringList": ["b", "c"]}, + {"FeatureName": "feature6", "ValueAsStringList": ["1", "2", "3"]}, + {"FeatureName": "feature7", "ValueAsStringList": ["1.4", "2.5", "3.2", "25"]}, + {"FeatureName": "feature8", "ValueAsStringList": ["1", "2", None]}, + {"FeatureName": "feature9", "ValueAsStringList": ["1.4", "25", "3.2"]}, + {"FeatureName": "feature10", "ValueAsStringList": ["b", "c"]}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "2.0"}, + {"FeatureName": "feature2", "ValueAsString": "2"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "2.0"}, + {"FeatureName": "feature5", "ValueAsStringList": ["c", "f"]}, + {"FeatureName": "feature6", "ValueAsStringList": ["1", "5"]}, + {"FeatureName": "feature7", "ValueAsStringList": ["1.0", "5.3"]}, + {"FeatureName": "feature8", "ValueAsStringList": ["1", "5"]}, + {"FeatureName": "feature9", "ValueAsStringList": ["1.0", "3", None]}, + {"FeatureName": "feature10", "ValueAsStringList": ["c", None]}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "3.0"}, + {"FeatureName": "feature2", "ValueAsString": "3"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "3.0"}, + {"FeatureName": "feature5", "ValueAsStringList": ["d"]}, + {"FeatureName": "feature6", "ValueAsStringList": ["1"]}, + {"FeatureName": "feature7", "ValueAsStringList": ["1.2"]}, + {"FeatureName": "feature8", "ValueAsStringList": ["1"]}, + {"FeatureName": "feature9", "ValueAsStringList": ["1.2"]}, + {"FeatureName": "feature10", "ValueAsStringList": ["d"]}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "4.0"}, + {"FeatureName": "feature2", "ValueAsString": "4"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature4", "ValueAsString": "4.0"}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "5.0"}, + {"FeatureName": "feature2", "ValueAsString": "5"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + {"FeatureName": "feature8", "ValueAsStringList": [None]}, + {"FeatureName": "feature9", "ValueAsStringList": [None]}, + {"FeatureName": "feature10", "ValueAsStringList": [None]}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "6.0"}, + {"FeatureName": "feature2", "ValueAsString": "6"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "7.0"}, + {"FeatureName": "feature2", "ValueAsString": "7"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "8.0"}, + {"FeatureName": "feature2", "ValueAsString": "8"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + ), + call( + FeatureGroupName="MyGroup", + Record=[ + {"FeatureName": "feature1", "ValueAsString": "9.0"}, + {"FeatureName": "feature2", "ValueAsString": "9"}, + {"FeatureName": "feature3", "ValueAsString": "2020-10-30T03:43:21Z"}, + ], + ), + ] + assert ( + actual_put_record_calls == expected_put_record_calls + ), f"Expected {expected_put_record_calls} calls, but got {actual_put_record_calls}" + + @patch( "sagemaker.feature_store.feature_group.IngestionManagerPandas._ingest_single_batch", MagicMock(side_effect=ProfileNotFound(profile="non_exist")), @@ -823,6 +1500,7 @@ def test_ingestion_manager_with_profile_name_run_failure(): df = pd.DataFrame({"float": pd.Series([2.0], dtype="float64")}) manager = IngestionManagerPandas( feature_group_name="MyGroup", + feature_definitions=feature_group_dummy_definition_dict, sagemaker_session=sagemaker_session_mock, sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock, max_workers=1, @@ -843,6 +1521,7 @@ def test_ingestion_manager_run_multi_process_failure(): df = pd.DataFrame({"float": pd.Series([2.0], dtype="float64")}) manager = IngestionManagerPandas( feature_group_name="MyGroup", + feature_definitions=feature_group_dummy_definition_dict, sagemaker_session=None, sagemaker_fs_runtime_client_config=None, max_workers=2, diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 43768031b3..de543b6f53 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -5261,6 +5261,46 @@ def test_list_feature_groups(sagemaker_session): ) +@pytest.fixture() +def sagemaker_session_with_fs_runtime_client(): + boto_mock = MagicMock(name="boto_session") + sagemaker_session = sagemaker.Session( + boto_session=boto_mock, sagemaker_featurestore_runtime_client=MagicMock() + ) + return sagemaker_session + + +def test_feature_group_put_record(sagemaker_session_with_fs_runtime_client): + sagemaker_session_with_fs_runtime_client.put_record( + feature_group_name="MyFeatureGroup", + record=[{"FeatureName": "feature1", "ValueAsString": "value1"}], + ) + fs_client_mock = sagemaker_session_with_fs_runtime_client.sagemaker_featurestore_runtime_client + + assert fs_client_mock.put_record.called_with( + FeatureGroupName="MyFeatureGroup", + record=[{"FeatureName": "feature1", "ValueAsString": "value1"}], + ) + + +def test_feature_group_put_record_with_ttl_and_target_stores( + sagemaker_session_with_fs_runtime_client, +): + sagemaker_session_with_fs_runtime_client.put_record( + feature_group_name="MyFeatureGroup", + record=[{"FeatureName": "feature1", "ValueAsString": "value1"}], + ttl_duration={"Unit": "Seconds", "Value": 123}, + target_stores=["OnlineStore", "OfflineStore"], + ) + fs_client_mock = sagemaker_session_with_fs_runtime_client.sagemaker_featurestore_runtime_client + assert fs_client_mock.put_record.called_with( + FeatureGroupName="MyFeatureGroup", + record=[{"FeatureName": "feature1", "ValueAsString": "value1"}], + target_stores=["OnlineStore", "OfflineStore"], + ttl_duration={"Unit": "Seconds", "Value": 123}, + ) + + def test_start_query_execution(sagemaker_session): athena_mock = Mock() sagemaker_session.boto_session.client(