Transactional event-based NOSQL storage
I am presenting here a simple two steps architectural approach based on stored events as a workaround for the lack of full atomic transaction support in so-called “NOSQL” databases.
Being fairly new to NOSQL-based architectures, I have the annoying intuition that I am about to write nothing but a set of obvious statements. On the other hand, I have not yet read a detailed description of this anywhere, so hopefully it will be useful to some other developers as well.
[Edit: 1rst Oct 2012]: look also at the slides of Nathan Marz’ recent presentation on event based “Big Data architecture”: http://www.slideshare.net/nathanmarz/runaway-complexity-in-big-data-and-a-plan-to-stop-it, this one has some similarities with what I present below and is very clear to understand.
NOSQL databases do not offer atomic transactions over several update operations on different data entities(*). A simplistic explanation to this is that the two-phase commit mechanism forces each participant to lock its data for some time, which does not scale with highly partitioned data (cf CAP theorem). However, NOSQL databases of type document stores are usually able to update atomically as many properties of one single document as we want (this is at least true for AWS SDB data rows and MongoDB JSON documents).
* [UPDATE 10 Sept 2011] : I recently discovered the awesome neo4j graph database that brands itself as “NOSQL” and offers support for traditional ACID transactions. However, even though neo4j can be deployed as a cluster in replication mode, it is is not partition tolerant, as explained here. Redis is also offering some interesting atomic transactional support, although, as pointed out here, a failed update operation does not trigger the automatic roll back of the others.
There exists of course tons of situations which require updating several entities as one single operation. In the NOSQL world, there’s even more such examples as we are encouraged to replicate data across entities rather that use joins at read time, for performance reasons(*).
* [UPDATE 10 Sept 2011] : Those operations are costly in most current NOSQL products because of the so-called n+1 problem. When using a RDBMS solution, this is typically resolved by letting the DB itself perform the join internally before returning data to the application. I learnt recently that some NOSQL products also allow the traversal of relationships inside the DB: neo4j is of course one of them, by nature, but Redis also provides such mechanism, as explained at the end of this post, and here.
A naïve solution to this is to ignore the lack of atomicity and simply leave the data inconsistent in case of partial update failure. This might be acceptable when the data is considered less critical than the throughput (e.g collection of statistical trends) or in situations where inconsistency just means we end up with some non deleted “dead” records that will be ignored forever.
Another approach is trying to compensate by undoing the already executed operations in case of failure. However, obtaining 100% guarantee of consistency that way is hard because it requires to build an undo mechanism that never fails, which can be a very hard thing to do.
A brief example
Imagine an application that allow users to create a profile and to send messages to each other. Imagine the requirements tell us that, when displaying the list of messages, we must display the city of origin of both the author and the recipient.
A relational model for this would simply use foreign keys to link the entities.
In the NOSQL world however, this is discouraged because traversing the relations requires supplementary round-trips to the DB (I admit the performance impact would be small in the case of my simplistic example). We therefore choose to de-normalize the data and to replicate the personal details of the author and recipient within the userMessage table. This of course yields the need of several update operations any time a profile is updated (because the messages must now be updated as well). In a lot of applications, read operations happen much more often then write operations, so the overall performance of this approach is typically good.
I just found that Adam Heroku has already blogged about a very similar solution 4 years ago (argl !), here:
“The old example of using a database transaction to wrap the transfer of money from one bank account to another is total bull. The correct solution is to store a list of ledger events (transfers between accounts) and show the current balance as a sum of the ledger. If you’re programming in a functional language (or thinking that way), this is obvious.”
Which seems to confirm I am just stating the obvious here :-)
The solution proposed here is using two distinct data spaces, one for writing the data and one for reading it. Behind the scene, a job is asynchronously pulling data out of the first space and propagating its impact to the second one. Both spaces have very different data models. In this post, I refer to those spaces as the write domain and the read domain.
As explained in the problem statement above, the largest set of data we can update in one atomic operation is one single “row”.
For each desired update transaction, the “write-only DAOs” will therefore output nothing more than one monolithic event describing in details the business action(s) that has just happened. In my example, those events would be instances of “ProfileUpdateEvent”. In the bank account example of Heroku, these would be the “ledgers”.
The events must be self-contained, i.e. bundle enough information so that their impact on the domain entities can later be completely evaluated based on their content only. During this later evaluation, they will be seen as snapshots from the past, possibly hard to correlate with the present state of other resources.
It is also important to fully validate the business action before outputting the resulting event to DB. The propagation job runs asynchronously and will have no simple way of warning the initiator of the update that some input parameters is incorrect or inconsistent.
A nice improvement is to try to trigger the propagation job described in the next section directly after the event insertion in DB. If this second step fails, it’s still ok as we know it is going to be retried later on anyway. We can even put in place a nice degradation of service policy which only triggers this immediate propagation when the system load is below a given threshold. This ensures that we free up computing resources (which are infinite in a cloud, I know, I just do not trust this ^__^ ) in case of high traffic.
The propagation job is a process which runs continuously in the background, its role is fairly straightforward:
- Read one event out from the write domain (or several correlated events, if this is relevant)
- Update the corresponding read domain tables in an idempotent fashion
- If all the above was successful, remove the event from the write domain
This propagation of course involves several update operations wich, here again, are not guaranteed to be executed atomically. However, because the events are supposed to be fully validated at previous step, any failure here should only happen in case of recoverable error (like a network outage, a node crash…), which implies that if we retry the failed propagation regularly, it should eventually succeed. If this is not the case, it means we have a bug, which requires a human intervention anyway.
It is therefore crucially important that the update operation performed on the read domain be idempotent, so that we can repeat the complete set of operations (even the ones that did not fail last time). Idempotency is easily achieved in the user profile example: we can execute the “setAuthorCity” method repeatly with the same argument with no side effect. It is trickier to achieve in the ledger example of Heroku but still feasible, for example by recording the list of “received event ids” as part of the updated row (it must be within the updated row itself, here again because this is the biggest atomic write operation we can execute).
The read operations boil down to, well, reading the data from the read domain any time there is a business need to do so.
A supplementary step can improve the consistency of the obtained information by reading all pending events related to the entities obtained from the read domain and computing in real time the freshest possible state of the domain entities. Similarly to the degradation of service mechanism exposed in the “Writing events” section above, in case of peak of application traffic, the read DAOs could be instructed not to perform this additional step in order to return a slightly less consistent result but also to free resources faster.
The event-based storage solution proposed here lets a system benefit from the high scalability offered by NOSQL databases while still providing a 100% guarantee of “eventual consistency” for any written data, even when several update operations need to be executed as one business transactional unit. The approach does not prevent us to define a very de-normalized data model, which avoid join operations and increase read access speed.
Three important aspects of the solution are
- events must be fully validated before being put in DB
- events must be self contained
- propagation of the impact must be done in an idempotent way
Both read and write operations are much simpler than in traditional approaches as the data model they handle is well aligned with their respective use cases and need little manipulation when moved to/from the persistence layer. The business logic is cleanly encapsulated in the propagation job(s), away from the application external interfaces, so the approach also helps us enforce good architectural practices.
Because the propagation job can optionally be triggered by the write operations immediately, outputting the event only appears as one small supplementary step that ensures eventual consistency of data. The required latency before a written data becomes visible to the read-only DAOs should therefore not be, under normal traffic, higher than the latency we already experience in NOSQL systems.