RabbitMQ for Node.js in 30 steps

This is a simple guide to RabbitMQ patterns in MacOS using Node.js based on RabbitMQ Tutorials. The steps on this guide may also be applied to other operating systems but be aware that installation and running of RabbitMQ binaries and services could be different. In a nutshell, this guide covers installation, execution and basic configuration of the RabbitMQ service in Node.js.

Contents

Getting Started

  1. Install RabbitMQ
  2. Mac OS Brew install issues
  3. Start RabbitMQ Service

Consumer and Producer Pattern

  1. Create and run send.js
  2. Create and run receive.js
  3. Listing Queues

Work Queue Pattern (Round-robin Dispatcher)

  1. Create new_task.js
  2. Create worker.js
  3. Running worker.js
  4. Running new_task.js
  5. Tasks running in sequence
  6. Message acknowledgements
  7. Message durability
  8. Check unacknowledged messages
  9. Message persisence
  10. Fair dispatch
  11. Summary

Publish and Subscribe Pattern - Fanout

  1. Create emit_log.js
  2. Create receive_logs.js
  3. Runningreceive_logs.js
  4. Running emit_log.js
  5. Messages published to subscribers
  6. Check bindings
  7. Check types of exchanges
  8. Summary

Publish and Subscribe Pattern - Direct

  1. Create emit_log_direct.js
  2. Create receive_logs_direct.js
  3. Runningreceive_logs_direct.js
  4. Running emit_log_direct.js
  5. Summary

Getting Started

1. Install RabbitMQ

  • https://www.rabbitmq.com/install-homebrew.html

  • brew install rabbitmq

2. Mac OS Brew install issues

  • As of writing, I expect that you will also have this issue:

Issue

Error: The `brew link` step did not complete successfully
The formula built, but is not symlinked into /usr/local
Could not symlink sbin/cuttlefish
/usr/local/sbin is not writable.

Fix

cd /usr/local
sudo mkdir sbin
sudo chown -R glenn:admin sbin
brew link rabbitmq

3. Start RabbitMQ Service

  • brew services start rabbitmq

Consumer and Producer Pattern

1. Create and run send.js

This will send messages to a queue.

#!/usr/bin/env node
const amqp = require('amqplib/callback_api')

// Create connection
amqp.connect('amqp://localhost', (err, conn) => {
  // Create channel
  conn.createChannel((err, ch) => {
    // Name of the queue
    const q = 'hello'
    // Declare the queue
    ch.assertQueue(q, { durable: false })

    // Send message to the queue
    ch.sendToQueue(q, new Buffer('Hello World!'))
    console.log(" {x} Sent 'Hello World'")

    // Close the connection and exit
    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 500)
  })
})
# Terminal
sudo chmod 755 send.js
./send.js

2. Create and run receive.js

This will receive messages from a queue.

#!/usr/bin/env node
const amqp = require('amqplib/callback_api')

// Create connection
amqp.connect('amqp://localhost', (err, conn) => {
  // Create channel
  conn.createChannel((err, ch) => {
    // Name of the queue
    const q = 'hello'
    // Declare the queue
    ch.assertQueue(q, { durable: false })

    // Wait for Queue Messages
    console.log(` [*] Waiting for messages in ${q}. To exit press CTRL+C`)
    ch.consume( q, msg => {
        console.log(` [x] Received ${msg.content.toString()}`)
      }, { noAck: true }
    )
  })
})
# Terminal
sudo chmod 755 receive.js
./receive.js

3. Listing Queues

This will list all queues present in the RabbitMQ service

/usr/local/sbin/rabbitmqctl list_queues

Work Queue Pattern (Round-robin Dispatcher)

1. Create new_task.js

This will send a message to the next available queue.

#!/usr/bin/env node
const amqp = require('amqplib/callback_api')

// Create connection
amqp.connect('amqp://localhost', (err, conn) => {
  // Create channel
  conn.createChannel((err, ch) => {
    // Name of the queue
    const q = 'task_queue_durable'
    // Write a message
    const msg = process.argv.slice(2).join(' ') || "Hello World!"

    // Declare the queue
    ch.assertQueue(q, { durable: true }) // { durable: true } ensures that the message will still be redelivered even if RabbitMQ service is turned off/restarted

    // Send message to the queue
    ch.sendToQueue(q, new Buffer(msg), {persistent: true}) // {persistent: true} saves the message to disk/cache
    console.log(` {x} Sent '${msg}'`)

    // Close the connection and exit
    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 500)
  })
})

2. Create worker.js

This will consume messages when it’s the next worker in the round-robin dispatch.

#!/usr/bin/env node
const amqp = require('amqplib/callback_api')

// Create connection
amqp.connect('amqp://localhost', (err, conn) => {
  // Create channel
  conn.createChannel((err, ch) => {
    // Name of the queue
    const q = 'task_queue_durable'
    // Declare the queue
    ch.assertQueue(q, { durable: true }) // { durable: true } ensures that the message will still be redelivered even if RabbitMQ service is turned off/restarted
    // Tell RabbitMQ not to give more than 1 message per worker
    ch.prefetch(1)

    // Wait for Queue Messages
    console.log(` [*] Waiting for messages in ${q}. To exit press CTRL+C`)
    ch.consume( q, msg => {
        // Just to simulate a fake task, length of dots in the message
        // is the number of secs the task will run
        const secs = msg.content.toString().split('.').length - 1

        console.log(` [x] Received ${msg.content.toString()}`)
        console.log(` [x] Task will run for ${secs} secs`)
        
        // Fake task which simulates execution time
        setTimeout(() => {
          console.log(` [x] Done ${msg.content.toString()}`);
          // Send acknowledgment
          ch.ack(msg)
        }, secs * 1000)

      }, { noAck: false } // noAck: false means Message acknowledgments is turned on
      // When message acknowledgements are turned on, even if a worker.js is killed (Ctrl+C)
      // while processing a message, it will be redelivered
    )
  })
})

3. Open two terminals and run worker.js on each

# terminal 1
sudo chmod 755 worker.js
./worker.js
# => [*] Waiting for messages. To exit press CTRL+C
# terminal 2
./worker.js
# => [*] Waiting for messages. To exit press CTRL+C

4. Open a third terminal and run several new_task.js

sudo chmod 755 new_task.js
./new_task.js First message.
./new_task.js Second message..
./new_task.js Third message...
./new_task.js Fourth message....
./new_task.js Fifth message.....

5. The tasks will run in sequence for each worker.js currently running.

6. Message acknowledgements in worker.js.

This step was already done so there is nothing else to change anything in worker.js.

{ noAck: false } // noAck: false means Message acknowledgments is turned on
// When message acknowledgements are turned on, even if a worker.js is killed (Ctrl+C)
// while processing a message, it will be redelivered

7. Message durability in both worker.js and new_task.js.

This step was already done so there is nothing else to change anything in worker.js.

ch.assertQueue(q, { durable: true }) // { durable: true } ensures that the message will still be redelivered even if RabbitMQ service is turned off/restarted
// Send acknowledgment
ch.ack(msg)

8. Check unacknowledged messages.

To test this, comment out ch.ack(msg). Messages will not be acknowledge with this commented.

/usr/local/sbin/rabbitmqctl list_queues name messages_ready messages_unacknowledged

9. Message persistence in new_task.js.

As requirement to #7, persistence needs to be set to true.

ch.sendToQueue(q, new Buffer(msg), {persistent: true}) // {persistent: true} saves the message to disk/cache

10. Fair dispatch (consumer queue size) in worker.js

// Tell RabbitMQ not to give more than 1 message per worker
ch.prefetch(1)

11. In summary:

  • The queue is now capable of round robin dispatch of messages.
  • Message acknowledgement is turned on which means that if a worker dies, the message will be redelivered if not already acknowledged.
  • Message persistence is turned on which means that if the RabbitMQ is killed or restarted, the message will still be written on disk or cache.
  • Fair dispatch is enabled which means, the consumer will not process more than X messages per worker at the risk of filling up a queue.

Publish and Subscribe Pattern - Fanout

1. Create emit_log.js

This will publish messages to an exchange and deliver the messages to all queues bound to that exchange.

#!/usr/bin/env node
const amqp = require('amqplib/callback_api')

// Create connection
amqp.connect('amqp://localhost', (err, conn) => {
  // Create channel
  conn.createChannel((err, ch) => {
    // Name of the exchange
    const ex = 'logs'
    // Write a message
    const msg = process.argv.slice(2).join(' ') || "Hello World!"

    // Declare the exchange
    ch.assertExchange(ex, 'fanout', { durable: false }) // 'fanout' will broadcast all messages to all the queues it knows

    // Send message to the exchange
    ch.publish(ex, '', new Buffer(msg))  // '' empty string means that message will not be sent to a specific queue
    console.log(` {x} Sent '${msg}'`)

    // Close the connection and exit
    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 500)
  })
})

Major differences from previous pattern new_task.js

  1. We are now declaring an exchange instead of a queue
// Name of the exchange
const ex = 'logs'
// Declare the exchange
ch.assertExchange(ex, 'fanout', { durable: false }) // 'fanout' will broadcast all messages to all the queues it knows
  1. We now publish messages instead of sending a message directly to a queue
// Send message to the exchange
ch.publish(ex, '', new Buffer(msg))  // '' empty string means that message will not be sent to a specific queue

2. Create receive_logs.js

Every instance of this will subscribe to an exchange and receive messages from the queue.

#!/usr/bin/env node
const amqp = require('amqplib/callback_api')

// Create connection
amqp.connect('amqp://localhost', (err, conn) => {
  // Create channel
  conn.createChannel((err, ch) => {
    // Name of the exchange
    const ex = 'logs'
    // Declare the exchange
    ch.assertExchange(ex, 'fanout', { durable: false }) // 'fanout' will broadcast all messages to all the queues it knows

    // Declare the queues
    ch.assertQueue('', {exclusive: true}, (err, q) => {
      // Wait for Queue Messages
      console.log(` [*] Waiting for messages in ${q}. To exit press CTRL+C`)
      // Tell exchange to send messages to queue
      ch.bindQueue(q.queue, ex, '')
      // Consume queue messages
      ch.consume(q.queue, msg => {
        console.log(` [x] ${msg.content.toString()}`)
      }, {noAck: true})
    })
  })
})

Major differences from previous pattern worker.js

  1. We are now declaring an exchange instead of a queue
// Name of the exchange
const ex = 'logs'
// Declare the exchange
ch.assertExchange(ex, 'fanout', { durable: false }) // 'fanout' will broadcast all messages to all the queues it knows
  1. We create a non-durable queue with a generated queue name by specifying an empty string.
ch.assertQueue('', {exclusive: true}, (err, q) => {
  1. We bind the exchange to the queue
// Tell exchange to send messages to queue
ch.bindQueue(q.queue, ex, '')
  1. The consumer consumes messages to every queue in the exchange (i.e. Sent to all running queues)
// Consume queue messages
ch.consume(q.queue, msg => {
  console.log(` [x] ${msg.content.toString()}`)
}, {noAck: true})

3. Open two terminals and run receive_logs.js for each.

#Terminal 1
sudo chmod 755 receive_logs.js
./receive_logs.js
#Terminal 2
./receive_logs.js

4. Open a third terminal and run emit_log.js.

#Terminal 3
sudo chmod 755 emit_log.js
./emit_log.js

5. The message sent by emit_log.js will be sent to all running receive_logs.js.

6. To check all bindings (i.e. Exchanges bound to queues):

/usr/local/sbin/rabbitmqctl list_bindings

7. Aside from fanout, there are many other types of exchanges.

Check them by typing the following in your terminal:

/usr/local/sbin/rabbitmqctl list_exchanges

8. In summary:

  1. We created subscribers that creates queues with randomly generated names and bind those to an exchange.
  2. We created a publisher that sends messages to an exchange and consumes the message to all queues bound to that exchange.

Publish and Subscribe Pattern - Direct

1. Create emit_log_direct.js

#!/usr/bin/env node
const amqp = require('amqplib/callback_api')

// Create connection
amqp.connect('amqp://localhost', (err, conn) => {
  // Create channel
  conn.createChannel((err, ch) => {
    // Name of the exchange
    const ex = 'direct_logs'
    // Store message as args
    const args = process.argv.slice(2)
    // Write a message
    const msg = args.slice(1).join(' ') || "Hello World!"
    // Use severity as the binding key
    const severity = (args.length > 0) ? args[0] : 'info'

    // Declare the exchange
    ch.assertExchange(ex, 'direct', { durable: false }) // 'direct' will broadcast messages to its corresponding binding key (i.e. severity)

    // Send message to the exchange
    ch.publish(ex, severity, new Buffer(msg))  // '' empty string means that message will not be sent to a specific queue
    console.log(` {x} Sent ${severity}: '${msg}'`)

    // Close the connection and exit
    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 500)
  })
})

Major differences from previous pattern emit_log.js

  1. We identify a binding key as part of the run arguments. For this example, we are using severity which can either have a value of info, warning, or error.
// Store message as args
const args = process.argv.slice(2)
// Write a message
const msg = args.slice(1).join(' ') || "Hello World!"
// Use severity as the binding key
const severity = (args.length > 0) ? args[0] : 'info'
  1. The exchange is now declared as direct.
// Declare the exchange
ch.assertExchange(ex, 'direct', { durable: false }) // 'direct' will broadcast messages to its corresponding binding key (i.e. severity)
  1. Publishing the message now uses a binding key severity.
// Send message to the exchange
ch.publish(ex, severity, new Buffer(msg))  // '' empty string means that message will not be sent to a specific queue
console.log(` {x} Sent ${severity}: '${msg}'`)

2. Create receive_logs_direct.js

#!/usr/bin/env node
const amqp = require('amqplib/callback_api')

// Store message as args
const args = process.argv.slice(2);

// Choose what type of binding key `receive_logs_direct.js` is going to use
// If no arguments were provided, output instructions
if (args.length == 0) {
  console.log("Usage: receive_logs_direct.js [info] [warning] [error]");
  process.exit(1);
}

// Create connection
amqp.connect('amqp://localhost', (err, conn) => {
  // Create channel
  conn.createChannel((err, ch) => {
    // Name of the exchange
    const ex = 'direct_logs'
    // Declare the exchange
    ch.assertExchange(ex, 'direct', { durable: false }) // 'direct' will broadcast messages to its corresponding binding key (i.e. severity)

    // Declare the queues
    ch.assertQueue('', {exclusive: true}, (err, q) => {
      // Wait for Queue Messages
      console.log(` [*] Waiting for messages in ${q}. To exit press CTRL+C`)
      // For each binding key, tell exchange to send messages to queue
      args.forEach( severity => {
        ch.bindQueue(q.queue, ex, severity)
      })
      
      // Consume queue messages
      ch.consume(q.queue, msg => {
        console.log(` [x] ${msg.fields.routingKey}: ${msg.content.toString()}`)
      }, {noAck: true})
    })
  })
})

Major differences from previous pattern receive_logs.js

  1. We now require the user to specify a binding key as part of the run arguments. If no binding key is provided, an instruction will be outputted to the user.
// If no arguments were provided, output instructions
if (args.length == 0) {
  console.log("Usage: receive_logs_direct.js [info] [warning] [error]");
  process.exit(1);
}
  1. The exchange is now declared as direct.
// Declare the exchange
ch.assertExchange(ex, 'direct', { durable: false }) // 'direct' will broadcast messages to its corresponding binding key (i.e. severity)
  1. The exchange is bound to queues by binding key
// For each binding key, tell exchange to send messages to queue
args.forEach( severity => {
  ch.bindQueue(q.queue, ex, severity)
})
  1. The output message now outputs the routing key (equivalent value to binding key)
// Consume queue messages
ch.consume(q.queue, msg => {
  console.log(` [x] ${msg.fields.routingKey}: ${msg.content.toString()}`)
}, {noAck: true})

3. Open 2 terminals to run receive_logs_direct.js for each routing key:

# Terminal 1 - Will display info messages only
chmod 755 receive_logs_direct.js
./receive_logs_direct.js info
# Terminal 2 - Will display warning and error messages
./receive_logs_direct.js warning error

4. Open another terminal to run emit_log_direct.js

# Terminal 3
chmod 755 emit_log_direct.js
./emit_log_direct.js info This is an info message
./emit_log_direct.js warning This is a warning message
./emit_log_direct.js error This is an error message

5. In summary:

  1. We are now publishing and subscribing to messages using a direct exchange.
  2. A direct exchange uses a binding key (which has an equivalent value to a routing key). This key will tell the subscribers to listen only to a specific queue (similar to a channel).
  3. Receiving logs require that a binding key is specified before runtime. The queue will only receive messages from publishers with a similar binding keys which is also specified before runtime.