Skip to content

Commit d4fb214

Browse files
author
Nilesh PS
committed
feat: add throughput management support for feature group
1 parent 08c8a3a commit d4fb214

File tree

6 files changed

+287
-13
lines changed

6 files changed

+287
-13
lines changed

src/sagemaker/feature_store/feature_group.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@
6464
TtlDuration,
6565
OnlineStoreConfigUpdate,
6666
OnlineStoreStorageTypeEnum,
67+
ThroughputConfig,
68+
ThroughputConfigUpdate,
6769
)
6870
from sagemaker.utils import resolve_value_from_config, format_tags, Tags
6971

@@ -541,6 +543,7 @@ def create(
541543
tags: Optional[Tags] = None,
542544
table_format: TableFormatEnum = None,
543545
online_store_storage_type: OnlineStoreStorageTypeEnum = None,
546+
throughput_config: ThroughputConfig = None,
544547
) -> Dict[str, Any]:
545548
"""Create a SageMaker FeatureStore FeatureGroup.
546549
@@ -570,6 +573,8 @@ def create(
570573
table_format (TableFormatEnum): format of the offline store table (default: None).
571574
online_store_storage_type (OnlineStoreStorageTypeEnum): storage type for the
572575
online store (default: None).
576+
throughput_config (ThroughputConfig): throughput configuration of the
577+
feature group (default: None).
573578
574579
Returns:
575580
Response dict from service.
@@ -618,6 +623,9 @@ def create(
618623
)
619624
create_feature_store_args.update({"online_store_config": online_store_config.to_dict()})
620625

626+
if throughput_config:
627+
create_feature_store_args.update({"throughput_config": throughput_config.to_dict()})
628+
621629
# offline store configuration
622630
if s3_uri:
623631
s3_storage_config = S3StorageConfig(s3_uri=s3_uri)
@@ -656,17 +664,17 @@ def update(
656664
self,
657665
feature_additions: Sequence[FeatureDefinition] = None,
658666
online_store_config: OnlineStoreConfigUpdate = None,
667+
throughput_config: ThroughputConfigUpdate = None,
659668
) -> Dict[str, Any]:
660669
"""Update a FeatureGroup and add new features from the given feature definitions.
661670
662671
Args:
663672
feature_additions (Sequence[Dict[str, str]): list of feature definitions to be updated.
664673
online_store_config (OnlineStoreConfigUpdate): online store config to be updated.
665-
674+
throughput_config (ThroughputConfigUpdate): target throughput configuration
666675
Returns:
667676
Response dict from service.
668677
"""
669-
670678
if feature_additions is None:
671679
feature_additions_parameter = None
672680
else:
@@ -679,10 +687,15 @@ def update(
679687
else:
680688
online_store_config_parameter = online_store_config.to_dict()
681689

690+
throughput_config_parameter = (
691+
None if throughput_config is None else throughput_config.to_dict()
692+
)
693+
682694
return self.sagemaker_session.update_feature_group(
683695
feature_group_name=self.name,
684696
feature_additions=feature_additions_parameter,
685697
online_store_config=online_store_config_parameter,
698+
throughput_config=throughput_config_parameter,
686699
)
687700

688701
def update_feature_metadata(

src/sagemaker/feature_store/inputs.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,3 +453,65 @@ class ExpirationTimeResponseEnum(Enum):
453453

454454
DISABLED = "Disabled"
455455
ENABLED = "Enabled"
456+
457+
458+
class ThroughputModeEnum(Enum):
459+
"""Enum of throughput modes supported by feature group.
460+
461+
Throughput mode of feature group can be ON_DEMAND or PROVISIONED.
462+
"""
463+
464+
ON_DEMAND = "OnDemand"
465+
PROVISIONED = "Provisioned"
466+
467+
468+
@attr.s
469+
class ThroughputConfig(Config):
470+
"""Throughput configuration of the feature group.
471+
472+
Throughput configuration can be ON_DEMAND, or PROVISIONED with valid values for
473+
read and write capacity units. ON_DEMAND works best for less predictable traffic,
474+
while PROVISIONED works best for consistent and predictable traffic.
475+
"""
476+
477+
mode: ThroughputModeEnum = attr.ib(default=None)
478+
provisioned_read_capacity_units: int = attr.ib(default=None)
479+
provisioned_write_capacity_units: int = attr.ib(default=None)
480+
481+
def to_dict(self) -> Dict[str, Any]:
482+
"""Construct a dictionary based on the attributes provided.
483+
484+
Returns:
485+
dict represents the attributes.
486+
"""
487+
return Config.construct_dict(
488+
ThroughputMode=self.mode.value if self.mode else None,
489+
ProvisionedReadCapacityUnits=self.provisioned_read_capacity_units,
490+
ProvisionedWriteCapacityUnits=self.provisioned_write_capacity_units,
491+
)
492+
493+
494+
@attr.s
495+
class ThroughputConfigUpdate(Config):
496+
"""Target throughput configuration of the feature group being updated.
497+
498+
Target throughput configuration can be ON_DEMAND, or PROVISIONED with valid values for
499+
read and write capacity units. ON_DEMAND works best for less predictable traffic,
500+
while PROVISIONED works best for consistent and predictable traffic.
501+
"""
502+
503+
mode: ThroughputModeEnum = attr.ib(default=None)
504+
provisioned_read_capacity_units: int = attr.ib(default=None)
505+
provisioned_write_capacity_units: int = attr.ib(default=None)
506+
507+
def to_dict(self) -> Dict[str, Any]:
508+
"""Construct a dictionary based on the attributes provided.
509+
510+
Returns:
511+
dict represents the attributes.
512+
"""
513+
return Config.construct_dict(
514+
ThroughputMode=self.mode.value if self.mode else None,
515+
ProvisionedReadCapacityUnits=self.provisioned_read_capacity_units,
516+
ProvisionedWriteCapacityUnits=self.provisioned_write_capacity_units,
517+
)

src/sagemaker/session.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5681,6 +5681,7 @@ def create_feature_group(
56815681
role_arn: str = None,
56825682
online_store_config: Dict[str, str] = None,
56835683
offline_store_config: Dict[str, str] = None,
5684+
throughput_config: Dict[str, Any] = None,
56845685
description: str = None,
56855686
tags: Optional[Tags] = None,
56865687
) -> Dict[str, Any]:
@@ -5696,6 +5697,8 @@ def create_feature_group(
56965697
feature online store.
56975698
offline_store_config (Dict[str, str]): dict contains configuration of the
56985699
feature offline store.
5700+
throughput_config (Dict[str, str]): dict contains throughput configuration
5701+
for the feature group.
56995702
description (str): description of the FeatureGroup.
57005703
tags (Optional[Tags]): tags for labeling a FeatureGroup.
57015704
@@ -5731,6 +5734,7 @@ def create_feature_group(
57315734
kwargs,
57325735
OnlineStoreConfig=inferred_online_store_from_config,
57335736
OfflineStoreConfig=inferred_offline_store_from_config,
5737+
ThroughputConfig=throughput_config,
57345738
Description=description,
57355739
Tags=tags,
57365740
)
@@ -5759,28 +5763,32 @@ def update_feature_group(
57595763
feature_group_name: str,
57605764
feature_additions: Sequence[Dict[str, str]] = None,
57615765
online_store_config: Dict[str, any] = None,
5766+
throughput_config: Dict[str, Any] = None,
57625767
) -> Dict[str, Any]:
57635768
"""Update a FeatureGroup
57645769
5765-
either adding new features from the given feature definitions
5766-
or updating online store config
5770+
Supports modifications like adding new features from the given feature definitions,
5771+
updating online store and throughput configurations.
57675772
57685773
Args:
57695774
feature_group_name (str): name of the FeatureGroup to update.
57705775
feature_additions (Sequence[Dict[str, str]): list of feature definitions to be updated.
5776+
online_store_config (Dict[str, Any]): updates to online store config
5777+
throughput_config (Dict[str, Any]): target throughput configuration of the feature group
57715778
Returns:
57725779
Response dict from service.
57735780
"""
5781+
update_req = {"FeatureGroupName": feature_group_name}
5782+
if online_store_config is not None:
5783+
update_req["OnlineStoreConfig"] = online_store_config
57745784

5775-
if feature_additions is None:
5776-
return self.sagemaker_client.update_feature_group(
5777-
FeatureGroupName=feature_group_name,
5778-
OnlineStoreConfig=online_store_config,
5779-
)
5785+
if throughput_config is not None:
5786+
update_req["ThroughputConfig"] = throughput_config
57805787

5781-
return self.sagemaker_client.update_feature_group(
5782-
FeatureGroupName=feature_group_name, FeatureAdditions=feature_additions
5783-
)
5788+
if feature_additions is not None:
5789+
update_req["FeatureAdditions"] = feature_additions
5790+
5791+
return self.sagemaker_client.update_feature_group(**update_req)
57845792

57855793
def list_feature_groups(
57865794
self,

tests/integ/test_feature_store.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@
4343
TtlDuration,
4444
OnlineStoreConfigUpdate,
4545
OnlineStoreStorageTypeEnum,
46+
ThroughputConfig,
47+
ThroughputModeEnum,
48+
ThroughputConfigUpdate,
4649
)
4750
from sagemaker.feature_store.dataset_builder import (
4851
JoinTypeEnum,
@@ -410,6 +413,78 @@ def test_create_feature_group_standard_storage_type(
410413
assert storage_type == "Standard"
411414

412415

416+
def test_throughput_create_as_provisioned_and_update_to_ondemand(
417+
feature_store_session,
418+
role,
419+
feature_group_name,
420+
pandas_data_frame,
421+
):
422+
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session)
423+
feature_group.load_feature_definitions(data_frame=pandas_data_frame)
424+
with cleanup_feature_group(feature_group):
425+
feature_group.create(
426+
s3_uri=False,
427+
record_identifier_name="feature1",
428+
event_time_feature_name="feature3",
429+
role_arn=role,
430+
enable_online_store=True,
431+
throughput_config=ThroughputConfig(ThroughputModeEnum.PROVISIONED, 4000, 4000),
432+
)
433+
_wait_for_feature_group_create(feature_group)
434+
435+
tp_config = feature_group.describe().get("ThroughputConfig")
436+
mode = tp_config.get("ThroughputMode")
437+
rcu = tp_config.get("ProvisionedReadCapacityUnits")
438+
wcu = tp_config.get("ProvisionedWriteCapacityUnits")
439+
assert mode == ThroughputModeEnum.PROVISIONED.value
440+
assert rcu == 4000
441+
assert wcu == 4000
442+
443+
feature_group.update(throughput_config=ThroughputConfigUpdate(ThroughputModeEnum.ON_DEMAND))
444+
_wait_for_feature_group_update(feature_group)
445+
446+
tp_config = feature_group.describe().get("ThroughputConfig")
447+
mode = tp_config.get("ThroughputMode")
448+
assert mode == ThroughputModeEnum.ON_DEMAND.value
449+
450+
451+
def test_throughput_create_as_ondemand_and_update_to_provisioned(
452+
feature_store_session,
453+
role,
454+
feature_group_name,
455+
pandas_data_frame,
456+
):
457+
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session)
458+
feature_group.load_feature_definitions(data_frame=pandas_data_frame)
459+
with cleanup_feature_group(feature_group):
460+
feature_group.create(
461+
s3_uri=False,
462+
record_identifier_name="feature1",
463+
event_time_feature_name="feature3",
464+
role_arn=role,
465+
enable_online_store=True,
466+
throughput_config=ThroughputConfig(ThroughputModeEnum.ON_DEMAND),
467+
)
468+
_wait_for_feature_group_create(feature_group)
469+
470+
tp_config = feature_group.describe().get("ThroughputConfig")
471+
mode = tp_config.get("ThroughputMode")
472+
assert mode == ThroughputModeEnum.ON_DEMAND.value
473+
474+
feature_group.update(
475+
throughput_config=ThroughputConfigUpdate(ThroughputModeEnum.PROVISIONED, 100, 200)
476+
)
477+
_wait_for_feature_group_update(feature_group)
478+
479+
tp_config = feature_group.describe().get("ThroughputConfig")
480+
mode = tp_config.get("ThroughputMode")
481+
rcu = tp_config.get("ProvisionedReadCapacityUnits")
482+
wcu = tp_config.get("ProvisionedWriteCapacityUnits")
483+
assert mode == ThroughputModeEnum.PROVISIONED.value
484+
assert rcu == 100
485+
assert wcu == 200
486+
487+
413488
def test_ttl_duration(
414489
feature_store_session,
415490
role,

0 commit comments

Comments
 (0)