3131import java .util .Set ;
3232import java .util .concurrent .*;
3333import java .util .concurrent .atomic .AtomicBoolean ;
34+ import java .util .concurrent .locks .Lock ;
35+ import java .util .concurrent .locks .ReentrantLock ;
3436
3537/**
3638 * Manages a set of channels, indexed by channel number (<code><b>1.._channelMax</b></code>).
@@ -40,8 +42,8 @@ public class ChannelManager {
4042 private static final Logger LOGGER = LoggerFactory .getLogger (ChannelManager .class );
4143
4244 private final AtomicBoolean closed = new AtomicBoolean (false );
43- /** Monitor for <code>_channelMap</code> and <code>channelNumberAllocator</code> */
44- private final Object monitor = new Object ();
45+ /** Lock for <code>_channelMap</code> and <code>channelNumberAllocator</code> */
46+ private final Lock lock = new ReentrantLock ();
4547 /** Mapping from <code><b>1.._channelMax</b></code> to {@link ChannelN} instance */
4648 private final Map <Integer , ChannelN > _channelMap = new HashMap <>();
4749 private final IntAllocator channelNumberAllocator ;
@@ -97,10 +99,13 @@ public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFac
9799 * @throws UnknownChannelException if there is no channel with number <code><b>channelNumber</b></code> on this connection
98100 */
99101 public ChannelN getChannel (int channelNumber ) {
100- synchronized (this .monitor ) {
102+ lock .lock ();
103+ try {
101104 ChannelN ch = _channelMap .get (channelNumber );
102105 if (ch == null ) throw new UnknownChannelException (channelNumber );
103106 return ch ;
107+ } finally {
108+ lock .unlock ();
104109 }
105110 }
106111
@@ -111,8 +116,11 @@ public ChannelN getChannel(int channelNumber) {
111116 public void handleSignal (final ShutdownSignalException signal ) {
112117 if (this .closed .compareAndSet (false , true )) {
113118 Set <ChannelN > channels ;
114- synchronized (this .monitor ) {
119+ lock .lock ();
120+ try {
115121 channels = new HashSet <>(_channelMap .values ());
122+ } finally {
123+ lock .unlock ();
116124 }
117125 Set <CountDownLatch > shutdownSet = new HashSet <>();
118126 for (final ChannelN channel : channels ) {
@@ -171,26 +179,32 @@ private void scheduleShutdownProcessing(Set<CountDownLatch> shutdownSet) {
171179
172180 public ChannelN createChannel (AMQConnection connection ) throws IOException {
173181 ChannelN ch ;
174- synchronized (this .monitor ) {
182+ lock .lock ();
183+ try {
175184 int channelNumber = channelNumberAllocator .allocate ();
176185 if (channelNumber == -1 ) {
177186 return null ;
178187 } else {
179188 ch = addNewChannel (connection , channelNumber );
180189 }
190+ } finally {
191+ lock .unlock ();
181192 }
182193 ch .open (); // now that it's been safely added
183194 return ch ;
184195 }
185196
186197 public ChannelN createChannel (AMQConnection connection , int channelNumber ) throws IOException {
187198 ChannelN ch ;
188- synchronized (this .monitor ) {
199+ lock .lock ();
200+ try {
189201 if (channelNumberAllocator .reserve (channelNumber )) {
190202 ch = addNewChannel (connection , channelNumber );
191203 } else {
192204 return null ;
193205 }
206+ } finally {
207+ lock .unlock ();
194208 }
195209 ch .open (); // now that it's been safely added
196210 return ch ;
@@ -233,7 +247,8 @@ public void releaseChannelNumber(ChannelN channel) {
233247 // a way as to cause disconnectChannel on the old channel to try to
234248 // remove the new one. Ideally we would fix this race at the source,
235249 // but it's much easier to just catch it here.
236- synchronized (this .monitor ) {
250+ lock .lock ();
251+ try {
237252 int channelNumber = channel .getChannelNumber ();
238253 ChannelN existing = _channelMap .remove (channelNumber );
239254 // Nothing to do here. Move along.
@@ -246,13 +261,11 @@ else if (existing != channel) {
246261 return ;
247262 }
248263 channelNumberAllocator .free (channelNumber );
264+ } finally {
265+ lock .unlock ();
249266 }
250267 }
251268
252- public ExecutorService getShutdownExecutor () {
253- return shutdownExecutor ;
254- }
255-
256269 public void setShutdownExecutor (ExecutorService shutdownExecutor ) {
257270 this .shutdownExecutor = shutdownExecutor ;
258271 }
0 commit comments