Wednesday, October 27, 2010

FuseSource now officially launched.

Its now official, FuseSource is now operationally independent from Progress Software, Rob has explained the background along with Larry's interview and Dana's podcast much better than I could have.


This is exciting news and the future of FuseSource looks to be bright.  The team is growing and still includes the talent that brought about Apache ActiveMQ, Apache Camel and Apache ServiceMix.  The Roadmap for 2011 looks to be a good one with continued work on the Apache projects we all know and love as well as some great new FuseSource projects that are currently in the works.  Should be fun times ahead...

Tuesday, October 26, 2010

ActiveMQ C API under development

Recently I started working on a C API for ActiveMQ.  The idea is to wrap the ActiveMQ-CPP library in a C accessible API in order to leverage all the hard work that has gone into making the C++ client.  I've made some good progress so far, I've wrapped all the CMS API's for ConnectionFactory, Connection, Session, Producer and Consumer.  I'm part way through mapping the CMS Message APIs into C APIs.

We still don't have any unit tests written for it but there's some sample code that compiles, and runs (at least it did). 

It would be great if some C developers out there wanted to chip in and help out on this as my C skills are pretty rusty.  There's still a lot to be done in the wrappers like better managing error conditions and reporting the errors in a meaningful way. 

You can download the source from the Apache SVN here.

Monday, October 4, 2010

Enhanced JMS Scheduler in ActiveMQ

Previously we added the ability to schedule delivery of Messages on the ActiveMQ broker.  To schedule a message all you need to do is create the Message and then set some properties in the Message headers, simple right, and it allows for pretty much any client to access this functionality whether it talks Openwire or Stomp.  To manage those scheduled message however you were limited to using the JMX console or the Web console, and while it nice that you can manage them the current setup does prevent Stomp users from playing. 

This weekend I coded up a patch that allows you to manage your scheduled messages much the same way to create them in the first place, by sending some Messages.  I've added support for requesting that the broker send all the scheduled messages to a destination of you choosing as well as allowing you to then request that certain messages be deleted from the schedule, or all of them for that matter.  Lets take a look at how it works...

First thing you probably want to do is to see what messages are scheduled, so to accomplish that you need to create a Producer that publishes on the Destination named: "ActiveMQ.Scheduler.Management".  Once that's done you create a new Message and set some properties and add a Reply To destination so the scheduler knows where to send your Messages.  Then all you need to do is process the messages with a Consumer that is subscribed to that Reply To destination.



        Connection connection = createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Create the Browse Destination and the Reply To location
        Destination requestBrowse = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
        Destination browseDest = session.createTemporaryQueue();

        // Create the "Browser"
        MessageConsumer browser = session.createConsumer(browseDest);

        connection.start();

        // Send the browse request
        MessageProducer producer = session.createProducer(requestBrowse);
        Message request = session.createMessage();
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
                                  ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
        request.setJMSReplyTo(browseDest);
        producer.send(request);

        Message scheduled = browser.receive(5000);
        while (scheduled != null) {
            // Do something clever...
        }


With the above code your consumer will be able to check all the Messages that are scheduled.  Now if you happen to have a huge number of Messages scheduled then you probably don't want them all sent to your client, so to narrow down the results you can add two additional properties to your request Message to define the time window that you are concerned with, here's the browse request code again with the properties added to see what is scheduled for the next hour.

        // Send the browse request
        long start = System.currentTimeMillis();
        long end = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);

        MessageProducer producer = session.createProducer(requestBrowse);
        Message request = session.createMessage();
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
                                  ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start));
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end));
        request.setJMSReplyTo(browseDest);
        producer.send(request);


Now that you have seen how to browse the messages that are scheduled for delivery lets take a look at how to manage the scheduled Messages that you've browsed.  Each scheduled Message that is sent to your consumer contains in it a Job Id that can be used to remove that scheduled Message from the Scheduler using the same management destination that you used to request the browse from, here's an example of that.  


        Message remove = session.createMessage();
        remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
                ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE);
        remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID,
                scheduled.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID));
        producer.send(remove);



Here we create a new Message and assign it the remove action property and then set the Id of the scheduled using the Id from a Message that was sent to us on the browse destination we created earlier.

If you want to remove some scheduled Messages but don't want to browse them just to find the one's you are interested in you can do so using the remove option show above but instead of specifying an Id you can give it a time window in which to operate, here's an example show a remove operation requested for all scheduled Messages in the next hour.

        long start = System.currentTimeMillis();
        long end = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
 


        Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
 

        MessageProducer producer = session.createProducer(management);
        Message request = session.createMessage();
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
                                  ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start));
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end));
        producer.send(request);



You can also remove all jobs from the scheduler with a single message, this is shown in the next example.  


        Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
 

         MessageProducer producer = session.createProducer(management);
        Message request = session.createMessage();
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
        producer.send(request);



That's it for now.