Ingest and route compressed data from EventHub to ADX
Azure Data Explorer a.k.a ADX, a not so well known, data analytics service designed to help users unlock valuable insights from vast amounts of raw data. Built by Microsoft, Azure Data Explorer (ADX) is a powerful, fast, and highly scalable tool that excels in real-time and complex event processing, enabling users to analyze large volumes of structured, semi-structured, and unstructured data with exceptional speed and efficiency. Its integration with a wide array of Azure services makes it a versatile and invaluable asset for any data-centric enterprise.
When to use ADX
- Real-time Data Ingestion and Analysis:
- You need to perform real-time data ingestion from various sources such as logs, telemetry data, and time-series data.
- Quick and responsive querying for fresh data is critical.
- Complex, Ad-hoc Queries:
- You require complex querying capabilities, including full-text search, pattern matching, time series analysis, and advanced analytics.
- You need a fast response for ad-hoc and iterative queries.
- Interactive Data Exploration:
- Users need to explore large datasets interactively, with the ability to quickly drill down into data.
- The requirement includes data visualization, dashboards, and interactive analytics.
- Optimized for Log and Time-Series Data:
- Your data is predominantly log, event, or time-series data.
- Data compression, indexing, and retention policies specific to these types of data are essential.
Environment Setup
Event Hub Creation
Azure Event Hub is a highly scalable data streaming platform and event ingestion service designed to handle millions of events per second, making it an ideal solution for big data and real-time analytics. By acting as a buffer between event producers and event consumers, EventHub effectively decouples the process of information sending. This decoupling allows producers to send data without worrying about the readiness or availability of consumers, and vice versa. Producers can continuously push data into EventHub, where it is temporarily stored and made available for consumers to process at their own pace. This architecture not only enhances system reliability and scalability but also simplifies the development of complex data processing pipelines, enabling seamless integration across various applications and services.
Creation is straight forward from Azure Portal
Create an Event Hub Namespace
Search for Event Hubs and in the upper left corner select+ Create
to create a new Namespace.New Event Hub Namespace creation
Select Standard pricing tier or above, since a dedicated consumer group is needed
Create an Event Hub
When deployment is complete, go to the resource and the upper left corner select+ Event Hub
.New Event Hub creation
Create a new Consumer group
Select the newly created Event Hub (having your Namespace go toEntities -> Event Hubs
and select your Event Hub in the new window). Select the+ Consumer group
to create a new Consumer group.New Consumer group creation
From the docs:
Consumer group: This logical group of consumer instances reads data from an event hub or Kafka topic. It enables multiple consumers to read the same streaming data in an event hub independently at their own pace and with their own offsets.
ADX Creation
Create an ADX Cluster
Search for Data Explorer and in the upper left corner select+ Create
to create a new Cluster.New Cluster creation
Create a Database When deployment is complete, go to the resource and the upper left corner select
+ Add Database
.
New Database creation When deployment is complete you will see your newly created Database by navigating to
Data -> Databases
.
You can execute your queries by selecting and your Database and moving to the query section. You can also visit the Azure Data Explorer portal and work with your cluster from there.
Before creating the sample Tables there is another important concept called Ingestion mappings. In a nutshell ingestion mappings provide a flexible way to handle diverse data formats, such as JSON, CSV, or Avro, ensuring that data is accurately parsed and aligned with the schema of the target table. By leveraging ingestion mappings, users can streamline the data ingestion process, reduce errors, and enhance the efficiency of their data analytics workflows, ultimately enabling more insightful and actionable business intelligence. For the purpose of this demo JSON ingestion mappings will be used.
- Create two new Tables on your Database
Right click your Database and selectCreate table
.
For convenience you can use the following commands that create the Tables and their corresponding mappings (run one after another):
1
2
3
4
5
6
7
8
9
10
11
12
13
//1. Create User Table
.create table User (Name: string, Surname: string, Age: int, IsHuman: bool)
//2. Create Car Table
.create table Car (Model: string, Year: int, Price: decimal)
//3. Create User JSON mapping
.create table User ingestion json mapping "User_evh_json_mapping"
'[{"column":"Name","path":"$[\'Name\']","datatype":""},{"column":"Surname","path":"$[\'Surname\']","datatype":""},{"column":"Age","path":"$[\'Age\']","datatype":""},{"column":"IsHuman","path":"$[\'IsHuman\']","datatype":""}]'
//4. Create Car JSON mapping
.create table Car ingestion json mapping "Car_evh_json_mapping"
'[{"column":"Model","path":"$[\'Model\']","datatype":""},{"column":"Year","path":"$[\'Year\']","datatype":""},{"column":"Price","path":"$[\'Price\']","datatype":""}]'
Streaming GZIP data from C#
- Create a producer client
The Event Hub Producer Client in C# allows developers to create and send batches of events, ensuring optimal performance and resource utilization. To get started with the Event Hub Producer Client in C#, you should have the Azure.Messaging.EventHubs NuGet package installed in your C# project. Familiarity with asynchronous programming in C# will also be beneficial, as the client leverages async methods to handle event publishing efficiently. With these prerequisites in place, you can begin integrating the Event Hub Producer Client into your applications to enable robust event streaming and data ingestion capabilities.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class EvenHubFeeder : IFeed
{
private EventHubProducerClient _producerClient;
public EvenHubFeeder(IConfiguration configuration)
{
EventHubProducerClientOptions producerOptions = new EventHubProducerClientOptions
{
RetryOptions = new EventHubsRetryOptions
{
Mode = EventHubsRetryMode.Exponential,
MaximumDelay = TimeSpan.FromMilliseconds(double.Parse(configuration["MaximumDelayInMs"])),
MaximumRetries = int.Parse(configuration["MaximumRetries"])
},
ConnectionOptions = new EventHubConnectionOptions
{
Proxy = string.IsNullOrWhiteSpace(configuration["ProxyAddress"]) ? null : new WebProxy
{
Address = new Uri(configuration["ProxyAddress"])
}
}
};
_producerClient = new EventHubProducerClient(configuration["EvhConnectionString"], configuration["EvhName"], producerOptions);
}
}
EventHubProducerClient
mimics theHttpClient
pattern so as a best practice, when your application pushes events regularly you should cache and reuse the theEventHubProducerClient
for the lifetime of your application.
- Compress the payload data
GZIP
in .NET is a widely-used compression algorithm that allows developers to efficiently compress and decompress data, reducing the size of files and streams for storage or transmission. The .NET framework provides built-in support for GZIP through classes such asGZipStream
in theSystem.IO.Compression
namespace. These classes enable developers to easily apply GZIP compression to data streams, making it straightforward to compress data before sending it over a network or to decompress data received from a compressed source. By leveraging GZIP in .NET, applications can achieve significant performance improvements in terms of bandwidth usage and storage efficiency, while maintaining compatibility with other systems that support the GZIP format.
1
2
3
4
5
6
7
8
9
10
11
12
13
private static byte[] CompressJsonData(string jsonData)
{
byte[] byteArray = Encoding.UTF8.GetBytes(jsonData);
using (var memoryStream = new MemoryStream())
{
using (var gzipStream = new GZipStream(memoryStream, CompressionLevel.Optimal))
{
gzipStream.Write(byteArray, 0, byteArray.Length);
}
return memoryStream.ToArray();
}
}
- Create EventData to add to the batch
TheEventData
class in C# is a fundamental component of theAzure.Messaging.EventHubs
library, designed to encapsulate the data payload that you wish to send to an Azure Event Hub. Each instance ofEventData
represents a single event, containing the event’s body as a byte array, along with optional properties and system properties that can be used for metadata and routing purposes.
1
2
3
4
5
6
7
8
9
10
11
12
private static EventData CreateEventDataFromChange<T>(T change, string tableName, string mappingName)
{
string serializedChange = JsonConvert.SerializeObject(change);
byte[] compressedPayloadBytes = CompressJsonData(serializedChange);
var eventData = new EventData(compressedPayloadBytes);
//properties required to route the data to ADX
eventData.Properties.Add("Table", tableName); //must match the table name, case sensitive
eventData.Properties.Add("Format", "JSON");
eventData.Properties.Add("IngestionMappingReference", mappingName); //must match the json mapping, case sensitive
return eventData;
}
System properties expansion is not supported on Event Hub ingestion of compressed messages.
- Send batch data to Event Hub
TheEventDataBatch
class in C# is a crucial component of theAzure.Messaging.EventHubs library
, designed to optimize the process of sending multiple events to an Azure Event Hub. This class allows developers to group a collection of EventData instances into a single, manageable batch, ensuring that the events are transmitted efficiently and within the size constraints imposed by the Event Hub service. By usingEventDataBatch
, you can maximize throughput and minimize the number of network operations required to send large volumes of event data. The class provides methods to add events to the batch while automatically checking if the batch size exceeds the allowable limit, thus preventing errors and ensuring smooth operation. UtilizingEventDataBatch
is essential for applications that need to handle high-frequency event generation and transmission, making it a key tool for building scalable and performant event-driven solutions.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public async Task FeedAsync<T>(List<T> payload, string tableName, string mappingName)
{
var eventBatch = await _producerClient.CreateBatchAsync();
foreach (var change in payload)
{
EventData eventData = CreateEventDataFromChange(change, tableName, mappingName);
//try add to the batch. If the batch cannot take up more then send
if (!eventBatch.TryAdd(eventData))
{
if (eventBatch.Count == 0) //if there are no other events this change is too large no way to ever send it this way
{
continue;
}
await TrySendBatch(eventBatch);
eventBatch = await _producerClient.CreateBatchAsync(); //we just send the batch so it has been closed, recreate it
if (!eventBatch.TryAdd(eventData) && eventBatch.Count == 0) //add current change to batch since it was not send and check if this item is too big
{
continue; //if it is too big we cannot send it anyway. Have a fallback mechanism here
}
}
}
//send all or the whatever left
if (eventBatch != null)
{
await TrySendBatch(eventBatch);
}
}
You can find the complete code available in github. Fill in the
EvhConnectionString
andEvhName
in theappsettings.json
file.
Executing the code
Upon code execution we can see that data are successfully sent to Event Hub, but querying our Tables will yield no results.
Enabling automatic ingestion
ADX Data Connection is a powerful feature that enables seamless and continuous data ingestion from various sources. By integrating data from sources such as Azure Event Hubs, Azure IoT Hub, and Azure Blob Storage, ADX Data Connection ensures that your data is always up-to-date and ready for real-time analytics and complex querying. This integration streamlines the process of ingesting large volumes of diverse data, making it an essential tool for building efficient, scalable, and real-time analytics solutions.
To create a Data Connection select your database then Data Connections and in the upper left corner + Add data connection
. From the drop down select Event Hub
.
Ensure
- GZIP is selected as data compression
- Data routing is Enabled
- System-assigned identity is selected
In Event Hub we can now see Outgoing messages Successful Outgoing Messages
And in ADX we can see the ingested data Ingested Data
Useful commands
.show ingestion failures //used to display detailed information about any data ingestion errors that have occurred
.show ingestion mappings //retrieves and displays the defined mappings for data ingestion
Ingestion batching policy is a crucial configuration that optimizes the data ingestion process by grouping multiple small data ingestion requests into larger, more efficient batches. This policy helps to enhance throughput, reduce latency, and minimize the overall cost of data ingestion by leveraging the system’s ability to handle large volumes of data more effectively. By fine-tuning parameters such as batch size, maximum delay, and maximum number of items per batch, users can achieve a balanced and efficient data ingestion pipeline tailored to their specific workload requirements.
By default there is a 5 min delay. For testing only purposes we can set up the ingestion to 10 seconds using the following command:
.alter table Car policy ingestionbatching ‘{“MaximumBatchingTimeSpan”:”00:00:10”}’
.alter table User policy ingestionbatching ‘{“MaximumBatchingTimeSpan”:”00:00:10”}’