Ooops…, I did it. I decided a couple of weeks back to get out of my comfort zone. My comfort zone of writing posts in my native language, which is Polish. As human, you are not able to make progress, continuously improve your life, if you don’t seek ways to get out of your comfort zone.
As always in such situations I am apprehensive, even terrified. Terrified that I will loose my “flow”, will be criticized and in many cases you will misunderstand my intentions and thoughts.
If you find “syntactic bugs” in my posts, please feel free to write a comment.
This time I am going to shed some light on interesting problem I’ve been fighting with some time ago.
Out of many projects in our company, we have a product which uses Mule ESB, to build integrations with third party providers. It works as a broker, which sends the same message to various providers over variety of protocols and using many different messages formats. Later on responses from providers are aggregated and sent back to client.Easy job, isn’t it?
Recently we got stuck with interesting performance related problem. Same message can be sent to hundreds of providers, and it may take significant amount of time for all providers to process and send message back. Problem is not new and nothing unusual. Looking at the very heart of the problem you can easily say that it can be solved with “fork and join” pattern.
Just for reference, “fork and join” is one of concurrency patterns, the simplest and most efficient way for obtaining parallel processing.In one sentence, you split your problem in a set of smaller tasks, which afterwards are executed by separate threads (you can use thread pool as well, this is even more efficient), this is where “fork” happens. Results of execution of tasks are then “join”ed in a main execution thread. You can read more about it, and its implementation in Java 7 in brilliant paper from Doug Lea, A Java Fork/Join Framework. There are some drawbacks of course, like in every pattern, you have to be aware of its applicability and consequences. I will write about it later, in the summary.
Once we decided that “fork and join” is our saviour, then came the next question how to do it in Mule ESB? After days of investigation, experiments and tests I found a solution. This work is a joined effort of a couple of people, so I am not going to take all of the glory on me 🙂
I am going to use in this example publicly available web services, as I cannot reveal details about vendors we work with :). In this short snippet I will try to aggregate responses from different web services, which provide information about musicians (like biography, history, events and so on).
We will implement simple, REST-like search service which uses existing public services.
In first step we are going to expose HTTP endpoint in Mule.
<http:inbound-endpoint host="127.0.0.1" port="8080" path="artist" exchange-pattern="request-response"> </http:inbound-endpoint> <expression-transformer expression="#[header:INBOUND:q]" />
This gives us a web service, which later on can be invoked using browser and example URL i.e, http://localhost:8080/artist?q=Killing+Joke.
<expression-transformer> is used to extract ‘q’ parameter from HTTP query. The value of this parameter becomes payload of message.
So now, it is time to search artists with similar name. We are going to use MusicBrainz services. And it goes like this:
<http:outbound-endpoint host="musicbrainz.org" port="80" path="ws/2/artist?query=#[payload:]" method="GET" /> <splitter evaluator="xpath" expression="/mb:metadata/mb:artist-list/mb:artist" /> <mulexml:xpath-extractor-transformer expression="/mb:artist/@id" resultType="STRING"/> <collection-aggregator />
For people unfamiliar with Mule concepts, it may look like a sentence from “Book Of Black Magic”, but it is really simple. It splits XML response from MusicBrainz into collection of artists’ ids, called MBID. It uses XPath expressions. What
<splitter> message processor does in Mule, it splits message using expression, in this case
"/mb:metadata/mb:artist-list/mb:artist", and sends every XML fragment to a next message processor. It does it in a single thread, and this is not what we are looking for. We would like to process these XML fragments in parallel. And here is where magic starts. We aggregate all XML fragments, in this case value of “id” attribute of “artists” element, into Java’s List, this is what
<collection-aggregator> does. And here comes the time for “fork”.
<request-reply storePrefix="workStore" doc:name="Put collection of MBIDs on queue for processing"> <vm:outbound-endpoint path="dispatchIn"> <message-properties-transformer scope="outbound"> <delete-message-property key="MULE_REPLYTO" /> </message-properties-transformer> </vm:outbound-endpoint> <vm:inbound-endpoint path="dispatchOut" /> </request-reply>
<request-reply> processor sends our message, in this case Java’s List of MBID, asynchronously to a VM queue called “dispatchIn” and then waits for response by listening on VM queue named “dispatchOut” on response. How Mule decides that the received message is the response to send a message? In Mule
<splitter> processor adds additional properties to a message:
MULE_CORELLATION_ID, which identifies group of messages to which messages belong to
MULE_CORRELATION_GROUP_SIZE, amount of messages in group
MULE_CORRELATION_SEQUENCE, index of message in a group, so responses can be later processed in the same order
Time for fork? You bet.
<flow name="workDispatcher"> <vm:inbound-endpoint path="dispatchIn" /> <collection-splitter enableCorrelation="ALWAYS" /> <flow-ref name="workWorker" /> </flow>
In this snippet we receive our message, collection of MBID, and split it again, with a little bit of help from
<collection-splitter>. Every message is then sent to the next flow, called “workWorker”. This is where “fork” happens. But wait a moment. Why? I personally think this is best kept secret in Mule’s world. The answer is a single acronym, SEDA, Staged Event-Driven Architecture. This is a concurrency pattern, which uses queues and thread pools for parallel processing of messages. If you have never heard about it, I recommend you to read original paper by Matt Welsh and Eric Brewer, SEDA: An Architecture for Well-Conditioned,
Scalable Internet Services. SEDA pattern is used underneath of everything what happens in Mule. Every flow has its “in” queue and thread pool which processes messages coming into a queue. How to get access to this hidden structure of queues and thread pools? The answer is: “private flows”. Construct introduced some time ago in Mule. Private flows don’t have message source as a first element of the flow. So you cannot invoke them outside of your Mule instance. That’s why they are called private. So you may ask yourself what is a source of messages for “private flows”. It is SEDA queue. By using
<flow ref="workWorker"/> you send message to SEDA queue of flow named “workWorker”. Magic demystified. Messages are picked up from queue by threads from thread pool, and here you go, parallel processing in Mule.
Once “fork” is demystified, time to uncover secrets of “join”. This is a piece which injured a couple of random developers and caused one or two brain surgeries:). It should be rather simple in Mule, but it’s not. In the most common examples, you use correlation id and
<collection-aggregator>, and it works in most cases. Where it doesn’t work, it is the situation when you want to use message router, like multicasting message router aka
<all>. In such cases correlation id, group size and sequence number are overwritten, by default. Keeping this in mind, how can we preserve this important information? In Mule 3 we can use scoped message properties.
<flow name="workWorker"> <message-properties-transformer scope="invocation" doc:name="Remember correlation"> <add-message-property value="#[header:OUTBOUND:MULE_CORRELATION_ID]" key="cid" /> <add-message-property value="#[header:OUTBOUND:MULE_CORRELATION_GROUP_SIZE]" key="cgs" /> </message-properties-transformer> <all> <!-- here we can call any number of services --> </all> <message-properties-transformer scope="outbound"> <add-message-property value="#[header:INVOCATION:cid]" key="MULE_CORRELATION_ID" /> <add-message-property value="#[header:INVOCATION:cgs]" key="MULE_CORRELATION_GROUP_SIZE" /> </message-properties-transformer> <vm:outbound-endpoint path="aggregatorIn" /> </flow>
Before invoking multicasting router we remember
MULE_CORRELATION_GROUP_SIZE in invocation scoped properties, and later on we restore these properties, which are later used in flow listening on VM queue “aggregatorIn”. This flow aggregates all messages with the same correlation id, and when it receives number of messages equal to value of
MULE_CORRELATION_GROUP_SIZE property passes collection of Mule messages to next message processor.
<flow name="workAggregator"> <vm:inbound-endpoint path="aggregatorIn" /> <collection-aggregator /> <vm:outbound-endpoint path="dispatchOut" /> </flow>
As you can see, once you understand SEDA and private flows in Mule, implementation of “fork and join” pattern is easy. And I agree with you. There is a lot of noise in this configuration.But this is XML don’t expect something readable:).
At the beginning of this short article, I promised to write about some drawbacks of “fork and join”. They are really obvious but I think it is still worth to mention. First problem is that not all cases can be implemented using “fork and join”. There are cases which cannot be split into set of smaller, independent tasks. Second problem is that overall time of execution is equal to execution time of longest task, because main execution thread will have to wait until all tasks finish. In a situation when you have a set of tasks that execute quickly and have one long running task the benefit of “fork and join” versus complexity of its implementation in Mule is not worth the effort.So be careful, using patterns always has its drawbacks, which you have to understand and accept.Full example is available at GitHub.
I hope I helped you understand Mule and “fork and join” implementation. Enjoy the ride with Mule ESB.
Awesome .. Just 1 query
We can have multiple requests in context meaning –
* Multiple messages will be fed to the flow (workDispatcher) via inbound-endpoint (dispatchIn)
* How will ‘dispatchOut’ send the aggregated response to the correct waiting dispatchOut VM queue ?
Is this because of request-reply?
However, this is really wierd as we have request-reply block versus request-reply exchange pattern. With request-reply block, I expect to receive the response to VM inbound endpoint (dispatchOut) automatically. I don’t have to manually push message to ‘dispatchOut’ queue.
I understand that ‘dispatchIn’ is one-way so that splitter can throw messages simultaneously, but mystry is how does pushing aggregated message to ‘dispatchOut’ queue reach to correct waiting ‘dispatchOut’ from the many open threads?
Probably, if write an extension to this article for uncovering this piece, it would be great
Thanks for kind words, I was not working with Mule ESB for a long time, let me refresh my memory and get back to you with an answer.