Building a Taskrunner with Node and RxJS

Are you a lazy person? I sure am. A common “problem” I have, is that I often need to run multiple programs/processes from command line. Perhaps you have a bunch of servers that you need up and running. But starting a new terminal window for each program is tedious. What if we could start and stop everything with a simple script? And what if we could get all console output inside one single terminal, and perhaps highlight output from each program in a different color? Can we transform these log streams into RxJS observables and manipulate the data? Let’s find out!

Launch an application (process)

NodeJS has a module called child_process that contain some useful functions to start processes. We can use exec to execute an arbitrary command in a separate process, and capture the output.

const exec = require('child_process').exec;
const cmd  = 'npm start'; // Some long-running process
const proc = exec(cmd, {maxBuffer: 1024 * 1024 * 10 }, (err, stdout, stderr) => {
  // called with the output when process terminates
);
proc.stdout.on('data', (data) => {
  // Handle data
});
proc.stderr.on('data', (data) => {
  // Handle data
});

We set the maxBuffer option to 10MB to make sure our buffer is big enough. This is useful for long-running processes. Tweak this value as needed.

Creating observables

We want to transform our process streams into an RxJS Observable. This is very easy with RxJS, because there is a built in function that can help us with that: Rx.Observable.fromEvent. We also want to combine the stdout and stderr streams into a single observable. We can do this with the Rx.Observable.merge function.

const Rx = require('rx');

const processStream = Rx.Observable.merge(
  Rx.Observable.fromEvent(proc.stdout, 'data'),
  Rx.Observable.fromEvent(proc.stderr, 'data')
);

Processing log statements

We want to convert the data we get from stdout and stderr into a stream of log statements (lines). We can use the map function to transform the data into an array of lines by splitting the strings by linefeed character. Then, we need to “flatten” the array of log messages into a stream of log messages. For this we use flatMap together with Rx.Observable.from. Finally, we are only interested in lines that contain something, so we filter out empty lines.

const logStream = processStream
  .map(data.toString().trim().split('\n')) // extract lines
  .flatMap(messages => Rx.Observable.from(messages)) // flatten array
  .filter(msg => msg.length > 0)
  .distinctUntilChanged();

Subscribing to our stream

We are now ready to test our stream. Let’s subscribe to it!

logStream.subscribe((data) => {
  console.log(data);
});

Multiple processes

So far we only have one process. Not that fun right? We want to be able to start multiple processes and combine the output into a single stream. Let’s define an array that contain the applications we want to start.

const applications = [
  {
    name: 'first app',
    cwd: 'some path', // from where to run command
    commands: ['npm start']
  },
  {
    name: 'second app',
    cwd: 'another path',
    // this app has two processes 
    commands: [
      'npm run webServer'
      'npm run apiServer'
    ]
  }
];

Now, lets create some functions that will create a single stream from from the applications above.


const getCommandStream = (cmd) => {
  const proc = exec(cmd, {maxBuffer:1024 * 1024 * 10});
  return commandStream = Rx.Observable.merge(
    Rx.Observable.fromEvent(proc.stdout, 'data'),
    Rx.Observable.fromEvent(proc.stderr, 'data')
  )
  .map(data.toString().trim().split('\n')) // extract lines
  .flatMap(messages => Rx.Observable.from(messages)) // flatten array
  .filter(msg => msg.length > 0)
  .distinctUntilChanged();
};

// For each command in the app, create instructions how to start
// 1) go to directory
// 2) start command
const getAppCommands = (app) => app.commands.map(cmd => [
    'cd', app.cwd,
    '&&', cmd
  ].join(' ');

const appStream = (app) => Rx.Observable.from(
  getAppCommands(app).map(cmd => getCommandStream(cmd))
).flatMap(s => s); // flatten array

const applicationsStream = Rx.Observable.from(
  applications.map(app => appStream(app)
).flatMap(a => a); // flatten array
  
applicationStream.subscribe((data) => console.log(data));

Add some colors!

Another fun thing we can do is assign specific colors to each app, and make the log output appear in that color. We just need to modify the script a little bit.

const colors = require('colors');
const stripcolorcodes = require('stripcolorcodes');

We use the colors module to output text with a specific color, and we use stripcolorcodes module to strip off existing colors (if any) from the log messages, to avoid problems.

Update the application items to include a color:

const applications = [
  {
    name: 'first app',
    color: 'green',
    // ...
  },
  {
    name: 'second app',
    color: 'blue',
    // ...
  }
];

Next, we update the map function in the getCommandStream function to remove existing colors, and add another map function to colorize the message:

const getCommandStream = (cmd, app) => {
 // ...
  .map(stripcolorcodes(data.toString()).trim().split('\n')) // extract lines
  .flatMap(messages => Rx.Observable.from(messages)) // flatten array
  .map(msg => colors[app.color](msg)) // add some color!
 // ...
};

We also need to pass in the app as an argument to getCommandStream:

const appStream = (app) => Rx.Observable.from(
  getAppCommands(app).map(cmd => getCommandStream(cmd, app))
).flatMap(s => s); // flatten array

Final thoughts

There are many more cool things you can do to improve this script:

  • Add timestamps (I recomment moment.js for this)
  • Filter out data you are not interested in

That pretty much wraps it up. Hope you find it useful :)