Tuesday, 24 March 2009

Consumer flow control and Message-Driven Beans

A recurring question from Open Message Queue (MQ) users is how to configure consumer flow control for messages delivered to MDBs running in an application server such as Glassfish.

The Sun Java System Message Queue 4.3 Administration Guide describes in some detail the way that MQ controls the flow of messages to standard JMS clients. However it does not at present cover the additional issues that need to be considered when MQ is used to deliver messages to MDBs running in an application server, especially when that application server is clustered.

This article addresses that omission. It reviews how consumer flow control operates for standard clients and then discusses the additional issues that need to be considered for application server clients.

Message pre-sending


The JMS specification defines two ways in which an ordinary MQ client program can receive messages from a queue or topic:
  • Synchronously, by calling MessageConsumer.receive(timeout), which returns a message to the caller if and when one is available. After receive() has returned, the client can call receive() again to obtain the next message.

  • Asynchronously, by registering a MessageListener. When a message is available, the JMS provider’s client runtime calls the MessageListener’s onMessage(Message) method, with the new message passed in as the argument. When the client has finished processing the message, onMessage() returns and the provider calls onMessage() again with the next message just as soon as it becomes available.
To give the best performance possible, MQ will “pre-send” messages to the consuming client in advance, so that when a preceding message has been consumed, the next message can be made available to the client application almost immediately, after a much shorter delay than if the next message had to be fetched on demand from the broker.

How many messages does the broker “pre-send” to the consuming client?

It can’t send the entire contents of the queue or topic, for two reasons:
  • Each message that is pre-sent to the client takes up heap space in the client JVM. Sending too many messages will cause the client to run out of memory.

  • In the case of a queue, the broker needs to share out the messages between each consumer on that queue. If it sends all the messages to one consumer it would starve the other consumer of messages.
To avoid these problems, the broker limits the number of messages that will be pre-sent to a consuming client and buffered in the client runtime, waiting to be consumed. It does this by applying two types of flow control: consumer flow control and connection flow control.

Consumer flow control


Consumer flow control limits the number of messages that can be held pending on a consumer, waiting to be consumed, to a value defined by the property imqConsumerFlowLimit. The default value of this property is 1000 messages.

When a client creates a consumer on a queue or topic, the broker will send this number of messages to the consumer immediately, if enough are available.

The consumer can then consume these messages, either by calling receive() repeatedly or by the client runtime repeatedly calling onMessage() on the specified message listener.

A message is considered to be “consumed” when it is acknowledged (either automatically or explicitly) or committed.

When a message is consumed the broker doesn’t immediately top-up its buffer of pre-sent messages. Instead, the buffer is only topped-up when the number of unconsumed messages in the consuming client falls to a specified fraction of its maximum. This is defined by the imqConsumerFlowThreshold property, which by default is 50%.

So, in the default case, the broker will initially send 1000 messages to the consumer (assuming that this number of messages is available).

The consumer will then consume messages until the number of unconsumed messages falls to 500 (which is 50% of 1000).

The broker will then top-up the consumer with 500 messages (again, if enough are available), bringing the number of unconsumed messages back up to 1000. This process repeats when the number of unconsumed messages falls to 500.

How are imqConsumerFlowLimit and imqConsumerFlowThreshold configured?

  • The imqConsumerFlowLimit property can be configured on the consuming connection factory, or a lower value can be configured in the broker, either for all queues or topics or for specific named queues or topics.

    imqConsumerFlowLimit is the name of the connection factory property. When configured in the broker the name of the property is slightly different: the property used for auto-created queues is imq.autocreate.queue.consumerFlowLimit. The property used for autocreated topics is imq.autocreate.topic.consumerFlowLimit. The destination property used for pre-configured queues and topics is consumerFlowLimit.

  • The imqConsumerFlowThreshold property can only be configured on the consuming connection factory.

How do you choose what to set imqConsumerFlowLimit and imqConsumerFlowThreshold to?

  • If imqConsumerFlowLimit is set to be high then the consuming client will use more memory. It will also prevent the broker sharing the messages in a queue between multiple consumers: one consumer might hog all the messages leaving the others with nothing to do. This is explained in more detail below.

  • If imqConsumerFlowLimit is set to be low then messages will be sent in smaller batches, potentially reducing message throughput.

  • If imqConsumerFlowThreshold is set too high (close to 100%), the broker will tend to send smaller batches, which can lower message throughput.

  • If imqConsumerFlowThreshold is set too low (close to 0%), the client may be able to finish processing the remaining buffered messages before the broker delivers the next set, again degrading message throughput. Generally speaking, it’s fine to leave it at its default value of 50%.

Connection flow control


Connection flow control (as opposed to consumer flow control) limits the number of messages that can be held in a connection, waiting to be consumed, to a value determined by the property imqConnectionFlowLimit (default=1000). This limit applies to the connection, not the individual consumers, and applies irrespective of the number of consumers on that connection.

This behaviour only occurs if the property imqConnectionFlowLimitEnabled is set to true. By default, it is set to false, which means that connection flow control is by default switched off.

If connection flow control is enabled, then when the number of messages that are buffered on the connection, waiting to be consumed, reaches the defined limit then the broker won’t deliver any more messages whilst the number of unconsumed messages remains above that limit. This limit, when enabled, operates in addition to consumer flow control.

How are imqConnectionFlowLimit and imqConnectionFlowLimitEnabled configured?

  • imqConnectionFlowLimit and imqConnectionFlowLimitEnabled can only be configured on the connection factory.

When might you want to enable connection flow control, and what would you set the limit to?


The main reason for configuring flow control at connection level rather than at consumer level is if you want to limit the memory used in the client by unconsumed messages but don’t how many consumers you will be using.

When there are multiple consumers on the same queue


Now let’s consider how consumer flow control is applied when there are multiple consumers on the same queue.



Consumer flow control means that messages will be sent to each consumer in batches, subject to the number of messages available on the queue. The size of each batch depends on the properties imqConsumerFlowLimit and imqConsumerFlowThreshold as described above.

What is important to understand is that batches are sent to consumers on a “first come, first serve” basis. When the consumer asks for a batch of messages, the broker will send that number of messages (if there are enough messages on the queue), even if this means that few or no messages are available to the next consumer that requests a batch. This can lead to an uneven distribution of messages between the queue consumers.

Whether this happens in practice depends on the rate at which new messages are added to the queue, and the rate at which the consumers process messages from it, though it is more likely to be noticed if the message throughput is low. The key factor is whether the number of messages on the queue falls to a low value – one lower than imqConsumerFlowLimit – so that when a consumer asks for a batch of messages none are available even though there are messages still waiting to be processed on another consumer.

Note that if new messages are continuing to be added to the queue this situation is likely to be transient since the next batch of messages is just as likely to be delivered to the “empty” consumer as the “active” consumer.

Setting imqConsumerFlowLimit to be low might seem to be the obvious way to avoid an uneven distribution of messages between queue consumers. However smaller batch sizes requires more overhead to deliver messages to consumers so should normally be considered only if the rate of message consumption is low.

If the consumers are MDBs running in an application server then reducing imqConsumerFlowLimit will also limit the maximum number of MDBs that can process messages concurrently, which will itself reduce performance. This is described in more detail below.

When the consumer is a MDB in an application server


This article has described how the MQ broker controls the flow of messages to a consuming client on a per-consumer and per-connection basis. Messages are sent to a consumer in batches rather than individually to improve performance. However the size of these batches is limited using configurable parameters to prevent the consumer running out of memory and to minimise the possibility of temporary imbalance between multiple consumers on the same queue.

When the message consumer is a message-driven bean (MDB) running in an application server some additional considerations apply.

In a typical MDB application, a pool of MDB instances listens for messages on a given queue or topic. Whenever a message is available, a bean is selected from the pool and its onMessage() method is called. This allows messages to be processed concurrently by multiple MDB instances if the rate at which messages arrive exceeds that at which a single MDB instance can process them. The number of MDB instances that can process messages is limited only by the maximum pool size configured in the application server.

In terms of flow control the important thing to know is that no matter how many MDB instances have been configured, all instances of a particular deployed MDB use the same JMS connection, and the same JMS consumer, to receive messages from the queue or topic. Flow control limits such as imqConsumerFlowLimit and imqConnectionFlowLimit will therefore apply across all instances of the MDB, not to individual instances.

This means that if these flow control limits are set to a low value then this will reduce the number of MDB instances that can process messages concurrently.

For example, in the Sun Glassfish application server, the maximum size of a MDB pool is by default set to 32 instances. However if imqConsumerFlowLimit is set to lower than this, say, 16, then the maximum number of MDB instances that can process messages concurrently will be reduced, in this case to 16.

You should therefore avoid setting imqConsumerFlowLimit or imqConnectionFlowLimit to less than the maximum pool size for the MDB that will be consuming from it. If you do, you will be reducing the maximum pool size and thereby the number of MDB instances that can process messages concurrently.



When using an application server cluster


If your MDB application is deployed into a cluster of application server instances then you are faced with a dilemma when deciding what value to set imqConsumerFlowLimit.

Consider the situation where an MDB is deployed into a cluster of two application server instances. For this MDB, each application server instance has one connection, one consumer and a pool of MDB instances.

From the point of view of the broker there are two connections, each with a single consumer.

As was described earlier in “when there are multiple consumers on the same queue”, messages are sent to the two consumers in batches, and if the number of messages on the queue falls below the configured imqConsumerFlowLimit there is a possibility that there will be times when messages are buffered on one consumer waiting to be processed whilst the other consumer has nothing to do.

This therefore means that there is a possibility that there will be times when one application server instance is busy whilst the other consumer has little or nothing to do.

The chance of this imbalance happening can be reduced by configuring a lower value of imqConsumerFlowLimit.

However if imqConsumerFlowLimit is set to a very low value, lower than the maximum MDB pool size, this will limit the number of MDB instances that can process messages, and therefore reduce message throughout.

The value chosen for imqConsumerFlowLimit when using a application server cluster therefore needs to balance these two conflicting requirements. Choose a low value to avoid seeing a transient imbalance between server instances, but don’t choose a value so low that it reduces the number of MDB instances. In addition, choosing a low value for imqConsumerFlowLimit means that the overhead of processing each batch of messages will have a greater impact on performance.



Further reading


Sun Java System Message Queue 4.3 Administration Guide