Node.jsでasync-lockを使った排他制御をおこなう

Node.jsはシングルスレッドで動作するため、スレッドセーフや排他制御については考慮しなくてよいと思っていました。イベントループによって実行される処理は1つだけであり、途中で他のスレッドなどから割り込まれることはないからです。 しかし、実際にはI/O待ちやsetTImeout関数のようなタイマーを使った処理や非同期処理により、処理がイベントループが複数にまたがる場合、あいだで別の処理がおこなわれる可能性があります。 (イベントループについてはこの記事が参考になるかもしれないです:描いて理解するイベントループ - Qiita

なので、処理ごとに数を増やすカウンターやシングルトンパターンでのインスタンス生成(Javaだとsynchronizedを使うやつ)には排他制御を組み込む必要があります。

排他制御を実現するasync-lockモジュール

今回、排他制御を実現するため、async-lockというモジュールを使いました。 npm上でダウンロード数も多く、あと日本語の記事が見つけられたというのが理由です。

async-lockを使う意味と使い方について、このあと書いていきます。 ソースコードGitHubにおいてあります。ここではそこから抜粋します。

リポジトリにいくつかJSファイルがありますが、 実行された回数をカウントする変数countの値を関数myFunctionが取得し、指定された時間待機した後に1を加算してcountにセットし直す、という部分は同じです。

const myFunction = async (timeout) => {
    console.log({
        timeout,
        time: Date.now(),
    });
    const myCount = count;
    count = await new Promise((resolve) => {
        setTimeout(() => resolve(myCount + 1), timeout);
    });

    const result = {
        timeout,
        time: Date.now(),
        before: myCount,
        now: count,
    };
    console.log(result);

    return result;
};

この共通部分のポイントは以下のとおりです。

  1. 関数myFunctionが変数countの値を取得し、待機してから値を変更、セットし直すという動作のため、同時実行されるとセットし直すときには取得した値が変更されている可能性がある(→排他制御または同期実行が必要)
  2. setTimeout関数により複数のイベントループにまたがる処理となる

排他制御をしないとき

最初に排他制御をしなかったときに何がおきるのかを見ていきます。 参照: promise-all.js

コンソール出力

{ timeout: 1500, time: 1678809410936 }
{ timeout: 1000, time: 1678809410943 }
{ timeout: 500,  time: 1678809410944 }
{ timeout: 100,  time: 1678809410944 }
{ timeout: 100,  time: 1678809411054, before: 0, now: 1 }
{ timeout: 500,  time: 1678809411445, before: 0, now: 1 }
{ timeout: 1000, time: 1678809411945, before: 0, now: 1 }
{ timeout: 1500, time: 1678809412446, before: 0, now: 1 }
1
await Promise.all([
    myFunction(1500),
    myFunction(1000),
    myFunction(500),
    myFunction(100),
]);

console.log(count);

合計で4回関数が呼び出されますが、非同期に処理され、また排他制御もないために変数countは1になります。 関数myFunctionが呼び出され、変数countを取得したときの値は4回とも0なのでこのような結果になります。

排他制御をするとき

最終的に4が出力されるように、関数myFunctionでの処理が同時実行されないように排他制御をおこないます。 参照: lock.js

const myFunction = async (timeout) => {
    console.log({
        timeout,
        time: Date.now(),
    });
    return await lock.acquire('my-lock', async () => {
        const myCount = count;
        count = await new Promise((resolve) => {
            setTimeout(() => resolve(myCount + 1), timeout);
        });
    
        const result = {
            timeout,
            time: Date.now(),
            before: myCount,
            now: count,
        };
        console.log(result);
    
        return result;    
    });
};

このように、変数countの取得→待機→加算→再セット、の処理をasync-lockによって排他制御させます。 結果は以下のとおり、きちんと4が出力されました。 排他制御によって変数countの値が再セットされてから次の処理が実施されていることがわかります。

{ timeout: 1500, time: 1678809307116 }
{ timeout: 1000, time: 1678809307130 }
{ timeout: 500,  time: 1678809307131 }
{ timeout: 100,  time: 1678809307131 }
{ timeout: 1500, time: 1678809308641, before: 0, now: 1 }
{ timeout: 1000, time: 1678809309653, before: 1, now: 2 }
{ timeout: 500,  time: 1678809310170, before: 2, now: 3 }
{ timeout: 100,  time: 1678809310277, before: 3, now: 4 }
4

(おまけ)同期実行するとき

逐次で同期実行させることでも一応解決します。 参照: serialize-await-async.js

{ timeout: 1500, time: 1679383601031 }
{ timeout: 1500, time: 1679383602552, before: 0, now: 1 }
{ timeout: 1000, time: 1679383602553 }
{ timeout: 1000, time: 1679383603554, before: 1, now: 2 }
{ timeout: 500,  time: 1679383603555 }
{ timeout: 500,  time: 1679383604070, before: 2, now: 3 }
{ timeout: 100,  time: 1679383604071 }
{ timeout: 100,  time: 1679383604179, before: 3, now: 4 }
4

ちゃんと最後に4が出力されています。

(async () => {
    await myFunction(1500);
    await myFunction(1000);
    await myFunction(500);
    await myFunction(100);

    console.log(count);
})();

ソースコード上の様々な場所(ファイル)から呼び出される場合やそもそもexpress.jsを使ったWebサーバの場合(実装例: Node.js で排他制御。async-lock を使ってみた - Neo's World)、逐次で同期処理を書くことができないので前述のような排他制御を利用することになるでしょう。

async-lockのオプション

async-lockモジュールによって排他制御ができることは前の段落までで確認できています、 ここからはasync-lockモジュールのオプションを見ていきます。

これらのオプションは、例えば排他制御した処理が時間がかかりすぎてシステム全体に影響が発生するような状況を避けるために使えそうなので確認したいと思います。

timeout

ロックを取得する前にアイテムがキューに留まることができる最大時間(ミリ秒)。 デフォルト値はゼロであり、この場合はタイムアウトによるエラー発生しない設定となる。

参照: lock-options-timeout.js

const lock = new AsyncLock({
    timeout: 1000,
    // maxOccupationTime: 1000,
    // maxExecutionTime: 1000,
});

<...中略...>

(async () => {
    await Promise.all([
        myFunction(1002), // 1001だとエラーが発生しなかった
        myFunction(1),
    ]);

    console.log(count);
})();

実装ではオプションtimeoutを1000に設定しており、myFunction(1)は1000 ms以上待機することになります。 そのため、以下のようなエラーが発生します。

{ timeout: 1002, time: 1679390007754 }
{ timeout: 1,    time: 1679390007762 }
C:\fakepath\nodejs-module-labo\async-lock\node_modules\async-lock\lib\index.js:211
                   done(false, new Error('async-lock timed out in queue ' + key));
                                            ^

Error: async-lock timed out in queue my-lock
    at Timeout._onTimeout (C:\fakepath\nodejs-module-labo\async-lock\node_modules\async-lock\lib\index.js:211:17)
    at listOnTimeout (node:internal/timers:559:17)
    at processTimers (node:internal/timers:502:7)

このオプションを使えば、キューに長時間待機しているタスクを検知してエラーを出すことができます。

maxOccupationTime

キューに入ってから処理が完了するまでの最大許容時間(ミリ秒)。 デフォルト値はゼロであり、この場合は最大許容時間の制限がなくなる。

参照: lock-options-occupation.js

const lock = new AsyncLock({
    // timeout: 1000,
    maxOccupationTime: 1000,
    // maxExecutionTime: 1000,
});

<...中略...>

(async () => {
    await Promise.all([
        myFunction(900),
        myFunction(101),
    ]);

    console.log(count);
})();

実装ではオプションmaxOccupationTimeを1000に設定しており、myFunction(101)はキューに入ってから処理が完了するまでに1000 ms経過してしまうことになります。 そのため、以下のようなエラーが発生します。

{ timeout: 900, time: 1679400007500 }
{ timeout: 101, time: 1679400007507 }
{ timeout: 900, time: 1679400008416, before: 0, now: 1 }
C:\fakepath\nodejs-module-labo\async-lock\node_modules\async-lock\lib\index.js:220
    done(false, new Error('Maximum occupation time is exceeded in queue ' + key));
                                                    ^

Error: Maximum occupation time is exceeded in queue my-lock
    at Timeout.<anonymous> (C:\fakepath\nodejs-module-labo\async-lock\node_modules\async-lock\lib\index.js:220:18)
    at listOnTimeout (node:internal/timers:559:17)
    at processTimers (node:internal/timers:502:7)

こちらもオプションtimeoutと同じく、キューに長時間待機したか処理完了までに時間がかかっているタスクを検知してエラーを出すことができます。

maxExecutionTime

ロックを取得してから処理が完了するまでの最大許容時間(ミリ秒)。 デフォルト値はゼロであり、この場合、最大許容時間の制限がなくなる。

参照: lock-options-execution.js

const lock = new AsyncLock({
    // timeout: 1000,
    // maxOccupationTime: 1000,
    maxExecutionTime: 1000,
});

<...中略...>

(async () => {
    await Promise.all([
        myFunction(1001),
    ]);

    console.log(count);
})();

実装ではオプションmaxExecutionTimeを1000に設定しており、myFunction(1001)はキューに入ってから処理が完了するまでに1000 ms経過してしまうことになります。 そのため、以下のようなエラーが発生します。

{ timeout: 1001, time: 1679392008446 }
C:\fakepath\nodejs-module-labo\async-lock\node_modules\async-lock\lib\index.js:145
             done(locked, new Error('Maximum execution time is exceeded ' + key));
                                                     ^

Error: Maximum execution time is exceeded my-lock
    at Timeout.<anonymous> (C:\fakepath\nodejs-module-labo\async-lock\node_modules\async-lock\lib\index.js:145:19)
    at listOnTimeout (node:internal/timers:559:17)
    at processTimers (node:internal/timers:502:7)

このオプションによって処理自体に時間がかかっているタスクを検知し、エラーを出すことができます。

タスクのタイムアウトに関する3つのオプションtimeout、maxOccupationTime、maxExecutionTimeの関係はちょっと混乱したので後述してあります。

maxPending

一度にキューで許可されるタスクの最大数。 デフォルト値は1000となっている。制限をなくすにはInfinityを設定する。

参照: lock-options-pending.js

const lock = new AsyncLock({
    maxPending: 2,
});

<...中略...>

(async () => {
    await Promise.all([
        myFunction(1500),
        myFunction(1000),
        myFunction(500),
        myFunction(100),
    ]);

    console.log(count);
})();

実装ではオプションmaxPendingを2に設定しており、myFunction(100)はキューに入るとキューで待機中のタスクは3となります。 そのため、以下のようなエラーが発生します。

{ timeout: 1500, time: 1679391640300 }
{ timeout: 1000, time: 1679391640307 }
{ timeout: 500,  time: 1679391640307 }
{ timeout: 100,  time: 1679391640308 }
C:\fakepath\nodejs-module-labo\async-lock\node_modules\async-lock\lib\index.js:195
                done(false, new Error('Too many pending tasks in queue ' + key));
                            ^

Error: Too many pending tasks in queue my-lock
    at AsyncLock.acquire (C:\fakepath\nodejs-module-labo\async-lock\node_modules\async-lock\lib\index.js:195:15)
    at myFunction (C:\fakepath\nodejs-module-labo\async-lock\lock-options-pending.js:12:23)
    at C:\fakepath\nodejs-module-labo\async-lock\lock-options-pending.js:35:9
    at Object.<anonymous> (C:\fakepath\nodejs-module-labo\async-lock\lock-options-pending.js:39:3)
    at Module._compile (node:internal/modules/cjs/loader:1191:14)
    at Object.Module._extensions..js (node:internal/modules/cjs/loader:1245:10)
    at Module.load (node:internal/modules/cjs/loader:1069:32)
    at Function.Module._load (node:internal/modules/cjs/loader:904:12)
    at Function.executeUserEntryPoint [as runMain] (node:internal/modules/run_main:81:12)
    at node:internal/main/run_main_module:22:47

タイムアウトのオプション3つの関係性

タスクのタイムアウトに関する3つのオプションtimeout、maxOccupationTime、maxExecutionTimeの関係はちょっと混乱したので図を作成しました。

タスクは「キューに入る」→「ロックを取得する/処理を開始する」→「ロックを取得する/処理を終了する」という流れになります。

  • オプションtimeoutは、このうちの「キューに入る」→「ロックを取得する/処理を開始する」の部分の時間を制限します。
  • オプションmaxExecutionTimeは、「ロックを取得する/処理を開始する」→「ロックを取得する/処理を終了する」の部分の時間を制限します。
  • オプションmaxOccupationTimeは、「キューに入る」→「ロックを取得する/処理を開始する」→「ロックを取得する/処理を終了する」という全体の時間を制限しています。

各オプションの使い分けとしては、

  • 処理に時間がかかるタスクであれば、maxExecutionTimeを制限する
  • 処理に時間はあまりかからないがタスク数が積み上がりやすいならばtimeoutを制限する

などだろうか。

おわり

Node.jsで排他制御が必要になるケース、その実現をしてくれるasync-lockモジュールを実装とともに紹介しました。

今回の実装ではシングルトンパターンは示していませんが、 シングルトンにしたいインスタンスの生成をasync-lockモジュールを使った排他制御をおこなえばよいでしょう(Double-Checked Lockingなども考慮)。

参考文献