Kinetik

A storage-agnostic, tag centric
job queue for distributed applications.

View the Project on GitHub

qualiancy/kinetik

Features

  • storage-agnostic, thanks to seed
  • distribute workers across processes and servers
  • tag and priority based processing
  • middleware capable
  • cli management suite
  • job delays
  • per-job logging
  • chainable api

Philosophy

Simple tasks such as sending transactional emails and mobile notifications should be handled as quickly as possible. Other tasks such as database analytics, adaptive re-keys, and media encoding should occur when their are dedicated resources available. Kinetik's goal is to provide the framework necissary to handle both ends of the spectrum with simplicity and agility so your app can keep moving.

What is Seed?

Seed is a suite of components (Graph, Model, Schema, Hash) that provide a common API for working with JSON-style documents, regardless of the storage engine. By using Seed, a Kinetik queue can be stored using any of Seed's storage adapters.

Visit the Seed project page to learn more, and see alternative storage solutions. Seed API documentation is in the works, so hold steady.

Kinetik currently supports Redis or MongoDB. Stay tuned for developments regarding Riak and CouchDB. Need something else? The Seed Storage API will make it easy to integrate with your storage engine of choice.

Quick Start Guide

Installation

kinetik package is available for node through npm.

npm install kinetik

Choosing a Storage Engine

The first step to using Kinetik is creating a new queue using one of Seed's storage engines. In this example we will use Redis with a default configuration.

var kinetik = require('kinetik')
  , RedisStore = require('seed-redis')
  , queue = kinetik(new RedisStore());

You can use any of Seed's available storage adapters. See that adapater's documentation for relevant configuration options. Using a store is optional. If one is not provided, Kinetik will default to using an in memory store.

Defining Tasks

In Kinetik, a task is a definition on how to process a given job. A job is a specific unit of work, and will encapsulate all of the data passed to it.

Here we are going to define our first task, send email.

queue
  .define('send email')
  .tag('notifications')
  .action(function (job, done) {
    var email = job.data.email
      , subject = job.data.subject;

    myAsyncMailer(email, subject, body, function (err) {
      if (err) return done(err);
      // do more things if needed.
      done();
    });
  });

All task definition require a tag defined. A tag is an arbitrary string that allows us to group common types of jobs together for completion by a single worker process.

The action is the function that will be used invoked to complete each job, it recieves two arguments.

  • The job argument for the action will be comprised of all relevant data needed to complete the task.
  • The done argument for the action accepts an error object as parameter. If an error is sent, it will flag job as failed, otherwise will flag as complete.

Starting the Processor

In the same node process that has the tasks defined, you will need to start processing. Processing is done per tag or group of tags.

queue.process('notifications');

This will begin processing the notifications tag. It will serially process all queued jobs for that tag and then pause, then wait 10 seconds (see API to change), then query for more to process. Multiple calls to process will create multiple "threads" of processing. Avoid including the same tag in more than one processing call.

You can also prioritize multiple tags for processing.

queue.process([ 'notifications', 'db aggregation' ]);

In this scenario, the queue will process all of the notifications, then move on to db aggregation. If a notification is added to the queue while running a db aggregation task, it will return to processing notifications at the completion of the current db task. This behavior will occur even if their are more db tasks to accomplish. In that case, there will be no pause. The pause will only happen after all tags assigned for processing have been emptied.

Adding Jobs

Finally, you will need to create jobs.

queue.create('send email', {
    email: 'ford@hitchhikersguide.com'
  , name: 'Ford Prefect'
  , subject: 'Hello Universe'
});

One noteworthy feature of Kinetik is that jobs do not have to be created with knowledge of the task that will process them. This means that your task processing codebase does not need to be shared with the part of your application that will be creating jobs. As long as jobs are created using the the same storage specifications as your processing queue, Kinetik will work as expected.

Queue API

Queue constructor is the main interaction point for the kinetik module. It should be constructed using a Seed storage engines, and any of the below options.

var kinetik = require('kinetik')
  , RedisStore = require('seed-redis')
  , queue = kinetik(new RedisStore(), { interval: '1m' });
Options
  • interval [10s]: wait time after at tag or group of tags has been emptied
Events

A queue is also an event emitter, which will relay information about task status and the state of the jobs being added to the queue. The following events can be listened for:

  • error - (err) - on any database related errors
  • drain - () - when a stack of jobs have been saved to the db
  • empty - (tags) - after processing a set of tags, prior to pausing for interval duration
  • job:define - (job) - when a job has been created and successfully saved. In this case, job is a constructed Seed model.
  • task:define - (task) - when a task has been defined

.tags

A property that can be used to get a list of all of the tags for tasks attached to the current queue instance. Cannot be set.

console.log(queue.tags); // [ 'messaging' ]

.configure ([env,] fn)

  • @param{ String }environmentto check (optional)
  • @param{ Function }functionexecute if match

Run certain function only if NODE_ENV process environment variable matches defined env.

queue.configure('development', function () {
  queue.use(kinetik.cli());
});

.use (fn)

  • @param{ Function }pluginhandle

Load a specific plugin and attach it this queue instance. Argument can either be a function or an object with a method called handle. The parameter passed to the function will be the current queue. Useful for code re-use.

queue.use(function (q) {
  q.on('error', function (err) {
    console.log(err.message);
  });
});

.define (name)

  • @param{ String }name

Creates a new task definition for this queue. See the Task API for how to further configure a task. Do not use the character : in your task names as they are reserved for internal event namespacing.

var sendEmail = queue.define('send email');

.create (task, data[, delay])

  • @param{ String }task
  • @param{ Object }datato pass to runner
  • @param{ Number | String }delaytime specification

Queue up a job with the given data attributes for a given task. A delay can also be provided to prevent a job from being processes until after a given timeframe. Delay can either be in milliseconds, or a string in the appropriate format.

queue.create('send email', {
    email: 'ford@hitchhikersguide.com'
  , name: 'Ford Prefect'
  , subject: 'Welcome to the Guide'
}, '1d');

Stringed timestamps can be in any of the following formats: 10ms, 30s, 45m, 2h, 6d.

The job that is created will be saved asyncronously. If you wish to know when all created jobs in the current scope have been saved, you can listen for the queue drain event.

.process (tags, [interval])

  • @param{ Array }tagsin order of priority
  • @param{ String | Number }timeto wait between checks

Start a process loop for a given set of tags. An interval can also be provided to override the queue interval as the delay between checking for more jobs on the event of all tags being empty.

// use default interval (10s)
queue.process('urgent');

// check every second
queue.process('long', 1000);

// check every 10 minutes
queue.process([ 'messaging', 'dbaggr' ], '10m');

For more information on the behavior of queue.process, see the Quick Start guide on "Starting the Processor".

.fetch (query, callback)

  • @param{ Object }query
  • @param{ Function }callbackwill return params err||null, graph.

Allows for direct access to Seed Graph of of jobs for data manipulation. Useful if you want to retrieve jobs that were previously scheduled or completed. Also, used internally for various tasks, such as cleaning or stats.

No matter which store you are using, the query should be formatted according to Seed's query language, which is a MongoDB compatible, object based query language.

The returned jobs parameter in the callback is a Seed.Hash instance. Here is a sample map/reduce pattern that will get the count of queued jobs by task:

queue.fetch({ status: 'queued' }, function (err, jobs) {
  if (err) throw err;

  // aggregate by task name
  function map (key, value, emit) {
    var task = value.get('task');
    emit(task, 1);
  }

  // reduce by count of emit's per task
  function reduce (key, value) {
    return value.length;
  }

  // task welcome email has [42] queued jobs
  function display (value, key) {
    console.log('task %s has [%d] queued jobs', key, value);
  }

  // run it
  jobs
    .mapReduce(map, reduce)
    .each(display);
});

.cancel (job, callback)

  • @param{ Job | ObjectId }jobinstance or id
  • @param{ Function }callbackupon completion (optional)

Cancel a specific job. Accepts either job instance of Seed.Model or id of job. If you cancel jobs using the method on the same constructed queue in which the job's task is defined, a cancelled event will be emitted to that task.

var id = job.id;
queue.cancel(job, function (err) {
  if (err) throw err;
  console.log('job %j has been cancelled', id);
});

.clean (tags, callback)

  • @param{ String | Array }tags(optional)
  • @param{ Function }callback

Remove all complete tasks for given (optional) tags. Will call callback upon completion. If no tags are specified, all tags will be cleaned. Only removes jobs which have a completed status; jobs which have timed out or failed will still remain.

queue.clean('messaging', function (err) {
  if (err) throw err;
  console.log('all completed messaging jobs have been removed');
});

Task API

When using the define method from your Queue, you will be returned a new instance of a Task. In Kinetik, the purpose of a task is to define how to process a given job.

var task = queue.define('email.welcome')
  .tag('messaging')
  .action(function (job, done) {
    var locals = { name: job.data.name };
    render('./emails/welcome.jade', locals, function (err, html) {
      if (err) return done(err);
      asyncMailer(email, html, function (err, transId) {
        if (err) return done(err);
        job.log('Email sent', { receipt: transId });
        done();
      });
    });
  });
Necessities

Each task is created with a unique name that will be referenced when you create jobs. This is completely arbitrary, but avoid using : as a namespace delimeter, as Kinetik uses that internally.

The tag is used to group the jobs for the queue processor. These are arbitrary keywords and can be as simple as priorities, such as normal and important OR task groupings such as messaging and db aggregation.

The action is the function that is to be executed to process the job. Actions are asyncronous by nature and are provide a number of helper methods from Kinetik (more later).

Events

A task is also an event-emitter which will emit the following events for each job that is processed through a task.

  • complete - (job) - when a job has completed
  • fail - (err, job) - when a job has failed errored
  • timeout - (job) - when a job has timed out
  • log - (job, line) - when a job has logged a line of data
  • progress - (job, current, total) - when the progress helper is emitted

In all of the events, the job parameter provided is a job model that can be updated and saved to your data store. This allows you the flexibility to:

  • Provide your own re-queue methodology.
  • Delete jobs that have been completed.

You should, however, avoid updating the log model during a log or progress event as the action is still being processed. The status events occur after the results of the job have been committed to the database.

.tag (tag)

  • @param{ String }tag

Tag a task as belonging to a given tag. All tasks require a tag to be found by the queue processor, but tasks will use default until this is defined.

task.tag('urguent')

.desc (description)

  • @param{ String }description

Provide a short description about the task being defined. This can be used by middleware.

.timeout (duration)

  • @param{ Number | String }duration

Provides a timeout duration for the action. If this time length has been reached an action will be marked as timeout. If no timeout value is defined, an action can run indefinately. Even though the action function is still running in limbo, Kinetik will have moved on and any attempts to change the status of the job or emit log and progress will be ignored.

task.timeout(10000); // 10 seconds
task.timeout('1m'); // 1 minute

.action (fn)

  • @param{ Function }actionfunction

Provides an action for a given task. When executed, the action will receive two arguments:

  • job - {Object} - encapsulats everything about a job
  • done - {Function} - indicates when the action has completed

In the job object, the following properties are available.

  • id - {Number|String} - the id of the job according to your database
  • data - {Object} - data attributes passed on job creation
  • progress - {Function} - indicate the progress of actions with multiple steps.
  • log - {Function} - method used to log a line item to the database for that job.
Indicating Progress

You can use the progress method to indicate the current position if the task has multiple steps. This is not saved to the database, but instead emitted as the progress event on onto the task.

// current, total
job.progress(10, 100);
Logging

Each task can have log items associated with them. Kinetik creates a few of these on creation, completion, failure, and timeout. You also have the ability to create these. Each one is timestamped and can include a message and options JSON data to accompany the log item.

// message, data
job.log('Email send complete.', { receipt: transactionId });

During a job run these are emitted immediately as the log event to the task. They are also stored in memory and pushed the the database as a batch at the completion of action.

Indicating Completion

If you want to mark the job as errored as opposed to completed, you can send a parameter to the done function.

done('This job completed with an error.');
done(new Error('This job completed with an error.');

If a string is provided it will be logged as the error message; if an object (such as a constructed javascript Error) is sent, kinetik will attempt to read the the object's message property as the failure reason, and also the error code property if there is one.

.retry

  • @param{ Number | String }durationbetween two retries
  • @param{ String }numberof retries, infinite if not set

define if the task should be re-run when it fails

Middleware

Middleware provides a way to hook into the events and methods of a queue. The API is in its infancy, but expect great things in the near future.

Current Middleware
  • environment - start multiple queue runners based on environment variables
  • cli - perform maintaince on your queue/jobs using a cli
Roadmap

This are the middleware "apps" that Kinetik currently has planned. If you are interested in contributing, please contact @jakeluer on Twitter.

  • HTTP API for job management
  • Network capable event bus
  • Web App UI

Environment

The environment middleware allows you to easily start multiple job processing node processes based on a set of tags. This allows you to fully utilize the resources of the server or servers that Kinetik is running on. Make sure that you are using one of Seed's database storage engines.

To use the environment middleware, start of creating a queue and defining tasks as normal, but avoid calling queue.process directly. Instead, end your program with the following use mechanism:

var kinetik = require('kinetik')
  , RedisStore = require('seed-redis')
  , queue = kinetik(new RedisStore());

// define all your tasks

queue.use(kinetik.environment());

This tells the queue to look for the QUEUE environment variable and being processing only if a list of tags are passed to it. If no QUEUE variable is found, the queue will not begin processing.

To start the queue, you then invoke your queue from the command line.

QUEUE=messaging node queue.js

This will start the queue processing the messaging tag. You can also spin up multiple processes including multiple tags.

QUEUE=messaging node queue.js
QUEUE=db.aggregation,log.rotate node queue.js

You now have two node processes working on your queue.

Command-line Interface

The command-line interface works best when paired with the environment middleware. Since node queue.js won't do anything without the QUEUE environment variable, you are free to use those calls for other things, such as performing maintainance and getting statistics.

Setup

Rather simple, really:

queue.use(kinetik.environment());
queue.use(kinetik.cli());

Now you are free to use that file as your CLI. Assuming queue.js was your file:

node queue.js --help

Kinetik --help

Commands

tasks

Get a list of all of the tasks for this queue:

Kinetik tasks

jobs

Get a list of all of the jobs in this queue. Also shows a breakdown of the current status of each job by task.

Kinetik jobs

clean

Clean jobs of a given status, optionally by tag. Will default to completed if no status option is provided. Furthermore, all logs associated with the jobs removed will also be removed.

Kinetik clean

Resources

Tests

Tests are writting in Mocha using the Chai should BDD assertion library. To make sure you have that installed, clone this repo, install dependacies using npm install.

make test

By default, Kinetik will use a MemoryStore to task all of its functionality. You can also test database integration with Redis and MongoDB. Make sure that you have both installed locally and setup with default configs without authentication.

make test-int

The tests will aggressively clean up after themselves. Mongo will remove the collection that it created and Redis with flush the default database. You may alter this behavior by commenting out a few lines in test/storage.js.

Contributors

Interested in contributing? Fork to get started. Contact @logicalparadox if you are interested in being regular contributor.

License

(The MIT License)