@@ -46,27 +46,28 @@ def initialized(self):
46
46
47
47
@staticmethod
48
48
def process_message (store , requester , msg , ready ):
49
- payload = json .loads (msg .data )
50
- log .debug ("Received stream event {}" .format (msg .event ))
49
+ log .debug ("Received stream event {} with data: {}" .format (msg .event , msg .data ))
51
50
if msg .event == 'put' :
51
+ payload = json .loads (msg .data )
52
52
store .init (payload )
53
53
if not ready .is_set () and store .initialized :
54
54
log .info ("StreamingUpdateProcessor initialized ok" )
55
55
return True
56
56
elif msg .event == 'patch' :
57
+ payload = json .loads (msg .data )
57
58
key = payload ['path' ][1 :]
58
59
feature = payload ['data' ]
59
- log .debug ("Updating feature {}" .format (key ))
60
60
store .upsert (key , feature )
61
61
elif msg .event == "indirect/patch" :
62
- key = payload [ ' data' ]
62
+ key = msg . data
63
63
store .upsert (key , requester .get_one (key ))
64
64
elif msg .event == "indirect/put" :
65
65
store .init (requester .get_all ())
66
66
if not ready .is_set () and store .initialized :
67
67
log .info ("StreamingUpdateProcessor initialized ok" )
68
68
return True
69
69
elif msg .event == 'delete' :
70
+ payload = json .loads (msg .data )
70
71
key = payload ['path' ][1 :]
71
72
# noinspection PyShadowingNames
72
73
version = payload ['version' ]
0 commit comments