Skip to content

Commit a3d7dc0

Browse files
jhoellerunknown
authored andcommitted
First pass completed, with support for the standard JMS 2.0 API in our CachingConnectionFactory and support for a delivery delay setting in JmsTemplate. Note that none of this has been tested against an actual JMS 2.0 provider yet, due to no such provider being available yet.
Known limitations: * Spring's SingleConnectionFactory and CachingConnectionFactory do not support createContext calls for JMSContext creation at this point. Note that the JMSContext model bypasses the point of a Connection/Session pool anyway; this will only really work with a native JMS 2.0 ConnectionFactory and provider-specific pooling such as in an EE environment. * JmsTemplate has no out-of-the-box support for send calls with an async completion listener. Note that a CompletionListener can be specified in a custom ProducerCallback implementation if really necessary. There is no special support for the simplified JMSContext API, and likely never will be: JMSContext can be used from a native ConnectionFactory directly. @Inject JMSContext isn't supported due to rather involved rules for defining and scoping the injected context which are quite at odds with the Spring way of doing these things. We strongly recommend JmsTemplate instead, or @resource ConnectionFactory with a createContext call within a Java 7 try-with-resources clause (as shown in the specification). After all, JMSContext has primarily been designed with EE's one-session-per-connection model and JTA transactions in mind, not with Spring-style use of a native JMS provider and native JMS transactions. Issue: SPR-8197
1 parent 41737e8 commit a3d7dc0

File tree

4 files changed

+191
-21
lines changed

4 files changed

+191
-21
lines changed

spring-jms/src/main/java/org/springframework/jms/connection/CachedMessageProducer.java

Lines changed: 102 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2012 the original author or authors.
2+
* Copyright 2002-2013 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,10 @@
1616

1717
package org.springframework.jms.connection;
1818

19+
import java.lang.reflect.InvocationHandler;
20+
import java.lang.reflect.InvocationTargetException;
21+
import java.lang.reflect.Method;
22+
import java.lang.reflect.Proxy;
1923
import javax.jms.Destination;
2024
import javax.jms.JMSException;
2125
import javax.jms.Message;
@@ -25,6 +29,9 @@
2529
import javax.jms.Topic;
2630
import javax.jms.TopicPublisher;
2731

32+
import org.springframework.util.ClassUtils;
33+
import org.springframework.util.ReflectionUtils;
34+
2835
/**
2936
* JMS MessageProducer decorator that adapts calls to a shared MessageProducer
3037
* instance underneath, managing QoS settings locally within the decorator.
@@ -34,12 +41,42 @@
3441
*/
3542
class CachedMessageProducer implements MessageProducer, QueueSender, TopicPublisher {
3643

44+
private static final Method setDeliveryDelayMethod =
45+
ClassUtils.getMethodIfAvailable(MessageProducer.class, "setDeliveryDelay", long.class);
46+
47+
private static final Method getDeliveryDelayMethod =
48+
ClassUtils.getMethodIfAvailable(MessageProducer.class, "getDeliveryDelay");
49+
50+
private static Class completionListenerClass;
51+
52+
private static Method sendWithCompletionListenerMethod;
53+
54+
private static Method sendWithDestinationAndCompletionListenerMethod;
55+
56+
static {
57+
try {
58+
completionListenerClass = ClassUtils.forName(
59+
"javax.jms.CompletionListener", CachedMessageProducer.class.getClassLoader());
60+
sendWithCompletionListenerMethod = MessageProducer.class.getMethod(
61+
"send", Message.class, int.class, int.class, long.class, completionListenerClass);
62+
sendWithDestinationAndCompletionListenerMethod = MessageProducer.class.getMethod(
63+
"send", Destination.class, Message.class, int.class, int.class, long.class, completionListenerClass);
64+
}
65+
catch (Exception ex) {
66+
// No JMS 2.0 API available
67+
completionListenerClass = null;
68+
}
69+
}
70+
71+
3772
private final MessageProducer target;
3873

3974
private Boolean originalDisableMessageID;
4075

4176
private Boolean originalDisableMessageTimestamp;
4277

78+
private Long originalDeliveryDelay;
79+
4380
private int deliveryMode;
4481

4582
private int priority;
@@ -57,7 +94,7 @@ public CachedMessageProducer(MessageProducer target) throws JMSException {
5794

5895
public void setDisableMessageID(boolean disableMessageID) throws JMSException {
5996
if (this.originalDisableMessageID == null) {
60-
this.originalDisableMessageID = Boolean.valueOf(this.target.getDisableMessageID());
97+
this.originalDisableMessageID = this.target.getDisableMessageID();
6198
}
6299
this.target.setDisableMessageID(disableMessageID);
63100
}
@@ -68,7 +105,7 @@ public boolean getDisableMessageID() throws JMSException {
68105

69106
public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException {
70107
if (this.originalDisableMessageTimestamp == null) {
71-
this.originalDisableMessageTimestamp = Boolean.valueOf(this.target.getDisableMessageTimestamp());
108+
this.originalDisableMessageTimestamp = this.target.getDisableMessageTimestamp();
72109
}
73110
this.target.setDisableMessageTimestamp(disableMessageTimestamp);
74111
}
@@ -77,6 +114,17 @@ public boolean getDisableMessageTimestamp() throws JMSException {
77114
return this.target.getDisableMessageTimestamp();
78115
}
79116

117+
public void setDeliveryDelay(long deliveryDelay) {
118+
if (this.originalDeliveryDelay == null) {
119+
this.originalDeliveryDelay = (Long) ReflectionUtils.invokeMethod(getDeliveryDelayMethod, this.target);
120+
}
121+
ReflectionUtils.invokeMethod(setDeliveryDelayMethod, this.target, deliveryDelay);
122+
}
123+
124+
public long getDeliveryDelay() {
125+
return (Long) ReflectionUtils.invokeMethod(getDeliveryDelayMethod, this.target);
126+
}
127+
80128
public void setDeliveryMode(int deliveryMode) {
81129
this.deliveryMode = deliveryMode;
82130
}
@@ -156,18 +204,66 @@ public void publish(Topic topic, Message message, int deliveryMode, int priority
156204
public void close() throws JMSException {
157205
// It's a cached MessageProducer... reset properties only.
158206
if (this.originalDisableMessageID != null) {
159-
this.target.setDisableMessageID(this.originalDisableMessageID.booleanValue());
207+
this.target.setDisableMessageID(this.originalDisableMessageID);
160208
this.originalDisableMessageID = null;
161209
}
162210
if (this.originalDisableMessageTimestamp != null) {
163-
this.target.setDisableMessageTimestamp(this.originalDisableMessageTimestamp.booleanValue());
211+
this.target.setDisableMessageTimestamp(this.originalDisableMessageTimestamp);
164212
this.originalDisableMessageTimestamp = null;
165213
}
214+
if (this.originalDeliveryDelay != null) {
215+
ReflectionUtils.invokeMethod(setDeliveryDelayMethod, this.target, this.originalDeliveryDelay);
216+
this.originalDeliveryDelay = null;
217+
}
166218
}
167219

168-
169220
public String toString() {
170221
return "Cached JMS MessageProducer: " + this.target;
171222
}
172223

224+
225+
/**
226+
* Build a dynamic proxy that reflectively adapts to JMS 2.0 API methods, if necessary.
227+
* Otherwise simply return this CachedMessageProducer instance itself.
228+
*/
229+
public MessageProducer getProxyIfNecessary() {
230+
if (completionListenerClass != null) {
231+
return (MessageProducer) Proxy.newProxyInstance(CachedMessageProducer.class.getClassLoader(),
232+
new Class[] {MessageProducer.class, QueueSender.class, TopicPublisher.class},
233+
new Jms2MessageProducerInvocationHandler());
234+
}
235+
else {
236+
return this;
237+
}
238+
}
239+
240+
241+
/**
242+
* Reflective InvocationHandler which adapts to JMS 2.0 API methods that we
243+
* cannot statically compile against while preserving JMS 1.1 compatibility
244+
* (due to the new {@code javax.jms.CompletionListener} type in the signatures).
245+
*/
246+
private class Jms2MessageProducerInvocationHandler implements InvocationHandler {
247+
248+
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
249+
try {
250+
if (method.getName().equals("send") && args != null &&
251+
completionListenerClass.equals(method.getParameterTypes()[args.length - 1])) {
252+
if (args.length == 2) {
253+
return sendWithCompletionListenerMethod.invoke(
254+
target, args[0], deliveryMode, priority, timeToLive, args[1]);
255+
}
256+
else if (args.length == 3) {
257+
return sendWithDestinationAndCompletionListenerMethod.invoke(
258+
target, args[0], args[1], deliveryMode, priority, timeToLive, args[2]);
259+
}
260+
}
261+
return method.invoke(target, args);
262+
}
263+
catch (InvocationTargetException ex) {
264+
throw ex.getTargetException();
265+
}
266+
}
267+
}
268+
173269
}

spring-jms/src/main/java/org/springframework/jms/connection/CachingConnectionFactory.java

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2012 the original author or authors.
2+
* Copyright 2002-2013 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -40,7 +40,9 @@
4040
import javax.jms.TopicSession;
4141

4242
import org.springframework.util.Assert;
43+
import org.springframework.util.ClassUtils;
4344
import org.springframework.util.ObjectUtils;
45+
import org.springframework.util.ReflectionUtils;
4446

4547
/**
4648
* {@link SingleConnectionFactory} subclass that adds {@link javax.jms.Session}
@@ -80,6 +82,10 @@
8082
*/
8183
public class CachingConnectionFactory extends SingleConnectionFactory {
8284

85+
private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable(
86+
Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class);
87+
88+
8389
private int sessionCacheSize = 1;
8490

8591
private boolean cacheProducers = true;
@@ -333,7 +339,7 @@ else if (isCacheConsumers()) {
333339
null);
334340
}
335341
}
336-
else if (methodName.equals("createDurableSubscriber")) {
342+
else if (methodName.equals("createDurableConsumer") || methodName.equals("createDurableSubscriber")) {
337343
Destination dest = (Destination) args[0];
338344
if (dest != null) {
339345
return getCachedConsumer(dest,
@@ -342,6 +348,15 @@ else if (methodName.equals("createDurableSubscriber")) {
342348
(String) args[1]);
343349
}
344350
}
351+
else if (methodName.equals("createSharedDurableConsumer")) {
352+
Destination dest = (Destination) args[0];
353+
if (dest != null) {
354+
return getCachedConsumer(dest,
355+
(args.length > 2 ? (String) args[2] : null),
356+
null,
357+
(String) args[1]);
358+
}
359+
}
345360
}
346361
}
347362
try {
@@ -367,11 +382,11 @@ private MessageProducer getCachedProducer(Destination dest) throws JMSException
367382
}
368383
this.cachedProducers.put(cacheKey, producer);
369384
}
370-
return new CachedMessageProducer(producer);
385+
return new CachedMessageProducer(producer).getProxyIfNecessary();
371386
}
372387

373388
private MessageConsumer getCachedConsumer(
374-
Destination dest, String selector, boolean noLocal, String subscription) throws JMSException {
389+
Destination dest, String selector, Boolean noLocal, String subscription) throws JMSException {
375390

376391
ConsumerCacheKey cacheKey = new ConsumerCacheKey(dest, selector, noLocal, subscription);
377392
MessageConsumer consumer = this.cachedConsumers.get(cacheKey);
@@ -382,9 +397,27 @@ private MessageConsumer getCachedConsumer(
382397
}
383398
else {
384399
if (dest instanceof Topic) {
385-
consumer = (subscription != null ?
386-
this.target.createDurableSubscriber((Topic) dest, subscription, selector, noLocal) :
387-
this.target.createConsumer(dest, selector, noLocal));
400+
if (noLocal == null) {
401+
// createSharedDurableConsumer((Topic) dest, subscription, selector);
402+
try {
403+
consumer = (MessageConsumer) createSharedDurableConsumerMethod.invoke
404+
(this.target, dest, subscription, selector);
405+
}
406+
catch (InvocationTargetException ex) {
407+
if (ex.getTargetException() instanceof JMSException) {
408+
throw (JMSException) ex.getTargetException();
409+
}
410+
ReflectionUtils.handleInvocationTargetException(ex);
411+
}
412+
catch (IllegalAccessException ex) {
413+
throw new IllegalStateException("Could not access JMS 2.0 API method: " + ex.getMessage());
414+
}
415+
}
416+
else {
417+
consumer = (subscription != null ?
418+
this.target.createDurableSubscriber((Topic) dest, subscription, selector, noLocal) :
419+
this.target.createConsumer(dest, selector, noLocal));
420+
}
388421
}
389422
else {
390423
consumer = this.target.createConsumer(dest, selector);
@@ -499,11 +532,11 @@ private static class ConsumerCacheKey extends DestinationCacheKey {
499532

500533
private final String selector;
501534

502-
private final boolean noLocal;
535+
private final Boolean noLocal;
503536

504537
private final String subscription;
505538

506-
public ConsumerCacheKey(Destination destination, String selector, boolean noLocal, String subscription) {
539+
public ConsumerCacheKey(Destination destination, String selector, Boolean noLocal, String subscription) {
507540
super(destination);
508541
this.selector = selector;
509542
this.noLocal = noLocal;
@@ -517,7 +550,7 @@ public boolean equals(Object other) {
517550
ConsumerCacheKey otherKey = (ConsumerCacheKey) other;
518551
return (destinationEquals(otherKey) &&
519552
ObjectUtils.nullSafeEquals(this.selector, otherKey.selector) &&
520-
this.noLocal == otherKey.noLocal &&
553+
ObjectUtils.nullSafeEquals(this.noLocal, otherKey.noLocal) &&
521554
ObjectUtils.nullSafeEquals(this.subscription, otherKey.subscription));
522555
}
523556
}

spring-jms/src/main/java/org/springframework/jms/connection/SingleConnectionFactory.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2012 the original author or authors.
2+
* Copyright 2002-2013 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -528,9 +528,20 @@ else if (method.getName().equals("close")) {
528528
}
529529
else if (method.getName().equals("createSession") || method.getName().equals("createQueueSession") ||
530530
method.getName().equals("createTopicSession")) {
531-
boolean transacted = (Boolean) args[0];
532-
Integer ackMode = (Integer) args[1];
533-
Integer mode = (transacted ? Session.SESSION_TRANSACTED : ackMode);
531+
// Default: JMS 2.0 createSession() method
532+
Integer mode = Session.AUTO_ACKNOWLEDGE;
533+
if (args != null) {
534+
if (args.length == 1) {
535+
// JMS 2.0 createSession(int) method
536+
mode = (Integer) args[0];
537+
}
538+
else if (args.length == 2) {
539+
// JMS 1.1 createSession(boolean, int) method
540+
boolean transacted = (Boolean) args[0];
541+
Integer ackMode = (Integer) args[1];
542+
mode = (transacted ? Session.SESSION_TRANSACTED : ackMode);
543+
}
544+
}
534545
Session session = getSession(this.target, mode);
535546
if (session != null) {
536547
if (!method.getReturnType().isInstance(session)) {

0 commit comments

Comments
 (0)