@@ -148,7 +148,7 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd,
148
148
e = & rb -> events [rb -> ring_cnt ];
149
149
memset (e , 0 , sizeof (* e ));
150
150
151
- e -> events = EPOLLIN ;
151
+ e -> events = EPOLLIN | EPOLLET ;
152
152
e -> data .fd = rb -> ring_cnt ;
153
153
if (epoll_ctl (rb -> epoll_fd , EPOLL_CTL_ADD , map_fd , e ) < 0 ) {
154
154
err = - errno ;
@@ -260,7 +260,18 @@ static int64_t ringbuf_process_ring(struct ring *r)
260
260
cnt ++ ;
261
261
}
262
262
263
- smp_store_release (r -> consumer_pos , cons_pos );
263
+ /* This ordering is critical to ensure that an epoll notification
264
+ * gets sent in the case where the next iteration of this loop
265
+ * discovers that the consumer is caught up. If this store were
266
+ * performed using RELEASE, it'd be possible for the consumer
267
+ * to fail to see an updated producer position (because, perhaps,
268
+ * the producer position has not yet been updated), and for that
269
+ * producer to fail to see this write. By making this write SEQ_CST,
270
+ * we know that either the newly produced message will be visible
271
+ * to the consumer, or the producer will discover that the consumer
272
+ * is caught up.
273
+ */
274
+ __atomic_store_n (r -> consumer_pos , cons_pos , __ATOMIC_SEQ_CST );
264
275
}
265
276
} while (got_new_data );
266
277
done :
0 commit comments