e-learning、オラクル研修、LMS(学習管理システム)のiStudy

e-learning、オラクル研修、LMS(学習管理システム)のiStudy

第89回 「DBMS_PIPEを使った非同期メッセージ通知(1)」

2014.03.13

こんにちは。インストラクターの蓑島です。

今回はタイトルにあるように、DBMS_PIPEパッケージを使って、非同期にメッセージ送受信する方法を簡単にご紹介します。今回は基本ですが、次回はこの機能を使った応用例をご紹介したいと思います。

まず、パイプ(PIPE)とはなにかというと、簡単にいえばメッセージを格納する場所です。
権限があればそれを使うすべてのセッションから共有できます。
したがって、あるセッションからメッセージをパイプに送信すると、別のセッションがそのパイプからメッセージを受信することができます。文字通り、パイプのようなイメージであり、入った順番に取り出されていきます。
このように、パイプを介して、メッセージを非同期に(つまり別々のタイミングで)送受信できるわけです。パイプの場所はデータベースサーバーのメモリです。
したがって、データベースをシャットダウンすると、パイプの情報は失われますので注意してください。

ではパイプを使った一般的な操作の手順を簡単に説明します。

<<送信側>>
1. いったん、ローカルメッセージバッファにメッセージを格納する(複数可)
2. そのバッファをパイプに送信する
---------------------------------------------------------------------
<<受信側>>
3. パイプからローカルメッセージバッファにメッセージを受信する(コピー)
4. そのバッファからメッセージを1件づつ取り出す。

ここで意識していただきたいのは、送信も、受信も2段階の処理となっていることです。送信側では直接パイプにメッセージを送信しません。
いったんローカルメッセージバッファというセッション固有の領域にメッセージ(複数可)を格納しておいて、そのローカルメッセージバッファをパイプに送信します。
受信側も同様に、パイプからローカルメッセージバッファにいったん受信して、そのローカルメッセージバッファからメッセージを1件ずつとりだします。
このように送信も受信も、ローカルメッセージバッファを介した2段階の処理となっています。

では実際に操作をしてきたいと思います。なるべく簡単におこなってみます。

まず、最初にDBMS_PIPEのテスト用のユーザ(TEST01, TEST02)を作成し、必要最低限の機能だけを付与しておきます。
SYSユーザなどの管理者ユーザで操作してください。

SQL> SHOW USER
ユーザーは"SYS"です。
SQL> CREATE USER TEST01 IDENTIFIED BY TEST01;

ユーザーが作成されました。

SQL> CREATE USER TEST02 IDENTIFIED BY TEST02;

ユーザーが作成されました。

SQL> GRANT CREATE SESSION TO TEST01, TEST02;

権限付与が成功しました。

SQL> GRANT EXECUTE ON DBMS_PIPE TO TEST01, TEST02;

権限付与が成功しました。

これで、TEST01,TEST02ユーザが作成されて、必要な権限が付与されました。
権限で「CREATE SESSION」はデータベースにログインできる権限、「EXECUTE ON DBMS_PIPE」は、DBMS_PIPEパッケージに対する実行権限です。

では、TEST01ユーザで、PIPE01という名前のパイプにメッセージを送信し、TEST02ユーザがそのパイプからメッセージを受信する、というストーリーで実際に処理を行います。

ではまず、TEST01ユーザでログインします。
SQL> CONNECT TEST01/TEST01
接続されました。

DBMS_OUTPUTパッケージの画面出力を有効にしておきます。
SQL> SET SERVEROUTPUT ON

次にローカルメッセージバッファにメッセージを格納します。DBMS_PIPEパッケージのPACK_MESSAGEプロシージャを使います。
以下をご覧ください。

SQL> show user
ユーザーは"TEST01"です。
SQL> EXEC DBMS_PIPE.PACK_MESSAGE('TEST01ユーザのメッセージ1')

PL/SQLプロシージャが正常に完了しました。

これで、「TEST01ユーザのメッセージ1」というメッセージをローカルメッセージバッファに格納しました。
続けてもう1件、メッセージをローカルバッファに格納します。

SQL> EXEC DBMS_PIPE.PACK_MESSAGE('TEST01ユーザのメッセージ2')

PL/SQLプロシージャが正常に完了しました。

これでローカルメッセージバッファにメッセージが2件格納されました。

では、続けてローカルメッセージバッファをパイプに送信します。パイプ名を「PIPE01」とします。
DBMS_PIPEパッケージのSEND_MESSAGEファンクションを使います。以下の例をご覧ください。

1
2
3
4
5
6
7
8
9
10
DECLARE
    V_RETURN NUMBER;
BEGIN
    V_RETURN := DBMS_PIPE.SEND_MESSAGE( 'PIPE01' );
    DBMS_OUTPUT.PUT_LINE( '戻り値=' || V_RETURN);
END ;
/
戻り値=0
 
PL/SQLプロシージャが正常に完了しました。

解説します。
DBMS_PIPE.SEND_MESSAGEは指定されたパイプに、ローカルメッセージバッファの内容全体を送信するファンクションです。ここで指定したパイプ名は、「PIPE01」です。
もしも「PIPE01」というパイプが事前に存在していなければ、自動的に新規で「PIPE01」という名前でパイプを作成して送信します。すでに存在していれば、そのパイプに対してそのまま送信します。
なお、このファンクションの戻り値が「0」であれば、正常に処理されたことを示します。
上記の例では確かに戻りは「0」で、正常に処理されたことがわかります。

これでPIPE01パイプが確実に存在し、そこにTEST01ユーザのメッセージ(2件)のセットが格納されています。
このように1回の処理で複数のメッセージがセットになってパイプに送信されます。パイプとの送受信の単位はセットであり、一回の送受信処理で複数のメッセージが送受信されます。

では、次はTEST02ユーザのセッションでこのメッセージのセット受信してみましょう。

TEST02ユーザで接続します。
SQL> CONNECT TEST02/TEST02
接続されました。

画面表示を有効にしておきます。
SQL> SET SERVEROUTPUT ON

まず、PIPE01パイプには現在、TEST01ユーザのメッセージ(2件)がセットになって格納されています。
このセットを取り出し、そのセッションのローカルメッセージバッファにコピーするためにDBMS_PIPEパッケージのRECEIVE_MESSAGEファンクションを使います。 このファンクションが正常に処理された場合は戻り値は「0」となります。
以下の例をご覧ください。

1
2
3
4
5
6
7
8
9
10
DECLARE
    V_RETURN NUMBER;
BEGIN
    V_RETURN := DBMS_PIPE.RECEIVE_MESSAGE( 'PIPE01' );
    DBMS_OUTPUT.PUT_LINE( '戻り値=' || V_RETURN);
END ;
/
戻り値=0
 
PL/SQLプロシージャが正常に完了しました。

ご覧のように、8行目で「戻り値=0」と表示されているので、このセッションのローカルメッセージバッファにメッセージのセットがコピーされたわけです。

もしも、仮にこのとき「PIPE01」パイプが存在していなければ、この処理は待機状態になります。つまり、「PIPE01」パイプが存在しメッセージが格納される状態になるまで待機します。
したがって、別のセッションで「PIPE01」パイプにメッセージを送信(SEND_MESSAGE)すれば待機状態は解消します。
なお、待機のタイムアウトはデフォルトでは1000日間となっています。

では、今回は「PIPE01」パイプは存在しますので、待機状態とならずに処理できたわけです。つまり、ローカルメッセージバッファにメッセージのセットが格納されました。そのセットにはメッセージが2件ありますが、順番に取り出してみましょう。
メッセージを取り出すには、DBMS_PIPEパッケージのUNPACK_MESSAGEプロシージャを使います。これによりローカルメッセージバッファから指定した変数にメッセージが取得できます。

以下の例をご覧ください。
まず忘れないで画面表示を有効にします。
SQL> SET SERVEROUTPUT ON

そして以下のように実行します。

1
2
3
4
5
6
7
8
DECLARE
    V_ITEM   VARCHAR2(32767);
BEGIN
    DBMS_PIPE.UNPACK_MESSAGE(V_ITEM);
    DBMS_OUTPUT.PUT_LINE(V_ITEM);
END ;
/
TEST01ユーザのメッセージ1      ←1件目のメッセージが表示されている

ここでは、V_ITEMという変数を宣言しておいて(2行目)、4行目でその変数に最初のメッセージが格納され、5行目でその変数を画面表示したわけです。ご覧のように、TEST01ユーザの最初のメッセージが表示できましたので、きちんと取り出せたことがわかります。

では、まったく同じ処理をもう一度行うと、2件目のメッセージが取得できます。

1
2
3
4
5
6
7
8
DECLARE
    V_ITEM   VARCHAR2(32767);
BEGIN
    DBMS_PIPE.UNPACK_MESSAGE(V_ITEM);
    DBMS_OUTPUT.PUT_LINE(V_ITEM);
END ;
/
TEST01ユーザのメッセージ2      ←2件目のメッセージが表示されている

ご覧のように、TEST01ユーザの2件目のメッセージが取得できました。

これで、ローカルバッファは空になったのですが、空のローカルバッファからさらにメッセージを取得するとどうなるでしょうか?
やってみます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
DECLARE
   V_ITEM   VARCHAR2(32767);
BEGIN
   DBMS_PIPE.UNPACK_MESSAGE(V_ITEM);
   DBMS_OUTPUT.PUT_LINE(V_ITEM);
END ;
/
 
DECLARE
*
行1でエラーが発生しました。:
ORA-06556: パイプが空です。UNPACK_MESSAGEリクエストを実行できません。
ORA-06512: "SYS.DBMS_PIPE" , 行80
ORA-06512: 行4

ご覧のように、「ORA-06556」のエラーとなりました。
ローカルバッファが空のとき、そこからさらにメッセージをUNPACKしようとするとエラーになるわけです。このことを利用すれば、プログラムでローカルメッセージバッファからループ処理で1件ずつメッセージを取り出す時の終了判定の条件に使えますね。

では今回はここまでにいたします。
簡単な例ですが、パイプを通して二つのセッション間でメッセージの送受信ができることを解説しました。
次回はこれを利用した応用例をご紹介したいと思います。ご期待ください。

先頭へ戻る