Diffusion for Developers

Create real-time applications that deliver hyper-personalized data to millions of concurrent consumers.

Developers hero image

Remove the limits on team productivity – rapidly develop and connect applications with your choice of language, protocols and APIs.

PUBLISH
// Add a topic and set its value
diffusion.connect({
    host : "",
    port : "",
    principal : "admin",
    credentials : ""
}).then(function(session) {
    // Use the new session here
});
SUBSCRIBE
// Subscribe to topics.
final Session session = Diffusion.sessions()
  .principal("admin")
  .password("")
  .open("ws://:");

session.feature(Topics.class).addStream("my-topic", JSON.class, new Topics.ValueStream.Default() {
  @Override	
PUBLISH
// Add a topic and set its value.
var session = Diffusion.Sessions
    .Principal("admin")
    .Password("")
    .Open("ws://::");

await session.TopicControl.AddTopicAsync( “my-topic”, TopicType.JSON );

var jsonDataType = Diffusion.DataTypes.JSON;
var value = jsonDataType.FromJSONString("{\"foo\": \"bar\"}");
var result = session.TopicUpdate.SetAsync(
    "my-topic",
    value);

											
SUBSCRIBE
//Subscribe to topics.
var session = Diffusion.Sessions
    .Principal("admin")
    .Password("")
    .Open("ws://::");
    
session.Topics.AddStream("my-topic", new ExampleValueStream());

await session.Topics.SubscribeAsync("my-topic");


class ExampleValueStream : IValueStream
{
     public void OnSubscription(string topicPath, ITopicSpecification specification)
     {
         Console.WriteLine($"Subscribed to: {topicPath}");
     }

     public void OnUnsubscription(string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason)
     {
         Console.WriteLine($"Unsubscribed from: {topicPath}");
     }

     public void OnValue(string topicPath, ITopicSpecification specification, IJSON oldValue, IJSON newValue)
     {
         Console.WriteLine($"{topicPath}: {newValue.ToJSONString()}");
     }

     public void OnClose()
     {
         // Not used
     }

     public void OnError(ErrorReason errorReason)
     {
         // Not used
     }
}
											
PUBLISH
// Add a topic and set its value.
final Session session = Diffusion.sessions()
  .principal("admin")
  .password("")
  .open("ws://:");

final CompletableFuture result =
               session.feature(TopicControl.class).addTopic("my-topic",
                                                            TopicType.JSON);

final JSONDataType jsonDataType = Diffusion.dataTypes().json();
final JSON value = jsonDataType.fromJsonString("{\"foo\" : \"bar\" }");
final CompletableFuture result = session.feature(TopicUpdate.class).set("my-topic", JSON.class, value);
											
SUBSCRIBE
// Subscribe to topics.
final Session session = Diffusion.sessions()
  .principal("admin")
  .password("")
  .open("ws://:");

session.feature(Topics.class).addStream("my-topic", JSON.class, new Topics.ValueStream.Default() {
  @Override
  public void onValue(String topicPath, TopicSpecification topicSpec, JSON oldValue, JSON newValue) {
    System.out.println("New value for" + topicPath + ": " + newValue.toJsonString());
  }
});

session.feature(Topics.class).subscribe("my-topic");
											
PUBLISH
// Add a topic and set its value.
var session:PTDiffusionSession?

let url = URL(string: "ws://:")!
let credentials = PTDiffusionCredentials(password: "")
let config = PTDiffusionSessionConfiguration(principal: "admin", credentials: credentials)

PTDiffusionSession.open(with: url, configuration: config) { (_session, error) -> Void in
    self.session = _session
}

let errorHandler: (Any?, Error?) -> Void = {response, error in
    if (error != nil) {
        print(error!)
    }
}

self.session!.topicControl.addTopic(withPath: "my-topic",
                                    type: PTDiffusionTopicType.JSON,
                                    completionHandler: self.errorHandler)

let value = try? PTDiffusionJSON(object: ["foo": "bar"])

self.session!.topicUpdate.setWithPath("my-topic",
                                      toJSONValue: value!) { error in self.errorHandler(nil, error) }
											
SUBSCRIBE
// Subscribe to topics.
class StreamDelegate: NSObject, PTDiffusionJSONValueStreamDelegate {

    func diffusionStream(_ stream: PTDiffusionStream,
                         didUnsubscribeFromTopicPath topicPath: String,
                         specification: PTDiffusionTopicSpecification,
                         reason: PTDiffusionTopicUnsubscriptionReason) {
        print("Unsubscribed from: \(topicPath)")
    }

    func diffusionStream(_ stream: PTDiffusionStream,
                         didFailWithError error: Error) {
        print("Failed with error: \(error)")
    }

    func diffusionDidClose(_ stream: PTDiffusionStream) {
        print("Closed")
    }

    func diffusionStream(_ stream: PTDiffusionStream,
                         didSubscribeToTopicPath topicPath: String,
                         specification: PTDiffusionTopicSpecification) {
        print("Subscribed to: \(topicPath)")
    }

    func diffusionStream(_ stream: PTDiffusionValueStream,
                         didUpdateTopicPath topicPath: String,
                         specification: PTDiffusionTopicSpecification,
                         oldJSON: PTDiffusionJSON?,
                         newJSON: PTDiffusionJSON) {
        do {
            let value:Dictionary = try newJSON.object() as! Dictionary
            print("\(topicPath): \(value.description)")
        }
        catch {
            print("Unable to read message")
        }
    }
}

var session:PTDiffusionSession?

let url = URL(string: "ws://:")!
let credentials = PTDiffusionCredentials(password: "")
let config = PTDiffusionSessionConfiguration(principal: "admin", credentials: credentials)

PTDiffusionSession.open(with: url, configuration: config) { (_session, error) -> Void in
    self.session = _session
}

let errorHandler: (Any?, Error?) -> Void = {response, error in
    if (error != nil) {
        print(error!)
    }
}

let delegate = StreamDelegate()

let selector = PTDiffusionTopicSelector(expression: "my-topic")
let stream = PTDiffusionJSON.valueStream(with: self.delegate)

try? self.session!.topics.add(stream, with: selector, error: ())

self.session!.topics.subscribe(withTopicSelectorExpression: "my-topic") { error in self.errorHandler(nil, error) }

											
PUBLISH
// Add a topic and set its value.
final Session session = Diffusion.sessions()
  .principal("admin")
  .password("")
  .open("ws://:");

final CompletableFuture result =
               session.feature(TopicControl.class).addTopic("my-topic",
                                                            TopicType.JSON);

final JSONDataType jsonDataType = Diffusion.dataTypes().json();
final JSON value = jsonDataType.fromJsonString("{\"foo\" : \"bar\" }");
final CompletableFuture result = session.feature(TopicUpdate.class).set("my-topic", JSON.class, value);
											
SUBSCRIBE
// Subscribe to topics.
final Session session = Diffusion.sessions()
  .principal("admin")
  .password("")
  .open("ws://:");

session.feature(Topics.class).addStream("my-topic", JSON.class, new Topics.ValueStream.Default() {
  @Override
  public void onValue(String topicPath, TopicSpecification topicSpec, JSON oldValue, JSON newValue) {
    System.out.println("New value for" + topicPath + ": " + newValue.toJsonString());
  }
});

session.feature(Topics.class).subscribe("my-topic");
											
PUBLISH
/*
 *  Add a topic and set its value.
 */
DIFFUSION_SESSION_FACTORY_T *session_factory = diffusion_session_factory_init();
diffusion_session_factory_principal(session_factory, "admin");
diffusion_session_factory_password(session_factory, "");

SESSION_T *session = session_create_with_session_factory(session_factory, "ws://::");

on_topic_added(SESSION_T *session, TOPIC_ADD_RESULT_CODE result_code, void *context)
{
        printf("on_topic_added\n");
        return HANDLER_SUCCESS;
}

static int
on_topic_add_failed(SESSION_T *session, TOPIC_ADD_FAIL_RESULT_CODE result_code, const DIFFUSION_ERROR_T *error, void *context)
{
        printf("on_topic_add_failed: %d\n", result_code);
        return HANDLER_SUCCESS;
}

static int
on_topic_update(void *context)
{
        printf("topic update success\n");
        return HANDLER_SUCCESS;
}

ADD_TOPIC_CALLBACK_T callback = {
        .on_topic_added_with_specification = on_topic_added,
        .on_topic_add_failed_with_specification = on_topic_add_failed
};

TOPIC_SPECIFICATION_T *spec = topic_specification_init(TOPIC_TYPE_JSON);
add_topic_from_specification(session, "my-topic", spec, callback);

BUF_T *buf = buf_create();
write_diffusion_json_value("\"hello world\"", buf);

DIFFUSION_TOPIC_UPDATE_SET_PARAMS_T topic_update_params = {
        .topic_path = "my-topic",
        .datatype = DATATYPE_JSON,
        .update = buf,
        .on_topic_update = on_topic_update
};

diffusion_topic_update_set(session, topic_update_params);
											
SUBSCRIBE
/*
 *  Subscribe to topics.
 */
DIFFUSION_SESSION_FACTORY_T *session_factory = diffusion_session_factory_init();
diffusion_session_factory_principal(session_factory, "admin");
diffusion_session_factory_password(session_factory, "");

SESSION_T *session = session_create_with_session_factory(session_factory, "ws://::");


static int on_subscription(const char* topic_path,
                    const TOPIC_SPECIFICATION_T *specification,
                    void *context)
{
        printf("Subscribed to topic: %s\n", topic_path);
        return HANDLER_SUCCESS;
}

static int on_unsubscription(const char* topic_path,
                      const TOPIC_SPECIFICATION_T *specification,
                      NOTIFY_UNSUBSCRIPTION_REASON_T reason,
                      void *context)
{
        printf("Unsubscribed from topic: %s\n", topic_path);
        return HANDLER_SUCCESS;
}

static int on_value(const char* topic_path,
             const TOPIC_SPECIFICATION_T *const specification,
             const DIFFUSION_DATATYPE datatype,
             const DIFFUSION_VALUE_T *const old_value,
             const DIFFUSION_VALUE_T *const new_value,
             void *context)
{
        char *result;
        bool success = to_diffusion_json_string(new_value, &result, &api_error);

        if(success) {
                printf("Received value: %s\n", result);
                free(result);
                return HANDLER_SUCCESS;
        }

        DIFFUSION_API_ERROR api_error;
        printf("Error during diffusion value read: %s\n", get_diffusion_api_error_description(api_error));
        diffusion_api_error_free(api_error);
        return HANDLER_FAILURE;
}

static void on_close()
{
        printf("Value stream closed\n");
}

VALUE_STREAM_T value_stream = {
        .datatype = DATATYPE_JSON,
        .on_subscription = on_subscription,
        .on_unsubscription = on_unsubscription,
        .on_value = on_value,
        .on_close = on_close
};

add_stream(session, "my-topic", &value_stream);
SUBSCRIPTION_PARAMS_T params = {
        .topic_selector = "my-topic"
};

subscribe(session, params);
											
PUBLISH
# Add a topic and set its value.
path = "foo/bar"
topic_type = diffusion.datatypes.STRING
value = "Value1"

async def main():
    async with diffusion.Session(
        url="wss://:", principal="admin", credentials=diffusion.Credentials("")
    ) as session:

    add_response = await session.topics.add_topic(path, topic_type)

    if add_response == session.topics.CREATED:
        print(f"Topic {path} successfully created.")
    if add_response == session.topics.EXISTS:
        print(f"Topic {path} already exists.")

    await session.topics.set_topic(path, value, specification=topic_type)
    print(f"Topic {path} successfully set to {value}")
											
SUBSCRIBE
# Subscribe to topics.
def on_update(*, old_value, topic_path, topic_value, **kwargs):
    print("Topic:", topic_path)
    if old_value is None:
        print("  Initial value:", topic_value)
    else:
        print("  Value updated")
        print("    Old value:", old_value)
        print("    New value:", topic_value)


def on_subscribe(*, topic_path, **kwargs):
    print(f"Subscribed to {topic_path}")


def on_unsubscribe(*, reason, topic_path, **kwargs):
    print(f"Unsubscribed from {topic_path} because {str(reason)}")


topic_selector = "foo/bar"
topic_type = diffusion.datatypes.STRING

session_duration = 15

value_stream = diffusion.topics.ValueStreamHandler(
    data_type=topic_type,
    update=on_update,
    subscribe=on_subscribe,
    unsubscribe=on_unsubscribe,
)

async def main():

    async with diffusion.Session(
        url="wss://:", principal="admin", credentials=diffusion.Credentials("")
    ) as session:

    print("Adding value stream")
    session.topics.add_value_stream(
        topic_selector=topic_selector, stream=value_stream
    )

    print(f"Subscribing to {topic_selector}")
    await session.topics.subscribe(topic_selector)
    await asyncio.sleep(session_duration)

    print(f"Unsubscribing from {topic_selector}")
    await session.topics.unsubscribe(topic_selector)
											

Consume, Transform and Deliver Data with Intelligence and Ease

Consumption icon

Consumption iconCONSUME

Quickly integrate any data source using adapters that simplify connecting data streams Kafka, REST, MQTT and more.

Adapters

  • Cloud REST Adapters poll external REST services and import data to a topic path of your choice.
  • Kafka Adapter lets you easily
    • Integrate real-time data from remote Kafka clusters into Diffusion.
    • Filter and aggregate data with flexible integration.
    • Connect securely using SSL—or SASC Map a Kafka data stream to and from a Diffusion time series, monitoring activity with Prometheus.

Protocols

  • MQTT Support enables direct connection of IoT devices using MQTT 5.0.
  • REST API is ideal for one-off updates or data snapshots, suitable for low-power devices and can be called with any language capable of HTTP requests.

SDKs

  • SDKs stream the latest real-time data from your client app using a persistent session.
Wrangling icon

Wrangling iconTRANSFORM

Diffusion tranforms data in-flight, processes and segments data in real time, stores event streams for querying and editing and streams real-time data from your clients’ apps.

Hyper-Personalize Data

  • Easily transform and map incoming data with low-code topic views—or implement advanced application logic with control clients.
  • Aggregate multiple incoming values into one topic to turn a clickstream into a customer.
  • Expand a single incoming data point and generate subtopics to make a list of prices into markets.
  • Tailor data for delivery to each service or region and even provide a custom feed for each end user.

TIME SERIES

  • Store a time-stamped event within a single topic.
  • Stream events as they happen or query to retrieve part of a series.
  • Enable non-destructive updates to update a posted event while maintaining a full audit trail

Delayed Feed

  • Topic views can be used to create a delayed feed and are ideal for creating lower-value versions of time-sensitive data. You can replay every data change with custom delays of as little as one second or multiple days.
Distribution icon

Distribution iconDELIVER

Diffusion features patented delta streaming technology that minimizes costs, controls data access down to individual end users to maximize security, and uses remote topic views to efficiently distribute data across a geographically dispersed user base.

Delta Streaming

  • Intelligently distinguishes between old, updated and new data, only sending recent, relevant, information to clients instead of the entire topic content. 90% reduction in server and bandwidth requirements is achieved by avoiding the need to send data that isn’t changing from one markup to the next.

Fine-Grained Security

  • Fine-grained dynamic security permissions can be provided for each topic.
  • Unique security permissions can be assigned to each user, scalable to hundreds of thousands of users.
  • Access changes are applied immediately giving you real-time control of what data each user can see.

Remote Topic Views

  • Copy all or part of the topic tree on one server to another.
  • Ideal for distributing data across a geographically dispersed user base.
  • Minimize latency and bandwidth by serving data from a local server.
  • Easy set up and modification from the Diffusion monitoring console or via SDKs.

Diffusion SDKs

Available for immediate download. Need help or direction? Book a call with one of our architects.

Learn About the Diffusion Intelligent Data Platform

Whether you’re using Diffusion now or just getting your feet wet, our learning resources are available to help anytime.

Support

Need a hand or a few pointers? Contact our support team.

Start publishing real-time data with Diffusionfree!
Rethink Real Time with Diffusion, the only Intelligent Data Platform.