-
Notifications
You must be signed in to change notification settings - Fork 44
Qos mqtt compatibility #238
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@@ -153,6 +158,14 @@ def content_tests(settings, vcopy): | |||
vcopy["WM_GW_BUFFERING_MINIMAL_SINK_COST"] | |||
== settings.buffering_minimal_sink_cost | |||
) | |||
assert ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Black would make changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor changes with naming and comments to make but otherwise looks good.
} | ||
self.last_update_time = current_time | ||
|
||
def is_in_cache(self, msg_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this take into account situation that all the messages are sent with same message id? I think that It has not been previously required that message id needs to distinct in some time range.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that message id were supposed to be unique. Anyway, it is better to hash the payload and the topics and to compare in the cache if it is already present instead of taking the message id. It will therefore handle this kind of issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there some time stamp in the incoming message? In some cases you might send broadcast RemoteAPI message that is exactly the same as previous message after one or two minutes (or even faster in LL network) to be sure that the message goes through. In this case the hash would match also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The uniqueness of request_id is specified here: https://github.com/wirepas/backend-apis/blob/master/gateway_to_backend/README.md#request-unique-identifier
So it is written that at least 48 bits must be unique.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good. Is the some check in gateway for req id? If I remember correctly, at least earlier, it was possible to use any constant req id (e.g. always zero).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the moment there is no check, it is just copied to the response. So if you use same id, you cannot match the response to an answer.
But with this proposed changed, this id wooul now be used as a way to detect duplicate messages. We could change it as a hash of full message, but will still filter twos messages with same id and same content sent very close in time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our case we are not so interested about the id as we cannot rely that the message reaches the target, so we need to resend it until we get response from the node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The best is then to resend it with a new request_id as it is definitely not the same request
d1b0bb9
to
a3d6971
Compare
assert message_cache.add_msg(REQ_ID) is True | ||
|
||
# Test if the cache has reset eventually | ||
sleep(CACHE_TIME_WINDOW_S/2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Black would make changes.
missing whitespace around arithmetic operator
self.cache_update_s = min(cache_update_s, cache_time_window_s) | ||
|
||
# Start a thread that cleans periodically the cache. | ||
self._clean_cache_thread = Thread(target=self._clean_cache_thread, daemon=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long (87 > 79 characters)
): | ||
""" | ||
Args: | ||
cache_time_window_s: time in seconds after which a message is removed from the cache. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long (97 > 79 characters)
""" | ||
Class to cache all recent messages to avoid sending duplicates in MQTT. | ||
""" | ||
def __init__( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Black would make changes.
# Add the message to the cache, if it can't be added, | ||
# the function should not send back any message | ||
if not self.message_cache.add_msg(request.req_id): | ||
logging.warning(f"_on_otap_set_target_scratchpad_request_received: A message with id={request.req_id} is already present in the cache !") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long (149 > 79 characters)
@@ -559,6 +575,10 @@ def _on_set_config_cmd_received(self, client, userdata, message): | |||
logging.error(str(e)) | |||
return | |||
|
|||
if not self.message_cache.add_msg(request.req_id): | |||
logging.warning(f"_on_set_config_cmd_received: A message with id={request.req_id} is already present in the cache !") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long (129 > 79 characters)
@@ -536,6 +548,10 @@ def _on_get_gateway_info_cmd_received(self, client, userdata, message): | |||
logging.error(str(e)) | |||
return | |||
|
|||
if not self.message_cache.add_msg(request.req_id): | |||
logging.warning(f"_on_get_gateway_info_cmd_received: A message with id={request.req_id} is already present in the cache !") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long (135 > 79 characters)
# Add the message to the cache, if it can't be added, | ||
# the function should not send back any response to the request | ||
if not self.message_cache.add_msg(request.req_id): | ||
logging.warning(f"_on_get_configs_cmd_received: A message with id={request.req_id} is already present in the cache !") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long (130 > 79 characters)
# Add the message to the cache, if it can't be added, | ||
# the function should not send back any response to the request | ||
if not self.message_cache.add_msg(request.req_id): | ||
logging.warning(f"_on_send_data_cmd_received: A message with id={request.req_id} is already present in the cache !") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long (128 > 79 characters)
|
||
self.cache.add_argument( | ||
"--cache_time_window_s", | ||
default=os.environ.get("WM_CACHE_TIME_WINDOW_S", cleaning_time_window_s_default), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long (93 > 79 characters)
a3d6971
to
6cff0bf
Compare
Some MQTT brokers do not support qos=2. Changing transport service to publish with qos=1. Adding a cache in transport service to avoid sending redundant messages. Fixes on PR #238
6cff0bf
to
20f7828
Compare
Some MQTT brokers do not support qos=2. Changing transport service to publish with qos=1. Adding a cache in transport service to avoid sending redundant messages.
Some MQTT brokers do not support qos=2. Changing transport service to publish with qos=1. Adding a cache in transport service to avoid sending redundant messages.
Changing transport service to publish with qos=1. Adding a cache in transport service to avoid sending redundant messages. Fixes on PR #238
20f7828
to
f1fd6a9
Compare
Renaming class to MessageCache + normalizing variable names for better visibility. Cache is being cleaned with a thread and not with messages addition. QoS has been removed from mqtt option on publish and subscription, as only qos=1 should be used with the cache. Warnings have been added when redundancy is dectected at transport service level.
f1fd6a9
to
307c3a2
Compare
MQTT compatibility with QoS
Some MQTT brokers do not support qos=2.
Changing transport service to publish with qos=1.
Adding a cache in transport service to avoid sending redundant messages.