From 9074bb48ad65c11d92d7bd9382c930bb0178e78a Mon Sep 17 00:00:00 2001 From: Gheorghe Teodor Bercea Date: Fri, 3 Sep 2021 15:15:57 -0400 Subject: [PATCH 1/5] Add new kafka transport tests. --- rayvens/core/verify.py | 1 - tests/kafka_scaling_transport.py | 59 ++++++++++++++++++++++++++++++++ tests/kafka_transport.py | 57 ++++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+), 1 deletion(-) create mode 100644 tests/kafka_scaling_transport.py create mode 100644 tests/kafka_transport.py diff --git a/rayvens/core/verify.py b/rayvens/core/verify.py index a6d60e2..ee207e2 100644 --- a/rayvens/core/verify.py +++ b/rayvens/core/verify.py @@ -37,7 +37,6 @@ def _verify_log(stream, _global_camel, sink_source_name, message): if _global_camel.mode.is_local(): # In the local case the integration run is ongoing and we can # access the logs directly. - # TODO: make this work for local implementation. outcome = integration.invocation.invoke(message) else: # When running using the operator then the integration run command diff --git a/tests/kafka_scaling_transport.py b/tests/kafka_scaling_transport.py new file mode 100644 index 0000000..fbd725e --- /dev/null +++ b/tests/kafka_scaling_transport.py @@ -0,0 +1,59 @@ +# +# Copyright IBM Corporation 2021 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import ray +import rayvens + +# Initialize run mode. +run_mode = 'operator' +env_run_mode = os.getenv('RAYVENS_TEST_MODE') +if env_run_mode is not None: + run_mode = env_run_mode + +if run_mode == 'operator': + ray.init(address='auto') +else: + ray.init(object_store_memory=78643200) + +# The Kafka topic used for communication. +topic = "testTopic" + +rayvens.init(mode=run_mode, transport='kafka') + +# Create source stream and configuration. +source_stream = rayvens.Stream('kafka-source-stream') + +# Event sink config. +test_sink_config = dict(kind='test-sink') + +# Add sink to stream. +test_sink = source_stream.add_sink(test_sink_config) + +source_config = dict( + kind='http-source', + url='https://query1.finance.yahoo.com/v7/finance/quote?symbols=AAPL', + route='/from-http', + period=2000, + kafka_transport_topic=topic, + kafka_transport_partitions=3) +source = source_stream.add_source(source_config) + +# Verify outcome. +source_stream._meta('verify_log', test_sink, "quoteResponse") + +# Disconnect source and sink. +source_stream.disconnect_all(after=10) diff --git a/tests/kafka_transport.py b/tests/kafka_transport.py new file mode 100644 index 0000000..03cc04d --- /dev/null +++ b/tests/kafka_transport.py @@ -0,0 +1,57 @@ +# +# Copyright IBM Corporation 2021 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import ray +import rayvens + +# Initialize run mode. +run_mode = 'operator' +env_run_mode = os.getenv('RAYVENS_TEST_MODE') +if env_run_mode is not None: + run_mode = env_run_mode + +if run_mode == 'operator': + ray.init(address='auto') +else: + ray.init(object_store_memory=78643200) + +# The Kafka topic used for communication. +topic = "testTopic" + +rayvens.init(mode=run_mode, transport='kafka') + +# Create source stream and configuration. +source_stream = rayvens.Stream('kafka-source-stream') + +# Event sink config. +test_sink_config = dict(kind='test-sink') + +# Add sink to stream. +test_sink = source_stream.add_sink(test_sink_config) + +source_config = dict( + kind='http-source', + url='https://query1.finance.yahoo.com/v7/finance/quote?symbols=AAPL', + route='/from-http', + period=2000) +source = source_stream.add_source(source_config) + +# Verify outcome. +source_stream._meta('verify_log', test_sink, "quoteResponse") + +# Disconnect source and sink. +source_stream.disconnect_all(after=10) From 869e3eda8834bd494b0cc504b361f74fd4643699 Mon Sep 17 00:00:00 2001 From: Gheorghe Teodor Bercea Date: Tue, 7 Sep 2021 10:40:21 -0400 Subject: [PATCH 2/5] Wait for events flag when using verify logs. --- rayvens/api.py | 8 ++++++++ rayvens/core/verify.py | 24 ++++++++++++++++++++++-- tests/kafka_scaling_transport.py | 5 ++++- tests/kafka_transport.py | 7 +++++-- 4 files changed, 39 insertions(+), 5 deletions(-) diff --git a/rayvens/api.py b/rayvens/api.py index 56b78de..1444f92 100644 --- a/rayvens/api.py +++ b/rayvens/api.py @@ -80,6 +80,9 @@ def disconnect_all(self, self._wait_for_timeout(after_idle_for, after) return ray.get(self.actor.disconnect_all.remote(stream_drain_timeout)) + def event_count(self): + return ray.get(self.actor.event_count.remote()) + def _meta(self, action, *args, **kwargs): return ray.get(self.actor._meta.remote(action, *args, **kwargs)) @@ -119,6 +122,7 @@ def __init__(self, name, operator=None): self._sinks = {} self._latest_sent_event_timestamp = None self._limit_subscribers = False + self._event_counter = 0 def send_to(self, subscriber, name=None): if self._limit_subscribers: @@ -142,6 +146,7 @@ def append(self, data): continue _eval(subscriber, data) self._latest_sent_event_timestamp = time.time() + self._event_counter += 1 def add_operator(self, operator): self._operator = operator @@ -195,6 +200,9 @@ def disconnect_all(self, stream_drain_timeout): for sink_name in dict(self._sinks): self.disconnect_sink(sink_name) + def event_count(self): + return self.event_count + def _meta(self, action, *args, **kwargs): return verify_do(self, _global_camel, action, *args, **kwargs) diff --git a/rayvens/core/verify.py b/rayvens/core/verify.py index ee207e2..0e1decc 100644 --- a/rayvens/core/verify.py +++ b/rayvens/core/verify.py @@ -14,6 +14,7 @@ # limitations under the License. # +import time from rayvens.core import kamel @@ -23,7 +24,11 @@ def verify_do(stream, _global_camel, action, *args, **kwargs): raise RuntimeError('invalid meta action') -def _verify_log(stream, _global_camel, sink_source_name, message): +def _verify_log(stream, + _global_camel, + sink_source_name, + message, + wait_for_events=False): # Get integration: integration = None if sink_source_name in stream._sinks: @@ -34,6 +39,22 @@ def _verify_log(stream, _global_camel, sink_source_name, message): raise RuntimeError( f'{sink_source_name} not found on stream {stream.name}') + log = "FAIL" + + # Wait for at least one event to happen. + if wait_for_events: + event_count = 0 + countdown = 20 + while event_count == 0: + event_count = stream.event_count() + time.sleep(1) + countdown -= 1 + if countdown == 0: + break + if event_count == 0: + print("[LOG CHECK]:", log) + return False + if _global_camel.mode.is_local(): # In the local case the integration run is ongoing and we can # access the logs directly. @@ -47,7 +68,6 @@ def _verify_log(stream, _global_camel, sink_source_name, message): outcome = invocation is not None invocation.kill() - log = "FAIL" if outcome: log = "SUCCESS" print("[LOG CHECK]:", log) diff --git a/tests/kafka_scaling_transport.py b/tests/kafka_scaling_transport.py index fbd725e..6d5151d 100644 --- a/tests/kafka_scaling_transport.py +++ b/tests/kafka_scaling_transport.py @@ -53,7 +53,10 @@ source = source_stream.add_source(source_config) # Verify outcome. -source_stream._meta('verify_log', test_sink, "quoteResponse") +source_stream._meta('verify_log', + test_sink, + "quoteResponse", + wait_for_events=True) # Disconnect source and sink. source_stream.disconnect_all(after=10) diff --git a/tests/kafka_transport.py b/tests/kafka_transport.py index 03cc04d..91ed13b 100644 --- a/tests/kafka_transport.py +++ b/tests/kafka_transport.py @@ -51,7 +51,10 @@ source = source_stream.add_source(source_config) # Verify outcome. -source_stream._meta('verify_log', test_sink, "quoteResponse") +source_stream._meta('verify_log', + test_sink, + "quoteResponse", + wait_for_events=True) # Disconnect source and sink. -source_stream.disconnect_all(after=10) +source_stream.disconnect_all(after=5) From 0dec3bc509f3a8ff88a1c496682993b0410099da Mon Sep 17 00:00:00 2001 From: Gheorghe Teodor Bercea Date: Tue, 7 Sep 2021 10:49:01 -0400 Subject: [PATCH 3/5] Wait for at least one event to be propagated. --- examples/cloud_object_storage/cos_sink_from_directory.py | 5 +---- examples/cloud_object_storage/cos_sink_from_file.py | 3 +-- examples/cloud_object_storage/cos_sink_multi_part.py | 3 +-- tests/generic_sink.py | 7 ++----- tests/sink.py | 7 ++----- 5 files changed, 7 insertions(+), 18 deletions(-) diff --git a/examples/cloud_object_storage/cos_sink_from_directory.py b/examples/cloud_object_storage/cos_sink_from_directory.py index b1baa64..5630b75 100644 --- a/examples/cloud_object_storage/cos_sink_from_directory.py +++ b/examples/cloud_object_storage/cos_sink_from_directory.py @@ -17,7 +17,6 @@ import ray import rayvens import sys -import time # This example demonstrates how to send objects to the AWS S3 or # IBM Cloud Object Storage using multi-part uploads. @@ -69,7 +68,5 @@ # the monitored file system directory. dir_to_sink = stream.add_sink(dir_to_sink_config) -# stream._meta('verify_log', dir_to_sink, "BLA") - # Run for a while to give a chance for files to be dropped inside the directory -time.sleep(100) +stream.disconnect_all(after_idle_for=20) diff --git a/examples/cloud_object_storage/cos_sink_from_file.py b/examples/cloud_object_storage/cos_sink_from_file.py index b7f298d..1b32bab 100644 --- a/examples/cloud_object_storage/cos_sink_from_file.py +++ b/examples/cloud_object_storage/cos_sink_from_file.py @@ -17,7 +17,6 @@ import ray import rayvens import sys -import time from pathlib import Path # This example demonstrates how to send objects to the AWS S3 or @@ -103,4 +102,4 @@ stream << "Some other input which is invalid." # Run for a while -time.sleep(30) +stream.disconnect_all(after_idle_for=5) diff --git a/examples/cloud_object_storage/cos_sink_multi_part.py b/examples/cloud_object_storage/cos_sink_multi_part.py index 8a7b561..4d91edc 100644 --- a/examples/cloud_object_storage/cos_sink_multi_part.py +++ b/examples/cloud_object_storage/cos_sink_multi_part.py @@ -17,7 +17,6 @@ import ray import rayvens import sys -import time from pathlib import Path # This example demonstrates how to send objects to the AWS S3 or @@ -73,4 +72,4 @@ stream << Path("test_files/test.txt") # Run for a while -time.sleep(20) +stream.disconnect_all(after_idle_for=5) diff --git a/tests/generic_sink.py b/tests/generic_sink.py index ca6248c..8920885 100644 --- a/tests/generic_sink.py +++ b/tests/generic_sink.py @@ -16,7 +16,6 @@ import ray import rayvens -import time import os # Initialize ray based on where ray will run inside the cluster using the @@ -59,10 +58,8 @@ output_message = f'Sending message to Slack sink in run mode {run_mode}.' stream << output_message -time.sleep(10) - # Verify outcome. -stream._meta('verify_log', sink, output_message) +stream._meta('verify_log', sink, output_message, wait_for_events=True) # Delete all integrations from stream. -stream.disconnect_all() +stream.disconnect_all(after_idle_for=5) diff --git a/tests/sink.py b/tests/sink.py index f2a9324..f76396b 100644 --- a/tests/sink.py +++ b/tests/sink.py @@ -16,7 +16,6 @@ import ray import rayvens -import time import os # Initialize ray based on where ray will run inside the cluster using the @@ -52,10 +51,8 @@ output_message = f'Sending message to Slack sink in run mode {run_mode}.' stream << output_message -time.sleep(10) - # Verify outcome. -stream._meta('verify_log', sink, output_message) +stream._meta('verify_log', sink, output_message, wait_for_events=True) # Delete all integrations from stream. -stream.disconnect_all() +stream.disconnect_all(after_idle_for=5) From dd39f4d1b208184450a7a4cd7bab94905ef5d07d Mon Sep 17 00:00:00 2001 From: Gheorghe Teodor Bercea Date: Tue, 7 Sep 2021 12:26:58 -0400 Subject: [PATCH 4/5] Add tests to Travis. --- .travis.yml | 6 ++++++ tests/kafka_scaling_transport.py | 14 +++++++------- tests/kafka_transport.py | 14 +++++++------- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/.travis.yml b/.travis.yml index a529541..9142a52 100644 --- a/.travis.yml +++ b/.travis.yml @@ -36,6 +36,10 @@ script: - RAYVENS_TEST_MODE=local RAYVENS_TRANSPORT=kafka python ./tests/source.py # - ray submit ./scripts/cluster.yaml ./tests/stream.py - ray submit ./scripts/cluster.yaml ./tests/source.py + - RAYVENS_TEST_MODE=local python ./tests/kafka_transport.py + - ray submit ./scripts/cluster.yaml ./tests/kafka_transport.py local + - RAYVENS_TEST_MODE=local python ./tests/kafka_scaling_transport.py + - ray submit ./scripts/cluster.yaml ./tests/kafka_scaling_transport.py local # Test operator mode # - ray submit ./scripts/cluster.yaml ./tests/source_operator.py @@ -47,6 +51,8 @@ script: # - ray submit ./scripts/cluster.yaml ./tests/generic_sink.py # - RAYVENS_TEST_MODE=local python ./tests/generic_source.py # - ray submit ./scripts/cluster.yaml ./tests/generic_source.py + - ray submit ./scripts/cluster.yaml ./tests/kafka_transport.py operator + - ray submit ./scripts/cluster.yaml ./tests/kafka_scaling_transport.py operator # Test mixed mode - RAYVENS_TEST_MODE=mixed python ./tests/sink.py diff --git a/tests/kafka_scaling_transport.py b/tests/kafka_scaling_transport.py index 6d5151d..63ae1ed 100644 --- a/tests/kafka_scaling_transport.py +++ b/tests/kafka_scaling_transport.py @@ -15,19 +15,19 @@ # import os +import sys import ray import rayvens # Initialize run mode. -run_mode = 'operator' -env_run_mode = os.getenv('RAYVENS_TEST_MODE') -if env_run_mode is not None: - run_mode = env_run_mode +if len(sys.argv) < 2: + run_mode = 'local' +run_mode = sys.argv[1] -if run_mode == 'operator': - ray.init(address='auto') -else: +if os.getenv('RAYVENS_TEST_MODE') == 'local': ray.init(object_store_memory=78643200) +else: + ray.init(address='auto') # The Kafka topic used for communication. topic = "testTopic" diff --git a/tests/kafka_transport.py b/tests/kafka_transport.py index 91ed13b..6a96f6b 100644 --- a/tests/kafka_transport.py +++ b/tests/kafka_transport.py @@ -15,19 +15,19 @@ # import os +import sys import ray import rayvens # Initialize run mode. -run_mode = 'operator' -env_run_mode = os.getenv('RAYVENS_TEST_MODE') -if env_run_mode is not None: - run_mode = env_run_mode +if len(sys.argv) < 2: + run_mode = 'local' +run_mode = sys.argv[1] -if run_mode == 'operator': - ray.init(address='auto') -else: +if os.getenv('RAYVENS_TEST_MODE') == 'local': ray.init(object_store_memory=78643200) +else: + ray.init(address='auto') # The Kafka topic used for communication. topic = "testTopic" From 256e92f617e3dfbb0b8ffde7c7e5623f10cd979e Mon Sep 17 00:00:00 2001 From: Gheorghe Teodor Bercea Date: Tue, 7 Sep 2021 13:05:48 -0400 Subject: [PATCH 5/5] Fix tests. --- tests/kafka_scaling_transport.py | 3 ++- tests/kafka_transport.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/kafka_scaling_transport.py b/tests/kafka_scaling_transport.py index 63ae1ed..25af080 100644 --- a/tests/kafka_scaling_transport.py +++ b/tests/kafka_scaling_transport.py @@ -22,7 +22,8 @@ # Initialize run mode. if len(sys.argv) < 2: run_mode = 'local' -run_mode = sys.argv[1] +else: + run_mode = sys.argv[1] if os.getenv('RAYVENS_TEST_MODE') == 'local': ray.init(object_store_memory=78643200) diff --git a/tests/kafka_transport.py b/tests/kafka_transport.py index 6a96f6b..e2eb1df 100644 --- a/tests/kafka_transport.py +++ b/tests/kafka_transport.py @@ -22,7 +22,8 @@ # Initialize run mode. if len(sys.argv) < 2: run_mode = 'local' -run_mode = sys.argv[1] +else: + run_mode = sys.argv[1] if os.getenv('RAYVENS_TEST_MODE') == 'local': ray.init(object_store_memory=78643200)