Queues Guide

Everything you need to know about OKMQ queues, messages, and reliable task processing

What are Queues?

Queues are the core of OKMQ. They store messages in order and deliver them to consumers reliably. Each queue has its own configuration for retry strategies, TTL, acknowledgment modes, and more.

FIFO Ordering

Messages are delivered in first-in, first-out order

Persistent Storage

Messages survive server restarts and crashes

Configurable Retries

Multiple retry strategies for failed deliveries

Auto-cleanup

TTL support removes expired messages automatically

Queue Message Structure

Queue messages are rich, structured data objects optimized for reliable task processing with acknowledgments, retries, and scheduling.

Rich Metadata

  • • ID for deduplication
  • • Message body content
  • • Tags for filtering
  • • Delivery time scheduling

Acknowledgments

  • • Manual ack/nack control
  • • Delivery guarantees
  • • Message locking
  • • Failure handling

Retry Logic

  • • Exponential backoff
  • • Linear retry intervals
  • • Fixed delay retries
  • • Custom retry timing

Scheduling

  • • Delayed message delivery
  • • Future scheduling
  • • Priority processing
  • • Batch operations

Creating Queues

Create a queue by sending a POST request to /queues/ with the queue configuration.


const createQueue = async () => {
  const response = await fetch('https://api.okmq.net/queues/', {
    method: 'POST',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json',
    },
    body: JSON.stringify({
      name: 'order-processing',
      auto_ack: false,           // Manual acknowledgment
      ttl: 3600,                // 1 hour TTL
      max_delivery_attempts: 5,  // Try up to 5 times
      lock_duration: 60,        // Lock messages for 60 seconds
      topics: ['orders', 'payments'], // Subscribe to these topics
      retry_strategy: {
        type: 'exponential',
        base_delay: 5,          // Start with 5 second delay
        max_delay: 300,         // Cap at 5 minutes
        multiplier: 2.0         // Double delay each retry
      }
    }),
  })
}
ParameterTypeDescriptionDefault
namestringUnique queue name (max 50 characters)Required
auto_ackbooleanAutomatically acknowledge messagesfalse
ttlnumberMessage time-to-live in seconds300
retry_strategyobjectRetry configuration (see examples below)exponential
max_delivery_attemptsnumberMaximum delivery attempts1
lock_durationnumberMessage lock duration in seconds30
topicsarrayTopics to subscribe to for fan-out messaging[]

Retry Strategies

When message processing fails, OKMQ can automatically retry delivery using different strategies.

Exponential Backoff

Delays increase exponentially: 5s, 10s, 20s, 40s, 80s...

{
  "type": "exponential",
  "base_delay": 5,
  "max_delay": 300,
  "multiplier": 2.0
}

Linear Backoff

Delays increase linearly: 10s, 20s, 30s, 40s...

{
  "type": "linear",
  "base_delay": 10,
  "max_delay": 120
}

Fixed Delay

Constant retry delay: 30s, 30s, 30s...

{
  "type": "fixed",
  "base_delay": 30
}

No Retries

Failed messages are immediately discarded

{
  "type": "none"
}

TTL and Cleanup

Set time-to-live (TTL) values to automatically clean up expired messages and prevent queue bloat. TTL is evaluated and expired messages are deleted every minute.

Getting Queue Information

Retrieve queue configuration and statistics using GET requests. This provides both the queue settings and real-time message counts.


const getQueueInfo = async (queueName) => {
  const response = await fetch(`https://api.okmq.net/queues/${queueName}`, {
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })

  if (response.ok) {
    const queueInfo = await response.json()
    console.log('Queue configuration:', queueInfo)
    return queueInfo
  }
}

// Example response
{
  "name": "order-processing",
  "auto_ack": false,
  "lock_duration": 60,
  "max_delivery_attempts": 3,
  "ttl": 3600,
  "retry_strategy": {
    "type": "exponential",
    "base_delay": 5,
    "max_delay": 300,
    "multiplier": 2.0
  },
  "topics": ["orders", "payments"],
  "acked": 1250,
  "enqueued": 45,
  "failed": 12,
  "scheduled": 8
}

Configuration Fields

  • name - Queue name
  • auto_ack - Auto acknowledgment setting
  • lock_duration - Message lock time (seconds)
  • max_delivery_attempts - Retry limit
  • ttl - Message TTL (seconds)
  • retry_strategy - Retry configuration
  • topics - Subscribed topics

Statistics Fields

  • acked - Successfully processed messages
  • enqueued - Messages ready for processing
  • failed - Messages that exceeded retry limit
  • scheduled - Messages scheduled for future delivery
Statistics are updated in real-time and can be used for monitoring queue health and performance.

Consuming Queue Messages

Queue messages provide flexible consumption patterns with acknowledgments, filtering, and long polling support.


// Basic message retrieval
const getMessages = async () => {
  const response = await fetch('https://api.okmq.net/queues/order-processing/messages', {
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })

  const messages = await response.json()
  
  // Messages are automatically locked for processing
  for (const message of messages) {
    console.log(`Message ID: ${message.id}`)
    console.log(`Body: ${message.body}`)
    console.log(`Tag: ${message.tag || 'none'}`)
    
    // Process your message here
    await processMessage(message)
    
    // Acknowledge successful processing
    await acknowledgeMessage(message.id)
  }
}

// Filter by tag and limit results
const getTaggedMessages = async () => {
  const response = await fetch('https://api.okmq.net/queues/order-processing/messages?tag=order-123&limit=5', {
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })
  
  const messages = await response.json()
  return messages
}

// Long polling - wait for messages if queue is empty
const longPollMessages = async () => {
  const response = await fetch('https://api.okmq.net/queues/order-processing/messages', {
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })

  // Will wait up to 30 seconds for messages to arrive
  const messages = await response.json()
  return messages
}

// Peek at messages without locking them
const peekMessages = async () => {
  const response = await fetch('https://api.okmq.net/queues/order-processing/peek?limit=10', {
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })

  // Messages are not locked and won't be marked as delivered
  const messages = await response.json()
  return messages
}

Message Acknowledgments

Queue messages require explicit acknowledgment for reliable processing. You can acknowledge success, trigger retries, or customize retry timing.


// Manual acknowledgment example
const response = await fetch(`${host}/queues/orders/messages`, {
  headers: { authorization: `Bearer ${token}` }
})
const messages = await response.json()

// Process each message
for (const message of messages) {
  try {
    // Process the message
    await processOrder(message.body)

    // Acknowledge successful processing
    await fetch(`${host}/queues/orders/ack`, {
      method: 'POST',
      headers: {
        authorization: `Bearer ${token}`,
        'content-type': 'application/json'
      },
      body: JSON.stringify([{
        id: message.id,
        ack: true
      }])
    })

    // Or acknowledge and delete in one operation
    await fetch(`${host}/queues/orders/ack`, {
      method: 'POST',
      headers: {
        authorization: `Bearer ${token}`,
        'content-type': 'application/json'
      },
      body: JSON.stringify([{
        id: message.id,
        ack: true,
        delete: true  // Message will be removed from queue
      }])
    })
  } catch (error) {
    console.error('Processing failed:', error)

    // Option 1: Let queue retry strategy handle timing
    // Just don't acknowledge - message will retry automatically

    // Option 2: NACK to use queue's retry strategy
    await fetch(`${host}/queues/orders/ack`, {
      method: 'POST',
      headers: {
        authorization: `Bearer ${token}`,
        'content-type': 'application/json'
      },
      body: JSON.stringify([{
        id: message.id,
        ack: false  // Uses queue's retry strategy (exponential, linear, etc.)
      }])
    })

    // Option 3: Override with custom retry time
    await fetch(`${host}/queues/orders/ack`, {
      method: 'POST',
      headers: {
        authorization: `Bearer ${token}`,
        'content-type': 'application/json'
      },
      body: JSON.stringify([{
        id: message.id,
        ack: false,
        delivery_time: new Date(Date.now() + 30000).toISOString() // Custom: retry in 30 seconds
      }])
    })
  }
}
Important: Messages not acknowledged within the visibility timeout will be automatically retried according to the queue's retry strategy.

Producing Queue Messages

Queue messages support rich metadata including IDs, tags, and delivery scheduling for flexible task processing patterns.


// Send messages with tags
const sendOrderMessages = async () => {
  await fetch('https://api.okmq.net/queues/order-processing/messages', {
    method: 'POST',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json',
    },
    body: JSON.stringify([
      {
        id: 'order-001-payment',
        body: { orderId: '001', action: 'payment' },
        tag: 'order-001'  // Tag for order 001 messages
      },
      {
        id: 'order-001-shipping',
        body: { orderId: '001', action: 'shipping' },
        tag: 'order-001'  // Same tag for order 001
      },
      {
        id: 'order-002-payment',
        body: { orderId: '002', action: 'payment' },
        tag: 'order-002'  // Different tag for order 002
      }
    ])
  })
}

Message Tags & Filtering

Tags allow you to organize and filter messages for specialized consumers and processing patterns.

Channel Routing

Route notifications to specific channels based on type

// Email consumer fetches only email messages
fetch('/queues/notifications/messages?tag=email')

// SMS consumer fetches only SMS messages
fetch('/queues/notifications/messages?tag=sms')

Priority Processing

Process high-priority messages with dedicated consumers

// High priority consumer
fetch('/queues/orders/messages?tag=urgent')

// Normal priority consumer
fetch('/queues/orders/messages?tag=normal')

Scheduled & Delayed Messages

Schedule messages for future delivery using the delivery_time field. Perfect for reminders, follow-ups, and timed workflows.


// Send a delayed message
const sendDelayedMessage = async () => {
  const deliveryTime = new Date(Date.now() + 3600000) // 1 hour from now

  await fetch('https://api.okmq.net/queues/notifications/messages', {
    method: 'POST',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json',
    },
    body: JSON.stringify([{
      id: 'reminder-001',
      body: {
        type: 'reminder',
        userId: '12345',
        message: 'Don\\'t forget to complete your profile!'
      },
      delivery_time: deliveryTime.toISOString(),
      tag: 'reminder'
    }])
  })

  console.log(`Message scheduled for delivery at ${deliveryTime}`)
}

// Schedule a series of follow-up messages
const scheduleFollowUpSequence = async (userId) => {
  const messages = [
    {
      id: `followup-${userId}-day1`,
      body: { type: 'followup', day: 1, userId },
      delivery_time: new Date(Date.now() + 86400000).toISOString(), // 1 day
      tag: 'followup'
    },
    {
      id: `followup-${userId}-week1`,
      body: { type: 'followup', day: 7, userId },
      delivery_time: new Date(Date.now() + 604800000).toISOString(), // 1 week
      tag: 'followup'
    }
  ]

  await fetch('https://api.okmq.net/queues/marketing/messages', {
    method: 'POST',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json',
    },
    body: JSON.stringify(messages)
  })
}

Topic-Based Publishing

Publish messages to topics for fan-out delivery to multiple subscribed queues. Perfect for event broadcasting and microservice communication.


// Send message to topic - delivered to all subscribed queues
const sendToTopic = async (topicName, messageData) => {
  const response = await fetch(`https://api.okmq.net/topics/${topicName}/messages`, {
    method: 'POST',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json',
    },
    body: JSON.stringify([{
      id: messageData.id,
      body: messageData,
      tag: messageData.type || 'general'
    }])
  })

  if (response.ok) {
    console.log(`Message published to topic ${topicName}`)
  }
}

// Example: Send order update to all relevant services
await sendToTopic('order-updates', {
  id: 'order-12345-shipped',
  orderId: '12345',
  status: 'shipped',
  timestamp: new Date().toISOString(),
  type: 'status-change'
})

Deleting Messages from Queues

Delete messages from queues for cleanup and storage management. You can delete specific messages, bulk delete by type/tag, reset failed messages, or rely on TTL for automatic cleanup.

Bulk Deletion

Delete all, acked, or failed messages with optional tag filtering

Specific Messages

Delete specific messages by providing their IDs

Reset Failed

Reset failed messages for retry instead of deleting them

TTL Cleanup

Automatic deletion based on message TTL settings


// Bulk delete by type - delete all messages
const deleteAllMessages = async (queueName) => {
  const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages?type=all`, {
    method: 'DELETE',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })

  if (response.ok) {
    console.log(`All messages deleted from queue ${queueName}`)
  }
}

// Delete only acknowledged messages
const deleteAckedMessages = async (queueName) => {
  const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages?type=acked`, {
    method: 'DELETE',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })

  if (response.ok) {
    console.log(`Acknowledged messages deleted from queue ${queueName}`)
  }
}

// Delete only failed messages (exceeded retry limit)
const deleteFailedMessages = async (queueName) => {
  const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages?type=failed`, {
    method: 'DELETE',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })

  if (response.ok) {
    console.log(`Failed messages deleted from queue ${queueName}`)
  }
}

// Delete messages by tag
const deleteMessagesByTag = async (queueName, tag) => {
  const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages?type=all&tag=${tag}`, {
    method: 'DELETE',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })

  if (response.ok) {
    console.log(`Messages with tag "${tag}" deleted from queue ${queueName}`)
  }
}

// Delete specific messages by IDs
const deleteSpecificMessages = async (queueName, messageIds) => {
  const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages`, {
    method: 'DELETE',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    },
    body: JSON.stringify({
      ids: messageIds
    })
  })

  if (response.ok) {
    console.log(`${messageIds.length} specific messages deleted from queue ${queueName}`)
  }
}

// Reset failed messages for retry (instead of deleting)
const resetFailedMessages = async (queueName, tag = '') => {
  const response = await fetch(`https://api.okmq.net/queues/${queueName}/reset-failed`, {
    method: 'POST',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    },
    body: JSON.stringify({
      tag: tag
    })
  })

  if (response.ok) {
    console.log(`Failed messages reset for retry in queue ${queueName}`)
  }
}

// Delete individual message during acknowledgment
const deleteMessageOnAck = async (queueName, messageId) => {
  const response = await fetch(`https://api.okmq.net/queues/${queueName}/ack`, {
    method: 'POST',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    },
    body: JSON.stringify([{
      id: messageId,
      ack: true,
      delete: true  // Delete the message after acknowledging
    }])
  })

  if (response.ok) {
    console.log(`Message ${messageId} acknowledged and deleted`)
  }
}
Message Deletion Options
  • Bulk by type: Delete all, only acknowledged, or only failed messages
  • Tag filtering: Combine with any deletion type to target specific tags
  • Specific IDs: Delete exact messages by providing their IDs
  • Reset failed: Give failed messages another chance instead of deleting them
  • TTL-based: Automatic cleanup when messages exceed their time-to-live
  • Ack + delete: Delete messages immediately after processing
Important: Deleted messages cannot be recovered. Use reset failed messages to retry processing instead of deletion when appropriate.

Deleting Queues

Delete a queue and all its messages by sending a DELETE request to the queue endpoint.


const deleteQueue = async (queueName) => {
  const response = await fetch(`https://api.okmq.net/queues/${queueName}`, {
    method: 'DELETE',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json',
    }
  })

  if (response.ok) {
    console.log(`Queue ${queueName} deleted successfully`)
  }
}
Warning: Deleting a queue permanently removes the queue and all its messages. This action cannot be undone.

Next Steps

Learn about streams and real-time message processing

Streams Documentation