How to Integrate MongoDB with Diffusion Cloud
July 13, 2016 | DiffusionData
MongoDB forms part of the NoSQL vanguard and is big in more than once sense, which is part of the attraction. It is both very popular, finding employ inside eBay, LinkedIn and Craigslist, and it excels in storing & organizing very large volumes of data.
That it stores its content as (what look and feel like) JSON documents makes it interesting from from a Diffusion Cloud perspective. “How hard ..”, I asked myself, “..would it be to reflect a MongoDB collection into the topic tree?”. Not so hard, as it turns out.
MongoDB divides is storage into databases which in turn are divided into collections, much like a conventional RDMS uses tables. Our intent is to subscribe to a single collection and all changes to it, creating a discrete JSON topic for each document, and updating each topic in sync with the original document.
Configuring MongoDB for replication
Our first obstacle: MongoDB doesn’t really do subscription in a pub/sub sense. So instead make use the replication feature, which is really intended for wiring together MongoDB replica sets. Plenty of third-party applications make use of it however, including Meteor who do include a more directly useful Publish/Subscribe feature in their product.
It’s an optional feature so it has to be deliberately configured. Typically MongoDB is run as a daemon, but for testing purposes we configure and run ours in the fore.
As well as starting your MongoDB server this will also output a lot of diagnostic noise, so start another shell for MongoDB interaction.
The output from this Mongo
testReplSet:PRIMARY confirms that we’re logged into the primary (and indeed only) member of the replication set. We then create 100 documents in a collection
someDB.someCollection. This gives us our test subject.
Building the Adapter.
All sources for the adapter are available in Github, but here we shall lavish some explanation on the significant parts.
Placing the connectivity
We chose JCommander because we’ve used it before, and while http://mvnrepository.com/ shows nearly five times as many projects use Apache Commons CLI, the annotation based approach makes for a quicker & cleaner approach.
oplog.rs in the database
local which exists only when replication is configured, and is used as a sink for all storage mutation events. The OpLog is a “capped” collection, making it a little different to regular collections:
- Documents can only be appended to a capped collection (though an identically sized documents can be used to overwrite an existing document).
- Capped collections are akin to circular buffers. They are bounded, and once all allocated storage is used, older storage is then reused.
- Similarly to the way
tail(1)works with files, a capped collection can entertain queries from ‘tailable’ cursors, such that when the cursor reaches the end of the collection the cursors waits while more documents are appended to the collection.
Building the Adapter
build method gathers two key features. Firstly it obtains TopicUpdateControl which is used to update topics values, using which it obtains an exclusive ValueUpdater for a branch of the topic tree. Finally we obtain TopicControl in order that we may create and remove topics.
Now we build a MongoNamespace, encapsulating the database and collection used to listen for command events (specifically commands to drop the collection), before constructing and returning an Adapter object. Using a builder rather than a more complex constructor conveys many benefits (as covered in chapter 2 of in Effective Java by Josh Bloch), but my motivations here are: the constructors intent is clear and it is simpler to build an immutable (and therefore thread-safe) Adapter object.
Turning the handle
The real work can begin now both the connectivity and state are in place. In method run() we do two things:
- Query the MongoDB collection for all documents, transcribing each.
- call relayChanges()
We use Java 8’s “Try With Resources” feature so that cursor opened on line 10 is closed a soon as the thread drops out of that scope.
transcribeDocument() has the task of building a new JSON topic from the content of the MongoDB document. If the topic has already been created (and therefore exists in
topicPaths) we update the topic. Either branch makes use of method
toBytes(String) which parses a JSON string down to a form suitable for the ValueUpdater.
Relaying the changes
relayChanges(long) does what it says on the tin: relays changes from the MongoDB instance to the topic tree. Before listening to the OpLog for events it composes a filter to use. In this case we wish only to see events later that ‘now’, that are document-insert, update or delete events, or commands to drop the collection.
BsonTimestamp resolves time only to the second, and nothing smaller. It uses an incrementing counter to disambiguate events within that second.
We make the call to
oplog.find(filter), and build a
TailableAwait cursor, and then read incoming events as they happen. We switching them to either
processCommand(Document), depending on the value of property
op (presumably short for operation).
processCommand(Document) check that the event relates to the adapted collection. A more focussed filter here would be both more performant and eliminate the need for this step.
processInsert(Document) mostly reuses
transcribeDocument() to update a topic with the new content embedded in the event.
processDelete(Document) uses the MongoDB document-id to look up the effected topic, and then remove it.
processUpdate(Document) is more interesting as the event contains instructions to change the document, but not the changed document itself. A more fully realised adapter implementation would interpret and react to the changes, however in this case we fetch the changed document from the collection using it’s ObjectId, and pass that to transcribeDocument to update the existing topic. This imposes the cost of an extra round-trip, for the sake of readability.
Above we have shown how little code is actually required to get started. As this is written the Github repository holds only 426 lines of Java. In the next blog we shall show how the adapter can be put to use.