-
Notifications
You must be signed in to change notification settings - Fork 3
service to service events #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
c7576fd
ruff: 0.5.7 -> 0.9.4
Lance-Drane 4281540
#20 - allow services to listen for events from other services
Lance-Drane 3f33ad3
fix docs build
Lance-Drane 294d794
remove redundant logging statement
Lance-Drane 74934f5
make sure to handle uncaught capability exceptions when handling svc2…
Lance-Drane File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| """Client for service to service example. | ||
|
|
||
| Listens for events from the exposed_service, and prints each one out. | ||
| Once it gets two events, it terminates itself. | ||
| """ | ||
|
|
||
| import logging | ||
|
|
||
| from intersect_sdk import ( | ||
| INTERSECT_JSON_VALUE, | ||
| IntersectClient, | ||
| IntersectClientCallback, | ||
| IntersectClientConfig, | ||
| default_intersect_lifecycle_loop, | ||
| ) | ||
|
|
||
| logging.basicConfig(level=logging.INFO) | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class SampleOrchestrator: | ||
| """Simply contains an event callback for events from the exposed service. | ||
|
|
||
| In this example, we just want to receive two events from the exposed service before killing the client. | ||
| """ | ||
|
|
||
| def __init__(self) -> None: | ||
| """Straightforward constructor, just initializes global variable which counts events.""" | ||
| self.got_first_event = False | ||
|
|
||
| def event_callback( | ||
| self, _source: str, _operation: str, _event_name: str, payload: INTERSECT_JSON_VALUE | ||
| ) -> None: | ||
| """This simply prints the event from the exposed service to your console. | ||
|
|
||
| Params: | ||
| source: the source of the response message. | ||
| operation: the name of the function we called in the original message. | ||
| _has_error: Boolean value which represents an error. | ||
| payload: Value of the response from the Service. | ||
| """ | ||
| print(payload) | ||
| if self.got_first_event: | ||
| # break out of pubsub loop | ||
| raise Exception | ||
| self.got_first_event = True | ||
| # empty return, don't send any additional messages or modify the events listened to | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| from_config_file = { | ||
| 'brokers': [ | ||
| { | ||
| 'username': 'intersect_username', | ||
| 'password': 'intersect_password', | ||
| 'port': 5672, | ||
| 'protocol': 'amqp0.9.1', | ||
| }, | ||
| ], | ||
| } | ||
|
|
||
| # Listen for an event on the exposed service | ||
| config = IntersectClientConfig( | ||
| initial_message_event_config=IntersectClientCallback( | ||
| services_to_start_listening_for_events=[ | ||
| 'example-organization.example-facility.example-system.example-subsystem.exposed-service' | ||
| ], | ||
| ), | ||
| **from_config_file, | ||
| ) | ||
| orchestrator = SampleOrchestrator() | ||
| client = IntersectClient( | ||
| config=config, | ||
| event_callback=orchestrator.event_callback, | ||
| ) | ||
| default_intersect_lifecycle_loop( | ||
| client, | ||
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| """Exposed service. | ||
|
|
||
| This service listens for events from the internal service, and then emits its own | ||
| 'exposed_service_event' event. The client listens for the 'exposed_service_event'. | ||
| """ | ||
|
|
||
| import logging | ||
|
|
||
| from intersect_sdk import ( | ||
| HierarchyConfig, | ||
| IntersectBaseCapabilityImplementation, | ||
| IntersectEventDefinition, | ||
| IntersectService, | ||
| IntersectServiceConfig, | ||
| default_intersect_lifecycle_loop, | ||
| intersect_event, | ||
| ) | ||
|
|
||
| logging.basicConfig(level=logging.INFO) | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class ExposedServiceCapabilityImplementation(IntersectBaseCapabilityImplementation): | ||
| """Exposed service capability.""" | ||
|
|
||
| intersect_sdk_capability_name = 'ExposedService' | ||
|
|
||
| def on_service_startup(self) -> None: | ||
| """This function will get called when starting up the Service. | ||
|
|
||
| Note that while we could call this function earlier, we should not call this function in the constructor. The order of things which need to happen: | ||
|
|
||
| 1) Capability constructor is called | ||
| 2) Service constructor is called (which will include this capability) | ||
| 3) We can now start listening for events. | ||
|
|
||
| Note that you do not have to explicitly start the Service, you only need to follow steps one and two. | ||
| """ | ||
| self.intersect_sdk_listen_for_service_event( | ||
| HierarchyConfig( | ||
| organization='example-organization', | ||
| facility='example-facility', | ||
| system='example-system', | ||
| subsystem='example-subsystem', | ||
| service='internal-service', | ||
| ), | ||
| 'internal_service_event', | ||
| self.on_internal_service_event, | ||
| ) | ||
|
|
||
| @intersect_event(events={'exposed_service_event': IntersectEventDefinition(event_type=str)}) | ||
| def on_internal_service_event( | ||
| self, source: str, _operation: str, event_name: str, payload: str | ||
| ) -> None: | ||
| """When we get an event back from the internal_service, we will emit our own event.""" | ||
| self.intersect_sdk_emit_event( | ||
| 'exposed_service_event', | ||
| f'From event "{event_name}", received message "{payload}" from "{source}"', | ||
| ) | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| from_config_file = { | ||
| 'brokers': [ | ||
| { | ||
| 'username': 'intersect_username', | ||
| 'password': 'intersect_password', | ||
| 'port': 5672, | ||
| 'protocol': 'amqp0.9.1', | ||
| }, | ||
| ], | ||
| } | ||
| config = IntersectServiceConfig( | ||
| hierarchy=HierarchyConfig( | ||
| organization='example-organization', | ||
| facility='example-facility', | ||
| system='example-system', | ||
| subsystem='example-subsystem', | ||
| service='exposed-service', | ||
| ), | ||
| status_interval=30.0, | ||
| **from_config_file, | ||
| ) | ||
| capability = ExposedServiceCapabilityImplementation() | ||
| service = IntersectService([capability], config) | ||
| logger.info('Starting Service 1, use Ctrl+C to exit.') | ||
| default_intersect_lifecycle_loop(service, post_startup_callback=capability.on_service_startup) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| """Internal service. | ||
|
|
||
| This service periodically emits an 'internal_service_event' string, as an event. | ||
| The exposed service listens to the events of this service. | ||
| """ | ||
|
|
||
| import logging | ||
| import threading | ||
| import time | ||
|
|
||
| from intersect_sdk import ( | ||
| HierarchyConfig, | ||
| IntersectBaseCapabilityImplementation, | ||
| IntersectEventDefinition, | ||
| IntersectService, | ||
| IntersectServiceConfig, | ||
| default_intersect_lifecycle_loop, | ||
| intersect_event, | ||
| ) | ||
|
|
||
| logging.basicConfig(level=logging.INFO) | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class InternalServiceCapabilityImplementation(IntersectBaseCapabilityImplementation): | ||
| """Internal service capability.""" | ||
|
|
||
| intersect_sdk_capability_name = 'InternalService' | ||
|
|
||
| def after_service_startup(self) -> None: | ||
| """Called after service startup.""" | ||
| self.thread = threading.Thread( | ||
| target=self.internal_service_event_generator, daemon=True, name='event_thread' | ||
| ) | ||
| self.thread.start() | ||
|
|
||
| @intersect_event(events={'internal_service_event': IntersectEventDefinition(event_type=str)}) | ||
| def internal_service_event_generator(self) -> str: | ||
| """Emits a periodic internal_service_event event.""" | ||
| while True: | ||
| time.sleep(2.0) | ||
| self.intersect_sdk_emit_event('internal_service_event', 'not_feeling_creative') | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| from_config_file = { | ||
| 'brokers': [ | ||
| { | ||
| 'username': 'intersect_username', | ||
| 'password': 'intersect_password', | ||
| 'port': 5672, | ||
| 'protocol': 'amqp0.9.1', | ||
| }, | ||
| ], | ||
| } | ||
| config = IntersectServiceConfig( | ||
| hierarchy=HierarchyConfig( | ||
| organization='example-organization', | ||
| facility='example-facility', | ||
| system='example-system', | ||
| subsystem='example-subsystem', | ||
| service='internal-service', | ||
| ), | ||
| status_interval=30.0, | ||
| **from_config_file, | ||
| ) | ||
| capability = InternalServiceCapabilityImplementation() | ||
| service = IntersectService([capability], config) | ||
| logger.info('Starting Service 2, use Ctrl+C to exit.') | ||
| default_intersect_lifecycle_loop( | ||
| service, | ||
| post_startup_callback=capability.after_service_startup, | ||
| ) |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.