Messages Guide

Learn how to work with messages, tags, and acknowledgments in OKMQ

Message Structure

Messages in OKMQ are JSON objects that contain your data along with metadata for processing and delivery. Each message has a unique ID - duplicate messages with the same ID are silently ignored to prevent double processing.


{
  "id": "unique-message-id",
  "body": {
    // Your message data here
    "userId": 123,
    "action": "welcome_email",
    "email": "user@example.com"
  },
  "tag": "user-123",           // Optional: for ordered processing
  "delivery_attempt": 1,
  "delivery_time": "2024-01-01T12:00:00Z",
  "locked_until": "2024-01-01T12:01:00Z"
}
Parameter Type Description Required
id string Unique message identifier (max 50 characters) Yes
body any Message payload (max 1 MB) Yes
tag string Message tag for client-side partitioning (max 50 characters) No
delivery_time string ISO 8601 timestamp for scheduled delivery No

Retrieving Messages

Pull messages from queues using GET requests. The API returns an array of messages ready for processing.


// Basic message retrieval
const getMessages = async () => {
  const response = await fetch('https://api.okmq.net/queues/order-processing/messages', {
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })
  
  if (response.ok) {
    const messages = await response.json()
    console.log(`Received ${messages.length} messages`)
    
    for (const message of messages) {
      console.log('Message ID:', message.id)
      console.log('Body:', message.body)
      console.log('Tag:', message.tag)
      console.log('Delivery attempt:', message.delivery_attempt)
    }
  }
}

Query Parameters

  • limit - Max messages to return (default: 10)
  • tag - Filter by specific tag

Response Fields

  • id - Message identifier
  • body - Message payload
  • delivery_attempt - Retry count
  • locked_until - Lock expiration

Advanced Retrieval Options


// Filter by tag and limit results
const getTaggedMessages = async () => {
  const response = await fetch('https://api.okmq.net/queues/order-processing/messages?tag=order-123&limit=5', {
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })
  
  const messages = await response.json()
  return messages
}

// Long polling - wait for messages if queue is empty
const longPollMessages = async () => {
  const response = await fetch('https://api.okmq.net/queues/order-processing/messages', {
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })
  
  // Will wait up to 30 seconds for messages to arrive (fixed timeout)
  const messages = await response.json()
  return messages
}

// Peek at messages without locking them
const peekMessages = async () => {
  const response = await fetch('https://api.okmq.net/queues/order-processing/peek?limit=10', {
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })
  
  // Messages are not locked and won't be marked as delivered
  const messages = await response.json()
  return messages
}
Retrieved messages are automatically locked to prevent duplicate processing. Use peek to view messages without locking.

Getting Individual Messages

Retrieve detailed information about a specific message using its ID. This returns the complete message structure including delivery metadata.


const getMessage = async (queueName, messageId) => {
  const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages/${messageId}`, {
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })
  
  if (response.ok) {
    const message = await response.json()
    console.log('Message details:', message)
    return message
  } else if (response.status === 404) {
    console.log('Message not found')
    return null
  }
}

// Example response
{
  "id": "order-123",
  "body": {
    "orderId": "12345",
    "customerId": "user-456",
    "amount": 99.99
  },
  "tag": "order-processing",
  "auto_ack": false,
  "acked": null,
  "created": "2024-01-01T10:00:00Z",
  "delivery_attempt": 2,
  "delivery_time": "2024-01-01T10:05:00Z",
  "lock_duration": 30,
  "locked_until": "2024-01-01T10:35:00Z",
  "max_delivery_attempts": 3,
  "ttl": "2024-01-01T11:00:00Z",
  "updated": "2024-01-01T10:05:00Z"
}

Message Data

  • id - Message identifier
  • body - Message payload
  • tag - Message tag
  • auto_ack - Auto acknowledgment setting

Delivery Metadata

  • acked - Acknowledgment timestamp (if acked)
  • delivery_attempt - Current retry count
  • delivery_time - Next delivery time
  • locked_until - Lock expiration time
  • created - Message creation time
  • updated - Last modification time
This endpoint is useful for debugging message processing issues and monitoring delivery status.

Message Acknowledgment

Control when messages are considered successfully processed with acknowledgment modes.

Auto Acknowledgment

When auto_ack: true is set, messages are automatically acknowledged as soon as they're delivered to a consumer. This is perfect for fire-and-forget scenarios where you don't need delivery guarantees, such as logging or notifications.

Manual Acknowledgment

With auto_ack: false, you must explicitly acknowledge messages after successful processing. Failed messages (NACK with ack: false) will be retried using the queue's configured retry strategy, unless you override the timing with a custom delivery_time.


// Manual acknowledgment example
const response = await fetch(`${host}/queues/orders/messages`, {
  headers: { authorization: `Bearer ${token}` }
})
const messages = await response.json()

// Process each message
for (const message of messages) {
  try {
    // Process the message
    await processOrder(message.body)
    
    // Acknowledge successful processing
    await fetch(`${host}/queues/orders/ack`, {
      method: 'POST',
      headers: {
        authorization: `Bearer ${token}`,
        'content-type': 'application/json'
      },
      body: JSON.stringify([{
        id: message.id,
        ack: true
      }])
    })
  } catch (error) {
    console.error('Processing failed:', error)
    
    // Option 1: Let queue retry strategy handle timing
    // Just don't acknowledge - message will retry automatically
    
    // Option 2: NACK to use queue's retry strategy  
    await fetch(`${host}/queues/orders/ack`, {
      method: 'POST',
      headers: {
        authorization: `Bearer ${token}`,
        'content-type': 'application/json'
      },
      body: JSON.stringify([{
        id: message.id,
        ack: false  // Uses queue's retry strategy (exponential, linear, etc.)
      }])
    })
    
    // Option 3: Override with custom retry time
    await fetch(`${host}/queues/orders/ack`, {
      method: 'POST',
      headers: {
        authorization: `Bearer ${token}`,
        'content-type': 'application/json'
      },
      body: JSON.stringify([{
        id: message.id,
        ack: false,
        delivery_time: new Date(Date.now() + 30000).toISOString() // Custom: retry in 30 seconds
      }])
    })
  }
}

Message Tags

Use tags for client-side processing and partitioning. Tags allow clients to fetch only messages with a specific tag, enabling selective message processing.


// Send messages with tags
const sendOrderMessages = async () => {
  await fetch('https://api.okmq.net/queues/order-processing/messages', {
    method: 'POST',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json',
    },
    body: JSON.stringify([
      {
        id: 'order-001-payment',
        body: { orderId: '001', action: 'payment' },
        tag: 'order-001'  // Tag for order 001 messages
      },
      {
        id: 'order-001-shipping',
        body: { orderId: '001', action: 'shipping' },
        tag: 'order-001'  // Same tag for order 001
      },
      {
        id: 'order-002-payment',
        body: { orderId: '002', action: 'payment' },
        tag: 'order-002'  // Different tag for order 002
      }
    ])
  })
}
Use the tag parameter when fetching messages to retrieve only messages with a specific tag, enabling targeted processing by different consumers.

Tag Use Cases

Dedicated Consumers

Different consumers can process specific message types by filtering on tags.


// Email consumer fetches only email messages
fetch('/queues/notifications/messages?tag=email')

// SMS consumer fetches only SMS messages  
fetch('/queues/notifications/messages?tag=sms')

Priority Processing

Separate high and low priority messages for different processing workflows.


// High priority consumer
fetch('/queues/orders/messages?tag=urgent')

// Normal priority consumer
fetch('/queues/orders/messages?tag=normal')

Scheduled Delivery

Schedule messages for future delivery using the delivery_time field.


// Send a delayed message
const sendDelayedMessage = async () => {
  const deliveryTime = new Date(Date.now() + 3600000) // 1 hour from now
  
  await fetch('https://api.okmq.net/queues/notifications/messages', {
    method: 'POST',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json',
    },
    body: JSON.stringify([
      {
        id: 'reminder-123',
        body: { 
          type: 'reminder',
          message: 'Your trial expires tomorrow'
        },
        delivery_time: deliveryTime.toISOString()
      }
    ])
  })
}

Reminders

Schedule follow-up emails and notifications

Rate Limiting

Spread API calls over time

Batch Processing

Process during off-peak hours

Topic Publishing

Send messages to topics for fan-out delivery to multiple queues. Any queue subscribed to a topic will receive copies of messages sent to that topic.


// Send message to topic - delivered to all subscribed queues
const sendToTopic = async (topicName, messageData) => {
  const response = await fetch(`https://api.okmq.net/topics/${topicName}/messages`, {
    method: 'POST',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json',
    },
    body: JSON.stringify([
      {
        id: `notification-${Date.now()}`,
        body: messageData,
        tag: 'broadcast'
      }
    ])
  })
  
  if (response.ok) {
    console.log('Message sent to all subscribers')
  }
}

// Example: Send order update to all relevant services
await sendToTopic('order-updates', {
  orderId: '12345',
  status: 'shipped',
  trackingNumber: 'ABC123',
  timestamp: new Date().toISOString()
})

Queue Subscription

Queues subscribe to topics when created by including them in the topics array.


// Queue subscribes to multiple topics
{
  name: 'notification-service',
  topics: ['order-updates', 'user-events'],
  auto_ack: false
}

Topic Use Cases

Event Broadcasting

Notify multiple services of important events

Multi-channel Notifications

Send alerts via email, SMS, and push simultaneously

Microservice Updates

Keep distributed services synchronized

Topics enable pub/sub messaging patterns where publishers don't need to know about individual subscribers.

Deleting Messages

Remove messages from queues using DELETE requests. You can delete all messages, only acknowledged messages, or only failed messages.


// Delete all messages from a queue
const deleteAllMessages = async (queueName) => {
  const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages`, {
    method: 'DELETE',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })
  
  if (response.ok) {
    console.log('All messages deleted')
  }
}

// Delete only acknowledged messages
const deleteAckedMessages = async (queueName) => {
  const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages?type=acked`, {
    method: 'DELETE',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })
}

// Delete only failed messages (exceeded max delivery attempts)
const deleteFailedMessages = async (queueName) => {
  const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages?type=failed`, {
    method: 'DELETE',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })
}

// Delete messages with specific tag
const deleteTaggedMessages = async (queueName, tag) => {
  const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages?tag=${tag}`, {
    method: 'DELETE',
    headers: {
      authorization: `Bearer ${token}`,
      'content-type': 'application/json'
    }
  })
}

Query Parameters

  • type - Message type to delete (all, acked, failed)
  • tag - Delete only messages with specific tag

Message Types

  • all - All messages in queue (default)
  • acked - Successfully processed messages
  • failed - Messages that exceeded retry limits
Warning: Deleting messages permanently removes them from the queue. This action cannot be undone.