「PostgreSQL で安くて早くてうまい MQ 作ってね」的ミッションがどこからともなく降ってきたので、色々調べて試してみたところ、案外よい方法が見つかったので、まとめてみた。
- [4/24] 複数キューを扱う場合の方法を tableoid を利用するように更新。
- [4/27] フィルタ条件を付ける場合の記法について追記(WHERE 句内の評価順の明示的指定)
- [4/30] 優先順位に ORDER BY は利用できない旨追記
鍵は advisory lock。PostgreSQL 8.2 以降の機能だ。
メッセージキューと言っても、安くて早いわけだからもちろんフルフルの高機能なわけなく、いろいろ制限がある。
- receiver は複数可。メッセージは receiver の一つが受信する。ブロードキャスト/マルチキャストは対象外。
- ack あり。ack せずに receiver が落ちた場合、メッセージは別の receiver が受信可能になる。
- receiver は PULL でメッセージを受信*1
- メッセージ処理に長時間かからない(メッセージ受信から ack 送信まで、DBセッションを握ったままでよい)
つまり、排他性と receiver の1つに障害が発生しても処理が行われるという要件を満たすことを優先している。
メッセージを格納するテーブル
create table mq ( id serial, message text, primary key(id) );
必須フィールドは id。serial 。
キューを複数を持ちたい場合は、それぞれ対応するテーブルを作成。
必要なデータ・メタデータがあれば適宜フィールドを追加。
キューにメッセージを送信(send)
insert into mq (text) values ('abc');
そのまま insert でOK。
キューからメッセージを1件取得(receive)
下記 SQL によりポーリング(PULL)。
select * from mq where pg_try_advisory_lock(tableoid::int, id) limit 1;
PostgreSQL の advisory lock はアプリが独自に利用可能なロック管理機能。
pg_try_advisory_lock(tableoid::int, id) は (tableoid, id) をキーにして advisory lock の取得に成功したら true、失敗したら false を返すので、この1行で
「対象キューテーブルの、ロックされていないメッセージを最大1件取得し、id をキーにロックする」
という動作になる。
受け取り可能なメッセージがなければ、0件。
テーブルに適当なメタデータ追加& receive の SQL に ORDER BY や WHERE 句追加でメッセージの優先順位やフィルタも実現できる。宛先指定もなんとかなるかな。
ただし、WHERE 句内に AND で並べた式の評価順は保証されないので、副作用を持つ pg_try_advisory_lock() を含めてそのまま並べただけでは、意図したとおりに動作するかどうかは断言できない。
例えばフィルタ条件より先に pg_try_advisory_lock() を評価されてしまったら、取り出すことができたメッセージ以外にもロックかけまくってる、なんて現象が発生してしまう*2。
そういう場合には CASE 文を使って明示的に評価順を記述する方法がある、と PostgreSQL のマニュアルに書いてあった。
ので、「where case when [フィルタ条件] then pg_try_advisory_lock() else false end」のように記述すればよいだろう。ちょっと BK くさいけど。
select * from mq where case when priority < 10 then pg_try_advisory_lock(tableoid::int, id) else false end limit 1;
また、優先順位の処理に ORDER BY が利用できるか、と考えていたのだが、id:sfujiwara さんのご指摘により、ORDER BY 句をつけると並べ替えのために全てのレコードに対して WHERE 条件を評価してしまい、全レコード分のロックを取得してしまうことがわかった。
サブクエリなどを駆使したり、volatile で関数宣言したりして回避できないか色々試してみたがダメ。
というわけで、優先順位などで並べ替えをやりたければ、カーソルで1レコードずつ取って pg_try_advisory_lock() するしかなさそう。残念。
更新ロックをかけたければ for update をつけられるけど、advisory lock によりアプリレイヤーでは排他性は確保できているので、トランザクション不要でいいんじゃあないかと*3。
なお、advisory lock は transaction と無関係。rollback でも解放されない。
が、セッションが切れれば解放される。
メッセージ処理完了(ack)
delete from mq where id=12345 returning pg_advisory_unlock(tableoid::int, id);
メッセージを削除しつつ、advisory lock を解放。成功すれば true が返る。
削除したくなければ完了フラグ的なものを用意して、DELETE のかわりに UPDATE を使えばOK。
ack せずにセッションが切れた場合は advisory lock が勝手に解放されて、別の receiver が受信可能に(←ここが嬉しい)。
メッセージのキャンセル(cancel)
メッセージをキャンセルしたいなんて場合もあるかもしれない。
delete from mq where id=12345 and pg_try_advisory_lock(tableoid:int, id) returning pg_advisory_unlock(tableoid:int, id);
キャンセルに成功すれば true、失敗すれば false 、メッセージがすでに処理されてしまっていたら0件が返る。
制限
難点は、advisory lock のキーは 32bit integer×2(または 64bit) なので、利用するアプリ間であらかじめ調整していないと衝突の可能性があること。
とはいえ、advisory lock のキーは database ごとに与えられる(PostgreSQL の lock 機構は、キーに database oid を含む)から、同じデータベースを利用するアプリ間でさえ調整できていればOKなので、なんとか現実的な範囲だろう。*4
また、advisory lock はセッション単位なので、 ack するまではセッションを握っておかないといけない。
メッセージを処理している間、コネクション閉じたりプールに返したりしたらダメ。
余談2
最初、 "UPDATE 〜 SET lock=1 WHERE lock=0 RETURNING *;" でいけんじゃあないかと思っていたら、PostgreSQL の UPDATE 文に LIMIT がなくて……