Implementing a Transactional Receiver
See the first post for a basic introduction on Hazelcast and Spring Boot integration.
Hazelcast Transactions
Hazelcast, differently from some other No-SQL, in-memory stores, has the ability to make access to distributed collection ACID (… at least when you are not in a split-brain condition, but this is fair enough).Hazelcast supports transactions in two ways:
- Transactional distributed collections;
Map
,Multimap
,Queue
,Set
andList
. - Transactional distributed tasks: Automatically wraps the execution in a transaction.
Transaction and locking behaviour is reasonably similar to RDBMS we are al used to: While in a transaction, writing a distributed collection entry take a lock on it; reading doesn’t take a lock unless you do it explicitly with a “get for update” (e.g. TransactionalMap.getForUpdate(...)
), remarkably similar to SQL SELECT … FOR UPDATE.
Beyond locks, all updates are recorded in a “transaction log” and applied only on commit, giving a REPEATABLE_READ
isolation by default.You may also find some similarities with the JPA EntityManager/Hibernate Session, as accessed entries are locally cached during the transaction. But beware similarities end here. For example, modifying properties of a distributed collection entry does NOT modify the actual distributed object, as you have always a copy (not a proxy) of the actual entry.
The Not-so-transparent Transactional Context and Transactional Collections
If you are accustomed to the Spring’s @Transactional
and TransactionManager
, Hazelcast transactions will appear remarkably non-transparent:Here is an example from the official guide:
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
TransactionContext txCxt = hz.newTransactionContext();
TransactionalMap<String,Employee> employees = context.getMap(“employees”);
employees.put(“1”,new Employee());
txCxt.commitTransaction();
IMap employeesIMap = hz.getMap(“employees”);
System.out.println(employeesIMap.get(“1”));
As you may see, transactional collections are retrieved through a Transactional Context. Note that the interface of the transactional collections are different from the non-transactional versions (e.g. TransactionalMap
vs IMap
). Apparently, this is a API design decision: The developer has to know he’s working with a transactional collections, due to performance implications. This make sense, but makes practically impossible to transparently inject transactions à la Spring style.
A Transactional Consumer
So, get back to the sample chat application I introduced in the first post of this series.My specific (but not so uncommon) problem, is making message polling more robust. If something happens during the poll and transmission, I want the messages to remain in the queue.

I’m focusing on the transactional polling: The green dotted line on sending the messages to the recipient.First of all, I have to use a TransactionalQueue
. Note that Queue, differently from other collections, are transactional on read too, as polling actually modify the content of the Queue. When transactional, polls are recorded in a “transaction log”, entries are actually removed only on commit.But a transactional Queue will not suffice. I have to wrap the whole poll request (including converting, serialising and sending messages to the client) in a try - catch
handling the transaction externally. The way Spring does annotating the Controller method @Transactional
.So I have to make the Hazelcast transaction Spring-managed.
No out-of-the-box Spring-managed Transaction
Although transactions may integrate with (heavyweight) XA, there is no support for Spring managed transactions. A feature request is long-pending.I was short of time for implementing it from scratch, So I googled for some open source project and I found the remarkable HazelcastMQ, by Mike Pilone: https://github.com/mpilone/hazelcastmq.His goal is much broader than mine. HazelcastMQ provides a high-level messaging layer above Hazelcast. But what interested me was the integration with Spring-managed transactions.So I extracted just a couple of classes from HazelcastMQ, adapting them to my limited scope.
HazelcastTransactionManager
: Implements Spring’sPlatformTransactionManager
, binding the Hazelcast Transaction Context to the current thread. Runtime, it depends on the HazelcastInstance and has to be initialised as Spring Bean.HazelcastUtils
:
Utility class used by the application to retrieve the transactional objects (currently, onlyQueue
s are available).
Back to Code
The sample chat application, upgrade for supporting transactional polling, is in the transactions branch of the repository:https://github.com/nicusX/springboot-hazelcast-example/tree/transactionsFor comparison, both transactional and non-transactional versions are available. Which implementation to use is controlled by the presence of the “transactional-polling” Spring Bean Profile.Registering the HazelcastTransactionManager
makes Spring managing the Hazelcast transaction. Annotating the controller method @Transactional
, make it rollback (and returning the polled elements into the Queue) on any exception, including those caused by response mapping and serialisation.
@Transactional
@RequestMapping(value = "/recipients/{recipient}/poll", method = RequestMethod.GET)
public List receiveAll(@PathVariable("recipient") String recipient) {
final List messages = chatService.receive(recipient);
return messages.stream().map( ChatController::map ).collect(Collectors.toList());
}
Only for demonstrating the transactional implementation, the controller throws an exception on receiving a “poison pill”: A message with EPOCH timestamp:
if ( message.getTimestamp().equals(Instant.EPOCH)) {
throw new RuntimeException("Simulated Exception");
}
Service
The implementation of the service method is the same both for transactional and non-transactional polling. Note I had to use BaseQueue
as only common interface of both IQueue
and TransactionalQueue
.
@Override
public List receive(String recipient) {
final BaseQueue recipientQueue = recipientQueueForPolling(recipient);
final List messages = new ArrayList<>();
while ( true ) {
final ChatMessage message = recipientQueue.poll();
if ( message == null ) break;
messages.add(message);
}
return Collections.unmodifiableList(messages);
}
The non-transactional version simply retrieves the IQueue
(as BaseQueue
) from HazelcastInstance
:
// Non transactional Queue
protected BaseQueue recipientQueueForPolling(String recipient) {
return hazelcastInstance.getQueue(“recipient-” + recipient);
}
While the transactional version uses the custom HazelcastUtils
, that returns a TransactionalQueue
:
// Transactional Queue
protected BaseQueue recipientQueueForPolling(String recipient) {
return HazelcastUtils.getTransactionalQueue( “recipient-” + recipient, hazelcastInstance, true);
}
Conclusion: A half-successful Integration
This transaction integration works fine for my limited goal of implementing transactional polling. However, is very limited and has a lot of space for improvement. It currently supports only a transactional queue, but extending HazelcastUtils
to support other collections should not be hard.I also haven’t tested the synchronisation with other Spring-managed transactions. For example JPA or JMS. I would possibly work on this subject for a future post.HazelcastMQ also provides a second integration pattern. It uses a factory, the snappily named TransactionAwareHazelcastInstanceProxyFactory
,
to provide a wrapped HazelcastInstance
returning transactional collections bound to the current Spring-managed transaction. To be honest, I wasn’t able to make it work, and I wasn’t super-happy with the solutions required.More specifically, the two HazelcastInstance
beans (for the same actual instance) you have to handle: One for transactional and another for non-transactional collections. There is also an “elegant hack”, proxying transactional collections to make them implementing non-transactional interfaces.Stay-tuned to the OpenCredo blog for the next installation of my exploration into Spring and Hazelcast!This blog is written exclusively by the OpenCredo team. We do not accept external contributions.