Conditional topic updates

Introduction

In Diffusion 6.10, we have released an extension to the UpdateConstraint feature that allows you to update JSON topics only if part of the content matches a particular condition. Previously, the update would only take effect if the value matched exactly; now we can use comparison operations such as less-than and greater-than.

This provides a mechanism to prevent out-of-order updates to topics, and to ensure that you do not overwrite new data with old data. This might be a problem when:

  1. The input stream contains unordered data, and you want to make sure that you always update a topic with a later value. Any messages that are older than the current value are discarded.

  2. You have more than one publisher sending updates to a topic and you can’t guarantee the order in which they update the topic. You would like to avoid race conditions where old values overwrite newer ones.

Example use cases

Dropping out-of-order messages

We have a publisher sending data from a 3rd party system to Diffusion, but we are unable to guarantee the order of updates.

We might see something like this:

The client will receive updates in exactly this order, and the server – and therefore the client – will receive :green-4: before :red-3:  and for a short time will not have the true value from the source system.

Diffusion can’t reorder the messages, but with conditional updates we could ensure that we do not update a topic with an older copy of the source data. This way, the client always has the most recent value:

Multiple overlapping producers

In another situation, you might have multiple independent producers that are updating the same topic. Again, you don’t want to overwrite a topic with an older value than it currently contains. Consider two producers, sending the same data at slightly different times:

Helpfully, Diffusion (as long as you are not using the DONT_RETAIN_VALUE flag), doesn’t send an update when there is no change to a topic, so the client doesn’t get all 8 possible updates. But, for some period of time, :green-4: is overwritten by :red-3: and old data is given to the client. Clearly, we don’t want this to happen!

Example usage

Let’s take a sample series of messages with which we want to update a topic:

{ "seqnum": 1, "price" : 1.50 }
{ "seqnum": 2, "price" : 1.52 }
{ "seqnum": 4, "price" : 1.54 }
{ "seqnum": 3, "price" : 1.53 }
{ "seqnum": 5, "price" : 1.55 }

Like our first use case, we have data here that is not in order. We will use UpdateConstraints to make sure that the message with seqnum = 3 does not overwrite the topic after seqnum = 4. The rules that govern updating the topic are:

  1. If the topic does not exist, we can add it.

  2. If the topic exists, but has no data, we can update it.

  3. If the topic exists and has data, the seqnum field must contain a value that is less than the one in the message we are using for the update.

To do this, we create an UpdateConstraint with the following rules:

final UpdateConstraint constraint =
    Diffusion.updateConstraints()
             .noTopic()
             .or(Diffusion.updateConstraints().noValue())
             .or(Diffusion.updateConstraints().jsonValue()
                 .with("/seqnum",
                       UpdateConstraint.Operator.LT,
                       thisSeqNum));

Available condition operators

LT is not the only comparison operator available. Here is a full list:

Operator

Description

IS

Is exactly the same

EQ

Logically equal to

NE

Logically not equal to

GT

Greater than

GE

Greater than or equal to

LT

Less than

LE

Less than or equal to

In all cases except for IS, when the value can be interpreted as a number such as a Long or a Double it is, and this includes when it is formatted as a string. For example, LT 5 is interpreted the same as LT "5.0".

The difference between IS and EQ is that IS performs an exact comparison at the byte level, whereas EQ will check to see if the current and new values are equivalent:

Value

1.2

1.20

“1.20”

1.2

IS + EQ

EQ

EQ

1.20

EQ

IS + EQ

EQ

“1.20”

EQ

EQ

IS + EQ

Full example

The following class shows how to send JSON messages as per the above example.

package com.diffusiondata.examples;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.TopicUpdate;
import com.pushtechnology.diffusion.client.features.UpdateConstraint;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import com.pushtechnology.diffusion.datatype.json.JSON;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

public class Main {
    public static void main(String[] args) throws Exception {
        final Session session = Diffusion.sessions()
                                         .principal("admin")
                                         .password("password")
                                         .open("ws://localhost:8090");

        final TopicSpecification topicSpec = Diffusion.newTopicSpecification(TopicType.JSON);

        // A sequence of JSON-formatted messages to send to Diffusion.
        // Note the out-of-order seqnum 3.
        final String[] messages = {
            "{\"seqnum\": 1, \"price\": 1.50}",
            "{\"seqnum\": 2, \"price\": 1.52}",
            "{\"seqnum\": 4, \"price\": 1.54}",
            "{\"seqnum\": 3, \"price\": 1.53}",
            "{\"seqnum\": 5, \"price\": 1.55}",
        };

        final ObjectMapper mapper = new ObjectMapper();

        for (String thisMessage : messages) {
            final JsonNode node = mapper.readTree(thisMessage);
            final int thisSeqNum = node.get("seqnum").asInt();

            final UpdateConstraint constraint =
                Diffusion.updateConstraints()
                         .noTopic()
                         .or(Diffusion.updateConstraints().noValue())
                         .or(Diffusion.updateConstraints().jsonValue().with("/seqnum",
                                                                            UpdateConstraint.Operator.LT,
                                                                            thisSeqNum));

            final JSON myJsonMessage = Diffusion.dataTypes().json().fromJsonString(thisMessage);

            session.feature(TopicUpdate.class)
                   .addAndSet("my-topic", topicSpec, JSON.class, myJsonMessage, constraint)
                   .whenComplete((result, err) -> {
                       if (err == null) {
                           System.out.println("Updated topic with seqnum="
                                              + thisSeqNum);
                       } else {
                           System.err.println("Failed to update topic with seqnum="
                                              + thisSeqNum + ": " + err.getMessage());
                       }
                   });
    }

    // Give some time for futures to complete
    Thread.sleep(1000);

    // Done.
    session.close();
}