Skip to content

Commit 54dcb80

Browse files
committed
JMS Batch sample
1 parent 1effa91 commit 54dcb80

File tree

10 files changed

+376
-0
lines changed

10 files changed

+376
-0
lines changed

jms/jms-batch/pom.xml

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<parent>
8+
<groupId>org.javaee7.jms</groupId>
9+
<artifactId>jms-samples</artifactId>
10+
<version>1.0-SNAPSHOT</version>
11+
</parent>
12+
13+
<artifactId>jms-batch</artifactId>
14+
15+
<name>Batch JMS processing</name>
16+
<description>ItemReader reading from durable subscription</description>
17+
18+
<dependencies>
19+
<dependency>
20+
<groupId>org.javaee7</groupId>
21+
<artifactId>util-samples</artifactId>
22+
</dependency>
23+
</dependencies>
24+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package org.javaee7.jms.batch;
2+
3+
import javax.annotation.Resource;
4+
import javax.batch.api.chunk.AbstractItemReader;
5+
import javax.inject.Named;
6+
import javax.jms.ConnectionFactory;
7+
import javax.jms.JMSConsumer;
8+
import javax.jms.JMSContext;
9+
import javax.jms.Topic;
10+
import java.io.Serializable;
11+
12+
/**
13+
* @author Patrik Dudits
14+
*/
15+
@Named
16+
public class JmsItemReader extends AbstractItemReader {
17+
18+
@Resource(lookup = Resources.CONNECTION_FACTORY)
19+
ConnectionFactory factory;
20+
21+
private JMSContext jms;
22+
23+
@Resource(lookup = Resources.TOPIC)
24+
Topic topic;
25+
26+
private JMSConsumer subscription;
27+
28+
@Override
29+
public void open(Serializable checkpoint) throws Exception {
30+
jms = factory.createContext(); // <1> Since we're not using default connection factory, we use app managed +JMSContext+
31+
subscription = jms.createDurableConsumer(topic, Resources.SUBSCRIPTION);
32+
}
33+
34+
@Override
35+
public Object readItem() throws Exception {
36+
Integer item = subscription.receiveBodyNoWait(Integer.class); // <2> When there is no message ready to be received, +null+ is returned, fulfilling +readItem+ contract
37+
return item;
38+
}
39+
40+
@Override
41+
public void close() throws Exception {
42+
subscription.close(); // <3> Free resources at end of run
43+
jms.close();
44+
}
45+
46+
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package org.javaee7.jms.batch;
2+
3+
import javax.jms.JMSConnectionFactoryDefinition;
4+
import javax.jms.JMSDestinationDefinition;
5+
6+
/**
7+
* @author Patrik Dudits
8+
*/
9+
@JMSDestinationDefinition(
10+
name = Resources.TOPIC,
11+
resourceAdapter = "jmsra",
12+
interfaceName = "javax.jms.Topic",
13+
destinationName="batch.topic",
14+
description="Batch processing topic")
15+
@JMSConnectionFactoryDefinition( // <1> WildFly appears to require user and password to be set for connection factories
16+
name = Resources.CONNECTION_FACTORY,
17+
resourceAdapter = "jmsra",
18+
clientId = "batchJob", // <2> It is not allowed to call +setClientId+ on +Connection+ or +JMSContext+ in Java EE container.
19+
description = "Connection factory with clientId of the durable subscription"
20+
)
21+
public class Resources {
22+
public static final String SUBSCRIPTION = "BatchJob"; // <3> Durable consumer is uniquely identified with its +clientId+ and +subscriptionName+.
23+
public static final String TOPIC = "java:app/batch/topic";
24+
public static final String CONNECTION_FACTORY = "java:app/batch/factory";
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package org.javaee7.jms.batch;
2+
3+
import javax.ejb.Singleton;
4+
5+
/**
6+
* @author Patrik Dudits
7+
*/
8+
@Singleton
9+
public class ResultCollector {
10+
11+
private int numberOfJobs;
12+
private int lastItemCount;
13+
private int lastSum;
14+
15+
public void postResult(int sum, int numItems) {
16+
numberOfJobs++;
17+
lastItemCount = numItems;
18+
lastSum = sum;
19+
}
20+
21+
public int getNumberOfJobs() {
22+
return numberOfJobs;
23+
}
24+
25+
public int getLastItemCount() {
26+
return lastItemCount;
27+
}
28+
29+
public int getLastSum() {
30+
return lastSum;
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package org.javaee7.jms.batch;
2+
3+
import javax.annotation.PostConstruct;
4+
import javax.annotation.Resource;
5+
import javax.ejb.Singleton;
6+
import javax.ejb.Startup;
7+
import javax.jms.ConnectionFactory;
8+
import javax.jms.JMSConsumer;
9+
import javax.jms.JMSContext;
10+
import javax.jms.Topic;
11+
12+
/**
13+
* Create durable subscription upon deployment.
14+
*
15+
* Durable subscription needs unique subscription name and client id. Since setting
16+
* client id is not possible in Java EE environment, we define app-specific connection
17+
* factory with a client id.
18+
*
19+
* @author Patrik Dudits
20+
*/
21+
@Singleton
22+
@Startup
23+
public class SubscriptionCreator {
24+
25+
@Resource(lookup = Resources.TOPIC)
26+
Topic topic;
27+
28+
@Resource(lookup = Resources.CONNECTION_FACTORY)
29+
ConnectionFactory factory;
30+
31+
/**
32+
* We create the subscription at soonest possible time after deployment so we
33+
* wouldn't miss any message
34+
*/
35+
@PostConstruct
36+
void createSubscription() {
37+
try (JMSContext jms = factory.createContext()) { // <1> This is factory with clientId specified
38+
JMSConsumer consumer = jms.createDurableConsumer(topic, Resources.SUBSCRIPTION); // <2> creates durable subscription on the topic
39+
consumer.close();
40+
}
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package org.javaee7.jms.batch;
2+
3+
import javax.batch.api.chunk.AbstractItemWriter;
4+
import javax.inject.Inject;
5+
import javax.inject.Named;
6+
import java.io.Serializable;
7+
import java.util.List;
8+
9+
/**
10+
* @author Patrik Dudits
11+
*/
12+
@Named
13+
public class SummingItemWriter extends AbstractItemWriter {
14+
@Inject
15+
ResultCollector collector;
16+
17+
private int numItems;
18+
private int sum;
19+
20+
@Override
21+
public void open(Serializable checkpoint) throws Exception {
22+
numItems = 0; // <1> Reset the computation
23+
sum = 0;
24+
}
25+
26+
@Override
27+
public void writeItems(List<Object> objects) throws Exception {
28+
numItems += objects.size(); // <2> Perform the computation. Note that this may be called multiple times within single job run
29+
sum += computeSum(objects);
30+
}
31+
32+
@Override
33+
public void close() throws Exception {
34+
collector.postResult(sum, numItems); // <3> Post results
35+
}
36+
37+
private int computeSum(List<Object> objects) {
38+
int subTotal = 0;
39+
for (Object o : objects) {
40+
subTotal += (Integer)o;
41+
}
42+
return subTotal;
43+
}
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<job id="jms-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
2+
<step id="step1" >
3+
<chunk>
4+
<reader ref="jmsItemReader"/>
5+
<writer ref="summingItemWriter"/>
6+
</chunk>
7+
</step>
8+
</job>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package org.javaee7.jms.batch;
2+
3+
import org.javaee7.util.BatchTestHelper;
4+
import org.jboss.arquillian.container.test.api.Deployment;
5+
import org.jboss.arquillian.junit.Arquillian;
6+
import org.jboss.arquillian.junit.InSequence;
7+
import org.jboss.shrinkwrap.api.ArchivePaths;
8+
import org.jboss.shrinkwrap.api.ShrinkWrap;
9+
import org.jboss.shrinkwrap.api.asset.EmptyAsset;
10+
import org.jboss.shrinkwrap.api.spec.WebArchive;
11+
import org.junit.Test;
12+
import org.junit.runner.RunWith;
13+
14+
import javax.annotation.Resource;
15+
import javax.batch.operations.JobOperator;
16+
import javax.batch.runtime.BatchRuntime;
17+
import javax.batch.runtime.JobExecution;
18+
import javax.ejb.EJB;
19+
import javax.jms.*;
20+
import java.util.Properties;
21+
import java.util.Random;
22+
23+
import static org.junit.Assert.*;
24+
25+
/**
26+
* This test demonstrates programmatical creation of durable consumer, and reading
27+
* its subscribed messages in a batch job in form of an +ItemReader+.
28+
*
29+
* include::JmsItemReader[]
30+
*
31+
* The items are then fed into the writer, that performs the aggregation and stores
32+
* the result into a +@Singleton+ EJB.
33+
*
34+
* include::SummingItemWriter[]
35+
*
36+
* @author Patrik Dudits
37+
*/
38+
@RunWith(Arquillian.class)
39+
public class JmsItemReaderTest {
40+
41+
/**
42+
* Upon deployment a topic and connection factory for durable subscription are created:
43+
*
44+
* include::Resources[]
45+
*
46+
* Then the subscription itself is created by means of +@Singleton+ +@Startup+ EJB
47+
* +SubscriptionCreator+.
48+
*
49+
* include::SubscriptionCreator#createSubscription[]
50+
*
51+
* The job itself computes sum and count of random numbers that are send on the topic.
52+
* Note that at time of sending there is no active consumer listening on the topic.
53+
*/
54+
@Deployment
55+
public static WebArchive deployment() {
56+
return ShrinkWrap.create(WebArchive.class)
57+
.addAsWebInfResource(EmptyAsset.INSTANCE, ArchivePaths.create("beans.xml"))
58+
.addClass(BatchTestHelper.class)
59+
.addPackage(JmsItemReader.class.getPackage())
60+
.addAsResource("META-INF/batch-jobs/jms-job.xml");
61+
}
62+
63+
@Resource(lookup = "java:comp/DefaultJMSConnectionFactory")
64+
ConnectionFactory factory;
65+
66+
@Resource(lookup = Resources.TOPIC)
67+
Topic topic;
68+
69+
@EJB
70+
ResultCollector collector;
71+
72+
/**
73+
* In this test case we verify that the subscription is really created upon deployment
74+
* and thus messages are waiting for the job even before the first run of it.
75+
*
76+
* The subscription is not deleted even after the application is undeployed, because
77+
* the physical topic and its subscription in the message broker still exist,
78+
* even after the application scoped managed objects are deleted.
79+
*
80+
* Following method is used to generate the payload:
81+
*
82+
* include::#sendMessages[]
83+
*
84+
* So we send 10 random numbers, and verify that summing integers works exactly the
85+
* same way on both ends. Or that the job really picked up all the numbers submitted
86+
* for the computation.
87+
*/
88+
@InSequence(1)
89+
@Test
90+
public void worksAfterDeployment() throws InterruptedException {
91+
int sum = sendMessages(10);
92+
runJob();
93+
assertEquals(10, collector.getLastItemCount());
94+
assertEquals(sum, collector.getLastSum());
95+
assertEquals(1, collector.getNumberOfJobs());
96+
}
97+
98+
/**
99+
* To verify that the durable subscription really collects messages we do few
100+
* more runs.
101+
*/
102+
@InSequence(2)
103+
@Test
104+
public void worksInMultipleRuns() throws InterruptedException {
105+
int sum = sendMessages(14);
106+
runJob();
107+
assertEquals(14, collector.getLastItemCount());
108+
assertEquals(sum, collector.getLastSum());
109+
assertEquals(2, collector.getNumberOfJobs());
110+
sum = sendMessages(8); // <1> Sending messages from separate connections makes no difference
111+
sum += sendMessages(4);
112+
runJob();
113+
assertEquals(12, collector.getLastItemCount());
114+
assertEquals(sum, collector.getLastSum());
115+
assertEquals(3, collector.getNumberOfJobs());
116+
}
117+
118+
private void runJob() throws InterruptedException {
119+
JobOperator jobOperator = BatchRuntime.getJobOperator();
120+
Long executionId = jobOperator.start("jms-job", new Properties());
121+
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
122+
123+
BatchTestHelper.keepTestAlive(jobExecution);
124+
}
125+
126+
private int sendMessages(int count) {
127+
int sum = 0;
128+
Random r = new Random();
129+
try (JMSContext jms = factory.createContext(Session.AUTO_ACKNOWLEDGE)) {
130+
JMSProducer producer = jms.createProducer();
131+
for (int i=0; i< count; i++) {
132+
int payload = r.nextInt();
133+
producer.send(topic, payload);
134+
sum += payload;
135+
}
136+
}
137+
return sum;
138+
}
139+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<arquillian xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://jboss.org/schema/arquillian" xsi:schemaLocation="http://jboss.org/schema/arquillian
3+
http://jboss.org/schema/arquillian/arquillian_1_0.xsd">
4+
5+
<defaultProtocol type="Servlet 3.0"/>
6+
7+
<container qualifier="test" default="true">
8+
<configuration>
9+
<property name="jbossHome">${serverRoot:target/wildfly-8.0.0.CR1}</property>
10+
<property name="serverConfig">${serverProfile:standalone-full.xml}</property>
11+
</configuration>
12+
</container>
13+
14+
</arquillian>

jms/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
<module>jms-xa</module>
2020
<module>send-receive</module>
2121
<module>temp-destination</module>
22+
<module>jms-batch</module>
2223
</modules>
2324

2425
</project>

0 commit comments

Comments
 (0)