@@ -372,6 +372,228 @@ def test_send_buffer_nowait(self):
372372 obj [4 :8 ] = b'ham.'
373373 self .assertEqual (obj , buf )
374374
375+ def test_send_cleared_with_subinterpreter (self ):
376+ def common (rch , sch , unbound = None , presize = 0 ):
377+ if not unbound :
378+ extraargs = ''
379+ elif unbound is channels .UNBOUND :
380+ extraargs = ', unbound=channels.UNBOUND'
381+ elif unbound is channels .UNBOUND_ERROR :
382+ extraargs = ', unbound=channels.UNBOUND_ERROR'
383+ elif unbound is channels .UNBOUND_REMOVE :
384+ extraargs = ', unbound=channels.UNBOUND_REMOVE'
385+ else :
386+ raise NotImplementedError (repr (unbound ))
387+ interp = interpreters .create ()
388+
389+ _run_output (interp , dedent (f"""
390+ from test.support.interpreters import channels
391+ sch = channels.SendChannel({ sch .id } )
392+ obj1 = b'spam'
393+ obj2 = b'eggs'
394+ sch.send_nowait(obj1{ extraargs } )
395+ sch.send_nowait(obj2{ extraargs } )
396+ """ ))
397+ self .assertEqual (
398+ _channels .get_count (rch .id ),
399+ presize + 2 ,
400+ )
401+
402+ if presize == 0 :
403+ obj1 = rch .recv ()
404+ self .assertEqual (obj1 , b'spam' )
405+ self .assertEqual (
406+ _channels .get_count (rch .id ),
407+ presize + 1 ,
408+ )
409+
410+ return interp
411+
412+ with self .subTest ('default' ): # UNBOUND
413+ rch , sch = channels .create ()
414+ interp = common (rch , sch )
415+ del interp
416+ self .assertEqual (_channels .get_count (rch .id ), 1 )
417+ obj1 = rch .recv ()
418+ self .assertEqual (_channels .get_count (rch .id ), 0 )
419+ self .assertIs (obj1 , channels .UNBOUND )
420+ self .assertEqual (_channels .get_count (rch .id ), 0 )
421+ with self .assertRaises (channels .ChannelEmptyError ):
422+ rch .recv_nowait ()
423+
424+ with self .subTest ('UNBOUND' ):
425+ rch , sch = channels .create ()
426+ interp = common (rch , sch , channels .UNBOUND )
427+ del interp
428+ self .assertEqual (_channels .get_count (rch .id ), 1 )
429+ obj1 = rch .recv ()
430+ self .assertIs (obj1 , channels .UNBOUND )
431+ self .assertEqual (_channels .get_count (rch .id ), 0 )
432+ with self .assertRaises (channels .ChannelEmptyError ):
433+ rch .recv_nowait ()
434+
435+ with self .subTest ('UNBOUND_ERROR' ):
436+ rch , sch = channels .create ()
437+ interp = common (rch , sch , channels .UNBOUND_ERROR )
438+
439+ del interp
440+ self .assertEqual (_channels .get_count (rch .id ), 1 )
441+ with self .assertRaises (channels .ItemInterpreterDestroyed ):
442+ rch .recv ()
443+
444+ self .assertEqual (_channels .get_count (rch .id ), 0 )
445+ with self .assertRaises (channels .ChannelEmptyError ):
446+ rch .recv_nowait ()
447+
448+ with self .subTest ('UNBOUND_REMOVE' ):
449+ rch , sch = channels .create ()
450+
451+ interp = common (rch , sch , channels .UNBOUND_REMOVE )
452+ del interp
453+ self .assertEqual (_channels .get_count (rch .id ), 0 )
454+ with self .assertRaises (channels .ChannelEmptyError ):
455+ rch .recv_nowait ()
456+
457+ sch .send_nowait (b'ham' , unbound = channels .UNBOUND_REMOVE )
458+ self .assertEqual (_channels .get_count (rch .id ), 1 )
459+ interp = common (rch , sch , channels .UNBOUND_REMOVE , 1 )
460+ self .assertEqual (_channels .get_count (rch .id ), 3 )
461+ sch .send_nowait (42 , unbound = channels .UNBOUND_REMOVE )
462+ self .assertEqual (_channels .get_count (rch .id ), 4 )
463+ del interp
464+ self .assertEqual (_channels .get_count (rch .id ), 2 )
465+ obj1 = rch .recv ()
466+ obj2 = rch .recv ()
467+ self .assertEqual (obj1 , b'ham' )
468+ self .assertEqual (obj2 , 42 )
469+ self .assertEqual (_channels .get_count (rch .id ), 0 )
470+ with self .assertRaises (channels .ChannelEmptyError ):
471+ rch .recv_nowait ()
472+
473+ def test_send_cleared_with_subinterpreter_mixed (self ):
474+ rch , sch = channels .create ()
475+ interp = interpreters .create ()
476+
477+ # If we don't associate the main interpreter with the channel
478+ # then the channel will be automatically closed when interp
479+ # is destroyed.
480+ sch .send_nowait (None )
481+ rch .recv ()
482+ self .assertEqual (_channels .get_count (rch .id ), 0 )
483+
484+ _run_output (interp , dedent (f"""
485+ from test.support.interpreters import channels
486+ sch = channels.SendChannel({ sch .id } )
487+ sch.send_nowait(1, unbound=channels.UNBOUND)
488+ sch.send_nowait(2, unbound=channels.UNBOUND_ERROR)
489+ sch.send_nowait(3)
490+ sch.send_nowait(4, unbound=channels.UNBOUND_REMOVE)
491+ sch.send_nowait(5, unbound=channels.UNBOUND)
492+ """ ))
493+ self .assertEqual (_channels .get_count (rch .id ), 5 )
494+
495+ del interp
496+ self .assertEqual (_channels .get_count (rch .id ), 4 )
497+
498+ obj1 = rch .recv ()
499+ self .assertIs (obj1 , channels .UNBOUND )
500+ self .assertEqual (_channels .get_count (rch .id ), 3 )
501+
502+ with self .assertRaises (channels .ItemInterpreterDestroyed ):
503+ rch .recv ()
504+ self .assertEqual (_channels .get_count (rch .id ), 2 )
505+
506+ obj2 = rch .recv ()
507+ self .assertIs (obj2 , channels .UNBOUND )
508+ self .assertEqual (_channels .get_count (rch .id ), 1 )
509+
510+ obj3 = rch .recv ()
511+ self .assertIs (obj3 , channels .UNBOUND )
512+ self .assertEqual (_channels .get_count (rch .id ), 0 )
513+
514+ def test_send_cleared_with_subinterpreter_multiple (self ):
515+ rch , sch = channels .create ()
516+ interp1 = interpreters .create ()
517+ interp2 = interpreters .create ()
518+
519+ sch .send_nowait (1 )
520+ _run_output (interp1 , dedent (f"""
521+ from test.support.interpreters import channels
522+ rch = channels.RecvChannel({ rch .id } )
523+ sch = channels.SendChannel({ sch .id } )
524+ obj1 = rch.recv()
525+ sch.send_nowait(2, unbound=channels.UNBOUND)
526+ sch.send_nowait(obj1, unbound=channels.UNBOUND_REMOVE)
527+ """ ))
528+ _run_output (interp2 , dedent (f"""
529+ from test.support.interpreters import channels
530+ rch = channels.RecvChannel({ rch .id } )
531+ sch = channels.SendChannel({ sch .id } )
532+ obj2 = rch.recv()
533+ obj1 = rch.recv()
534+ """ ))
535+ self .assertEqual (_channels .get_count (rch .id ), 0 )
536+ sch .send_nowait (3 )
537+ _run_output (interp1 , dedent ("""
538+ sch.send_nowait(4, unbound=channels.UNBOUND)
539+ # interp closed here
540+ sch.send_nowait(5, unbound=channels.UNBOUND_REMOVE)
541+ sch.send_nowait(6, unbound=channels.UNBOUND)
542+ """ ))
543+ _run_output (interp2 , dedent ("""
544+ sch.send_nowait(7, unbound=channels.UNBOUND_ERROR)
545+ # interp closed here
546+ sch.send_nowait(obj1, unbound=channels.UNBOUND_ERROR)
547+ sch.send_nowait(obj2, unbound=channels.UNBOUND_REMOVE)
548+ sch.send_nowait(8, unbound=channels.UNBOUND)
549+ """ ))
550+ _run_output (interp1 , dedent ("""
551+ sch.send_nowait(9, unbound=channels.UNBOUND_REMOVE)
552+ sch.send_nowait(10, unbound=channels.UNBOUND)
553+ """ ))
554+ self .assertEqual (_channels .get_count (rch .id ), 10 )
555+
556+ obj3 = rch .recv ()
557+ self .assertEqual (obj3 , 3 )
558+ self .assertEqual (_channels .get_count (rch .id ), 9 )
559+
560+ obj4 = rch .recv ()
561+ self .assertEqual (obj4 , 4 )
562+ self .assertEqual (_channels .get_count (rch .id ), 8 )
563+
564+ del interp1
565+ self .assertEqual (_channels .get_count (rch .id ), 6 )
566+
567+ # obj5 was removed
568+
569+ obj6 = rch .recv ()
570+ self .assertIs (obj6 , channels .UNBOUND )
571+ self .assertEqual (_channels .get_count (rch .id ), 5 )
572+
573+ obj7 = rch .recv ()
574+ self .assertEqual (obj7 , 7 )
575+ self .assertEqual (_channels .get_count (rch .id ), 4 )
576+
577+ del interp2
578+ self .assertEqual (_channels .get_count (rch .id ), 3 )
579+
580+ # obj1
581+ with self .assertRaises (channels .ItemInterpreterDestroyed ):
582+ rch .recv ()
583+ self .assertEqual (_channels .get_count (rch .id ), 2 )
584+
585+ # obj2 was removed
586+
587+ obj8 = rch .recv ()
588+ self .assertIs (obj8 , channels .UNBOUND )
589+ self .assertEqual (_channels .get_count (rch .id ), 1 )
590+
591+ # obj9 was removed
592+
593+ obj10 = rch .recv ()
594+ self .assertIs (obj10 , channels .UNBOUND )
595+ self .assertEqual (_channels .get_count (rch .id ), 0 )
596+
375597
376598if __name__ == '__main__' :
377599 # Test needs to be a package, so we can do relative imports.
0 commit comments