WebShpere MQのトリガリング機能の動作確認。前回までの記事はこちら。参考サイト・ドキュメントは記事の末尾をご参照。
トリガリングとは
下記のドキュメントが簡単に纏まっていたので引用させて頂く。
引用元:WebSphere MQ System Administration Guide Version 7.0
WebSphere MQ enables you to start an application automatically when certain conditions on a queue are met. For example, you might want to start an application when the number of messages on a queue reaches a specified number. This facility is called triggering and is described in detail in the WebSphere MQ Application Programming Guide.
簡単に言えばイベント(メッセージ)ドリブンな処理を実現できる機能。
トリガリングの構成コンポーネント
下記画像はクリックすると拡大します。
上図の()括弧は補足説明。[]括弧は今記事でサンプルとして定義する名称。6つのコンポーネントに追加で一時動的Reply-toキューをモデル・キューとして定義しトリガリングの動作を確認する。
検証プログラム
使用サンプルプログラムは下記の2つ。
amqsreq – /opt/mqm/samp/amqsreq0.c
ソースの冒頭のコメントを参照。簡単に言うと、標準入力から入力されたメッセージ行を対象キューにMQPUTし、空行が入力されると、Reply-toキューからメッセージをMQGETし標準出力する。上図のユーザー・プログラムAの役割。Reply-toキューを図に記載しなかったのはトリガー構成から趣旨がずれてしまう為。
書式:amqsreq <対象キュー> <キュー・マネージャー> <Reply-toキュー>
/* Program logic: */ /* MQOPEN server queue for output */ /* MQOPEN the reply queue for exclusive input */ /* for each line in the input file, */ /* . MQPUT request message containing text to server queue */ /* while no MQI failures, */ /* . MQGET message from reply queue */ /* . display its content */ /* MQCLOSE both queues */ /* */ /********************************************************************/ /* */ /* AMQSREQ0 has 3 parameters */ /* - the name of the target queue (required) */ /* - queue manager name (optional) */ /* - the name of the ReplyToQ (optional) */
amqsech – /opt/mqm/samp/amqsecha.c
プロンプトから起動するのではなく、トリガー・モニターよりトリガー・メッセージ(構造体)をパラメーターとして起動。構造体内で指定されているキューからメッセージをMQGETし、そのメッセージ記述子に指定されたReply-toキューにそのメッセージのコピーをMQPUTする。これを対象のキューに残っているメッセージそれぞれに対して処理し、キューが空になったら、MQCLOSEする。上図のユーザー・プログラムBの役割。
詳細はソースに記載のコメントを参照。
/* Program logic: */ /* MQCONNect to message queue manager */ /* MQOPEN message queue for shared input */ /* while no MQI failures, */ /* . MQGET next message from input queue */ /* . Prepare reply message if MQGET was successful */ /* . Prepare a report message if MQGET failed */ /* . MQPUT1, send reply or report to named reply queue */ /* MQCLOSE queue A */ /* MQDISConnect from queue manager */ /* */ /* */ /********************************************************************/ /* */ /* AMQSECHA has 1 parameter - a string (MQTMC2) based on the */ /* initiation trigger message; only the QName and queue */ /* manager name fields are used in this example */
トリガリング用のキュー・マネージャー構成
検証用のオブジェクトの作成スクリプトは下記のようになる。
$ vi exe2.txt def ql(QL.INITQ) replace def ql(QL.A) replace + trigger trigtype(first) + process(PR.ECHO) + initq(QL.INITQ) def process(PR.ECHO) replace + applicid('/opt/mqm/samp/bin/amqsech') def qmodel(QM.REPLY) replace $ runmqsc qmgr1 report2.txt
簡単に上図との対応を記載すると、
- イニシエーション・キュー:QL.INITQ (下記QL.AのINITQパラメーターで指定、参照される。)
- アプリケーション・キュー:QL.A (TRIGGERを指定する事でトリガリング機能を有効にする。またTRIGTYPE(FIRST)はキューが空の状態からメッセージが1つPUTされた時にトリガー・イベントが発生する。)
- プロセス・オブジェクト:トリガー起動対象プログラムはamqsech。QL.Aに指定、参照される。
- Reply-toキュー:図には無いがamqsreq, amqsechが使用するキュー。モデル・キューとして定義。アプリケーションよりMQOPENされるとキュー・マネージャーによって動的にキューが作成される。
runmqscの実行結果
Starting MQSC for queue manager qmgr1. 1 : def ql(QL.INITQ) replace AMQ8006: WebSphere MQ queue created. 2 : def ql(QL.A) replace + : trigger trigtype(first) + : process(PR.ECHO) + : initq(QL.INITQ) AMQ8006: WebSphere MQ queue created. 3 : def process(PR.ECHO) replace + : applicid('/opt/mqm/samp/bin/amqsech') AMQ8010: WebSphere MQ process created. 4 : def qmodel(QM.REPLY) replace AMQ8006: WebSphere MQ queue created. : 4 MQSC commands read. No commands have a syntax error. All valid MQSC commands were processed.
トリガー・モニターの起動 – runmqtrm
WebSphere MQ (Windows, UNIX)に提供されているトリガー・モニターの内、runmqtrmを用いる。
書式:runmqtrm -m QMgrName -q InitiationQName
仮にイニシエーション・キューを指定しなかった場合はSYSTEM.DEFAULT.INITIATION.QUEUEが使用される。
$ runmqtrm -q QL.INITQ -m qmgr1 01/06/13 16:25:08 : WebSphere MQ trigger monitor started. __________________________________________________ 01/06/13 16:25:08 : Waiting for a trigger message
トリガリングのテスト
メッセージのPUT
$ amqsreq QL.A qmgr1 QM.REPLY Sample AMQSREQ0 start server queue is QL.A replies to AMQ.50E9AFED20001E02 Hello MQ world! Hello Hello response response response no more replies Sample AMQSREQ0 end $
“Hello MQ world!”、”Hello”、”Hello”メッセージがQL.AにMQPUTされ、空行の後にQM.REPLYキューよりamqsechがMQPUTしたメッセージをMQGETできている。その時のトリガー・モニターの状況は下記の通り。
トリガー・モニター
スペースが多いのはメッセージ構造体に指定されていないパラメータが多い為。
__________________________________________________ 01/06/13 16:25:08 : Waiting for a trigger message /opt/mqm/samp/bin/amqsech 'TMC 2QL.A PR.ECHO /opt/mqm/samp/bin/amqsech qmgr1 ' Sample AMQSECHA start Hello MQ world! MQGET ended with reason code 2033 Sample AMQSECHA end 01/06/13 16:27:21 : End of application trigger. __________________________________________________ 01/06/13 16:27:21 : Waiting for a trigger message /opt/mqm/samp/bin/amqsech 'TMC 2QL.A PR.ECHO /opt/mqm/samp/bin/amqsech qmgr1 ' Sample AMQSECHA start Hello Hello MQGET ended with reason code 2033 Sample AMQSECHA end 01/06/13 16:27:40 : End of application trigger. __________________________________________________ 01/06/13 16:27:40 : Waiting for a trigger message
トリガー・メッセージが2つに区切られているのは、1件目のMQGET後、5秒間(WaitIntervalで指定された時間)後、何もメッセーがPUT為れなかった為、MQGETがrc 2033(MQRC_NO_MSG_AVAILABLE)を返して終了したが、2, 3件目の間は5秒以内だった為、続けて”Hello”, “Hello”とMQGETした。このWaitIntervalの設定はソース中のGetMsgOpts構造体で設定している。
3件目のputの際は、直前にamqsechがgetしているので再度トリガーイベントが発生しamqsechが2重起動しそうな気がしたが、まだ、amqsechがMQOPEN中でWaitしている状態の為、イベントが発生しなかったのかな。
<前略> /******************************************************************/ /* */ /* Get messages from the message queue */ /* Loop until there is a warning or failure */ /* */ /******************************************************************/ buflen = sizeof(buffer) - 1; gmo.Version = MQGMO_VERSION_2; /* Avoid need to reset Message */ gmo.MatchOptions = MQMO_NONE; /* ID and Correlation ID after */ /* every MQGET */ gmo.Options = MQGMO_ACCEPT_TRUNCATED_MSG | MQGMO_CONVERT /* receive converted messages */ | MQGMO_WAIT; /* wait for new messages */ gmo.WaitInterval = 5000; /* 5 second limit for waiting */ while (CompCode == MQCC_OK) { /****************************************************************/ /* */ /* MQGET sets Encoding and CodedCharSetId to the values in */ /* the message returned, so these fields should be reset to */ /* the default values before every call, as MQGMO_CONVERT is */ /* specified. */ /* */ /****************************************************************/ md.Encoding = MQENC_NATIVE; md.CodedCharSetId = MQCCSI_Q_MGR; MQGET(Hcon, /* connection handle */ Hobj, /* object handle */ &md, /* message descriptor */ &gmo, /* GET options */ buflen, /* buffer length */ buffer, /* message buffer */ &messlen, /* message length */ &CompCode, /* completion code */ &Reason); /* reason code */ /* report reason if any (loop ends if it failed) */ if (Reason != MQRC_NONE) { printf("MQGET ended with reason code %d\n", Reason); } <後略>
尚、トリガー機能を停止したい場合はキューに対してNOTRIGGERパラメーターを設定する。
alter ql(QL.A) notrigger 1 : alter ql(QL.A) notrigger AMQ8008: WebSphere MQ queue changed.
参考ドキュメント
- WebSphere MQ Application Programming Guide Version 7.0
- WebSphere MQ System Administration Guide Version 7.0
ちなみに、runmqtrmはiSeries(IBM i)版には無いらしい。。。