diff --git a/.travis.yml b/.travis.yml index a529541..9142a52 100644 --- a/.travis.yml +++ b/.travis.yml @@ -36,6 +36,10 @@ script: - RAYVENS_TEST_MODE=local RAYVENS_TRANSPORT=kafka python ./tests/source.py # - ray submit ./scripts/cluster.yaml ./tests/stream.py - ray submit ./scripts/cluster.yaml ./tests/source.py + - RAYVENS_TEST_MODE=local python ./tests/kafka_transport.py + - ray submit ./scripts/cluster.yaml ./tests/kafka_transport.py local + - RAYVENS_TEST_MODE=local python ./tests/kafka_scaling_transport.py + - ray submit ./scripts/cluster.yaml ./tests/kafka_scaling_transport.py local # Test operator mode # - ray submit ./scripts/cluster.yaml ./tests/source_operator.py @@ -47,6 +51,8 @@ script: # - ray submit ./scripts/cluster.yaml ./tests/generic_sink.py # - RAYVENS_TEST_MODE=local python ./tests/generic_source.py # - ray submit ./scripts/cluster.yaml ./tests/generic_source.py + - ray submit ./scripts/cluster.yaml ./tests/kafka_transport.py operator + - ray submit ./scripts/cluster.yaml ./tests/kafka_scaling_transport.py operator # Test mixed mode - RAYVENS_TEST_MODE=mixed python ./tests/sink.py diff --git a/examples/cloud_object_storage/cos_sink_from_directory.py b/examples/cloud_object_storage/cos_sink_from_directory.py index b1baa64..5630b75 100644 --- a/examples/cloud_object_storage/cos_sink_from_directory.py +++ b/examples/cloud_object_storage/cos_sink_from_directory.py @@ -17,7 +17,6 @@ import ray import rayvens import sys -import time # This example demonstrates how to send objects to the AWS S3 or # IBM Cloud Object Storage using multi-part uploads. @@ -69,7 +68,5 @@ # the monitored file system directory. dir_to_sink = stream.add_sink(dir_to_sink_config) -# stream._meta('verify_log', dir_to_sink, "BLA") - # Run for a while to give a chance for files to be dropped inside the directory -time.sleep(100) +stream.disconnect_all(after_idle_for=20) diff --git a/examples/cloud_object_storage/cos_sink_from_file.py b/examples/cloud_object_storage/cos_sink_from_file.py index b7f298d..1b32bab 100644 --- a/examples/cloud_object_storage/cos_sink_from_file.py +++ b/examples/cloud_object_storage/cos_sink_from_file.py @@ -17,7 +17,6 @@ import ray import rayvens import sys -import time from pathlib import Path # This example demonstrates how to send objects to the AWS S3 or @@ -103,4 +102,4 @@ stream << "Some other input which is invalid." # Run for a while -time.sleep(30) +stream.disconnect_all(after_idle_for=5) diff --git a/examples/cloud_object_storage/cos_sink_multi_part.py b/examples/cloud_object_storage/cos_sink_multi_part.py index 8a7b561..4d91edc 100644 --- a/examples/cloud_object_storage/cos_sink_multi_part.py +++ b/examples/cloud_object_storage/cos_sink_multi_part.py @@ -17,7 +17,6 @@ import ray import rayvens import sys -import time from pathlib import Path # This example demonstrates how to send objects to the AWS S3 or @@ -73,4 +72,4 @@ stream << Path("test_files/test.txt") # Run for a while -time.sleep(20) +stream.disconnect_all(after_idle_for=5) diff --git a/rayvens/api.py b/rayvens/api.py index 56b78de..1444f92 100644 --- a/rayvens/api.py +++ b/rayvens/api.py @@ -80,6 +80,9 @@ def disconnect_all(self, self._wait_for_timeout(after_idle_for, after) return ray.get(self.actor.disconnect_all.remote(stream_drain_timeout)) + def event_count(self): + return ray.get(self.actor.event_count.remote()) + def _meta(self, action, *args, **kwargs): return ray.get(self.actor._meta.remote(action, *args, **kwargs)) @@ -119,6 +122,7 @@ def __init__(self, name, operator=None): self._sinks = {} self._latest_sent_event_timestamp = None self._limit_subscribers = False + self._event_counter = 0 def send_to(self, subscriber, name=None): if self._limit_subscribers: @@ -142,6 +146,7 @@ def append(self, data): continue _eval(subscriber, data) self._latest_sent_event_timestamp = time.time() + self._event_counter += 1 def add_operator(self, operator): self._operator = operator @@ -195,6 +200,9 @@ def disconnect_all(self, stream_drain_timeout): for sink_name in dict(self._sinks): self.disconnect_sink(sink_name) + def event_count(self): + return self.event_count + def _meta(self, action, *args, **kwargs): return verify_do(self, _global_camel, action, *args, **kwargs) diff --git a/rayvens/core/verify.py b/rayvens/core/verify.py index a6d60e2..0e1decc 100644 --- a/rayvens/core/verify.py +++ b/rayvens/core/verify.py @@ -14,6 +14,7 @@ # limitations under the License. # +import time from rayvens.core import kamel @@ -23,7 +24,11 @@ def verify_do(stream, _global_camel, action, *args, **kwargs): raise RuntimeError('invalid meta action') -def _verify_log(stream, _global_camel, sink_source_name, message): +def _verify_log(stream, + _global_camel, + sink_source_name, + message, + wait_for_events=False): # Get integration: integration = None if sink_source_name in stream._sinks: @@ -34,10 +39,25 @@ def _verify_log(stream, _global_camel, sink_source_name, message): raise RuntimeError( f'{sink_source_name} not found on stream {stream.name}') + log = "FAIL" + + # Wait for at least one event to happen. + if wait_for_events: + event_count = 0 + countdown = 20 + while event_count == 0: + event_count = stream.event_count() + time.sleep(1) + countdown -= 1 + if countdown == 0: + break + if event_count == 0: + print("[LOG CHECK]:", log) + return False + if _global_camel.mode.is_local(): # In the local case the integration run is ongoing and we can # access the logs directly. - # TODO: make this work for local implementation. outcome = integration.invocation.invoke(message) else: # When running using the operator then the integration run command @@ -48,7 +68,6 @@ def _verify_log(stream, _global_camel, sink_source_name, message): outcome = invocation is not None invocation.kill() - log = "FAIL" if outcome: log = "SUCCESS" print("[LOG CHECK]:", log) diff --git a/tests/generic_sink.py b/tests/generic_sink.py index ca6248c..8920885 100644 --- a/tests/generic_sink.py +++ b/tests/generic_sink.py @@ -16,7 +16,6 @@ import ray import rayvens -import time import os # Initialize ray based on where ray will run inside the cluster using the @@ -59,10 +58,8 @@ output_message = f'Sending message to Slack sink in run mode {run_mode}.' stream << output_message -time.sleep(10) - # Verify outcome. -stream._meta('verify_log', sink, output_message) +stream._meta('verify_log', sink, output_message, wait_for_events=True) # Delete all integrations from stream. -stream.disconnect_all() +stream.disconnect_all(after_idle_for=5) diff --git a/tests/kafka_scaling_transport.py b/tests/kafka_scaling_transport.py new file mode 100644 index 0000000..25af080 --- /dev/null +++ b/tests/kafka_scaling_transport.py @@ -0,0 +1,63 @@ +# +# Copyright IBM Corporation 2021 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import ray +import rayvens + +# Initialize run mode. +if len(sys.argv) < 2: + run_mode = 'local' +else: + run_mode = sys.argv[1] + +if os.getenv('RAYVENS_TEST_MODE') == 'local': + ray.init(object_store_memory=78643200) +else: + ray.init(address='auto') + +# The Kafka topic used for communication. +topic = "testTopic" + +rayvens.init(mode=run_mode, transport='kafka') + +# Create source stream and configuration. +source_stream = rayvens.Stream('kafka-source-stream') + +# Event sink config. +test_sink_config = dict(kind='test-sink') + +# Add sink to stream. +test_sink = source_stream.add_sink(test_sink_config) + +source_config = dict( + kind='http-source', + url='https://query1.finance.yahoo.com/v7/finance/quote?symbols=AAPL', + route='/from-http', + period=2000, + kafka_transport_topic=topic, + kafka_transport_partitions=3) +source = source_stream.add_source(source_config) + +# Verify outcome. +source_stream._meta('verify_log', + test_sink, + "quoteResponse", + wait_for_events=True) + +# Disconnect source and sink. +source_stream.disconnect_all(after=10) diff --git a/tests/kafka_transport.py b/tests/kafka_transport.py new file mode 100644 index 0000000..e2eb1df --- /dev/null +++ b/tests/kafka_transport.py @@ -0,0 +1,61 @@ +# +# Copyright IBM Corporation 2021 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import ray +import rayvens + +# Initialize run mode. +if len(sys.argv) < 2: + run_mode = 'local' +else: + run_mode = sys.argv[1] + +if os.getenv('RAYVENS_TEST_MODE') == 'local': + ray.init(object_store_memory=78643200) +else: + ray.init(address='auto') + +# The Kafka topic used for communication. +topic = "testTopic" + +rayvens.init(mode=run_mode, transport='kafka') + +# Create source stream and configuration. +source_stream = rayvens.Stream('kafka-source-stream') + +# Event sink config. +test_sink_config = dict(kind='test-sink') + +# Add sink to stream. +test_sink = source_stream.add_sink(test_sink_config) + +source_config = dict( + kind='http-source', + url='https://query1.finance.yahoo.com/v7/finance/quote?symbols=AAPL', + route='/from-http', + period=2000) +source = source_stream.add_source(source_config) + +# Verify outcome. +source_stream._meta('verify_log', + test_sink, + "quoteResponse", + wait_for_events=True) + +# Disconnect source and sink. +source_stream.disconnect_all(after=5) diff --git a/tests/sink.py b/tests/sink.py index f2a9324..f76396b 100644 --- a/tests/sink.py +++ b/tests/sink.py @@ -16,7 +16,6 @@ import ray import rayvens -import time import os # Initialize ray based on where ray will run inside the cluster using the @@ -52,10 +51,8 @@ output_message = f'Sending message to Slack sink in run mode {run_mode}.' stream << output_message -time.sleep(10) - # Verify outcome. -stream._meta('verify_log', sink, output_message) +stream._meta('verify_log', sink, output_message, wait_for_events=True) # Delete all integrations from stream. -stream.disconnect_all() +stream.disconnect_all(after_idle_for=5)