Learn how to work with messages, tags, and acknowledgments in OKMQ
Messages in OKMQ are JSON objects that contain your data along with metadata for processing and delivery. Each message has a unique ID - duplicate messages with the same ID are silently ignored to prevent double processing.
{
"id": "unique-message-id",
"body": {
// Your message data here
"userId": 123,
"action": "welcome_email",
"email": "user@example.com"
},
"tag": "user-123", // Optional: for ordered processing
"delivery_attempt": 1,
"delivery_time": "2024-01-01T12:00:00Z",
"locked_until": "2024-01-01T12:01:00Z"
}
Parameter | Type | Description | Required |
---|---|---|---|
id |
string | Unique message identifier (max 50 characters) | Yes |
body |
any | Message payload (max 1 MB) | Yes |
tag |
string | Message tag for client-side partitioning (max 50 characters) | No |
delivery_time |
string | ISO 8601 timestamp for scheduled delivery | No |
Pull messages from queues using GET requests. The API returns an array of messages ready for processing.
// Basic message retrieval
const getMessages = async () => {
const response = await fetch('https://api.okmq.net/queues/order-processing/messages', {
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
if (response.ok) {
const messages = await response.json()
console.log(`Received ${messages.length} messages`)
for (const message of messages) {
console.log('Message ID:', message.id)
console.log('Body:', message.body)
console.log('Tag:', message.tag)
console.log('Delivery attempt:', message.delivery_attempt)
}
}
}
limit
- Max messages to return (default: 10)tag
- Filter by specific tagid
- Message identifierbody
- Message payloaddelivery_attempt
- Retry countlocked_until
- Lock expiration
// Filter by tag and limit results
const getTaggedMessages = async () => {
const response = await fetch('https://api.okmq.net/queues/order-processing/messages?tag=order-123&limit=5', {
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
const messages = await response.json()
return messages
}
// Long polling - wait for messages if queue is empty
const longPollMessages = async () => {
const response = await fetch('https://api.okmq.net/queues/order-processing/messages', {
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
// Will wait up to 30 seconds for messages to arrive (fixed timeout)
const messages = await response.json()
return messages
}
// Peek at messages without locking them
const peekMessages = async () => {
const response = await fetch('https://api.okmq.net/queues/order-processing/peek?limit=10', {
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
// Messages are not locked and won't be marked as delivered
const messages = await response.json()
return messages
}
Retrieve detailed information about a specific message using its ID. This returns the complete message structure including delivery metadata.
const getMessage = async (queueName, messageId) => {
const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages/${messageId}`, {
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
if (response.ok) {
const message = await response.json()
console.log('Message details:', message)
return message
} else if (response.status === 404) {
console.log('Message not found')
return null
}
}
// Example response
{
"id": "order-123",
"body": {
"orderId": "12345",
"customerId": "user-456",
"amount": 99.99
},
"tag": "order-processing",
"auto_ack": false,
"acked": null,
"created": "2024-01-01T10:00:00Z",
"delivery_attempt": 2,
"delivery_time": "2024-01-01T10:05:00Z",
"lock_duration": 30,
"locked_until": "2024-01-01T10:35:00Z",
"max_delivery_attempts": 3,
"ttl": "2024-01-01T11:00:00Z",
"updated": "2024-01-01T10:05:00Z"
}
id
- Message identifierbody
- Message payloadtag
- Message tagauto_ack
- Auto acknowledgment settingacked
- Acknowledgment timestamp (if acked)delivery_attempt
- Current retry countdelivery_time
- Next delivery timelocked_until
- Lock expiration timecreated
- Message creation timeupdated
- Last modification timeControl when messages are considered successfully processed with acknowledgment modes.
When auto_ack: true
is set, messages are automatically acknowledged as soon as they're delivered to a consumer.
This is perfect for fire-and-forget scenarios where you don't need delivery guarantees, such as logging or notifications.
With auto_ack: false
, you must explicitly acknowledge messages after successful processing.
Failed messages (NACK with ack: false
) will be retried using the queue's configured retry strategy,
unless you override the timing with a custom delivery_time
.
// Manual acknowledgment example
const response = await fetch(`${host}/queues/orders/messages`, {
headers: { authorization: `Bearer ${token}` }
})
const messages = await response.json()
// Process each message
for (const message of messages) {
try {
// Process the message
await processOrder(message.body)
// Acknowledge successful processing
await fetch(`${host}/queues/orders/ack`, {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
},
body: JSON.stringify([{
id: message.id,
ack: true
}])
})
} catch (error) {
console.error('Processing failed:', error)
// Option 1: Let queue retry strategy handle timing
// Just don't acknowledge - message will retry automatically
// Option 2: NACK to use queue's retry strategy
await fetch(`${host}/queues/orders/ack`, {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
},
body: JSON.stringify([{
id: message.id,
ack: false // Uses queue's retry strategy (exponential, linear, etc.)
}])
})
// Option 3: Override with custom retry time
await fetch(`${host}/queues/orders/ack`, {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
},
body: JSON.stringify([{
id: message.id,
ack: false,
delivery_time: new Date(Date.now() + 30000).toISOString() // Custom: retry in 30 seconds
}])
})
}
}
Use tags for client-side processing and partitioning. Tags allow clients to fetch only messages with a specific tag, enabling selective message processing.
// Send messages with tags
const sendOrderMessages = async () => {
await fetch('https://api.okmq.net/queues/order-processing/messages', {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json',
},
body: JSON.stringify([
{
id: 'order-001-payment',
body: { orderId: '001', action: 'payment' },
tag: 'order-001' // Tag for order 001 messages
},
{
id: 'order-001-shipping',
body: { orderId: '001', action: 'shipping' },
tag: 'order-001' // Same tag for order 001
},
{
id: 'order-002-payment',
body: { orderId: '002', action: 'payment' },
tag: 'order-002' // Different tag for order 002
}
])
})
}
Different consumers can process specific message types by filtering on tags.
// Email consumer fetches only email messages
fetch('/queues/notifications/messages?tag=email')
// SMS consumer fetches only SMS messages
fetch('/queues/notifications/messages?tag=sms')
Separate high and low priority messages for different processing workflows.
// High priority consumer
fetch('/queues/orders/messages?tag=urgent')
// Normal priority consumer
fetch('/queues/orders/messages?tag=normal')
Schedule messages for future delivery using the delivery_time
field.
// Send a delayed message
const sendDelayedMessage = async () => {
const deliveryTime = new Date(Date.now() + 3600000) // 1 hour from now
await fetch('https://api.okmq.net/queues/notifications/messages', {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json',
},
body: JSON.stringify([
{
id: 'reminder-123',
body: {
type: 'reminder',
message: 'Your trial expires tomorrow'
},
delivery_time: deliveryTime.toISOString()
}
])
})
}
Schedule follow-up emails and notifications
Spread API calls over time
Process during off-peak hours
Send messages to topics for fan-out delivery to multiple queues. Any queue subscribed to a topic will receive copies of messages sent to that topic.
// Send message to topic - delivered to all subscribed queues
const sendToTopic = async (topicName, messageData) => {
const response = await fetch(`https://api.okmq.net/topics/${topicName}/messages`, {
method: 'POST',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json',
},
body: JSON.stringify([
{
id: `notification-${Date.now()}`,
body: messageData,
tag: 'broadcast'
}
])
})
if (response.ok) {
console.log('Message sent to all subscribers')
}
}
// Example: Send order update to all relevant services
await sendToTopic('order-updates', {
orderId: '12345',
status: 'shipped',
trackingNumber: 'ABC123',
timestamp: new Date().toISOString()
})
Queues subscribe to topics when created by including them in the topics array.
// Queue subscribes to multiple topics
{
name: 'notification-service',
topics: ['order-updates', 'user-events'],
auto_ack: false
}
Notify multiple services of important events
Send alerts via email, SMS, and push simultaneously
Keep distributed services synchronized
Remove messages from queues using DELETE requests. You can delete all messages, only acknowledged messages, or only failed messages.
// Delete all messages from a queue
const deleteAllMessages = async (queueName) => {
const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages`, {
method: 'DELETE',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
if (response.ok) {
console.log('All messages deleted')
}
}
// Delete only acknowledged messages
const deleteAckedMessages = async (queueName) => {
const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages?type=acked`, {
method: 'DELETE',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
}
// Delete only failed messages (exceeded max delivery attempts)
const deleteFailedMessages = async (queueName) => {
const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages?type=failed`, {
method: 'DELETE',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
}
// Delete messages with specific tag
const deleteTaggedMessages = async (queueName, tag) => {
const response = await fetch(`https://api.okmq.net/queues/${queueName}/messages?tag=${tag}`, {
method: 'DELETE',
headers: {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
})
}
type
- Message type to delete (all, acked, failed)tag
- Delete only messages with specific tagall
- All messages in queue (default)acked
- Successfully processed messagesfailed
- Messages that exceeded retry limits