Diffusion Intelligent Data Platform
Real-Time Hyper-Personalization Made Simple
Quickly capitalize on existing or new data sources. Purpose-built to simplify event-driven, real-time application development, Diffusion enables you to swiftly add new capabilities with minimal development costs.
No-Restriction Data Consumption
Accommodates any size, format or velocity of data.
Low-code Data Transformation
Low-code features enable dynamic data enrichment without application modification.
Minimal Resource Requirements
Delta Streaming delivers internet scalability with 90% fewer server resources.
More Than Just a Pipeline to the Edge
Data Gateway
Makes it easy to integrate with any data source within your data center or outside, with pre-built Adapters for Kafka, MQTT, REST, JMS or backend applications using our WebSocket-based SDKs in all popular languages. Accommodates both streaming and static data sources, supports any size, format, or velocity of data.
Data Transformation
Provides a flexible, hierarchical data model to organize in-coming event-data in a multi-level topic tree structure. Easily scalable to millions of topics. Facilitates transformation of event data using low-code features of the platform. Enables subscription to event-data at a fine-grained level for hyper-personalization and efficient distribution at Internet scale.
Data Distribution
Includes a real-time Event Broker that can scale to millions of concurrent connections. Supports Pub/Sub model with history tracking as well as asynchronous Request/Response. With intelligent queuing, conflation, compression, and patented delta-streaming protocol you can save over 90% in bandwidth costs.
David Bull, Director, Baker Technology
The Complete Real-Time Application Platform
Diffusion Intelligent Data Platform combines 10 powerful feature sets to accelerate even-driven, real-time application development.
Real-Time Event Broker
PUB-SUB
One of four delivery mechanisms available inside Diffusion enabling you to publish real-time data to hundreds of thousands of consumers efficiently.
REQUEST-RESPONSE
Provides reliable two-way messaging between multiple endpoints, allowing you to build sophisticated asynchronous interactions.
TIME SERIES (EVENT HISTORY)
Store real-time events and dynamically query historial data, replay past events at any point in time.
FETCH
Retrieve snapshots from real-time streams and efficiently access static data through a simple query API.
Delta Streaming
DELTAS
Diffusion automatically transmits only the changes from the previous message, resulting in bandwidth reductions of over 90% – providing consistently faster delivery, more efficient scaling, and lower costs.
FLOW CONTROL
Diffusion’s networking layer adapts to congestion or slow responses, allowing clients to maintain connectivity across unpredictable networks.
THROTTLING/CONFLATION
Control the rate of data delivery whilst ensuring slow consumers always receive the most recent updates, with intelligent, per-topic conflation policies.
Hierarchial Topic Trees
TOPIC MANAGEMENT
DATA AGNOSTIC
Diffusion is completely data agnostic; stream text, JSON or binary data in any combination or format.
PERSISTED
Reliable disk-based persistence with cluster aware restoration, ensuring service resiliency.
Low-Code Features
DECLARATIVE LANGUAGE – DSL
Diffusion provides low-code features via Domain Specific Languages (DSLs) enabling easy configuration. DSL simplifies data wrangling via the management console without having to make any code changes to customer applications.
LOW CODE CAPABILITIES
Build a simple backend application to publish a firehose of event data to Diffusion using our SDKs. Then use DSLs to re-organize and transform data via the management console with no changes to your applications.
TRANSFORMATION
Restructure JSON values in-flight, enabling you to disaggregate and filter data, enabling you to deliver and secure data at a fine-grained level.
Open Protocols
REST API
Diffusion’s simple REST API enables quick integration of stateless applications, letting you scale real-time systems with minimal effort.
MQTT
Support for MQTT offers a lightweight alternative to Diffusion SDK. A wide variety of existing IoT devices and web apps can now connect to Diffusion to leverage the benefits of its topic management and security frameworks.
PRE-BUILT ADAPTERS
Use pre-built adapters to start scaling and distributing real-time data from external services such as Kafka, JMS or REST interfaces in minutes.
RICH CLIENTS
Standardized client libraries in popular programming languages make it easy to develop advanced real-time functionality within web, mobile, IoT & desktop applications.
Fine-Grained Security
AUTH INTEGRATION
Fully control your application’s authentication flow with adapters for standard and bespoke integration points.
DYNAMIC ACL
Dynamically apply and modify topic level security enabling you to add and remove access in real time.
TTL
Establish automated rules over the lifecycle of data, making it easy to control when data is available.
Seamless Scalability
CLUSTERING
Distribute data, security and connections in real time across a local cluster, making it easier to reliably scale services horizontally.
REMOTE SERVERS
Create dynamic connections between Diffusion Clusters over a WAN, enabling global replication and hybrid deployments.
EDGE REPLICATION
Store data at the network edge for faster access, whilst also removing load from back-end systems.
Analytics Dashboard
METRICS
Gain meaningful insights by exposing data, session and infrastructure metrics to industry standard real-time monitoring tools such as Prometheus.
DASHBOARD
Configure, manage and report on connections, data, processing and security through a real-time UI (Diffusion management console).
CONTROL API
Expose every event inside Diffusion to an API for handling by back-end services providing end-to-end control.
Mesh Architecture
REMOTE SERVERS
Create dynamic connections between Diffusion clusters over a WAN, enabling global replication and hybrid deployments: an Intelligent Event Data Mesh.
REMOTE TOPIC VIEWS
To manage data across a geographically dispersed user base, remote topic view allows global replication of real-time data, assures compliance, and minimizes delivery latency.
CLUSTER-AWARE MESSAGING
Diffusion’s request-response messaging is cluster-aware, enabling automatic message routing within a cluster including delivery acknowledgment. Messages can be selectively targeted to individual clients, groups of clients, or all clients, via property-based addressing.
Integrations
PRE-BUILT ADAPTERS
Use pre-built adapters to start scaling and distributing real-time data from external services such as Kafka, JMS or REST in minutes.
BESPOKE INTEGRATIONS
Distributed coordination, state management, and partial updates make the integration of bespoke data sources simple.
REAL-TIME FIRE HOSES
High-performance networking and flow-control ensures that you can reliably consume real-time data streams, regardless of throughput.
Powering Success for Tradition Financial Services
Founded in 1959, Tradition is one of the world leaders in the IDB (Interdealer broker) sector of financial and non-financial products. Diffusion Intelligent Event-Data Platform was selected to deliver real-time pricing data, research reports, trade alerts and risk reports for Tradition’s new cloud-based data and & analytics SaaS offerings. Diffusion scales to meet the trade volumes Tradition needs, while lowering compute and network capacity demands.

Yann L’Huillier
Group CIO, Tradition

The Value of the Diffusion Intelligent Data Platform
Diffusion’s cache subscription aggregates all requisite functionality so you only need one solution to consume, transform and deliver.
final Session session = Diffusion.sessions() .principal("control") .password("password") .open("ws://localhost:8080");
const session = await diffusion.connect({ host : "localhost", port : 8080, principal : "control", password : "password" });
var session = Diffusion.Sessions .Principal("control") .Password("password") .Open("ws://localhost:8080");
DIFFUSION_ERROR_T error = { 0 }; SESSION_T *session = session_create("ws://localhost:8080", "control", "password", NULL, NULL, &error);
async with diffusion.Session( url = "ws://localhost:8080", principal = "control", credentials = diffusion.Credentials("password") ) as session;
let url = URL(string: "ws://localhost:8080")! let credentials = PTDiffusionCredentials(password: "password") let config = PTDiffusionSessionConfiguration( principal: "control", credentials: credentials ) PTDiffusionSession.open(with: url, configuration: config) { (session, error) in self.session = session }
session.feature(TopicControl.class).add("my-topic", new TopicSpecification(TopicType.STRING)); session.feature(TopicUpdate.class).set("my-topic", String.class, "Hello, world");
session.topics.add('my-topic', diffusion.topics.TopicType.STRING); session.topicUpdate.set('my-topic', diffusion.datatypes.string(), 'Hello, world');
session.TopicControl.AddTopicAsync("my-topic", TopicType.STRING); session.TopicUpdate.SetAsync("my-topic", "Hello, World");
TOPIC_SPECIFICATION_T *spec = topic_specification_init(TOPIC_TYPE_STRING); add_topic_from_specification(session, "my-topic", spec, NULL); BUF_T *buf = buf_create(); write_diffusion_string_value("Hello, world", buf); TOPIC_UPDATE_SET_PARAMS_T topic_update_params = { .topicPath = "my-topic", .datatype = DATATYPE_STRING, .update = buf }; buf_free(buf);
await session.topics.add_topic("my-topic", diffusion.datatypes.STRING) await session.topics.set_topic("my_topic", "Hello, world", specification=diffusion.datatypes.STRING)
self.session!.topicControl.add( withTopicPath: "my-topic", type: PTDiffusionTopicType.String, value: nil, completionHandler: self.errorHandler ) self.session!.topicUpdateControl.updater.update( withTopicPath: "my-topic", value: "Hello, World", completionHandler: self.errorHandler)
curl --request POST \ --header "authorization: Bearer {token}" \ --header "content-type: application/json" \ --data '{"path" : "my-topic", "type" : "string"}' \ http://localhost:8080/topics/add curl --request POST \ --header "authorization: Bearer {token}" \ --header "content-type: application/json" \ --data '{"path": "my-topic", "type": "string", "value": "Hello, world"}' \ http://localhost:8080/topics/set
session.feature(Topics.class).addStream("my-topic", String.class, new Topics.ValueStream.Default() { @Override public void onValue(String topicPath, TopicSpecification topicSpec, String oldValue, String newValue) { System.out.println("New value for " + topicPath + ": " + newValue); } }); session.feature(Topics.class).subscribe("my-topic");
session.addStream('my-topic', diffuison.datatypes.string()) .on('value', (path, spec, newValue, oldValue) => { console.log(`New value for ${path}: ${newValue}`); }); session.select('my-topic');
class MyStream : IValueStream { public void OnValue(string topicPath, ITopicSpecification spec, string oldValue, string newValue) { Console.WriteLine($"New value for {topicPath}: {newValue}"); } } var myStream = new MyStream(); session.Topics.AddStream("my-topic", myStream); session.Topics.SubscribeAsync("my-topic");
static int on_value( const char* topic_path, const TOPIC_SPECIFICATION_T *const specification, const DIFFUSION_DATATYPE datatype, const DIFFUSION_VALUE_T *const old_value, const DIFFUSION_VALUE_T *const new_value, void *context) { char *str; read_value_string_value(new_value, &str, NULL); printf("New value for %s: %s\n", topic_path, str); free(str); return HANDLER_SUCCESS; } ... VALUE_STREAM_T value_stream = { .datatype = DATATYPE_STRING, .on_value = on_value }; add_stream(session, "my_topic", &value_stream); SUBSCRIPTION_PARAMS_T params = { .topic_selector = "my-topic" }; subscribe(session, params);
def on_update(*, old_value, topic_path, new_value): print("New value for " + topic_path + " = " + new_value) value_stream = diffusion.topics.ValueStreamHandler( data_type = diffusion.datatypes.STRING update = on_update ) session.topics.add_value_stream( topic_selector = "my-topic", stream = value_stream) } await session.topics.subscribe("my-topic")
class StreamDelegate: NSObject, PTDiffusionJSONValueStreamDelegate { func diffusionStream(_ stream: PTDiffusionValueStream, didUpdateTopicPath topicPath: String, specification: PTDiffusionTopicSpecification, oldString: String?, newString: String) { print("New value for \(topicPath): \(newString)") } } // ... let delegate = StreamDelegate() let selector = PTDiffusionTopicSelector(expression: "my-topic") let stream = PTDiffusionJSON.valueStream(with: self.delegate) self.session!.topics.add(stream, with: "my-topic", error: ()) self.session!.topics.subscribe(withTopicSelectorExpression: "my-topic") { error in self.errorHandler(nil, error) }
curl --request GET \ --header "authorization: Bearer {token}" \ --data '{"path": "my-topic", "type": "string", "value": "Hello, world"}' \ http://localhost:8080/topics/fetch?path=my-topic