Node.js Tutorial Mastering Task Queue Handling with Kue

in #utopian-io7 years ago (edited)

nodejs_logo-1417834491072.png

IMAGE SOURCE

What Will I Learn?

In this tutorial we will learn how to set the Queue or queuing on NodeJs using Kue . Queue or queuing is one trick that is very useful when our web applications receive a lot of requests. Why use queue? because when a code execution is done there may be a long enough process to execute and make the response time too long so that the user gets the page back in time long enough.

Not to mention the timeout of a process can result in some inevitable data loss because the incoming requests can not be served by the server. Lots of solutions that can be used to implement queues in our web application system. It's just that Kue are easier to install in Node.js web apps instead of using other technologies like Kafka and RabbitMQ.

Requirements

  • A computer that has been installed nodejs, redis-server, and mongodb
  • Basic knowledge of Javascript

Difficulty

  • Basic

Tutorial Contents

Now we will do the installation first for some of the necessary tools (for those who have installed it please skip some parts of installation below):

$ mkdir node_kue
$ cd node_kue
$ npm install kue
$ npm install mongoose

Then turn on the Redis server in the first console:

$ redis-server
3227:C 08 Apr 21:27:09.142 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
3227:M 08 Apr 21:27:09.145 * Increased maximum number of open files to 10032 (it was originally set to 4864).
                _._
           _.-``__ ''-._
      _.-``    `.  `_.  ''-._           Redis 3.2.6 (00000000/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._
 (    '      ,       .-`  | `,    )     Running in standalone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 3227
  `-._    `-._  `-./  _.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |           http://redis.io
  `-._    `-._`-.__.-'_.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |
  `-._    `-._`-.__.-'_.-'    _.-'
      `-._    `-.__.-'    _.-'
          `-._        _.-'
              `-.__.-'
3227:M 08 Apr 21:27:09.155 # Server started, Redis version 3.2.6
3227:M 08 Apr 21:27:21.421 * DB loaded from disk: 12.266 seconds
3227:M 08 Apr 21:27:21.421 * The server is now ready to accept connections on port 6379

Then turn on mongodb in the second console :

$ mongod
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten] MongoDB starting : pid=3952 port=27017 dbpath=/data/db 64-bit host=MacBook-Air.local
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten] db version v3.4.1
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten] git version: 5e103c4f5583e2566a45d740225dc250baacfbd7
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten] OpenSSL version: OpenSSL 1.0.2k  26 Jan 2017
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten] allocator: system
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten] modules: none
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten] build environment:
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten]     distarch: x86_64
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten]     target_arch: x86_64
2017-04-08T22:20:47.873+0700 I CONTROL  [initandlisten] options: {}
2017-04-08T22:20:47.874+0700 I -        [initandlisten] Detected data files in /data/db created by the 'wiredTiger' storage engine, so setting the active storage engine to 'wiredTiger'.
2017-04-08T22:20:47.875+0700 I STORAGE  [initandlisten] wiredtiger_open config: create,cache_size=1536M,session_max=20000,eviction=(threads_max=4),config_base=false,statistics=(fast),log=(enabled=true,archive=true,path=journal,compressor=snappy),file_manager=(close_idle_time=100000),checkpoint=(wait=60,log_size=2GB),statistics_log=(wait=0),
2017-04-08T22:20:49.122+0700 I CONTROL  [initandlisten]
2017-04-08T22:20:49.122+0700 I CONTROL  [initandlisten] ** WARNING: Access control is not enabled for the database.
2017-04-08T22:20:49.122+0700 I CONTROL  [initandlisten] **          Read and write access to data and configuration is unrestricted.
2017-04-08T22:20:49.122+0700 I CONTROL  [initandlisten] ** WARNING: You are running this process as the root user, which is not recommended.
2017-04-08T22:20:49.122+0700 I CONTROL  [initandlisten]
2017-04-08T22:20:49.303+0700 I FTDC     [initandlisten] Initializing full-time diagnostic data capture with directory '/data/db/diagnostic.data'
2017-04-08T22:20:49.304+0700 I NETWORK  [thread1] waiting for connections on port 27017
2017-04-08T22:20:49.881+0700 I NETWORK  [thread1] connection accepted from 127.0.0.1:52873 #1 (1 connection now open)

Now please create a file with the following structure (ignore node_modules because the directory is created automatically):

.
├── app.js
├── models
│   └── activity_log.js
└── worker.js

Creating Models

Here we will create a model to map into the collection in MongoDB. We will create a collection called activity log where we can store various activity logs of a user against the page he visited. The temples are so.

The collection consists of four fields with two additional fields created_at and updated_at. Please create the following code in the files models/activity_log.js:

var mongoose = require('mongoose');
var Schema = mongoose.Schema;
var activityLogSchema = new Schema({
  title: { type: String, required: true},
  body: String,
  status: String,
  url: String,
},
{
    timestamps: true
});
var ActivityLog = mongoose.model('ActivityLog', activityLogSchema);
module.exports = ActivityLog;

This model will be used in the worker.js file.

Create a Task

Now we will make the code to handle the task with cluster mode. Where we will utilize the number of CPUs owned by the server. Please create the following code in the worker.js file:

var kue = require('kue')
    , cluster = require('cluster')
    , jobs = kue.createQueue();
var mongoose = require('mongoose');
mongoose.Promise = global.Promise;
mongoose.connect('mongodb://localhost/node_kue');
var ActivityLog = require('./models/activity_log.js');
var clusterWorkerSize = require('os').cpus().length;
if (cluster.isMaster) {
    // start the UI
    kue.app.listen( 3000 );
    console.log( 'UI started on port 3000' );
    for (var i = 0; i < clusterWorkerSize; i++) {
        cluster.fork();
    }
} else {
    // Consumer / Worker for jobs testing
    jobs.process( 'activity_log', 10, function ( job, done ) {
    console.log( 'Starting ' + job.data.title );
        console.log("Execute activity_log jobs...");
        var activity_log = new ActivityLog({
                        title: job.data.title ,
                        body:job.data.body ,
                        status:job.data.status ,
                        url:job.data.url
                    });
        activity_log.save(function(err) {
            if (err)
            {
                console.log(err);
            }
            else
            {
                setTimeout( function () {
                    console.log( 'Finished activity log jobs: ' + job.data.title );
                    done();
                }, 100 );
            }
        });
    });
    jobs.process( 'testing', 4, function ( job, done ) {
      console.log( 'Starting ' + job.data.title );
      console.log("Execute testing jobs...");
      setTimeout( function () {
        console.log( 'Finished ' + job.data.title );
        done();
      }, 1000 );
    });
}

In the above code we create an Kue object and initialize the cluster. Then if the cluster is master then run Kue server, whereas if child then create instance to handle task which will come to Kue. So if there is a job with concurrency 10 and the number of cores of a server is 4 then the total concurrency will be 40.

In the above code, there are two jobs. That is job testing with concurrency 4 which only show stdout in console and given delay 1000 ms. Then there are jobs that store data to MongoDB. where he has 10 concurrency and delay over 100 ms. This file will be used in app.js.

Now please turn on worker.js on other consoles:

$ node worker.js

Generate the Application Code

Now we will create the code to use the jobs defined in worker.js. Its use is simple enough and does not make us stop using it. Simply call create () and skip the job name and its parameters.

var kue = require('kue')
  , jobs = kue.createQueue();
// Producer for jobs testing
for ( var i = 0; i < 10; i++ ) {
    console.log( 'Creating Job #' + i );
    jobs.create( 'testing', {
      title: 'jobs #' + i
    }).save();
}
for ( var i = 0; i < 100; i++ ) {
    console.log( 'Creating Activity Job #' + i );
    jobs.create( 'activity_log', {
        title: 'visited_item',
        body: 'lorem ipsum sit dolor amet- '+ i,
        status:'200',
        url:'http://www.example.com/item/mouse-imac-1j1h2j4h12'
    }).save();
}

To run the demo code please run this command on another console:

$ node app.js

Monitoring Task on Kue UI

For monitoring, you can open it at http: // localhost: 3000. There you can see five tabs consisting of Inactive, Active, Delayed, Failed, Complete. Without refreshing the page, you can see the update directly because it is already using websocket. You can also see the results through mongo client and please calculate the amount whether in accordance with what is sent to the jobs or not:

$ mongo
> use node_kue;
> db.activitylogs.find().length();
10000
>

Trying to Change Kue Parameters

Now we also want to try to change the concurrency parameter of activity_logs jobs to 500 and execute 50000 tasks sent to Kue.
Now we want to try changing the concurrency parameters of activity_logs jobs to 1000 and executing 50000 tasks sent to Kue.
In both cases, it tends to be the same, but we can know that the more concurrency, of course, the more the Cake will work faster. But of course we must know the maximum limit.

Using queuing may be helpful, especially Node.js itself is one of the asynchronous programming languages. Of course this becomes two big advantages when we use Node.js and Kue. It's just that we need to pay attention to some things in the use of this Kue.

We recommend that there is no heavy code in a task, if you want to have a heavy code better broken down again so another task
If you want to use cluster mode note the number of cores on the server first
Test the maximum value of concurrency that can be handled by the server should not we set the concurrency is too high that can not be executed by Kue
Debug it before Kue task is deployed so that not many messages will be retry by Kue.
Unfortunately currently Kue does not have a mechanism for scaling if one day the capacity of Kue installed servers is experiencing a spike in task handling, another thing that can happen is when the queue is very large and piling up so that the latest queue execution process can wait longer. Capacity planning is necessary in this case. If you have some question , Please put the question in the comment section below.



Posted on Utopian.io - Rewarding Open Source Contributors

Sort:  

Your contribution cannot be approved because it does not follow the Utopian Rules, and is considered as plagiarism. Plagiarism is not allowed on Utopian, and posts that engage in plagiarism will be flagged and hidden forever.

You plagiarised the tutorial from http://kekerenan.com/menangani-antrian-task-dengan-kue-di-node-js/ (credit to @knowledges).

You can contact us on Discord.
[utopian-moderator]

Coin Marketplace

STEEM 0.16
TRX 0.15
JST 0.028
BTC 54358.77
ETH 2293.67
USDT 1.00
SBD 2.30