Skip to content
6 changes: 6 additions & 0 deletions doc/services.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ This module provides classes to build steps that integrate with Amazon DynamoDB,

- `Amazon EventBridge <#amazon-eventbridge>`__

- `Amazon Glue DataBrew <#amazon-glue-databrew>`__

- `Amazon SNS <#amazon-sns>`__

- `Amazon SQS <#amazon-sqs>`__
Expand Down Expand Up @@ -47,6 +49,10 @@ Amazon EventBridge
-----------
.. autoclass:: stepfunctions.steps.service.EventBridgePutEventsStep

Amazon Glue DataBrew
--------------------
.. autoclass:: stepfunctions.steps.service.DataBrewStartJobRunStep

Amazon SNS
-----------
.. autoclass:: stepfunctions.steps.service.SnsPublishStep
Expand Down
1 change: 1 addition & 0 deletions src/stepfunctions/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from stepfunctions.steps.states import Graph, FrozenGraph
from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointConfigStep, EndpointStep, TuningStep, ProcessingStep
from stepfunctions.steps.compute import LambdaStep, BatchSubmitJobStep, GlueStartJobRunStep, EcsRunTaskStep
from stepfunctions.steps.service import DataBrewStartJobRunStep
from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep
from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep
from stepfunctions.steps.service import EventBridgePutEventsStep
Expand Down
45 changes: 45 additions & 0 deletions src/stepfunctions/steps/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from stepfunctions.steps.fields import Field
from stepfunctions.steps.integration_resources import IntegrationPattern, get_service_integration_arn

DATABREW_SERVICE_NAME = "databrew"
DYNAMODB_SERVICE_NAME = "dynamodb"
ELASTICMAPREDUCE_SERVICE_NAME = "elasticmapreduce"
EVENTBRIDGE_SERVICE_NAME = "events"
Expand All @@ -32,6 +33,10 @@ class DynamoDBApi(Enum):
UpdateItem = "updateItem"


class DataBrewApi(Enum):
StartJobRun = "startJobRun"


class SnsApi(Enum):
Publish = "publish"

Expand All @@ -54,6 +59,46 @@ class EventBridgeApi(Enum):
PutEvents = "putEvents"


class DataBrewStartJobRunStep(Task):

"""
Creates a Task state that starts a DataBrew job. See `Manage AWS Glue DataBrew Jobs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-databrew.html>`_ for more details.
"""

def __init__(self, state_id, wait_for_completion=True, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
comment (str, optional): Human-readable comment or description. (default: None)
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True)
"""
if wait_for_completion:
"""
Example resource arn: arn:aws:states:::databrew:startJobRun.sync
"""

kwargs[Field.Resource.value] = get_service_integration_arn(DATABREW_SERVICE_NAME,
DataBrewApi.StartJobRun,
IntegrationPattern.WaitForCompletion)
else:
"""
Example resource arn: arn:aws:states:::databrew:startJobRun
"""

kwargs[Field.Resource.value] = get_service_integration_arn(DATABREW_SERVICE_NAME,
DataBrewApi.StartJobRun)

super(DataBrewStartJobRunStep, self).__init__(state_id, **kwargs)


class DynamoDBGetItemStep(Task):
"""
Creates a Task state to get an item from DynamoDB. See `Call DynamoDB APIs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-ddb.html>`_ for more details.
Expand Down
29 changes: 29 additions & 0 deletions tests/unit/test_service_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import boto3

from unittest.mock import patch
from stepfunctions.steps.service import DataBrewStartJobRunStep
from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep
from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep
from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep
Expand Down Expand Up @@ -661,3 +662,31 @@ def test_emr_modify_instance_group_by_name_step_creation():
'End': True
}


@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_databrew_start_job_run_step_creation():
step = DataBrewStartJobRunStep('Start Databrew Job Run', parameters={
"Name": "MyWorkflowJobRun"
})

assert step.to_dict() == {
'Type': 'Task',
'Resource': 'arn:aws:states:::databrew:startJobRun.sync',
'Parameters': {
'Name': 'MyWorkflowJobRun'
},
'End': True
}

step = DataBrewStartJobRunStep('Start Databrew Job Run', wait_for_completion=False, parameters={
"Name": "MyWorkflowJobRun"
})

assert step.to_dict() == {
'Type': 'Task',
'Resource': 'arn:aws:states:::databrew:startJobRun',
'Parameters': {
'Name': 'MyWorkflowJobRun'
},
'End': True
}