Everything you need to know about OKMQ queues, messages, and reliable task processing
Queues are the core of OKMQ. They store messages in order and deliver them to consumers reliably. Each queue has its own configuration for retry strategies, TTL, acknowledgment modes, and more.
Messages are delivered in first-in, first-out order
Messages survive server restarts and crashes
Multiple retry strategies for failed deliveries
TTL support removes expired messages automatically
Queue messages are rich, structured data objects optimized for reliable task processing with acknowledgments, retries, and scheduling.
Create a queue by sending a POST request to /queues/
with the queue configuration.
const createQueue = async () => {
const response = await fetch('https://api.okmq.net/queues/', {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json',
},
body: JSON.stringify({
name: 'order-processing',
auto_ack: false, // Manual acknowledgment
ttl: 3600, // 1 hour TTL
max_delivery_attempts: 5, // Try up to 5 times
lock_duration: 60, // Lock messages for 60 seconds
topics: ['orders', 'payments'], // Subscribe to these topics
retry_strategy: {
type: 'exponential',
base_delay: 5, // Start with 5 second delay
max_delay: 300, // Cap at 5 minutes
multiplier: 2.0 // Double delay each retry
}
}),
})
}
Parameter | Type | Description | Default |
---|---|---|---|
name | string | Unique queue name (max 50 characters) | Required |
auto_ack | boolean | Automatically acknowledge messages | false |
ttl | number | Message time-to-live in seconds | 300 |
retry_strategy | object | Retry configuration (see examples below) | exponential |
max_delivery_attempts | number | Maximum delivery attempts | 1 |
lock_duration | number | Message lock duration in seconds | 30 |
topics | array | Topics to subscribe to for fan-out messaging | [] |
When message processing fails, OKMQ can automatically retry delivery using different strategies.
Delays increase exponentially: 5s, 10s, 20s, 40s, 80s...
{
"type": "exponential",
"base_delay": 5,
"max_delay": 300,
"multiplier": 2.0
}
Delays increase linearly: 10s, 20s, 30s, 40s...
{
"type": "linear",
"base_delay": 10,
"max_delay": 120
}
Constant retry delay: 30s, 30s, 30s...
{
"type": "fixed",
"base_delay": 30
}
Failed messages are immediately discarded
{
"type": "none"
}
Set time-to-live (TTL) values to automatically clean up expired messages and prevent queue bloat. TTL is evaluated and expired messages are deleted every minute.
Retrieve queue configuration and statistics using GET requests. This provides both the queue settings and real-time message counts.
const getQueueInfo = async (queueName) => {
const response = await fetch(`https://api.okmq.net/queues/${queueName}`, {
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
if (response.ok) {
const queueInfo = await response.json()
console.log('Queue configuration:', queueInfo)
return queueInfo
}
}
// Example response
{
"name": "order-processing",
"auto_ack": false,
"lock_duration": 60,
"max_delivery_attempts": 3,
"ttl": 3600,
"retry_strategy": {
"type": "exponential",
"base_delay": 5,
"max_delay": 300,
"multiplier": 2.0
},
"topics": ["orders", "payments"],
"acked": 1250,
"enqueued": 45,
"failed": 12,
"scheduled": 8
}
name
- Queue nameauto_ack
- Auto acknowledgment settinglock_duration
- Message lock time (seconds)max_delivery_attempts
- Retry limitttl
- Message TTL (seconds)retry_strategy
- Retry configurationtopics
- Subscribed topicsacked
- Successfully processed messagesenqueued
- Messages ready for processingfailed
- Messages that exceeded retry limitscheduled
- Messages scheduled for future deliveryQueue messages provide flexible consumption patterns with acknowledgments, filtering, and long polling support.
// Basic message retrieval
const getMessages = async () => {
const response = await fetch('https://api.okmq.net/queues/order-processing/messages', {
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
const messages = await response.json()
// Messages are automatically locked for processing
for (const message of messages) {
console.log(`Message ID: ${message.id}`)
console.log(`Body: ${message.body}`)
console.log(`Tag: ${message.tag || 'none'}`)
// Process your message here
await processMessage(message)
// Acknowledge successful processing
await acknowledgeMessage(message.id)
}
}
// Filter by tag and limit results
const getTaggedMessages = async () => {
const response = await fetch('https://api.okmq.net/queues/order-processing/messages?tag=order-123&limit=5', {
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
const messages = await response.json()
return messages
}
// Long polling - wait for messages if queue is empty
const longPollMessages = async () => {
const response = await fetch('https://api.okmq.net/queues/order-processing/messages', {
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
// Will wait up to 30 seconds for messages to arrive
const messages = await response.json()
return messages
}
// Peek at messages without locking them
const peekMessages = async () => {
const response = await fetch('https://api.okmq.net/queues/order-processing/peek?limit=10', {
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
// Messages are not locked and won't be marked as delivered
const messages = await response.json()
return messages
}
Queue messages require explicit acknowledgment for reliable processing. You can acknowledge success, trigger retries, or customize retry timing.
// Manual acknowledgment example
const response = await fetch(`${host}/queues/orders/messages`, {
headers: { authorization: `Bearer ${token}` }
})
const messages = await response.json()
// Process each message
for (const message of messages) {
try {
// Process the message
await processOrder(message.body)
// Acknowledge successful processing
await fetch(`${host}/queues/orders/ack`, {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
},
body: JSON.stringify([{
id: message.id,
ack: true
}])
})
// Or acknowledge and delete in one operation
await fetch(`${host}/queues/orders/ack`, {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
},
body: JSON.stringify([{
id: message.id,
ack: true,
delete: true // Message will be removed from queue
}])
})
} catch (error) {
console.error('Processing failed:', error)
// Option 1: Let queue retry strategy handle timing
// Just don't acknowledge - message will retry automatically
// Option 2: NACK to use queue's retry strategy
await fetch(`${host}/queues/orders/ack`, {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
},
body: JSON.stringify([{
id: message.id,
ack: false // Uses queue's retry strategy (exponential, linear, etc.)
}])
})
// Option 3: Override with custom retry time
await fetch(`${host}/queues/orders/ack`, {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
},
body: JSON.stringify([{
id: message.id,
ack: false,
delivery_time: new Date(Date.now() + 30000).toISOString() // Custom: retry in 30 seconds
}])
})
}
}
Queue messages support rich metadata including IDs, tags, and delivery scheduling for flexible task processing patterns.
// Send messages with tags
const sendOrderMessages = async () => {
await fetch('https://api.okmq.net/queues/order-processing/messages', {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json',
},
body: JSON.stringify([
{
id: 'order-001-payment',
body: { orderId: '001', action: 'payment' },
tag: 'order-001' // Tag for order 001 messages
},
{
id: 'order-001-shipping',
body: { orderId: '001', action: 'shipping' },
tag: 'order-001' // Same tag for order 001
},
{
id: 'order-002-payment',
body: { orderId: '002', action: 'payment' },
tag: 'order-002' // Different tag for order 002
}
])
})
}
Tags allow you to organize and filter messages for specialized consumers and processing patterns.
Route notifications to specific channels based on type
// Email consumer fetches only email messages
fetch('/queues/notifications/messages?tag=email')
// SMS consumer fetches only SMS messages
fetch('/queues/notifications/messages?tag=sms')
Process high-priority messages with dedicated consumers
// High priority consumer
fetch('/queues/orders/messages?tag=urgent')
// Normal priority consumer
fetch('/queues/orders/messages?tag=normal')
Schedule messages for future delivery using the delivery_time field. Perfect for reminders, follow-ups, and timed workflows.
// Send a delayed message
const sendDelayedMessage = async () => {
const deliveryTime = new Date(Date.now() + 3600000) // 1 hour from now
await fetch('https://api.okmq.net/queues/notifications/messages', {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json',
},
body: JSON.stringify([{
id: 'reminder-001',
body: {
type: 'reminder',
userId: '12345',
message: 'Don\\'t forget to complete your profile!'
},
delivery_time: deliveryTime.toISOString(),
tag: 'reminder'
}])
})
console.log(`Message scheduled for delivery at ${deliveryTime}`)
}
// Schedule a series of follow-up messages
const scheduleFollowUpSequence = async (userId) => {
const messages = [
{
id: `followup-${userId}-day1`,
body: { type: 'followup', day: 1, userId },
delivery_time: new Date(Date.now() + 86400000).toISOString(), // 1 day
tag: 'followup'
},
{
id: `followup-${userId}-week1`,
body: { type: 'followup', day: 7, userId },
delivery_time: new Date(Date.now() + 604800000).toISOString(), // 1 week
tag: 'followup'
}
]
await fetch('https://api.okmq.net/queues/marketing/messages', {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json',
},
body: JSON.stringify(messages)
})
}
Publish messages to topics for fan-out delivery to multiple subscribed queues. Perfect for event broadcasting and microservice communication.
// Send message to topic - delivered to all subscribed queues
const sendToTopic = async (topicName, messageData) => {
const response = await fetch(`https://api.okmq.net/topics/${topicName}/messages`, {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json',
},
body: JSON.stringify([{
id: messageData.id,
body: messageData,
tag: messageData.type || 'general'
}])
})
if (response.ok) {
console.log(`Message published to topic ${topicName}`)
}
}
// Example: Send order update to all relevant services
await sendToTopic('order-updates', {
id: 'order-12345-shipped',
orderId: '12345',
status: 'shipped',
timestamp: new Date().toISOString(),
type: 'status-change'
})
Delete messages from queues for cleanup and storage management. You can delete specific messages, bulk delete by type/tag, reset failed messages, or rely on TTL for automatic cleanup.
Delete all, acked, or failed messages with optional tag filtering
Delete specific messages by providing their IDs
Reset failed messages for retry instead of deleting them
Automatic deletion based on message TTL settings
// Bulk delete by type - delete all messages
const deleteAllMessages = async (queueName) => {
const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages?type=all`, {
method: 'DELETE',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
if (response.ok) {
console.log(`All messages deleted from queue ${queueName}`)
}
}
// Delete only acknowledged messages
const deleteAckedMessages = async (queueName) => {
const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages?type=acked`, {
method: 'DELETE',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
if (response.ok) {
console.log(`Acknowledged messages deleted from queue ${queueName}`)
}
}
// Delete only failed messages (exceeded retry limit)
const deleteFailedMessages = async (queueName) => {
const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages?type=failed`, {
method: 'DELETE',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
if (response.ok) {
console.log(`Failed messages deleted from queue ${queueName}`)
}
}
// Delete messages by tag
const deleteMessagesByTag = async (queueName, tag) => {
const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages?type=all&tag=${tag}`, {
method: 'DELETE',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
if (response.ok) {
console.log(`Messages with tag "${tag}" deleted from queue ${queueName}`)
}
}
// Delete specific messages by IDs
const deleteSpecificMessages = async (queueName, messageIds) => {
const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages`, {
method: 'DELETE',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
},
body: JSON.stringify({
ids: messageIds
})
})
if (response.ok) {
console.log(`${messageIds.length} specific messages deleted from queue ${queueName}`)
}
}
// Reset failed messages for retry (instead of deleting)
const resetFailedMessages = async (queueName, tag = '') => {
const response = await fetch(`https://api.okmq.net/queues/${queueName}/reset-failed`, {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
},
body: JSON.stringify({
tag: tag
})
})
if (response.ok) {
console.log(`Failed messages reset for retry in queue ${queueName}`)
}
}
// Delete individual message during acknowledgment
const deleteMessageOnAck = async (queueName, messageId) => {
const response = await fetch(`https://api.okmq.net/queues/${queueName}/ack`, {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
},
body: JSON.stringify([{
id: messageId,
ack: true,
delete: true // Delete the message after acknowledging
}])
})
if (response.ok) {
console.log(`Message ${messageId} acknowledged and deleted`)
}
}
Delete a queue and all its messages by sending a DELETE request to the queue endpoint.
const deleteQueue = async (queueName) => {
const response = await fetch(`https://api.okmq.net/queues/${queueName}`, {
method: 'DELETE',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json',
}
})
if (response.ok) {
console.log(`Queue ${queueName} deleted successfully`)
}
}