Processing a Message Queue and Using Async

Problem

I wrote a small test node app that loops and adds messages to a queue (azure storage queue), something like this:

var queueService = azure.createQueueService();
var queueName = 'taskqueue';

// other stuff like check if created

// loop called after queue is confirmed

for (i=0;i<1000;i++){
  queueService.createMessage(queueName, "Hello world!", null, messageCreated);
}

// messageCreated does nothing at the moment, just logs to console

I'm trying to rewrite that to handle say 1 million create's using async to control the number of worker functions that are run in parallel. This is a learning exercise more than anything.

https://github.com/caolan/async#queue

This is the basic setup for async's queue and I'm kind of at a loss as to what I need to change. I don't think the below will work:

var q = async.queue(function (task, callback) {
   queueService.createMessage(queueName, task.msg, null, messageCreated);
    callback();
}, 100);


// assign a callback.  Called when all the queues have been processed
q.drain = function() {
    console.log('all items have been processed');
}

// add some items to the queue
for(i=0;i<1000000;i++) {
  q.push({msg: 'Hello World'}, function (err) {
    console.log('finished processing foo');
  });
    console.log('pushing: ' + i);
}

I'm not quite grasping how to pull it all together with async.

Problem courtesy of: lucuma

Solution

Here's your error:

var q = async.queue(function (task, callback) {
   queueService.createMessage(queueName, task.msg, null, messageCreated);
    callback();
}, 100);

What you are doing is creating a message on the queue, and immediately afterwards pass the continuation (calling the callback). What you want to do is to pass continuation inside the callback passed to createMessage:

var q = async.queue(function (task, callback) {
   queueService.createMessage(queueName, task.msg, null, function(error, serverQueue, serverResponse) {
       callback(error, serverQueue, serverResponse);
       messageCreated(error, serverQueue, serverResponse);
   });
}, 100);

Now each task will report finishing after the task has actually been created.

Edit: Updated interface for the createMessage callback.

Solution courtesy of: Linus Gustav Larsson Thiel

Discussion

There is currently no discussion for this recipe.

This recipe can be found in it's original form on Stack Over Flow.