Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ script:
- RAYVENS_TEST_MODE=local RAYVENS_TRANSPORT=kafka python ./tests/source.py
# - ray submit ./scripts/cluster.yaml ./tests/stream.py
- ray submit ./scripts/cluster.yaml ./tests/source.py
- RAYVENS_TEST_MODE=local python ./tests/kafka_transport.py
- ray submit ./scripts/cluster.yaml ./tests/kafka_transport.py local
- RAYVENS_TEST_MODE=local python ./tests/kafka_scaling_transport.py
- ray submit ./scripts/cluster.yaml ./tests/kafka_scaling_transport.py local

# Test operator mode
# - ray submit ./scripts/cluster.yaml ./tests/source_operator.py
Expand All @@ -47,6 +51,8 @@ script:
# - ray submit ./scripts/cluster.yaml ./tests/generic_sink.py
# - RAYVENS_TEST_MODE=local python ./tests/generic_source.py
# - ray submit ./scripts/cluster.yaml ./tests/generic_source.py
- ray submit ./scripts/cluster.yaml ./tests/kafka_transport.py operator
- ray submit ./scripts/cluster.yaml ./tests/kafka_scaling_transport.py operator

# Test mixed mode
- RAYVENS_TEST_MODE=mixed python ./tests/sink.py
5 changes: 1 addition & 4 deletions examples/cloud_object_storage/cos_sink_from_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import ray
import rayvens
import sys
import time

# This example demonstrates how to send objects to the AWS S3 or
# IBM Cloud Object Storage using multi-part uploads.
Expand Down Expand Up @@ -69,7 +68,5 @@
# the monitored file system directory.
dir_to_sink = stream.add_sink(dir_to_sink_config)

# stream._meta('verify_log', dir_to_sink, "BLA")

# Run for a while to give a chance for files to be dropped inside the directory
time.sleep(100)
stream.disconnect_all(after_idle_for=20)
3 changes: 1 addition & 2 deletions examples/cloud_object_storage/cos_sink_from_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import ray
import rayvens
import sys
import time
from pathlib import Path

# This example demonstrates how to send objects to the AWS S3 or
Expand Down Expand Up @@ -103,4 +102,4 @@
stream << "Some other input which is invalid."

# Run for a while
time.sleep(30)
stream.disconnect_all(after_idle_for=5)
3 changes: 1 addition & 2 deletions examples/cloud_object_storage/cos_sink_multi_part.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import ray
import rayvens
import sys
import time
from pathlib import Path

# This example demonstrates how to send objects to the AWS S3 or
Expand Down Expand Up @@ -73,4 +72,4 @@
stream << Path("test_files/test.txt")

# Run for a while
time.sleep(20)
stream.disconnect_all(after_idle_for=5)
8 changes: 8 additions & 0 deletions rayvens/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ def disconnect_all(self,
self._wait_for_timeout(after_idle_for, after)
return ray.get(self.actor.disconnect_all.remote(stream_drain_timeout))

def event_count(self):
return ray.get(self.actor.event_count.remote())

def _meta(self, action, *args, **kwargs):
return ray.get(self.actor._meta.remote(action, *args, **kwargs))

Expand Down Expand Up @@ -119,6 +122,7 @@ def __init__(self, name, operator=None):
self._sinks = {}
self._latest_sent_event_timestamp = None
self._limit_subscribers = False
self._event_counter = 0

def send_to(self, subscriber, name=None):
if self._limit_subscribers:
Expand All @@ -142,6 +146,7 @@ def append(self, data):
continue
_eval(subscriber, data)
self._latest_sent_event_timestamp = time.time()
self._event_counter += 1

def add_operator(self, operator):
self._operator = operator
Expand Down Expand Up @@ -195,6 +200,9 @@ def disconnect_all(self, stream_drain_timeout):
for sink_name in dict(self._sinks):
self.disconnect_sink(sink_name)

def event_count(self):
return self.event_count

def _meta(self, action, *args, **kwargs):
return verify_do(self, _global_camel, action, *args, **kwargs)

Expand Down
25 changes: 22 additions & 3 deletions rayvens/core/verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
#

import time
from rayvens.core import kamel


Expand All @@ -23,7 +24,11 @@ def verify_do(stream, _global_camel, action, *args, **kwargs):
raise RuntimeError('invalid meta action')


def _verify_log(stream, _global_camel, sink_source_name, message):
def _verify_log(stream,
_global_camel,
sink_source_name,
message,
wait_for_events=False):
# Get integration:
integration = None
if sink_source_name in stream._sinks:
Expand All @@ -34,10 +39,25 @@ def _verify_log(stream, _global_camel, sink_source_name, message):
raise RuntimeError(
f'{sink_source_name} not found on stream {stream.name}')

log = "FAIL"

# Wait for at least one event to happen.
if wait_for_events:
event_count = 0
countdown = 20
while event_count == 0:
event_count = stream.event_count()
time.sleep(1)
countdown -= 1
if countdown == 0:
break
if event_count == 0:
print("[LOG CHECK]:", log)
return False

if _global_camel.mode.is_local():
# In the local case the integration run is ongoing and we can
# access the logs directly.
# TODO: make this work for local implementation.
outcome = integration.invocation.invoke(message)
else:
# When running using the operator then the integration run command
Expand All @@ -48,7 +68,6 @@ def _verify_log(stream, _global_camel, sink_source_name, message):
outcome = invocation is not None
invocation.kill()

log = "FAIL"
if outcome:
log = "SUCCESS"
print("[LOG CHECK]:", log)
Expand Down
7 changes: 2 additions & 5 deletions tests/generic_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import ray
import rayvens
import time
import os

# Initialize ray based on where ray will run inside the cluster using the
Expand Down Expand Up @@ -59,10 +58,8 @@
output_message = f'Sending message to Slack sink in run mode {run_mode}.'
stream << output_message

time.sleep(10)

# Verify outcome.
stream._meta('verify_log', sink, output_message)
stream._meta('verify_log', sink, output_message, wait_for_events=True)

# Delete all integrations from stream.
stream.disconnect_all()
stream.disconnect_all(after_idle_for=5)
63 changes: 63 additions & 0 deletions tests/kafka_scaling_transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#
# Copyright IBM Corporation 2021
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import sys
import ray
import rayvens

# Initialize run mode.
if len(sys.argv) < 2:
run_mode = 'local'
else:
run_mode = sys.argv[1]

if os.getenv('RAYVENS_TEST_MODE') == 'local':
ray.init(object_store_memory=78643200)
else:
ray.init(address='auto')

# The Kafka topic used for communication.
topic = "testTopic"

rayvens.init(mode=run_mode, transport='kafka')

# Create source stream and configuration.
source_stream = rayvens.Stream('kafka-source-stream')

# Event sink config.
test_sink_config = dict(kind='test-sink')

# Add sink to stream.
test_sink = source_stream.add_sink(test_sink_config)

source_config = dict(
kind='http-source',
url='https://query1.finance.yahoo.com/v7/finance/quote?symbols=AAPL',
route='/from-http',
period=2000,
kafka_transport_topic=topic,
kafka_transport_partitions=3)
source = source_stream.add_source(source_config)

# Verify outcome.
source_stream._meta('verify_log',
test_sink,
"quoteResponse",
wait_for_events=True)

# Disconnect source and sink.
source_stream.disconnect_all(after=10)
61 changes: 61 additions & 0 deletions tests/kafka_transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Copyright IBM Corporation 2021
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import sys
import ray
import rayvens

# Initialize run mode.
if len(sys.argv) < 2:
run_mode = 'local'
else:
run_mode = sys.argv[1]

if os.getenv('RAYVENS_TEST_MODE') == 'local':
ray.init(object_store_memory=78643200)
else:
ray.init(address='auto')

# The Kafka topic used for communication.
topic = "testTopic"

rayvens.init(mode=run_mode, transport='kafka')

# Create source stream and configuration.
source_stream = rayvens.Stream('kafka-source-stream')

# Event sink config.
test_sink_config = dict(kind='test-sink')

# Add sink to stream.
test_sink = source_stream.add_sink(test_sink_config)

source_config = dict(
kind='http-source',
url='https://query1.finance.yahoo.com/v7/finance/quote?symbols=AAPL',
route='/from-http',
period=2000)
source = source_stream.add_source(source_config)

# Verify outcome.
source_stream._meta('verify_log',
test_sink,
"quoteResponse",
wait_for_events=True)

# Disconnect source and sink.
source_stream.disconnect_all(after=5)
7 changes: 2 additions & 5 deletions tests/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import ray
import rayvens
import time
import os

# Initialize ray based on where ray will run inside the cluster using the
Expand Down Expand Up @@ -52,10 +51,8 @@
output_message = f'Sending message to Slack sink in run mode {run_mode}.'
stream << output_message

time.sleep(10)

# Verify outcome.
stream._meta('verify_log', sink, output_message)
stream._meta('verify_log', sink, output_message, wait_for_events=True)

# Delete all integrations from stream.
stream.disconnect_all()
stream.disconnect_all(after_idle_for=5)