Main Content

kafkaStream

Create connection to event stream in Kafka topic

Since R2022b

    This object requires Streaming Data Framework for MATLAB® Production Server™.

    Description

    The kafkaStream function creates a KafkaStream object that connects to a Kafka® topic and reads and writes event streams from that topic.

    An event consists of three parts:

    • Key — Identifies event source

    • Timestamp — Indicates time at which event occurred

    • Body — Contains event data specified as an unordered set of (name, value) pairs

    After creating a KafkaStream object, use the readtimetable function to read the events into a timetable or the writetimetable function to write a timetable to the stream.

    readtimetable converts events into rows of a timetable. The names in the event body become the timetable column names, the value associated with each name becomes the column value in the event row, and the event timestamp becomes the row timestamp. writetimetable converts rows of a timetable into events in a stream.

    Creation

    Description

    Row-Based Event Window

    ks = kafkaStream(host,port,topic) creates a default KafkaStream object connected to a Kafka topic at the specified hostname and port. This syntax sets the Host, Port, and Topic properties to host, port, and topic, respectively. The object reads 50 stream event rows at a time.

    ks = kafkaStream(host,port,topic,Rows=numevents) creates a KafkaStream object that reads numevents stream event rows at a time.

    example

    Duration-Based Event Window

    ks = kafkaStream(host,port,topic,Duration=timespan) creates a KafkaStream object that reads stream events occurring during the specified timestamp span timespan.

    Additional Options

    ks = kafkaStream(___,propname1,propval1,...,propnameN,propvalN) sets Kafka provider properties using any of the previous syntaxes.

    example

    ks = kafkaStream(___,Name=Value) specifies event stream options using one or more name-value arguments. You can also set properties using name-value arguments. You can use these name-value arguments and properties to specify how events are converted to and from timetables.

    Input Arguments

    expand all

    Number of events in the event window, specified as a positive integer. Rows=numevents specifies the number of rows that a call to the readtimetable function returns. If less than the number of specified rows are available for reading, then readtimetable times out and returns an empty timetable.

    readtimetable does not return until it processes all events in the window, so windows with large row values can block other processes from continuing. To configure a timeout period to prevent blocking, use the ReadLimit property.

    Example: Rows=500 specifies that each call to readtimetable returns a timetable with 500 rows.

    Data Types: single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64

    Timestamp span in the event window, specified as a duration scalar. Duration=timespan determines the events that the readtimetable function returns based on their timestamp. timespan specifies the difference between the last and first timestamps of events in the event window.

    readtimetable does not return until it processes all events in the window, so windows with large durations can block other processes from continuing. To configure a timeout period to prevent blocking, use the ReadLimit property.

    Example: Duration=minutes(1) specifies that each call to readtimetable returns a timetable that has one minute's worth of events, where the timestamp of the last event is no more than one minute later than the timestamp of the first event.

    Data Types: duration

    Name of a Kafka provider property, specified as a character vector or string scalar. Use single or double quotes around propname. Kafka property names always contain at least one dot character, for example, retention.ms. For a list of Kafka properties, see the Kafka documentation: https://kafka.apache.org/documentation/#configuration.

    The value of the property, propval, must follow the property name. Specify the property name and its corresponding value as a comma-separated pair.

    Example: kafkaStream(host,port,topic,"security.protocol","SASL_SSL") sets the Kafka configuration property security.protocol to SASL_SSL.

    Value of a Kafka provider property. For a list of Kafka properties and their values, see the Kafka documentation: https://kafka.apache.org/documentation/#configuration.

    The value of the property must follow the property name propname. Specify the property name and its corresponding value as a comma-separated pair. You can specify propval as any supported MATLAB data type, but it must be possible to convert that value to a string.

    Example: kafkaStream(host,port,topic,"sasl.mechanism","SCRAM-SHA-512") sets the value of the Kafka configuration property sasl.mechanism to SCRAM-SHA-512.

    Name-Value Arguments

    Specify optional pairs of arguments as Name1=Value1,...,NameN=ValueN, where Name is the argument name and Value is the corresponding value. Name-value arguments must appear after other arguments, but the order of the pairs does not matter.

    Grace Period

    expand all

    Length of time, in GraceUnit units, to wait for messages in the requested event window, specified as a real scalar or duration scalar. The KafkaStream object waits until the end of the grace period to return events read from the stream. The GracePeriod and GraceUnit arguments together set the GracePeriod property.

    This argument applies only for objects with duration-based event windows, that is, KafkaStream objects created using the timespan argument. For objects created using the numevents argument, the grace period is ignored.

    Example: 10

    Example: minutes(10)

    Data Types: single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64 | duration

    Unit of time for the grace period specified by the GracePeriod name-value argument, specified as one of these values:

    • "Milliseconds"

    • "Seconds"

    • "Minutes"

    • "Hours"

    • "Days"

    The GracePeriod and GraceUnit arguments together set the GracePeriod property.

    The KafkaStream object converts GracePeriod duration scalars to the units specified by GraceUnit. For example, suppose you specify a two-minute grace period using the minutes function but set the units to seconds. The GracePeriod property displays the grace period in seconds.

    ks = kafkaStream(host,port,topic,Duration=minutes(10), ...
                     GracePeriod=minutes(2),GraceUnit="Seconds")
    
    ks = 
    
      KafkaStream with properties:
                ...
                GracePeriod: "120 Seconds"
                ...

    This argument applies only for objects with duration-based event windows, that is, KafkaStream objects created using the timespan argument. For objects created using the numevents argument, the grace period is ignored.

    Data Types: string | char

    Schema

    expand all

    Rules for converting event data to MATLAB data types, specified as a JSON string in event schema format. You can specify an event schema more easily using the ImportOptions property.

    Rules for converting MATLAB data types to event data, specified as a JSON string in event schema format. You can specify an event schema more easily using the ExportOptions property.

    Properties

    expand all

    Hostname of the Kafka server, specified as a character vector or string scalar.

    Example: '144.213.5.7' or 'localhost'

    Data Types: char | string

    Port number of the Kafka server, specified as an integer in the range [0, 65,535].

    Example: 9092

    Kafka topic name, specified as a character vector or string scalar.

    Example: "CoolingFan"

    Data Types: char | string

    Kafka consumer group ID, specified as a character vector or string scalar.

    Multiple Kafka consumers can belong to the same consumer group. In that case, Kafka shares data between the consumers in the group so that no two consumers in the same group ever receive the same messages. By default, every kafkaStream object has a unique consumer group ID, which allows multiple consumers to read from the same topic independently.

    Data Types: char | string

    Strategy to order events in the stream, specified as one of these values:

    • "EventTime" — Order events based on the time that they occur. Ensures event-time chronological order even when events arrive out of order at the Kafka server.

    • "IngestTime" — Order events based on the time that they appear in the stream.

    You cannot set the value of this property after object creation.

    Data Types: string | char

    This property is read-only.

    Time that the KakfaStream object waits for messages, specified as a string scalar of the form "Length Units", where:

    • Length is the length of the grace period, as specified by the GracePeriod argument during object creation.

    • Units is the units of the grace period, as specified by the GraceUnits argument during object creation.

    When you create the object, if you do not specify a grace period, then the GracePeriod property is set to "0 Seconds" (no grace period).

    Example: "10 Minutes"

    Data Types: string

    This property is read-only.

    Event window size, specified as a fixed amount of time (using the timespan argument) or a fixed number of messages (using the numevents argument).

    Data Types: duration | single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64

    Strategy to wait for a response from the stream, specified as one of these values:

    • "Size" — Client prioritizes filling the event window. Using this strategy, the client might wait longer than the RequestTimeout time period as long as it is still receiving the expected number of messages. The default number of messages is 50. If the client receives no messages within the RequestTimeout time period, it no longer waits.

    • "Time" — Client strictly adheres to the RequestTimeout limit, even if it has not received the expected number of messages. RequestTimeout specifies the amount of time the stream object waits between receiving events. If the stream is actively receiving data, it does not time out during that operation.

    Unit of event timestamp, specified as one of these values:

    • "Milliseconds"

    • "Seconds"

    • "Minutes"

    • "Hours"

    • "Days"

    Interpret the event timestamp as the number of corresponding units before or after the UNIX® epoch.

    Data Types: string | char

    Connection and Request Timeouts

    Number of seconds that a client waits for the initial response from the Kafka host, specified as a positive integer.

    Data Types: single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64

    Number of seconds to wait before terminating a request, specified as a positive integer. The wait time includes connecting to the Kafka host as well as data transfer between the Kafka host and the client.

    Data Types: single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64

    Import and Export Options

    Rules for transforming stream events into MATLAB data, specified as an ImportOptions object. This object controls the import of stream events into MATLAB.

    Rules for transforming MATLAB data into stream events, specified as an ExportOptions object. This object controls the export of MATLAB data into streams.

    Flag to indicate whether the export schema is written to the output stream, specified as a logical scalar.

    The schema is embedded in each event, which can significantly increase the size of the event. If downstream applications do not require the schema, set this flag to false to reduce the number of bytes in your stream.

    Data Types: logical

    Event Key and Body Encoding

    Name of the key variable in the event stream, specified as a string scalar or character vector.

    Data Types: string | char

    Character encoding format used to interpret the bits in an event key, specified as one of the following:

    • utf8 — UTF-8 encoding format

    • utf16 — UTF-16 encoding format

    • base64— Base 64 encoding format

    • uint8 — Eight-bit unsigned binary bytes

    If KeyEncoding is utf8 or utf16, then the KeyType property must be text. If KeyEncoding is base64 or uint8, then KeyType must be one of the numeric encoding formats.

    Character encoding scheme used to interpret the bytes in an event key, specified as one of these values:

    • uint8 — One-byte unsigned integer

    • int8 — One-byte signed integer

    • uint16 — Two-byte unsigned integer

    • int16 — Two-byte signed integer

    • uint32 — Four-byte unsigned integer

    • int32 — Four-byte signed integer

    • uint64 — Eight-byte unsigned integer

    • int64 — Eight-byte signed integer

    • single — Single-precision IEEE 754 floating point number

    • double — Double-precision IEEE 754 floating point number

    • text — String

    If KeyType is text, then the KeyEncoding property must be either utf8 or utf16. If KeyType is any of the other numeric encoding formats, then KeyEncoding must be either base64 or uint8.

    Order for storing bits in the event key, specified as one of the following.

    • LittleEndian — Least significant bit is stored first

    • BigEndian — Most significant bit is stored first

    • MatchHost— Bits are stored in the same order as is used by the host computer on which the streaming data framework is running

    • NotApplicable — Not an integer key

    This property is applicable only for integer keys and not applicable to floating point or text keys.

    Character encoding format used to interpret the bits in the event body, specified as one of the following:

    • utf8 — UTF-8 encoding format

    • utf16 — UTF-16 encoding format

    • base64— Base 64 encoding format

    • uint8 — Eight-bit unsigned binary bytes

    This property determines the size and encoding of the bytes used in the event body, which are in the format specified by BodyFormat.

    Format of bytes in event body, specified as one of the following:

    • JSON — JSON string

    • Array — MATLAB array

    • Text — String data

    • Binary — Binary data

    Depending on the encoding specified by BodyEncoding, bytes can be larger than eight bits.

    Object Functions

    expand all

    readtimetableRead timetable from event stream
    writetimetableWrite timetable to event stream
    seekSet read position in event stream
    previewPreview subset of events from event stream
    identifyingNameEvent stream name
    detectImportOptionsCreate import options based on event stream content
    detectExportOptionsCreate export options based on event stream content
    readeventsRead raw events from Kafka stream without schema processing applied
    flushReset read window boundaries
    stopStop processing event streams from Kafka topic
    loggederrorError information for Kafka stream operation
    createTopicCreate topic in Kafka cluster
    deleteTopicRemove topic from Kafka cluster
    categoryListKafka stream provider property list
    getProviderPropertiesKafka stream configuration property data
    setProviderPropertiesSet properties specific to Kafka configuration
    isPropertyDetermine if Kafka stream provider property is set

    Examples

    collapse all

    Assume that you have a Kafka server running at the network address kafka.host.com:9092 that has a topic CoolingFan.

    Assume that the Kafka host is configured to use SSL. To configure SSL communication between the Kafka host and the client, provide SSL configuration settings when creating an object for reading and writing to the Kafka topic.

    ks = kafkaStream("kafka.host.com",9092,"CoolingFan", ...
                    "security.protocol","SASL_SSL", ...
                    "ssl.truststore.type","PEM", ...
                    "ssl.truststore.location","prodserver.pem")
    ks = 
    
      KafkaStream with properties:
    
                      Topic: "CoolingFan"
                      Group: "da576775-49c9-4de3-9955-2bdd9f963aa0"
                      Order: EventTime
                       Host: "kafka.host.com"
                       Port: 9092
          ConnectionTimeout: 30
             RequestTimeout: 61
              ImportOptions: "Import to MATLAB types"
              ExportOptions: "Source: function eventSchema"
              PublishSchema: "true"
                 WindowSize: 50
                KeyVariable: "key"
                KeyEncoding: "utf16"
                    KeyType: "text"
               KeyByteOrder: "BigEndian"
               BodyEncoding: "utf8"
                 BodyFormat: "JSON"
                  ReadLimit: "Size"
        TimestampResolution: "Milliseconds"

    Confirm which properties are set.

    props = getProviderProperties(ks);
    unique({props.name}')
    ans =
    
      7×1 cell array
    
        {'auto.offset.reset'      }
        {'retention.ms'           }
        {'sasl.jaas.config'       }
        {'sasl.username'          }
        {'security.protocol'      }
        {'ssl.truststore.location'}
        {'ssl.truststore.type'    }

    Assume that you have a Kafka server running at the network address kafka.host.com:9092 that has a topic CoolingFan.

    Create an object connected to the CoolingFan topic and request only 10 messages instead of the default.

    ks = kafkaStream("kafka.host.com",9092,"CoolingFan",Rows=10)
    ks = 
    
      KafkaStream with properties:
    
                      Topic: "CoolingFan"
                      Group: "da576775-49c9-4de3-9955-2bdd9f963aa0"
                      Order: EventTime
                       Host: "kafka.host.com"
                       Port: 9092
          ConnectionTimeout: 30
             RequestTimeout: 61
              ImportOptions: "Import to MATLAB types"
              ExportOptions: "Source: function eventSchema"
              PublishSchema: "true"
                 WindowSize: 10
                KeyVariable: "key"
                KeyEncoding: "utf16"
                    KeyType: "text"
               KeyByteOrder: "BigEndian"
               BodyEncoding: "utf8"
                 BodyFormat: "JSON"
                  ReadLimit: "Size"
        TimestampResolution: "Milliseconds"

    Use the object to read 10 messages from the event stream into a timetable.

    tt = readtimetable(ks)
    tt =
    
      10×11 timetable
    
             timestamp          vMotor    wMotor    Tmass     
        ____________________    ______    ______    ______    
    
        31-Oct-2020 00:00:00    1.0909         0        25            
        31-Oct-2020 00:00:00    1.1506     100.5     25.17            
        31-Oct-2020 00:00:00    1.1739     190.9    25.223             
        31-Oct-2020 00:00:00    1.1454    330.61     25.15             
        31-Oct-2020 00:00:00    1.1346    382.77    25.122           
        31-Oct-2020 00:00:00    1.1287    420.88    25.106             
        31-Oct-2020 00:00:00    1.1253    454.55    25.096             
        31-Oct-2020 00:00:00    1.1232     478.1     25.09            
        31-Oct-2020 00:00:00    1.1217    500.16    25.086    ...        

    Version History

    Introduced in R2022b