If you work on an app that processes time-series data coming from devices & provides real-time rich insights regarding them, you may have come across below scalability requirements for your app:
There’s a common perception that only NoSQL databases can provide that kind of scale. Citus on Azure breaks these barriers by enabling you to build large scale IoT apps within the PostgreSQL ecosystem.
This blog will present an end-to-end reference architecture for building a scalable IoT app. Hyperscale (Citus) (a.k.a. Citus on Azure) within the Postgres managed service on azure is the database of choice. The blog also covers certain best practices & considerations you can keep in mind while building your IoT app.
Okay, let’s get started. First let us go through a high level walk-through of the various tools and services that come together in creating a reference architecture that enables you to build a scalable IoT app.
The main components of this reference architecture include:
Azure IoT Hub is a managed service in Azure which acts as a central message hub for bi-directional communication between your IoT application and the devices. You can use Azure IoT Hub to build IoT solutions with reliable and secure communications between your IoT devices and a cloud-hosted solution backend. You can connect virtually any device to IoT Hub.
We’ve created a GitHub repo that lets you generate thousands of devices that are registered with IoT hub and send messages at scale. This repo enables you to simulate device data at scale and test IoT Hub. You can also plug scripts inside this repo as a part of your CI/CD pipelines while building your IoT app.
Azure Databricks can be used for processing & ingesting device data coming from Azure IoT Hub. Azure Databricks is a fault-tolerant stream processing engine with Kafka compatibility for continuous processing. You can use Spark Structured Streaming for both real-time ingestion and micro-batch processing of data.
The following code snippets show Databricks fetching device data from IoT Hub, processing it & ingesting it into Hyperscale (Citus) on Azure.
import org.apache.spark.sql.functions._
val EventHubs_ConnectionString = "Endpoint=sb://***.servicebus.windows.net/;SharedAccessKeyName=twitter;SharedAccessKey= <shared_access_key>";
val constring = "\"$ConnectionString\"";
val splchar = "\"";
val EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=" + "" + constring + "" + " password=" + splchar + EventHubs_ConnectionString + splchar + ";";
val df = spark
.readStream
.format("kafka") .option("subscribe", "<your topic>")
.option("kafka.bootstrap.servers", "<iot_hub_name>.servicebus.windows.net:9093")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", "$Default")
.option("failOnDataLoss", "true")
.load() //Dataframe is loaded untill here
//Create your postgresql configurations
def postgresqlSinkOptions: Map[String, String] = Map(
"dbtable" -> "public.devicedata", // table
"user" -> "citus", // Database username
"password" -> "<database password>", // Password
"driver" -> "org.postgresql.Driver",
"url" -> "jdbc:postgresql://<citus_server_group_name>.postgres.database.azure.com:5432/postgres",
"sslmode" -> "require"
)
//Writing device data to Citus/Postgres
df.writeStream
.foreachBatch { (batchdf: DataFrame, _: Long) =>
batchdf.write
.format("jdbc")
.options(postgresqlSinkOptions)
.mode("Append")
.save()
}
.start()
.awaitTermination()
The above code triggers the Postgres INSERT command for ingesting data in batches. With batch INSERT, you can expect throughputs of a few 100s of thousand rows ingested per second. However if you wanted larger throughputs, you can use the Postgres COPY command.
COPY command lets you micro-batch rows periodically (can be as low as every 30 seconds) and ingest data in near real-time. With COPY, some of our customers have seen throughputs up to a few millions of rows ingested per second. However the throughput depends on the data-model (row width) & hardware configuration.
NOTE: Databricks’ JDBC based Postgres driver natively doesn’t support the COPY command. So we wrote a simple scala wrapper that extends the COPY command functionality to Databricks.
Citus is PostgreSQL extended with the superpower of distributed tables. This superpower enables you to build highly scalable relational apps. You can start building apps on a single node server group, the same way you would with PostgreSQL. As your app's scalability and performance requirements grow, you can seamlessly scale to multiple nodes by transparently distributing your tables.
Below are a few best practices you can follow while building your app with Hyperscale (Citus):
Power BI with real-time streaming lets you stream data and update dashboards in real time. Any visual or dashboard created in Power BI can display and update real-time data and visuals. As shown in the above reference architecture, you can use serverless Azure functions to periodically query device data in Hyperscale (Citus) and publish that to Power BI for real-time visualizations.
Grafana can be used for building time series dashboards or displaying Event Log (Operational Log) metrics based on time scale. By default, Grafana comes with PostgreSQL integration. For managed Grafana on the cloud you can use Azure Managed Grafana service, which recently went preview.
To get a hands-on experience of Citus within 5 mins, you can walk through this Azure quickstart. As Citus is fully open source, you can easily download and test Citus on your local machine. Hope you found the blog useful, sharing a few informative links related to building time series apps with Citus.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.