WebSphere MQ: トリガリング (1) TRIGGER属性, runmqtrmコマンド

WebShpere MQのトリガリング機能の動作確認。前回までの記事はこちら。参考サイト・ドキュメントは記事の末尾をご参照。

トリガリングとは

下記のドキュメントが簡単に纏まっていたので引用させて頂く。
引用元:WebSphere MQ System Administration Guide Version 7.0

WebSphere MQ enables you to start an application automatically when certain conditions on a queue are met. For example, you might want to start an application when the number of messages on a queue reaches a specified number. This facility is called triggering and is described in detail in the WebSphere MQ Application Programming Guide.

簡単に言えばイベント(メッセージ)ドリブンな処理を実現できる機能。

トリガリングの構成コンポーネント

下記画像はクリックすると拡大します。
websphere_mq_trigger
上図の()括弧は補足説明。[]括弧は今記事でサンプルとして定義する名称。6つのコンポーネントに追加で一時動的Reply-toキューをモデル・キューとして定義しトリガリングの動作を確認する。

検証プログラム

使用サンプルプログラムは下記の2つ。

amqsreq – /opt/mqm/samp/amqsreq0.c

ソースの冒頭のコメントを参照。簡単に言うと、標準入力から入力されたメッセージ行を対象キューにMQPUTし、空行が入力されると、Reply-toキューからメッセージをMQGETし標準出力する。上図のユーザー・プログラムAの役割。Reply-toキューを図に記載しなかったのはトリガー構成から趣旨がずれてしまう為。
書式:amqsreq <対象キュー> <キュー・マネージャー> <Reply-toキュー>

 /*   Program logic:                                                 */
 /*      MQOPEN server queue for output                              */
 /*      MQOPEN the reply queue for exclusive input                  */
 /*      for each line in the input file,                            */
 /*      .  MQPUT request message containing text to server queue    */
 /*      while no MQI failures,                                      */
 /*      .  MQGET message from reply queue                           */
 /*      .  display its content                                      */
 /*      MQCLOSE both queues                                         */
 /*                                                                  */
 /********************************************************************/
 /*                                                                  */
 /*   AMQSREQ0 has 3 parameters                                      */
 /*                 - the name of the target queue (required)        */
 /*                 - queue manager name (optional)                  */
 /*                 - the name of the ReplyToQ (optional)            */

amqsech – /opt/mqm/samp/amqsecha.c

プロンプトから起動するのではなく、トリガー・モニターよりトリガー・メッセージ(構造体)をパラメーターとして起動。構造体内で指定されているキューからメッセージをMQGETし、そのメッセージ記述子に指定されたReply-toキューにそのメッセージのコピーをMQPUTする。これを対象のキューに残っているメッセージそれぞれに対して処理し、キューが空になったら、MQCLOSEする。上図のユーザー・プログラムBの役割。
詳細はソースに記載のコメントを参照。

 /*  Program logic:                                                  */
 /*     MQCONNect to message queue manager                           */
 /*     MQOPEN message queue for shared input                        */
 /*     while no MQI failures,                                       */
 /*     .  MQGET next message from input queue                       */
 /*     .  Prepare reply message if MQGET was successful             */
 /*     .  Prepare a report message if MQGET failed                  */
 /*     .  MQPUT1, send reply or report to named reply queue         */
 /*     MQCLOSE queue A                                              */
 /*     MQDISConnect from queue manager                              */
 /*                                                                  */
 /*                                                                  */
 /********************************************************************/
 /*                                                                  */
 /*   AMQSECHA has 1 parameter - a string (MQTMC2) based on the      */
 /*       initiation trigger message; only the QName and queue       */
 /*       manager name fields are used in this example               */

トリガリング用のキュー・マネージャー構成

検証用のオブジェクトの作成スクリプトは下記のようになる。

$ vi exe2.txt
def ql(QL.INITQ) replace
def ql(QL.A) replace +
trigger trigtype(first) +
process(PR.ECHO) +
initq(QL.INITQ)
def process(PR.ECHO) replace +
applicid('/opt/mqm/samp/bin/amqsech')
def qmodel(QM.REPLY) replace
$ runmqsc qmgr1  report2.txt

簡単に上図との対応を記載すると、

  • イニシエーション・キュー:QL.INITQ (下記QL.AのINITQパラメーターで指定、参照される。)
  • アプリケーション・キュー:QL.A (TRIGGERを指定する事でトリガリング機能を有効にする。またTRIGTYPE(FIRST)はキューが空の状態からメッセージが1つPUTされた時にトリガー・イベントが発生する。)
  • プロセス・オブジェクト:トリガー起動対象プログラムはamqsech。QL.Aに指定、参照される。
  • Reply-toキュー:図には無いがamqsreq, amqsechが使用するキュー。モデル・キューとして定義。アプリケーションよりMQOPENされるとキュー・マネージャーによって動的にキューが作成される。

runmqscの実行結果

Starting MQSC for queue manager qmgr1.
     1 : def ql(QL.INITQ) replace
AMQ8006: WebSphere MQ queue created.
     2 : def ql(QL.A) replace +
       : trigger trigtype(first) +
       : process(PR.ECHO) +
       : initq(QL.INITQ)
AMQ8006: WebSphere MQ queue created.
     3 : def process(PR.ECHO) replace +
       : applicid('/opt/mqm/samp/bin/amqsech')
AMQ8010: WebSphere MQ process created.
     4 : def qmodel(QM.REPLY) replace
AMQ8006: WebSphere MQ queue created.
       :
4 MQSC commands read.
No commands have a syntax error.
All valid MQSC commands were processed.

トリガー・モニターの起動 – runmqtrm

WebSphere MQ (Windows, UNIX)に提供されているトリガー・モニターの内、runmqtrmを用いる。
書式:runmqtrm -m QMgrName -q InitiationQName
仮にイニシエーション・キューを指定しなかった場合はSYSTEM.DEFAULT.INITIATION.QUEUEが使用される。

$ runmqtrm -q QL.INITQ -m qmgr1
01/06/13  16:25:08 : WebSphere MQ trigger monitor started.
__________________________________________________
01/06/13  16:25:08 : Waiting for a trigger message

トリガリングのテスト

メッセージのPUT

$ amqsreq QL.A qmgr1 QM.REPLY
Sample AMQSREQ0 start
server queue is QL.A
replies to AMQ.50E9AFED20001E02
Hello MQ world!
Hello
Hello
response
response
response
no more replies
Sample AMQSREQ0 end
$

“Hello MQ world!”、”Hello”、”Hello”メッセージがQL.AにMQPUTされ、空行の後にQM.REPLYキューよりamqsechがMQPUTしたメッセージをMQGETできている。その時のトリガー・モニターの状況は下記の通り。

トリガー・モニター

スペースが多いのはメッセージ構造体に指定されていないパラメータが多い為。

__________________________________________________
01/06/13  16:25:08 : Waiting for a trigger message
/opt/mqm/samp/bin/amqsech 'TMC    2QL.A                                            PR.ECHO                                                                                                             /opt/mqm/samp/bin/amqsech                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       qmgr1                                           '
Sample AMQSECHA start
Hello MQ world!
MQGET ended with reason code 2033
Sample AMQSECHA end
01/06/13  16:27:21 : End of application trigger.
__________________________________________________
01/06/13  16:27:21 : Waiting for a trigger message
/opt/mqm/samp/bin/amqsech 'TMC    2QL.A                                            PR.ECHO                                                                                                             /opt/mqm/samp/bin/amqsech                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       qmgr1                                           '
Sample AMQSECHA start
Hello
Hello
MQGET ended with reason code 2033
Sample AMQSECHA end
01/06/13  16:27:40 : End of application trigger.
__________________________________________________
01/06/13  16:27:40 : Waiting for a trigger message

トリガー・メッセージが2つに区切られているのは、1件目のMQGET後、5秒間(WaitIntervalで指定された時間)後、何もメッセーがPUT為れなかった為、MQGETがrc 2033(MQRC_NO_MSG_AVAILABLE)を返して終了したが、2, 3件目の間は5秒以内だった為、続けて”Hello”, “Hello”とMQGETした。このWaitIntervalの設定はソース中のGetMsgOpts構造体で設定している。
3件目のputの際は、直前にamqsechがgetしているので再度トリガーイベントが発生しamqsechが2重起動しそうな気がしたが、まだ、amqsechがMQOPEN中でWaitしている状態の為、イベントが発生しなかったのかな。

<前略>
   /******************************************************************/
   /*                                                                */
   /*   Get messages from the message queue                          */
   /*   Loop until there is a warning or failure                     */
   /*                                                                */
   /******************************************************************/
   buflen = sizeof(buffer) - 1;
   gmo.Version = MQGMO_VERSION_2;     /* Avoid need to reset Message */
   gmo.MatchOptions = MQMO_NONE;      /* ID and Correlation ID after */
                                      /* every MQGET                 */
   gmo.Options = MQGMO_ACCEPT_TRUNCATED_MSG
          | MQGMO_CONVERT         /* receive converted messages      */
          | MQGMO_WAIT;           /* wait for new messages           */
   gmo.WaitInterval = 5000;       /* 5 second limit for waiting      */
   while (CompCode == MQCC_OK)
   {
     /****************************************************************/
     /*                                                              */
     /*   MQGET sets Encoding and CodedCharSetId to the values in    */
     /*   the message returned, so these fields should be reset to   */
     /*   the default values before every call, as MQGMO_CONVERT is  */
     /*   specified.                                                 */
     /*                                                              */
     /****************************************************************/
     md.Encoding       = MQENC_NATIVE;
     md.CodedCharSetId = MQCCSI_Q_MGR;
     MQGET(Hcon,                /* connection handle                 */
           Hobj,                /* object handle                     */
           &md,                 /* message descriptor                */
           &gmo,                /* GET options                       */
           buflen,              /* buffer length                     */
           buffer,              /* message buffer                    */
           &messlen,            /* message length                    */
           &CompCode,           /* completion code                   */
           &Reason);            /* reason code                       */
     /* report reason if any  (loop ends if it failed)    */
     if (Reason != MQRC_NONE)
     {
       printf("MQGET ended with reason code %d\n", Reason);
     }
<後略>

尚、トリガー機能を停止したい場合はキューに対してNOTRIGGERパラメーターを設定する。

alter ql(QL.A) notrigger
     1 : alter ql(QL.A) notrigger
AMQ8008: WebSphere MQ queue changed.

参考ドキュメント

  • WebSphere MQ Application Programming Guide Version 7.0
  • WebSphere MQ System Administration Guide Version 7.0

ちなみに、runmqtrmはiSeries(IBM i)版には無いらしい。。。