Everything you need to know about OKMQ streams, messages, and real-time processing
Streams are append-only logs of messages that provide ordered, real-time message delivery with exactly-once processing. Unlike queues, streams retain all messages and use a single committed offset to track stream position.
Messages are stored permanently in order
Single committed offset tracks stream position
Long polling for immediate message delivery
Unique message IDs prevent duplicate processing
Understanding when to use streams versus queues is important for choosing the right messaging pattern.
Feature | Streams | Queues |
---|---|---|
Consumers | Single consumer with committed offset | Single consumer per message |
Message Order | Strict ordering by offset | FIFO with delivery attempts |
Acknowledgment | Single offset-based commits | Explicit ack/nack per message |
Retry Handling | Consumer controls retries | Built-in retry strategies |
Use Cases | Event sourcing, real-time data | Task processing, work distribution |
Stream messages include a unique ID for deduplication and a body, optimized for high-throughput sequential processing with exactly-once delivery.
Create a stream by sending a POST request to /streams/
with the stream configuration.
const createStream = async () => {
const response = await fetch('https://api.okmq.net/streams/', {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json',
},
body: JSON.stringify({
name: 'user-events',
ttl: 7200 // 2 hours TTL (optional)
}),
})
}
Parameter | Type | Description | Default |
---|---|---|---|
name | string | Unique stream name (max 50 characters) | Required |
ttl | number | Message time-to-live in seconds | No expiration |
Set time-to-live (TTL) values to automatically clean up expired messages and prevent stream bloat. TTL is evaluated and expired messages are deleted every minute.
Stream messages require a unique ID for deduplication and a message body. Each message gets an automatic offset for ordering.
const produceMessage = async (streamName, message) => {
const response = await fetch(`https://api.okmq.net/streams/${streamName}/messages`, {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json',
},
body: JSON.stringify([
{
id: 'unique-message-id-123',
body: JSON.stringify(message)
}
]),
})
if (response.ok) {
console.log('Message produced to stream')
}
}
// Example usage
await produceMessage('user-events', {
type: 'user_registered',
userId: '12345',
email: 'user@example.com',
timestamp: new Date().toISOString()
})
Stream consumption uses a single committed offset per stream. You must commit the offset after processing messages to avoid reprocessing.
const consumeMessages = async (streamName, limit = 10) => {
const response = await fetch(
`https://api.okmq.net/streams/${streamName}/messages?limit=${limit}`,
{
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
}
)
if (response.ok) {
const messages = await response.json()
// Process messages
for (const message of messages) {
console.log(`ID ${message.id}, Offset ${message.offset}: ${message.body}`)
// Process your message here
await processMessage(JSON.parse(message.body))
}
// Commit the highest offset processed
if (messages.length > 0) {
const lastOffset = messages[messages.length - 1].offset
await commitOffset(streamName, lastOffset)
}
return messages
}
}
const commitOffset = async (streamName, offset) => {
const response = await fetch(
`https://api.okmq.net/streams/${streamName}/commit`,
{
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json',
},
body: JSON.stringify({ offset }),
}
)
if (response.ok) {
console.log(`Committed offset ${offset}`)
}
}
Stream consumption supports long polling - the request will wait for new messages if none are immediately available.
const streamProcessor = async (streamName) => {
while (true) {
try {
// Long polling request - will wait up to 30 seconds for new messages
const messages = await consumeMessages(streamName, 10)
if (messages.length === 0) {
// No new messages, continue polling
continue
}
console.log(`Received ${messages.length} messages`)
// Process and commit as before...
} catch (error) {
console.error('Polling error:', error)
// Wait before retrying
await new Promise(resolve => setTimeout(resolve, 1000))
}
}
}
// Start processing
streamProcessor('user-events')
Stream messages are strictly ordered by their offset. Each message gets an incrementing offset number that represents its position in the stream.
// Messages are delivered in offset order
const messages = [
{ id: 'msg1', offset: 1, body: '{"event":"user_signup","userId":"123"}' },
{ id: 'msg2', offset: 2, body: '{"event":"user_login","userId":"123"}' },
{ id: 'msg3', offset: 3, body: '{"event":"page_view","userId":"123"}' }
]
// Process in order
for (const message of messages) {
console.log(`Processing offset ${message.offset}: ${message.id}`)
// Your processing logic here
const event = JSON.parse(message.body)
await handleEvent(event)
}
// Commit up to offset 3
await commitOffset('user-activity', 3)
// Next consumption will start from offset 4
Streams are ideal for scenarios where you need exactly-once processing, ordered delivery, and offset-based consumption.
Store all events as an immutable log for rebuilding application state
Process live data streams for dashboards and monitoring
Sync data between multiple systems and services
Process transactions with exactly-once guarantees
Delete messages from a stream for cleanup and storage management. You can delete all messages, delete messages up to a specific offset, or rely on TTL for automatic cleanup while keeping the stream itself.
Remove all messages from the stream while keeping the stream structure
Delete messages up to a specific offset for rolling window cleanup
Automatic deletion based on message age using stream TTL settings
// Delete all messages from a stream
const deleteAllMessages = async (streamName) => {
const response = await fetch(`https://api.okmq.net/streams/${streamName}/messages`, {
method: 'DELETE',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
if (response.ok) {
console.log(`All messages deleted from stream ${streamName}`)
}
}
// Delete messages up to a specific offset (exclusive)
const deleteMessagesUpToOffset = async (streamName, offset) => {
const response = await fetch(`https://api.okmq.net/streams/${streamName}/messages?offset=${offset}`, {
method: 'DELETE',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
if (response.ok) {
console.log(`Messages deleted up to offset ${offset} from stream ${streamName}`)
}
}
// Example: Rolling window cleanup - keep only last 1000 messages
const cleanupOldMessages = async (streamName) => {
// Get current stream info to find the latest offset
const streamResponse = await fetch(`https://api.okmq.net/streams/${streamName}`, {
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
const streamInfo = await streamResponse.json()
const currentOffset = streamInfo.offset
// Calculate offset to keep last 1000 messages
const deleteUpToOffset = Math.max(0, currentOffset - 1000)
if (deleteUpToOffset > 0) {
await deleteMessagesUpToOffset(streamName, deleteUpToOffset)
console.log(`Cleaned up messages, kept last 1000 messages`)
}
}
Delete a stream and all its messages by sending a DELETE request to the stream endpoint.
const deleteStream = async (streamName) => {
const response = await fetch(`https://api.okmq.net/streams/${streamName}`, {
method: 'DELETE',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json',
}
})
if (response.ok) {
console.log(`Stream ${streamName} deleted successfully`)
}
}