Post

Ingest and route compressed data from EventHub to ADX

Azure Data Explorer a.k.a ADX, a not so well known, data analytics service designed to help users unlock valuable insights from vast amounts of raw data. Built by Microsoft, Azure Data Explorer (ADX) is a powerful, fast, and highly scalable tool that excels in real-time and complex event processing, enabling users to analyze large volumes of structured, semi-structured, and unstructured data with exceptional speed and efficiency. Its integration with a wide array of Azure services makes it a versatile and invaluable asset for any data-centric enterprise.

When to use ADX

  1. Real-time Data Ingestion and Analysis:
    • You need to perform real-time data ingestion from various sources such as logs, telemetry data, and time-series data.
    • Quick and responsive querying for fresh data is critical.
  2. Complex, Ad-hoc Queries:
    • You require complex querying capabilities, including full-text search, pattern matching, time series analysis, and advanced analytics.
    • You need a fast response for ad-hoc and iterative queries.
  3. Interactive Data Exploration:
    • Users need to explore large datasets interactively, with the ability to quickly drill down into data.
    • The requirement includes data visualization, dashboards, and interactive analytics.
  4. Optimized for Log and Time-Series Data:
    • Your data is predominantly log, event, or time-series data.
    • Data compression, indexing, and retention policies specific to these types of data are essential.

Environment Setup

Event Hub Creation

Azure Event Hub is a highly scalable data streaming platform and event ingestion service designed to handle millions of events per second, making it an ideal solution for big data and real-time analytics. By acting as a buffer between event producers and event consumers, EventHub effectively decouples the process of information sending. This decoupling allows producers to send data without worrying about the readiness or availability of consumers, and vice versa. Producers can continuously push data into EventHub, where it is temporarily stored and made available for consumers to process at their own pace. This architecture not only enhances system reliability and scalability but also simplifies the development of complex data processing pipelines, enabling seamless integration across various applications and services.

Creation is straight forward from Azure Portal

  • Create an Event Hub Namespace
    Search for Event Hubs and in the upper left corner select + Create to create a new Namespace. New Event Hub Namespace New Event Hub Namespace creation

    Select Standard pricing tier or above, since a dedicated consumer group is needed

  • Create an Event Hub
    When deployment is complete, go to the resource and the upper left corner select + Event Hub. New Event Hub New Event Hub creation

  • Create a new Consumer group
    Select the newly created Event Hub (having your Namespace go to Entities -> Event Hubs and select your Event Hub in the new window). Select the + Consumer group to create a new Consumer group. New Event Hub Namespace New Consumer group creation

    From the docs:

    Consumer group: This logical group of consumer instances reads data from an event hub or Kafka topic. It enables multiple consumers to read the same streaming data in an event hub independently at their own pace and with their own offsets.

ADX Creation

  • Create an ADX Cluster
    Search for Data Explorer and in the upper left corner select + Create to create a new Cluster. New Azure Data Explorer Cluster New Cluster creation

  • Create a Database When deployment is complete, go to the resource and the upper left corner select + Add Database.
    New Database New Database creation When deployment is complete you will see your newly created Database by navigating to Data -> Databases.
    You can execute your queries by selecting and your Database and moving to the query section. You can also visit the Azure Data Explorer portal and work with your cluster from there.

Before creating the sample Tables there is another important concept called Ingestion mappings. In a nutshell ingestion mappings provide a flexible way to handle diverse data formats, such as JSON, CSV, or Avro, ensuring that data is accurately parsed and aligned with the schema of the target table. By leveraging ingestion mappings, users can streamline the data ingestion process, reduce errors, and enhance the efficiency of their data analytics workflows, ultimately enabling more insightful and actionable business intelligence. For the purpose of this demo JSON ingestion mappings will be used.

  • Create two new Tables on your Database
    Right click your Database and select Create table.
    For convenience you can use the following commands that create the Tables and their corresponding mappings (run one after another):
1
2
3
4
5
6
7
8
9
10
11
12
13
//1. Create User Table
.create table User (Name: string, Surname: string, Age: int, IsHuman: bool) 

//2. Create Car Table
.create table Car (Model: string, Year: int, Price: decimal) 

//3. Create User JSON mapping
.create table User ingestion json mapping "User_evh_json_mapping"
'[{"column":"Name","path":"$[\'Name\']","datatype":""},{"column":"Surname","path":"$[\'Surname\']","datatype":""},{"column":"Age","path":"$[\'Age\']","datatype":""},{"column":"IsHuman","path":"$[\'IsHuman\']","datatype":""}]'

//4. Create Car JSON mapping
.create table Car ingestion json mapping "Car_evh_json_mapping"
'[{"column":"Model","path":"$[\'Model\']","datatype":""},{"column":"Year","path":"$[\'Year\']","datatype":""},{"column":"Price","path":"$[\'Price\']","datatype":""}]'

Streaming GZIP data from C#

  • Create a producer client
    The Event Hub Producer Client in C# allows developers to create and send batches of events, ensuring optimal performance and resource utilization. To get started with the Event Hub Producer Client in C#, you should have the Azure.Messaging.EventHubs NuGet package installed in your C# project. Familiarity with asynchronous programming in C# will also be beneficial, as the client leverages async methods to handle event publishing efficiently. With these prerequisites in place, you can begin integrating the Event Hub Producer Client into your applications to enable robust event streaming and data ingestion capabilities.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class EvenHubFeeder : IFeed
{
    private EventHubProducerClient _producerClient;

    public EvenHubFeeder(IConfiguration configuration)
    {
        EventHubProducerClientOptions producerOptions = new EventHubProducerClientOptions
        {
            RetryOptions = new EventHubsRetryOptions
            {
                Mode = EventHubsRetryMode.Exponential,
                MaximumDelay = TimeSpan.FromMilliseconds(double.Parse(configuration["MaximumDelayInMs"])),
                MaximumRetries = int.Parse(configuration["MaximumRetries"])
            },
            ConnectionOptions = new EventHubConnectionOptions
            {
                Proxy = string.IsNullOrWhiteSpace(configuration["ProxyAddress"]) ? null : new WebProxy
                {
                    Address = new Uri(configuration["ProxyAddress"])
                }
            }
        };
        _producerClient = new EventHubProducerClient(configuration["EvhConnectionString"], configuration["EvhName"], producerOptions);
    }
}

EventHubProducerClient mimics the HttpClient pattern so as a best practice, when your application pushes events regularly you should cache and reuse the the EventHubProducerClient for the lifetime of your application.

  • Compress the payload data
    GZIP in .NET is a widely-used compression algorithm that allows developers to efficiently compress and decompress data, reducing the size of files and streams for storage or transmission. The .NET framework provides built-in support for GZIP through classes such as GZipStream in the System.IO.Compression namespace. These classes enable developers to easily apply GZIP compression to data streams, making it straightforward to compress data before sending it over a network or to decompress data received from a compressed source. By leveraging GZIP in .NET, applications can achieve significant performance improvements in terms of bandwidth usage and storage efficiency, while maintaining compatibility with other systems that support the GZIP format.
1
2
3
4
5
6
7
8
9
10
11
12
13
private static byte[] CompressJsonData(string jsonData)
{
    byte[] byteArray = Encoding.UTF8.GetBytes(jsonData);

    using (var memoryStream = new MemoryStream())
    {
        using (var gzipStream = new GZipStream(memoryStream, CompressionLevel.Optimal))
        {
            gzipStream.Write(byteArray, 0, byteArray.Length);
        }
        return memoryStream.ToArray();
    }
}
  • Create EventData to add to the batch
    The EventData class in C# is a fundamental component of the Azure.Messaging.EventHubs library, designed to encapsulate the data payload that you wish to send to an Azure Event Hub. Each instance of EventData represents a single event, containing the event’s body as a byte array, along with optional properties and system properties that can be used for metadata and routing purposes.
1
2
3
4
5
6
7
8
9
10
11
12
private static EventData CreateEventDataFromChange<T>(T change, string tableName, string mappingName)
{
    string serializedChange = JsonConvert.SerializeObject(change);
    byte[] compressedPayloadBytes = CompressJsonData(serializedChange);
    var eventData = new EventData(compressedPayloadBytes);
    
    //properties required to route the data to ADX
    eventData.Properties.Add("Table", tableName); //must match the table name, case sensitive
    eventData.Properties.Add("Format", "JSON"); 
    eventData.Properties.Add("IngestionMappingReference", mappingName); //must match the json mapping, case sensitive
    return eventData;
}

System properties expansion is not supported on Event Hub ingestion of compressed messages.

  • Send batch data to Event Hub
    The EventDataBatch class in C# is a crucial component of the Azure.Messaging.EventHubs library, designed to optimize the process of sending multiple events to an Azure Event Hub. This class allows developers to group a collection of EventData instances into a single, manageable batch, ensuring that the events are transmitted efficiently and within the size constraints imposed by the Event Hub service. By using EventDataBatch, you can maximize throughput and minimize the number of network operations required to send large volumes of event data. The class provides methods to add events to the batch while automatically checking if the batch size exceeds the allowable limit, thus preventing errors and ensuring smooth operation. Utilizing EventDataBatch is essential for applications that need to handle high-frequency event generation and transmission, making it a key tool for building scalable and performant event-driven solutions.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
 public async Task FeedAsync<T>(List<T> payload, string tableName, string mappingName)
 {
     var eventBatch = await _producerClient.CreateBatchAsync();

     foreach (var change in payload)
     {
         EventData eventData = CreateEventDataFromChange(change, tableName, mappingName);

         //try add to the batch. If the batch cannot take up more then send 
         if (!eventBatch.TryAdd(eventData))
         {
             if (eventBatch.Count == 0) //if there are no other events this change is too large no way to ever send it this way
             {
                 continue;
             }

             await TrySendBatch(eventBatch);
             eventBatch = await _producerClient.CreateBatchAsync(); //we just send the batch so it has been closed, recreate it

             if (!eventBatch.TryAdd(eventData) && eventBatch.Count == 0) //add current change to batch since it was not send and check if this item is too big
             {
                 continue; //if it is too big we cannot send it anyway. Have a fallback mechanism here
             }
         }
     }
     //send all or the whatever left
     if (eventBatch != null)
     {
         await TrySendBatch(eventBatch);
     }
 }

You can find the complete code available in github. Fill in the EvhConnectionString and EvhName in the appsettings.json file.

Executing the code

Upon code execution we can see that data are successfully sent to Event Hub, but querying our Tables will yield no results.

Incoming Messages Successful Incoming Messages

Enabling automatic ingestion

ADX Data Connection is a powerful feature that enables seamless and continuous data ingestion from various sources. By integrating data from sources such as Azure Event Hubs, Azure IoT Hub, and Azure Blob Storage, ADX Data Connection ensures that your data is always up-to-date and ready for real-time analytics and complex querying. This integration streamlines the process of ingesting large volumes of diverse data, making it an essential tool for building efficient, scalable, and real-time analytics solutions.

To create a Data Connection select your database then Data Connections and in the upper left corner + Add data connection. From the drop down select Event Hub.

New Data Connection Data Connection creation

Ensure

  1. GZIP is selected as data compression
  2. Data routing is Enabled
  3. System-assigned identity is selected

In Event Hub we can now see Outgoing messages Outgoing Messages Successful Outgoing Messages

And in ADX we can see the ingested data Ingested Data Ingested Data

Useful commands
.show ingestion failures //used to display detailed information about any data ingestion errors that have occurred
.show ingestion mappings //retrieves and displays the defined mappings for data ingestion

Ingestion batching policy is a crucial configuration that optimizes the data ingestion process by grouping multiple small data ingestion requests into larger, more efficient batches. This policy helps to enhance throughput, reduce latency, and minimize the overall cost of data ingestion by leveraging the system’s ability to handle large volumes of data more effectively. By fine-tuning parameters such as batch size, maximum delay, and maximum number of items per batch, users can achieve a balanced and efficient data ingestion pipeline tailored to their specific workload requirements.

By default there is a 5 min delay. For testing only purposes we can set up the ingestion to 10 seconds using the following command:
.alter table Car policy ingestionbatching ‘{“MaximumBatchingTimeSpan”:”00:00:10”}’
.alter table User policy ingestionbatching ‘{“MaximumBatchingTimeSpan”:”00:00:10”}’

This post is licensed under CC BY 4.0 by the author.