非同期イベントを扱うプログラムの書き方

非同期イベントの処理の仕方について勉強した事をまとめます。

バイスへの入出力等の非同期イベントはUNIX系システム*1ではほとんどがファイルディスクリプタの状態変化、シグナルによる割り込みによって実現されます。ファイルディスクリプタの状態変化は何が起こるか,いつ起こるか,どのような順番で起こるか,などが予測出来ません。
またシグナルはプロセスの実行中に任意のタイミングで割り込んでくのでさらに問題になります。

select システムコール

監視すべきファイルディスクリプタが複数ある場合はループを回して一つ一つ順番にチェックしていくというやり方があります。このやり方の場合、どこかのチェックでブロックしてしまうと当然他のチェックが行われない(または遅れる)ことになってしまうので通常はNon Blockingでチェックを行いますが、そうすると無駄にループを空回しすることになるのでCPUに不必要な負荷をかけてしまいます。
こういうやり方をbusy waitと言います。

busy waitをしないで良くするためには

全てのファイルディスクリプタを同時に監視する

ことが出来れば良いですが,そのためにselectシステムコールが用意されています。

int select(int nfds, fd_set *readfds, fd_set *writefds,
                  fd_set *exceptfds, struct timeval *timeout);

readfds,writefds,exceptfdsに監視したいファイルディスクリプタの集合を渡します。
readfds,writefdsについては「ブロックせずに」読み込み・書き込みが出来るかどうかを監視し,exceptfdsについては例外の発生を監視します。
何らかのディスクリプタの状態が変化するまではselectシステムコールの呼び出しによってブロックし,CPU資源を消費せずにプロセスを停止することが出来ます。

selectとシグナル

シグナルの扱いは慎重におこなわなければいけません。
まずシグナルは実行の最中に飛んでくるのでシグナルハンドラ内の処理によって矛盾が発生しないように非常に注意してハンドラを記述しなければいけません。

またman 2 selectにも書いてありますがシグナルに依存した処理をselectと同時に行う場合は以下の問題もあります。

static bool got_sigfoobar = false;

int
main()
{
  ...
  for(;;)
  {
    if(got_sigfoobar)
    {
      /* SIGFOOBARを検出したときの処理 */
      ...
    }
    ...
    /* !!!  ここでSIGFOOBARが飛んで来るとselectでブロックし上の処理に進まない !!! */
    ...
    select(...);
    ...
  }
}

SIGFOOBAR_handler(int sig)
{
  got_sigfoobar = true; /* よくあるシグナルハンドラの実装 */
}

これらを同時に克服するには一つの方法としてselectで待機している最中にのみシグナルが飛んで来るようにするという方法があります。実現するためにはsigprocmaskを使用してシグナルを操作しselectの実行中のみシグナルを許可するようにすれば良いです。

selectはシグナルを受信するとブロックから抜けるような仕様になっていますし,sigprocmaskで禁止していたシグナルも禁止を解除するときまで保留されるようになっています。

とはいってもsigprocmaskの実行とselectの実行の間には一瞬すき間があるので問題になる場合もあります。代わりにpselectを使用するようにすると良いらしいです。


このようにするとselectをファイルディスクリプタの状態変化とシグナルの両方を検出する為に使用することが出来るわけです。便利ですね〜。

具体例

PostgreSQLのpostmasterプロセスでの例です。

/*
 * メインループ 
 * シグナルを禁止した状態でメインループに入る
 */
static int
ServerLoop(void)
{
        ...

	for (;;)
	{
                ...

                /* select中だけシグナルを許可 */
		PG_SETMASK(&UnBlockSig);

		selres = select(nSockets, &rmask, NULL, NULL, &timeout);

                /* 再び禁止 */
		PG_SETMASK(&BlockSig);

		if (selres < 0)
                        /* エラー処理 */
		if (selres > 0)
		{
                        ...
			for (i = 0; i < MAXLISTEN; i++)
			{
                                ...
				if (FD_ISSET(ListenSocket[i], &rmask))
				{
                                        /* 
                                         * ソケットに変化があったら接続を確立し,
                                         * postgresプロセスを起動して処理を任せる
                                         */
					port = ConnCreate(ListenSocket[i]);
					if (port)
					{
						BackendStartup(port);
						StreamClose(port->sock);
						ConnFree(port);
					}
				}
			}
		}
                ...
	}
}

...

/* postgresプロセスなどの子プロセスが終了した際の後始末をするシグナルハンドラ 
 * selectのブロック中に呼ばれることが分かっているので自由に書いて良い
 */
static void
reaper(SIGNAL_ARGS)
{
  ...
}

*1:他のシステムは全然知らない