HPC/並列プログラミングポータルでは、HPC(High Performance Computing)プログラミングや並列プログラミングに関する情報を集積・発信しています。 |
[記事一覧を見る]
それでは、ソースコードの詳細を見ていこう。パイプライン処理を実装するには、pipeline型のオブジェクトと、filterクラスの派生クラスとして処理を実装したクラスのオブジェクトを生成し、処理を行う順にfilter派生オブジェクトをpipelineオブジェクトに登録、最後にpipelineオブジェクトのrunメンバー関数を呼べばよい。
まず、パイプラインを通るデータをクラスとして定義する。今回の例では、アクセスログデータを表現する「AccessLogItem」クラスがこれに相当する。
// アクセスログ1件を表すクラス class AccessLogItem { // 元データ string buffer; boost::regex regex; // パース後のデータ string ipAddr; string hostname; string date; string request; string resultCode; string bites; public: string parsed; AccessLogItem(); // 入力ストリームから1行読みこむ // 成功すればtrue、失敗すればfalseを返す bool ReadFromIStream(ifstream ifs); // 元データをパースする // 成功すればtrue、失敗すればfalseを返す bool Parse(); // IPアドレスをホスト名に変換 // 成功すればtrue、失敗すればfalseを返す bool IPaddrToHostname(); // 出力用に整形したstringを生成 string Format(); };
次に、各段階の処理を行うfilter派生クラスを定義する。先に述べたとおり、この例では先に宣言したAccessLogItemクラスに対し行う4つの操作をそれぞれフィルタとして定義していく。
まず、ファイルから1行のデータを読み出すフィルタを用意する。フィルタは「filter」クラスの派生クラスとして定義し、フィルタが実際に行う処理はoperator()内に記述する。
// 入力ストリームから1行分のアクセスログを読み込む class InputFilter: public tbb::filter { private: ifstream ifs; AccessLogItem* buffer; int bufSize; int nextBuffer; public: InputFilter( ifstream inputStream, int size_of_buffer ); ~InputFilter(); void* operator()(void*); };
フィルタのコンストラクタでは、まず基底クラスの初期化を行う。このとき、フィルタが並列処理可能であればfalseを、並列処理が行えない場合であればtrueを引数として与える。今回のInputFilterクラスはシーケンシャルにファイルからデータを読み出すフィルタであり、並列処理は行わない。そのため、ここではtrueを与えている。
また、InuputFilterはパイプラインの初段であるため、パイプラインに流すデータの生成もここで行う必要がある。そこで、コンストラクタの第2引数をバッファサイズとし、指定された数のAccessLogItemオブジェクトを確保するようにしている。
InputFilter::InputFilter( ifstream inputStream, int size_of_buffer ): filter(true), ifs(inputStream), bufSize(size_of_buffer), buffer(NULL), nextBuffer(0) { buffer = new AccessLogItem[bufSize]; } InputFilter::~InputFilter() { delete[] buffer; }
実際にフィルタが行う処理は、operator()内に記述する。InputFilterはパイプラインの初段であるため、引数は渡されない。また、戻り値は次のパイプラインに渡すデータへのポインタをvoid*型にキャストしたものとなる。
void* InputFilter::operator()(void*) { if( nextBuffer == bufSize - 1 ) { nextBuffer = 0; } else { nextBuffer++; } if( buffer[nextBuffer].ReadFromIStream(ifs) ) { return (buffer[nextBuffer]); } else { return NULL; } }
続いて、パイプライン2段目および3段目の処理となる、データのパースおよびIPアドレスの変換を行うフィルタを用意する。どちらのフィルタも並列実行が可能なので、基底クラスのコンストラクタにはfalseを与えて初期化している。またoperator()では、パイプラインの前の段のoperator()の戻り値が引数として与えられる。
// アクセスログをパースする class ParseFilter: public tbb::filter { public: // 派生元クラスのコンストラクタを呼ぶ // このステージは並列実行が可能なので、引数にfalseを指定 ParseFilter() : filter(false) {} void* operator()( void* item ) { AccessLogItem a = *static_castAccessLogItem*(item); a.Parse(); return a; } };
// IPアドレスをホスト名に変換 class IPaddrToHostnameFilter: public tbb::filter { public: // 派生元クラスのコンストラクタを呼ぶ // このステージは並列実行が可能なので、引数にfalseを指定 IPaddrToHostnameFilter() : filter(false) {} void* operator()( void* item ) { AccessLogItem a = *static_castAccessLogItem*(item); a.IPaddrToHostname(); return a; } };
最後に、パイプラインの最終段となる整形および出力を行うフィルタを用意する。出力についてもシーケンシャルに行うため、基底クラスのコンストラクタにはtrueを与えている。また、このフィルタは最終段であるので、フィルタの出力、つまりoperator()の戻り値はNULLとなる。
// 整形したログを出力 class OutputFilter: public tbb::filter { private: ostream os; public: // 派生元クラスのコンストラクタを呼ぶ // このステージは並列実行が不可なので、引数にtrueを指定 OutputFilter( ostream outputStream ) : filter(true), os(outputStream) {} void* operator()( void* item ) { AccessLogItem a = *static_castAccessLogItem*(item); os a.Format() endl; return NULL; } };
パイプラインを実行するにはまずpipelineオブジェクトと各フィルタを用意し、pipelineオブジェクトの「add_filter」メンバー関数でフィルタを処理順に登録していく。
// パイプラインを作成 tbb::pipeline p; int n_buffer = 10; // 入力ストリームから読み込むパイプラインを作成、登録 ifstream ifs; : : InputFilter inputFilter(ifs, n_buffer); p.add_filter(inputFilter); // アクセスログをパースするパイプラインを作成、登録 ParseFilter parseFilter; p.add_filter(parseFilter); // IPアドレスをホスト名に変換するパイプラインを作成、登録 IPaddrToHostnameFilter ip2hostFilter; p.add_filter(ip2hostFilter); // 整形したログを出力するパイプラインを作成、登録 ofstream ofs; : : OutputFilter outputFilter(ofs); p.add_filter(outputFilter);
最後に、pipelineオブジェクトの「run」メンバー関数を実行すると、パイプライン処理が開始される。なお、run関数の引数にはパイプラインを流れるバッファの数を指定する。
p.run( n_buffer );
なお、500件のアクセスログに対し、上記のように並列化を行ったものと、同様の処理を非並列に実行した場合とで処理時間を比較した結果が次の表3である。今回の例では、IPアドレスからホスト名を取得する個所が大きなボトルネックとなっており、これを並列化することで大幅に処理時間を向上できた。
プログラム | 実行時間 |
---|---|
並列版 | 13635ミリ秒 |
非並列版 | 82025ミリ秒 |
[PageInfo]
LastUpdate: 2009-11-18 20:48:26, ModifiedBy: hiromichi-m
[Permissions]
view:all, edit:login users, delete/config:members