Monday, October 4, 2010

Enhanced JMS Scheduler in ActiveMQ

Previously we added the ability to schedule delivery of Messages on the ActiveMQ broker.  To schedule a message all you need to do is create the Message and then set some properties in the Message headers, simple right, and it allows for pretty much any client to access this functionality whether it talks Openwire or Stomp.  To manage those scheduled message however you were limited to using the JMX console or the Web console, and while it nice that you can manage them the current setup does prevent Stomp users from playing. 

This weekend I coded up a patch that allows you to manage your scheduled messages much the same way to create them in the first place, by sending some Messages.  I've added support for requesting that the broker send all the scheduled messages to a destination of you choosing as well as allowing you to then request that certain messages be deleted from the schedule, or all of them for that matter.  Lets take a look at how it works...

First thing you probably want to do is to see what messages are scheduled, so to accomplish that you need to create a Producer that publishes on the Destination named: "ActiveMQ.Scheduler.Management".  Once that's done you create a new Message and set some properties and add a Reply To destination so the scheduler knows where to send your Messages.  Then all you need to do is process the messages with a Consumer that is subscribed to that Reply To destination.



        Connection connection = createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Create the Browse Destination and the Reply To location
        Destination requestBrowse = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
        Destination browseDest = session.createTemporaryQueue();

        // Create the "Browser"
        MessageConsumer browser = session.createConsumer(browseDest);

        connection.start();

        // Send the browse request
        MessageProducer producer = session.createProducer(requestBrowse);
        Message request = session.createMessage();
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
                                  ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
        request.setJMSReplyTo(browseDest);
        producer.send(request);

        Message scheduled = browser.receive(5000);
        while (scheduled != null) {
            // Do something clever...
        }


With the above code your consumer will be able to check all the Messages that are scheduled.  Now if you happen to have a huge number of Messages scheduled then you probably don't want them all sent to your client, so to narrow down the results you can add two additional properties to your request Message to define the time window that you are concerned with, here's the browse request code again with the properties added to see what is scheduled for the next hour.

        // Send the browse request
        long start = System.currentTimeMillis();
        long end = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);

        MessageProducer producer = session.createProducer(requestBrowse);
        Message request = session.createMessage();
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
                                  ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start));
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end));
        request.setJMSReplyTo(browseDest);
        producer.send(request);


Now that you have seen how to browse the messages that are scheduled for delivery lets take a look at how to manage the scheduled Messages that you've browsed.  Each scheduled Message that is sent to your consumer contains in it a Job Id that can be used to remove that scheduled Message from the Scheduler using the same management destination that you used to request the browse from, here's an example of that.  


        Message remove = session.createMessage();
        remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
                ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE);
        remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID,
                scheduled.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID));
        producer.send(remove);



Here we create a new Message and assign it the remove action property and then set the Id of the scheduled using the Id from a Message that was sent to us on the browse destination we created earlier.

If you want to remove some scheduled Messages but don't want to browse them just to find the one's you are interested in you can do so using the remove option show above but instead of specifying an Id you can give it a time window in which to operate, here's an example show a remove operation requested for all scheduled Messages in the next hour.

        long start = System.currentTimeMillis();
        long end = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
 


        Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
 

        MessageProducer producer = session.createProducer(management);
        Message request = session.createMessage();
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
                                  ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start));
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end));
        producer.send(request);



You can also remove all jobs from the scheduler with a single message, this is shown in the next example.  


        Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
 

         MessageProducer producer = session.createProducer(management);
        Message request = session.createMessage();
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
        producer.send(request);



That's it for now.

25 comments:

Unknown said...

Hi Tim,

Is the patch available for download on the ActiveMQ website?

Tim said...

The patch has been applied in trunk, as soon as a new snapshot build completes then you will be able to grab one of those builds, or you can build yourself from trunk.

The Gellings said...

Can you provide an example with NMS?

Tim said...

The code would be practically identical, you just need to substitute the actual string value of each of the Message properties that are specified in ScheduledMessage.java in ActiveMQ trunk.

Unknown said...

Hi Tim,
It does not work with 5.4.1 server. it need Version 5.5?

Tim said...

You will need to use a SNAPSHOT build until either 5.4.2 or 5.5 is released.

TheFarmer said...

I've been playing with these scheduled messages and I don't quite understand how it works. I created one message and set it to repeat. When I browse the messages as you describe, it browses through tons of messages. I wanted to delete the original schedule but why do I have to delete tons of messages? This is where I'm confused. Why can't I just see the one message I created and delete it?

Maybe it's a different issue but still to my confusion. It seemed like when one of the scheduled messages made it into the DLQ, it started generating messages in the DLQ instead of the original queue I sent it to. But every once in a while a messages would make it into the original queue.

I guess if there were some explanation of where this message lives, how messages are generated, what I'm looking at on the Scheduled tab of the admin web site, etc. I might understand what I'm seeing.

Tim said...

Without some sample code or better explanation of what you are trying to do its pretty hard to offer any suggestions. I'm not clear on the problem so if you can slow down and explain a bit and maybe give me some code or logger output that would help.

TheFarmer said...

One at a time:
I created one message and set it to repeat. When I browse the messages as you describe, it browses through tons of messages. I just want to delete the original message to stop the repeating.

On the message I set these properties:
AMQ_SCHEDULED_REPEAT 5000
AMQ_SCHEDULED_PERIOD 60000

Tim said...

You can of course limit the time window that is browsed as described in the article. You could also use a selector on the consumer that's browsing to limit the result returned.

Denis said...

Hi Tim,

I got following issue with your examples. I use scheduled messages to fire of some jobs. Therefore I need to prevent duplicate messages. I used your examples to check if a certain message already exists. All works fine. When I start my application, I schedule a message (CRON). When I try to schedule the same message again I get the desired result and my program refuses to schedule again. The job to be fired off is defined by the message text (e.g. FetchMails). I browse through all existing scheduled messages, compare message text to see if it already exists. As I said, this works fine.
Now the problem: when I restart my program, browsing scheduled messages it won't find any message at all, though they´re shown in activemq web frontend.
Any idea whats wrong? Only thing that changes is that a new connection is started. Broker is not restarted.
Thx in advance

Denis

Tim said...

Not really sure, been quite some time since I worked on that stuff. Best thing to do is create a JUnit test case that reproduces the problem and open a new Jira issue. The existing test cases should give you plenty of examples to work from.

cececata said...

Say I have a network of brokers, for example, two brokers configured with duplex network connectors between them. I publish a scheduled message against one of the brokers. Then, I would like to check whether the message is scheduled, by sending a AMQ_SCHEDULER_ACTION_BROWSE message, but I can't guarantee that I will publish the browse messages against the same broker that I had initially published the scheduled message against. Would I need to separately browse all brokers in the network, or would one single browse message suffice, to find the scheduled message?

Unknown said...

Tim I'm trying to delete a scheduled message. What I'm experiencing is my message consumer is processing the message that is being sent to remove the message instead of the message being deleted. What am I doing wrong here?

Tim said...

Hard to say without seeing the code. I'd recommend having a look at the unit tests for this feature in the ActiveMQ source. Or post a sample that shows what you are doing.

Ernie said...

Hi Tim,
Thanks for this, it's very useful. I was wondering if the ability to limit the messages returned via a message selector of some sort would work. Say I've provide the ability to have users schedule jobs and a message property is set to which user created the schedule. Can I then query "Give me all scheduled tasks for user 'x'."?

Thanks

Tim said...

The nice thing about the way this works is that you are using a standard JMS consumer to read the messages so you should be able to use a selector just like with any other Destination. Just add the properties to the scheduled messages and then subscribe with the selector you need to filter your messages.

Farhad said...

Hi Tim,
I've just tried what you've described above. What I'm trying to do additionally is to clone the message and set the AMQ_SCHEDULED_DELAY to zero and resend the message. The problem is that the JMSDestination is set to the temporary queue and no information is available in the message about the original destionation !? Do you have any clue?

Tim said...

I'm not quite clear on the issue you are having, perhaps some code to demonstrate what you are trying to do and show what the problem is you are encountering would be in order. Thanks.

Farhad said...

Well, what I'm basically trying to do is to reschedule messages, which are already scheduled by setting the AMQ_SCHEDULED_DELAY property. So I need a way to either reset that property on the original message or to get hold on to the JobSchedulerFacade (through jmx) and add a new job to the scheduler. I guess in either cases I need to be able to access the original message.
It seems that the message you get in the "browser" consumer above is not the original message, but a representation of a scheduled job.

Tim said...

You need to remove the original scheduled message and then re-add the new one with the appropriate new scheduled time.

Farhad said...

Removing is made easy now (many thanks to you), but how do you re-add the message, when you don't have the original message.
I can't figure any way to retrieve the original message. I would very much appreciate a code example of re-adding(rescheduling) a message.

Unknown said...

Hi Tim,
have used your code to delete the scheduled message. It works perfect...but there are many removal request coming in.. say 60 removal request processed simultaneously, then the scheduled message are not firing up. I see the Use of TempQueue here and when i see the no of meesage in my TempQueue its Huge......
No of message Enqueued = 439899
No of message dequeued = 18024

Is this really a concern. How i can clean this TempQueue after every removal request processed successfully.
Thanks
Shoab

Unknown said...

Hi Tim,
have used your code to delete the scheduled message. It works perfect...but there are many removal request coming in.. say 60 removal request processed simultaneously, then the scheduled message are not firing up. I see the Use of TempQueue here and when i see the no of meesage in my TempQueue its Huge......
No of message Enqueued = 439899
No of message dequeued = 18024

Is this really a concern. How i can clean this TempQueue after every removal request processed successfully.
Thanks
Shoab

Sandip said...

If we use this code then it is creating lots of topics, and even though connection and session are closed these topics are not getting remove.

Any solution for this?