Streams Guide

Everything you need to know about OKMQ streams, messages, and real-time processing

What are Streams?

Streams are append-only logs of messages that provide ordered, real-time message delivery with exactly-once processing. Unlike queues, streams retain all messages and use a single committed offset to track stream position.

Append-Only Log

Messages are stored permanently in order

Committed Offset

Single committed offset tracks stream position

Real-time Processing

Long polling for immediate message delivery

Message Deduplication

Unique message IDs prevent duplicate processing

Streams vs Queues

Understanding when to use streams versus queues is important for choosing the right messaging pattern.

FeatureStreamsQueues
ConsumersSingle consumer with committed offsetSingle consumer per message
Message OrderStrict ordering by offsetFIFO with delivery attempts
AcknowledgmentSingle offset-based commitsExplicit ack/nack per message
Retry HandlingConsumer controls retriesBuilt-in retry strategies
Use CasesEvent sourcing, real-time dataTask processing, work distribution

Stream Message Structure

Stream messages include a unique ID for deduplication and a body, optimized for high-throughput sequential processing with exactly-once delivery.

Simple Structure

  • • ID for deduplication
  • • Body content
  • • Automatic offset assignment
  • • Sequential ordering

Exactly-Once Delivery

  • • Message deduplication by ID
  • • Single committed offset
  • • No duplicate processing
  • • Reliable delivery guarantees

High Performance

  • • Append-only log structure
  • • Batch message production
  • • Long polling support
  • • Real-time processing

Creating Streams

Create a stream by sending a POST request to /streams/ with the stream configuration.


const createStream = async () => {
  const response = await fetch('https://api.okmq.net/streams/', {
    method: 'POST',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json',
    },
    body: JSON.stringify({
      name: 'user-events',
      ttl: 7200  // 2 hours TTL (optional)
    }),
  })
}
ParameterTypeDescriptionDefault
namestringUnique stream name (max 50 characters)Required
ttlnumberMessage time-to-live in secondsNo expiration
Streams are much simpler than queues - they only require a name and optional TTL.

TTL and Cleanup

Set time-to-live (TTL) values to automatically clean up expired messages and prevent stream bloat. TTL is evaluated and expired messages are deleted every minute.

Producing Messages

Stream messages require a unique ID for deduplication and a message body. Each message gets an automatic offset for ordering.


const produceMessage = async (streamName, message) => {
  const response = await fetch(`https://api.okmq.net/streams/${streamName}/messages`, {
    method: 'POST',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json',
    },
    body: JSON.stringify([
      {
        id: 'unique-message-id-123',
        body: JSON.stringify(message)
      }
    ]),
  })

  if (response.ok) {
    console.log('Message produced to stream')
  }
}

// Example usage
await produceMessage('user-events', {
  type: 'user_registered',
  userId: '12345',
  email: 'user@example.com',
  timestamp: new Date().toISOString()
})

Consuming Messages

Stream consumption uses a single committed offset per stream. You must commit the offset after processing messages to avoid reprocessing.


const consumeMessages = async (streamName, limit = 10) => {
  const response = await fetch(
    `https://api.okmq.net/streams/${streamName}/messages?limit=${limit}`,
    {
      headers: {
        authorization: `Bearer ${token}`,
        'content-type': 'application/json'
      }
    }
  )

  if (response.ok) {
    const messages = await response.json()

    // Process messages
    for (const message of messages) {
      console.log(`ID ${message.id}, Offset ${message.offset}: ${message.body}`)

      // Process your message here
      await processMessage(JSON.parse(message.body))
    }

    // Commit the highest offset processed
    if (messages.length > 0) {
      const lastOffset = messages[messages.length - 1].offset
      await commitOffset(streamName, lastOffset)
    }

    return messages
  }
}

const commitOffset = async (streamName, offset) => {
  const response = await fetch(
    `https://api.okmq.net/streams/${streamName}/commit`,
    {
      method: 'POST',
      headers: {
        authorization: `Bearer ${token}`,
        'content-type': 'application/json',
      },
      body: JSON.stringify({ offset }),
    }
  )

  if (response.ok) {
    console.log(`Committed offset ${offset}`)
  }
}
Important: You must commit offsets after processing messages, otherwise you will receive the same messages again.

Real-time with Long Polling

Stream consumption supports long polling - the request will wait for new messages if none are immediately available.


const streamProcessor = async (streamName) => {
  while (true) {
    try {
      // Long polling request - will wait up to 30 seconds for new messages
      const messages = await consumeMessages(streamName, 10)

      if (messages.length === 0) {
        // No new messages, continue polling
        continue
      }

      console.log(`Received ${messages.length} messages`)

      // Process and commit as before...

    } catch (error) {
      console.error('Polling error:', error)
      // Wait before retrying
      await new Promise(resolve => setTimeout(resolve, 1000))
    }
  }
}

// Start processing
streamProcessor('user-events')

Message Ordering & Offsets

Stream messages are strictly ordered by their offset. Each message gets an incrementing offset number that represents its position in the stream.


// Messages are delivered in offset order
const messages = [
  { id: 'msg1', offset: 1, body: '{"event":"user_signup","userId":"123"}' },
  { id: 'msg2', offset: 2, body: '{"event":"user_login","userId":"123"}' },
  { id: 'msg3', offset: 3, body: '{"event":"page_view","userId":"123"}' }
]

// Process in order
for (const message of messages) {
  console.log(`Processing offset ${message.offset}: ${message.id}`)
  
  // Your processing logic here
  const event = JSON.parse(message.body)
  await handleEvent(event)
}

// Commit up to offset 3
await commitOffset('user-activity', 3)

// Next consumption will start from offset 4

Offset Behavior

When you commit offset N, your next consumption will start from offset N+1. The committed offset represents the last message you've successfully processed.

Common Use Cases

Streams are ideal for scenarios where you need exactly-once processing, ordered delivery, and offset-based consumption.

Event Sourcing

Store all events as an immutable log for rebuilding application state

  • • User actions and state changes
  • • Audit trails and compliance
  • • Replay events for debugging

Real-time Analytics

Process live data streams for dashboards and monitoring

  • • User behavior tracking
  • • System metrics collection
  • • Live dashboard updates

Data Integration

Sync data between multiple systems and services

  • • Database change streams
  • • Service-to-service communication
  • • Data warehouse ETL

Financial Processing

Process transactions with exactly-once guarantees

  • • Payment processing
  • • Account balance updates
  • • Transaction logs
  • • Regulatory compliance

Deleting Messages from Streams

Delete messages from a stream for cleanup and storage management. You can delete all messages, delete messages up to a specific offset, or rely on TTL for automatic cleanup while keeping the stream itself.

Complete Cleanup

Remove all messages from the stream while keeping the stream structure

Offset-based Cleanup

Delete messages up to a specific offset for rolling window cleanup

TTL-based Cleanup

Automatic deletion based on message age using stream TTL settings


// Delete all messages from a stream
const deleteAllMessages = async (streamName) => {
  const response = await fetch(`https://api.okmq.net/streams/${streamName}/messages`, {
    method: 'DELETE',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })

  if (response.ok) {
    console.log(`All messages deleted from stream ${streamName}`)
  }
}

// Delete messages up to a specific offset (exclusive)
const deleteMessagesUpToOffset = async (streamName, offset) => {
  const response = await fetch(`https://api.okmq.net/streams/${streamName}/messages?offset=${offset}`, {
    method: 'DELETE',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })

  if (response.ok) {
    console.log(`Messages deleted up to offset ${offset} from stream ${streamName}`)
  }
}

// Example: Rolling window cleanup - keep only last 1000 messages
const cleanupOldMessages = async (streamName) => {
  // Get current stream info to find the latest offset
  const streamResponse = await fetch(`https://api.okmq.net/streams/${streamName}`, {
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })
  
  const streamInfo = await streamResponse.json()
  const currentOffset = streamInfo.offset
  
  // Calculate offset to keep last 1000 messages
  const deleteUpToOffset = Math.max(0, currentOffset - 1000)
  
  if (deleteUpToOffset > 0) {
    await deleteMessagesUpToOffset(streamName, deleteUpToOffset)
    console.log(`Cleaned up messages, kept last 1000 messages`)
  }
}
Message Deletion Notes
  • • Deleted messages cannot be recovered
  • • Offset-based deletion is exclusive (messages with offset >= specified value are kept)
  • • Stream structure and committed offset position remain unchanged
  • • TTL provides automatic cleanup - messages are deleted when they exceed the stream's configured TTL
  • • Useful for implementing data retention policies and storage cleanup

Deleting Streams

Delete a stream and all its messages by sending a DELETE request to the stream endpoint.


const deleteStream = async (streamName) => {
  const response = await fetch(`https://api.okmq.net/streams/${streamName}`, {
    method: 'DELETE',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json',
    }
  })

  if (response.ok) {
    console.log(`Stream ${streamName} deleted successfully`)
  }
}
Warning: Deleting a stream permanently removes all messages. This action cannot be undone.

Next Steps

Learn about queue-based task processing and acknowledgment patterns

Queues Documentation