Options
All
  • Public
  • Public/Protected
  • All
Menu

Class KafkaMessageQueue

Message queue that sends and receives messages via Kafka message broker.

Kafka is a popular light-weight protocol to communicate IoT devices.

Configuration parameters

  • topic: name of Kafka topic to subscribe
  • group_id: (optional) consumer group id (default: default)
  • from_beginning: (optional) restarts receiving messages from the beginning (default: false)
  • read_partitions: (optional) number of partitions to be consumed concurrently (default: 1)
  • autocommit: (optional) turns on/off autocommit (default: true)
  • connection(s):
    • discovery_key: (optional) a key to retrieve the connection from IDiscovery
    • host: host name or IP address
    • port: port number
    • uri: resource URI or connection string with all parameters in it
  • credential(s):
    • store_key: (optional) a key to retrieve the credentials from ICredentialStore
    • username: user name
    • password: user password
  • options:
    • autosubscribe: (optional) true to automatically subscribe on option (default: false)
    • acks (optional) control the number of required acks: -1 - all, 0 - none, 1 - only leader (default: -1)
    • log_level: (optional) log level 0 - None, 1 - Error, 2 - Warn, 3 - Info, 4 - Debug (default: 1)
    • connect_timeout: (optional) number of milliseconds to connect to broker (default: 1000)
    • max_retries: (optional) maximum retry attempts (default: 5)
    • retry_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 30000)
    • request_timeout: (optional) number of milliseconds to wait on flushing messages (default: 30000)

References

  • *:logger:*:*:1.0 (optional) ILogger components to pass log messages
  • *:counters:*:*:1.0 (optional) ICounters components to pass collected measurements
  • *:discovery:*:*:1.0 (optional) IDiscovery services to resolve connections
  • *:credential-store:*:*:1.0 (optional) Credential stores to resolve credentials
  • *:connection:kafka:*:1.0 (optional) Shared connection to Kafka service
see

[[MessageQueue]]

see

[[MessagingCapabilities]]

Example

let queue = new KafkaMessageQueue("myqueue");
queue.configure(ConfigParams.fromTuples(
  "topic", "mytopic",
  "connection.protocol", "tcp"
  "connection.host", "localhost"
  "connection.port", 9092
));

queue.open("123", (err) => {
    ...
});

queue.send("123", new MessageEnvelope(null, "mymessage", "ABC"));

queue.receive("123", (err, message) => {
    if (message != null) {
       ...
       queue.complete("123", message);
    }
});

Hierarchy

Implements

  • any
  • any
  • any
  • any
  • any

Index

Constructors

constructor

  • Creates a new instance of the persistence component.

    Parameters

    • Optional name: string

      (optional) a queue name.

    Returns KafkaMessageQueue

Properties

Protected _acks

_acks: number = -1

Protected _autoCommit

_autoCommit: boolean = true

Protected _autoSubscribe

_autoSubscribe: boolean

Protected _connection

_connection: KafkaConnection

The Kafka connection component.

Protected _dependencyResolver

_dependencyResolver: DependencyResolver = new DependencyResolver(KafkaMessageQueue._defaultConfig)

The dependency resolver.

Protected _fromBeginning

_fromBeginning: boolean

Protected _groupId

_groupId: string

Protected _logger

_logger: CompositeLogger = new CompositeLogger()

The logger.

Protected _messages

_messages: MessageEnvelope[] = []

Protected _readPartitions

_readPartitions: number = 1

Protected _receiver

_receiver: IMessageReceiver

Protected _subscribed

_subscribed: boolean

Protected _topic

_topic: string

Methods

abandon

  • abandon(message: MessageEnvelope, callback: function): void
  • Returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue.

    Important: This method is not supported by Kafka.

    Parameters

    • message: MessageEnvelope

      a message to return.

    • callback: function

      (optional) callback function that receives an error or null for success.

        • (err: any): void
        • Parameters

          • err: any

          Returns void

    Returns void

clear

  • clear(correlationId: string, callback: function): void
  • Clears component state.

    Parameters

    • correlationId: string

      (optional) transaction id to trace execution through call chain.

    • callback: function

      callback function that receives error or null no errors occured.

        • (err?: any): void
        • Parameters

          • Optional err: any

          Returns void

    Returns void

close

  • close(correlationId: string, callback?: function): void
  • Closes component and frees used resources.

    Parameters

    • correlationId: string

      (optional) transaction id to trace execution through call chain.

    • Optional callback: function

      callback function that receives error or null no errors occured.

        • (err: any): void
        • Parameters

          • err: any

          Returns void

    Returns void

complete

  • complete(message: MessageEnvelope, callback: function): void
  • Permanently removes a message from the queue. This method is usually used to remove the message after successful processing.

    Important: This method is not supported by Kafka.

    Parameters

    • message: MessageEnvelope

      a message to remove.

    • callback: function

      (optional) callback function that receives an error or null for success.

        • (err: any): void
        • Parameters

          • err: any

          Returns void

    Returns void

configure

  • configure(config: ConfigParams): void
  • Configures component by passing configuration parameters.

    Parameters

    • config: ConfigParams

      configuration parameters to be set.

    Returns void

endListen

  • endListen(correlationId: string): void
  • Ends listening for incoming messages. When this method is call listen unblocks the thread and execution continues.

    Parameters

    • correlationId: string

      (optional) transaction id to trace execution through call chain.

    Returns void

Protected fromMessage

  • fromMessage(message: MessageEnvelope): any
  • Parameters

    • message: MessageEnvelope

    Returns any

Protected getTopic

  • getTopic(): string
  • Returns string

isOpen

  • isOpen(): boolean
  • Checks if the component is opened.

    Returns boolean

    true if the component has been opened and false otherwise.

listen

  • listen(correlationId: string, receiver: IMessageReceiver): void
  • Listens for incoming messages and blocks the current thread until queue is closed.

    see

    [[IMessageReceiver]]

    see

    receive

    Parameters

    • correlationId: string

      (optional) transaction id to trace execution through call chain.

    • receiver: IMessageReceiver

      a receiver to receive incoming messages.

    Returns void

moveToDeadLetter

  • moveToDeadLetter(message: MessageEnvelope, callback: function): void
  • Permanently removes a message from the queue and sends it to dead letter queue.

    Important: This method is not supported by Kafka.

    Parameters

    • message: MessageEnvelope

      a message to be removed.

    • callback: function

      (optional) callback function that receives an error or null for success.

        • (err: any): void
        • Parameters

          • err: any

          Returns void

    Returns void

onMessage

  • onMessage(topic: string, partition: number, msg: any): Promise<void>
  • Parameters

    • topic: string
    • partition: number
    • msg: any

    Returns Promise<void>

open

  • open(correlationId: string, callback?: function): void
  • Opens the component.

    Parameters

    • correlationId: string

      (optional) transaction id to trace execution through call chain.

    • Optional callback: function

      callback function that receives error or null no errors occured.

        • (err: any): void
        • Parameters

          • err: any

          Returns void

    Returns void

peek

  • peek(correlationId: string, callback: function): void
  • Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns null.

    Parameters

    • correlationId: string

      (optional) transaction id to trace execution through call chain.

    • callback: function

      callback function that receives a message or error.

        • (err: any, result: MessageEnvelope): void
        • Parameters

          • err: any
          • result: MessageEnvelope

          Returns void

    Returns void

peekBatch

  • peekBatch(correlationId: string, messageCount: number, callback: function): void
  • Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list.

    Important: This method is not supported by Kafka.

    Parameters

    • correlationId: string

      (optional) transaction id to trace execution through call chain.

    • messageCount: number

      a maximum number of messages to peek.

    • callback: function

      callback function that receives a list with messages or error.

        • (err: any, result: MessageEnvelope[]): void
        • Parameters

          • err: any
          • result: MessageEnvelope[]

          Returns void

    Returns void

readMessageCount

  • readMessageCount(callback: function): void
  • Reads the current number of messages in the queue to be delivered.

    Parameters

    • callback: function

      callback function that receives number of messages or error.

        • (err: any, count: number): void
        • Parameters

          • err: any
          • count: number

          Returns void

    Returns void

receive

  • receive(correlationId: string, waitTimeout: number, callback: function): void
  • Receives an incoming message and removes it from the queue.

    Parameters

    • correlationId: string

      (optional) transaction id to trace execution through call chain.

    • waitTimeout: number

      a timeout in milliseconds to wait for a message to come.

    • callback: function

      callback function that receives a message or error.

        • (err: any, result: MessageEnvelope): void
        • Parameters

          • err: any
          • result: MessageEnvelope

          Returns void

    Returns void

renewLock

  • renewLock(message: MessageEnvelope, lockTimeout: number, callback?: function): void
  • Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.

    Important: This method is not supported by Kafka.

    Parameters

    • message: MessageEnvelope

      a message to extend its lock.

    • lockTimeout: number

      a locking timeout in milliseconds.

    • Optional callback: function

      (optional) callback function that receives an error or null for success.

        • (err: any): void
        • Parameters

          • err: any

          Returns void

    Returns void

send

  • send(correlationId: string, message: MessageEnvelope, callback?: function): void
  • Sends a message into the queue.

    Parameters

    • correlationId: string

      (optional) transaction id to trace execution through call chain.

    • message: MessageEnvelope

      a message envelop to be sent.

    • Optional callback: function

      (optional) callback function that receives error or null for success.

        • (err: any): void
        • Parameters

          • err: any

          Returns void

    Returns void

setReferences

  • setReferences(references: IReferences): void
  • Sets references to dependent components.

    Parameters

    • references: IReferences

      references to locate the component dependencies.

    Returns void

Protected subscribe

  • subscribe(correlationId: string, callback: function): void
  • Parameters

    • correlationId: string
    • callback: function
        • (err: any): void
        • Parameters

          • err: any

          Returns void

    Returns void

Protected toMessage

  • toMessage(msg: any): MessageEnvelope
  • Parameters

    • msg: any

    Returns MessageEnvelope

unsetReferences

  • unsetReferences(): void
  • Unsets (clears) previously set references to dependent components.

    Returns void

Generated using TypeDoc