Binary transport
Command schema
All the commands are represented as a binary message. The message consists of 3 parts: length, code and payload:
- length - 4-byte integer (u32) which represents the total length: code (4 bytes) + payload (N bytes)
- code - 4-byte integer (u32) which represents the command code
- payload - binary data of N bytes length
For example, if the payload is 100 bytes, the length will have a value of 104 (100 bytes for payload + 4 const bytes for the code). The message as whole will have 108 (4 + 4 + 100) bytes size.
+-----------------------------------------------------------+
| | | |
| LENGTH | CODE | PAYLOAD |
| | | |
+-----------------------------------------------------------+
| 4 bytes | 4 bytes | N bytes |
Command codes
PING = 1
GET_STATS = 10
GET_ME = 20
GET_CLIENT = 21
GET_CLIENTS = 22
POLL_MESSAGES = 100
SEND_MESSAGES = 101
GET_CONSUMER_OFFSET = 120
STORE_CONSUMER_OFFSET = 121
GET_STREAM = 200
GET_STREAMS = 201
CREATE_STREAM = 202
DELETE_STREAM = 203
GET_TOPIC = 300
GET_TOPICS = 301
CREATE_TOPIC = 302
DELETE_TOPIC = 303
CREATE_PARTITIONS = 402
DELETE_PARTITIONS = 403
GET_CONSUMER_GROUP = 600
GET_CONSUMER_GROUPS = 601
CREATE_CONSUMER_GROUP = 602
DELETE_CONSUMER_GROUP = 603
JOIN_CONSUMER_GROUP = 604
LEAVE_CONSUMER_GROUP = 605
Shared
Consumer
pub struct Consumer {
pub kind: ConsumerKind,
pub id: u32,
}
pub enum ConsumerKind {
Consumer, // Value = 1 (default)
ConsumerGroup // Value = 2
}
The Consumer
is a simple structs which represents the type of the consumer. It can be either a consumer or a consumer group. This type is used when interacting with the messages related commands data such as PollMessages
, GetConsumerOffset
and StoreConsumerOffset
.
Serialization
kind
- 1 byteid
- 4 bytes
Identifier
pub struct Identifier {
pub kind: IdKind,
pub length: u8,
pub value: Vec<u8>
}
pub enum IdKind {
Numeric, // Value = 1 (default)
String // Value = 2
}
The Identifier
is a struct which represents the identifier of the stream or topic. It consists of the kind
(numeric or string), length
(length of the identifier value) and value
(actual value of the identifier). The value
is a vector of bytes, which can be either a numeric value (e.g. 1 encoded as const 4 bytes for u32 type) or a string (e.g. "my-stream") with variable length of UTF-8 string, maximum 255 bytes (chars). The total length varies between 3 (string type with a single char) and 257 bytes (string type with 255 chars).
Serialization
kind
- 1 bytelength
- 1 bytevalue
- N bytes
PolledMessages
pub struct PolledMessages {
pub partition_id: u32,
pub current_offset: u64,
pub messages: Vec<Message>,
}
The PolledMessages
is a struct returned by the PollMessages
command. It consists of the partition_id
, current_offset
and messages
fields. The messages
field is a vector of Message
structs.
Serialization
partition_id
- 4 bytescurrent_offset
- 8 bytesmessages_count
- 4 bytes (hidden field)messages
- N bytes
Message
pub struct Message {
pub offset: u64,
pub state: MessageState,
pub timestamp: u64,
pub id: u128,
pub checksum: u32,
pub headers: Option<HashMap<HeaderKey, HeaderValue>>,
pub length: u32,
pub payload: Bytes,
}
The Message
consists of the offset
(offset of the message in the partition), state
(state of the message), timestamp
(timestamp of the message), id
(unique identifier of the message), checksum
(checksum of the message), headers
(optional headers of the message), length
(length of the payload) and payload
(actual payload of the message).
Serialization
offset
- 8 bytesstate
- 1 bytetimestamp
- 8 bytesid
- 16 byteschecksum
- 4 bytesheaders
- N byteslength
- 4 bytespayload
- N bytes
ConsumerOffsetInfo
The ConsumerOffsetInfo
is a struct returned by the GetConsumerOffset
command. It consists of the partition_id
, current_offset
and stored_offset
fields.
pub struct ConsumerOffsetInfo {
pub partition_id: u32,
pub current_offset: u64,
pub stored_offset: u64,
}
Serialization
partition_id
- 4 bytescurrent_offset
- 8 bytesstored_offset
- 8 bytes
Streams
Get stream
pub struct GetStream {
pub stream_id: Identifier
}
Code: 200
Serialization
stream_id
- 3-257 bytes
Get streams
pub struct GetStreams {}
Code: 201
Serialization
- Empty bytes
Create stream
pub struct CreateStream {
pub stream_id: u32,
pub name: String
}
Code: 202
Serialization
stream_id
- 3-257 bytesname_length
- 1 byte (hidden field)name
- 1-255 bytes
Delete stream
pub struct DeleteStream {
pub stream_id: Identifier
}
Code: 203
Serialization
stream_id
- 3-257 bytes
Topics
Get topic
pub struct GetTopic {
pub stream_id: Identifier,
pub topic_id: Identifier
}
Code: 300
Serialization
stream_id
- 3-257 bytestopic_id
- 3-257 bytes
Get topics
pub struct GetTopics {
pub stream_id: Identifier
}
Code: 301
Serialization
stream_id
- 3-257 bytes
Create topic
pub struct CreateTopic {
pub stream_id: Identifier,
pub topic_id: u32,
pub partitions_count: u32,
pub message_expiry: Option<u32>,
pub name: String,
}
Code: 302
Serialization
stream_id
- 3-257 bytestopic_id
- 3-257 bytespartitions_count
- 4 bytesname_length
- 1 byte (hidden field)name
- 1-255 bytes
Delete topic
pub struct DeleteTopic {
pub stream_id: Identifier,
pub topic_id: Identifier
}
Code: 303
Serialization
stream_id
- 3-257 bytestopic_id
- 3-257 bytes
Partitions
Create partitions
pub struct CreatePartitions {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub partitions_count: u32
}
Code: 402
Serialization
stream_id
- 3-257 bytestopic_id
- 3-257 bytespartitions_count
- 4 bytes
Delete partitions
pub struct DeletePartitions {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub partitions_count: u32
}
Code: 403
Serialization
stream_id
- 3-257 bytestopic_id
- 3-257 bytespartitions_count
- 4 bytes
Messages
Poll messages
pub struct PollMessages {
pub consumer: Consumer,
pub stream_id: Identifier,
pub topic_id: Identifier,
pub partition_id: u32,
pub strategy: PollingStrategy,
pub count: u32,
pub auto_commit: bool,
}
pub struct PollingStrategy {
pub kind: PollingKind,
pub value: u64,
}
pub enum PollingKind {
Offset, // Value = 1 (default)
Timestamp, // Value = 2
First, // Value = 3
Last, // Value = 4
Next, // Value = 5
}
Code: 100
Serialization
consumer
- 5 bytesstream_id
- 3-257 bytestopic_id
- 3-257 bytespartition_id
- 4 bytesstrategy
- 9 bytescount
- 4 bytesauto_commit
- 1 byte
Send messages
pub struct SendMessages {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub partitioning: Partitioning,
pub messages: Vec<Message>
}
pub struct Partitioning {
pub kind: PartitioningKind,
pub length: u8,
pub value: Vec<u8>
}
pub enum PartitioningKind {
Balanced, // Value = 1 (default)
PartitionId, // Value = 2
MessagesKey // Value = 3
}
pub struct Message {
pub id: u128,
pub length: u32,
pub payload: Bytes // Memory-optimized vector of bytes
pub headers: Option<HashMap<HeaderKey, HeaderValue>> // Optional headers, see the changelog for more details
}
pub struct HeaderKey(String);
pub struct HeaderValue {
pub kind: HeaderKind,
pub value: Vec<u8>
}
pub enum HeaderKind {
Raw,
String,
Bool,
Int8,
Int16,
Int32,
Int64,
Int128,
Uint8,
Uint16,
Uint32,
Uint64,
Uint128,
Float32,
Float64
}
Code: 101
Serialization
stream_id
- 3-257 bytestopic_id
- 3-257 bytespartitioning
- 3-257 bytesmessages
- 16 + 4 + N bytes per single message
Consumer offsets
Get consumer offset
pub struct GetConsumerOffset {
pub consumer: Consumer,
pub stream_id: Identifier,
pub topic_id: Identifier,
pub partition_id: u32
}
Code: 120
Serialization
consumer
- 5 bytesstream_id
- 3-257 bytestopic_id
- 3-257 bytespartition_id
- 4 bytes
Store consumer offset
pub struct StoreConsumerOffset {
pub consumer: Consumer,
pub stream_id: Identifier,
pub topic_id: Identifier,
pub partition_id: u32,
pub offset: u64
}
Code: 121
Serialization
consumer
- 5 bytesstream_id
- 3-257 bytestopic_id
- 3-257 bytespartition_id
- 4 bytesoffset
- 8 bytes
Consumer groups
Get consumer group
pub struct GetConsumerGroup {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub consumer_group_id: u32
}
Code: 600
Serialization
stream_id
- 3-257 bytestopic_id
- 3-257 bytesconsumer_group_id
- 4 bytes
Get consumer groups
pub struct GetConsumerGroups {
pub stream_id: Identifier,
pub topic_id: Identifier
}
Code: 601
Serialization
stream_id
- 3-257 bytestopic_id
- 3-257 bytes
Create consumer group
pub struct CreateConsumerGroup {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub consumer_group_id: u32
}
Code: 602
Serialization
stream_id
- 3-257 bytestopic_id
- 3-257 bytesconsumer_group_id
- 4 bytes
Delete consumer group
pub struct DeleteConsumerGroup {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub consumer_group_id: u32
}
Code: 603
Serialization
stream_id
- 3-257 bytestopic_id
- 3-257 bytesconsumer_group_id
- 4 bytes
Join consumer group
pub struct JoinConsumerGroup {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub consumer_group_id: u32
}
Code: 604
Serialization
stream_id
- 3-257 bytestopic_id
- 3-257 bytesconsumer_group_id
- 4 bytes
Leave consumer group
pub struct LeaveConsumerGroup {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub consumer_group_id: u32
}
Code: 605
Serialization
stream_id
- 3-257 bytestopic_id
- 3-257 bytesconsumer_group_id
- 4 bytes
System
Get me
pub struct GetMe {}
Code: 20
Serialization
- Empty bytes
Get client
pub struct GetClient {
pub client_id: u32
}
Code: 21
Serialization
client_id
- 4 bytes
Get clients
pub struct GetClients {}
Code: 22
Serialization
- Empty bytes
Get stats
pub struct GetStats {}
Code: 10
Serialization
- Empty bytes