Promises in a broker-client communication using 0MQ

Problem

I am attempting to develop a broker that serves as a proxy for communication between some workers and clients using 0MQ. The task was relatively simple to do without promises, but due to the fact that I am inexperienced with the use of promises I am unable to understand how to implement promises in this example.

Code for Broker:

//Broker that serves as proxy for workers and clients
var zmq = require('zmq');
var frontend = zmq.socket('router');
var backend = zmq.socket('router');
var Q = require('q');

frontend.bindSync('tcp://*:8001');
backend.bindSync('tcp://*:8002');

var frontendOn = Q.nbind(frontend.on, frontend);
var backendOn = Q.nbind(backend.on, backend);

var requestFrontend = frontendOn('message').then(function(){
console.log("Message received");
});
var requestBackend = backendOn('message').then(responseBackend);

...

Code for client:

//Client program that communicates with broker

var zmq = require('zmq')
var requester = zmq.socket('req');
var Q = require('q');

var arguments = process.argv.splice(2);

//Connect with broker
requester.connect(arguments[0]); 

console.log("Connected successfully to broker");

//Send message to broker
requester.send(arguments[1]);   

console.log("Message sent to broker");

...

The client connects to the broker, sends the message but the message is not processed by the broker. Any ideas as to what I'm doing wrong? Any help would be appreciated.

Problem courtesy of: ashe540

Solution

I haven't worked with ZeroMQ but from the docs I assume that on seems to be an event subscription mechanism, and not a NodeJS-style asynchronous operation accepting callback. It may fire more than once, right?

If this is the case, why do you want to use promises at all? They seem like the wrong abstraction for this particular situation.

Promises represent operations that finish or fail once, not asynchronous streams of values.

Even if

var requestFrontend = frontendOn('message').then(function(){
  console.log("Message received");
});

worked, it wouldn't be of any benefit to you because then would only be called once.
I assume that's not what you wanted.

If you're looking for a way to filter, map, merge or throttle asynchronous sequences, you can look into RxJS which does exactly that (it also interfaces with promises). But promises shouldn't, and can't be used as substitute for events that fire more than once.


On the other hand, if the message really does come only once, I agree it's best to use promises to abstract it away. However, nfbind or nbind won't work here because they require function(err, result)-style callbacks, and you have function(result).

Here's what I suggest you use instead:

function promiseOneMessage(queue) {
  var deferred = Q.defer();

  queue.on('message', deferred.resolve);
  queue.on('error', deferred.reject);

  return deferred.promise;
}

var requestFrontend = promiseOneMessage(frontend)
  .then(function (message) {
    console.log("Message received", message);
  })
  .done();

var requestBackend = promiseOneMessage(backend)
  .then(responseBackend)
  .done();
Solution courtesy of: Dan Abramov

Discussion

There is currently no discussion for this recipe.

This recipe can be found in it's original form on Stack Over Flow.