@@ -101,9 +101,14 @@ type AtomicTargetSize = atomic::AtomicUsize;
101101type AtomicTargetSize = atomic:: AtomicU8 ;
102102
103103#[ cfg( feature = "mpmc_large" ) ]
104- type IntSize = usize ;
104+ type UintSize = usize ;
105105#[ cfg( not( feature = "mpmc_large" ) ) ]
106- type IntSize = u8 ;
106+ type UintSize = u8 ;
107+
108+ #[ cfg( feature = "mpmc_large" ) ]
109+ type IntSize = isize ;
110+ #[ cfg( not( feature = "mpmc_large" ) ) ]
111+ type IntSize = i8 ;
107112
108113/// MPMC queue with a capability for 2 elements.
109114pub type Q2 < T > = MpMcQueue < T , 2 > ;
@@ -133,7 +138,7 @@ pub struct MpMcQueue<T, const N: usize> {
133138}
134139
135140impl < T , const N : usize > MpMcQueue < T , N > {
136- const MASK : IntSize = ( N - 1 ) as IntSize ;
141+ const MASK : UintSize = ( N - 1 ) as UintSize ;
137142 const EMPTY_CELL : Cell < T > = Cell :: new ( 0 ) ;
138143
139144 const ASSERT : [ ( ) ; 1 ] = [ ( ) ] ;
@@ -146,7 +151,7 @@ impl<T, const N: usize> MpMcQueue<T, N> {
146151
147152 // Const assert on size.
148153 #[ allow( clippy:: no_effect) ]
149- Self :: ASSERT [ ( N >= ( IntSize :: MAX as usize ) ) as usize ] ;
154+ Self :: ASSERT [ ( N >= ( UintSize :: MAX as usize ) ) as usize ] ;
150155
151156 let mut cell_count = 0 ;
152157
@@ -200,23 +205,23 @@ impl<T> Cell<T> {
200205 const fn new ( seq : usize ) -> Self {
201206 Self {
202207 data : MaybeUninit :: uninit ( ) ,
203- sequence : AtomicTargetSize :: new ( seq as IntSize ) ,
208+ sequence : AtomicTargetSize :: new ( seq as UintSize ) ,
204209 }
205210 }
206211}
207212
208213unsafe fn dequeue < T > (
209214 buffer : * mut Cell < T > ,
210215 dequeue_pos : & AtomicTargetSize ,
211- mask : IntSize ,
216+ mask : UintSize ,
212217) -> Option < T > {
213218 let mut pos = dequeue_pos. load ( Ordering :: Relaxed ) ;
214219
215220 let mut cell;
216221 loop {
217222 cell = buffer. add ( usize:: from ( pos & mask) ) ;
218223 let seq = ( * cell) . sequence . load ( Ordering :: Acquire ) ;
219- let dif = ( seq as i8 ) . wrapping_sub ( ( pos. wrapping_add ( 1 ) ) as i8 ) ;
224+ let dif = ( seq as IntSize ) . wrapping_sub ( ( pos. wrapping_add ( 1 ) ) as IntSize ) ;
220225
221226 match dif. cmp ( & 0 ) {
222227 core:: cmp:: Ordering :: Equal => {
@@ -251,7 +256,7 @@ unsafe fn dequeue<T>(
251256unsafe fn enqueue < T > (
252257 buffer : * mut Cell < T > ,
253258 enqueue_pos : & AtomicTargetSize ,
254- mask : IntSize ,
259+ mask : UintSize ,
255260 item : T ,
256261) -> Result < ( ) , T > {
257262 let mut pos = enqueue_pos. load ( Ordering :: Relaxed ) ;
@@ -260,7 +265,7 @@ unsafe fn enqueue<T>(
260265 loop {
261266 cell = buffer. add ( usize:: from ( pos & mask) ) ;
262267 let seq = ( * cell) . sequence . load ( Ordering :: Acquire ) ;
263- let dif = ( seq as i8 ) . wrapping_sub ( pos as i8 ) ;
268+ let dif = ( seq as IntSize ) . wrapping_sub ( pos as IntSize ) ;
264269
265270 match dif. cmp ( & 0 ) {
266271 core:: cmp:: Ordering :: Equal => {
@@ -320,7 +325,8 @@ mod tests {
320325 assert ! ( q. enqueue( 0 ) . is_ok( ) ) ;
321326 assert_eq ! ( q. dequeue( ) , Some ( 0 ) ) ;
322327 }
323- // this should not block forever
328+
329+ // Queue is empty, this should not block forever.
324330 assert_eq ! ( q. dequeue( ) , None ) ;
325331 }
326332
@@ -336,4 +342,22 @@ mod tests {
336342 // this should not block forever
337343 assert ! ( q. enqueue( 0 ) . is_err( ) ) ;
338344 }
345+
346+ #[ test]
347+ fn enqueue_full ( ) {
348+ #[ cfg( not( feature = "mpmc_large" ) ) ]
349+ const CAPACITY : usize = 128 ;
350+
351+ #[ cfg( feature = "mpmc_large" ) ]
352+ const CAPACITY : usize = 256 ;
353+
354+ let q: MpMcQueue < u8 , CAPACITY > = MpMcQueue :: new ( ) ;
355+
356+ for _ in 0 ..CAPACITY {
357+ q. enqueue ( 0xAA ) . unwrap ( ) ;
358+ }
359+
360+ // Queue is full, this should not block forever.
361+ q. enqueue ( 0x55 ) . unwrap_err ( ) ;
362+ }
339363}
0 commit comments