HPC/並列プログラミングポータルでは、HPC(High Performance Computing)プログラミングや並列プログラミングに関する情報を集積・発信しています。

新着トピックス

オープンソース化された並列化テンプレートクラスライブラリ「Intel Threading Building Blocks」入門

 C++で並列アプリケーションを実装するためのテンプレートライブラリとして、インテルがリリースしている「Intel Threading Building Blocks」(TBB)がある。TBBはインテル コンパイラーやインテル Parallel Studioといったインテルのコンパイラ製品に付属しているほか、オープンソース版も公開されている。本記事ではこのTBBの概要と、基本的な使い方を解説する。

 C/C++で並列アプリケーションを実装する手法として、並列化したい処理をOSのAPIを用いてマルチスレッド化する、もしくは並列プログラミングの規格である「OpenMP」を利用する、といったものが知られている。これらについては以前の記事でも紹介しているが、マルチスレッドを利用した実装は柔軟性がある一方で手間が掛かり、OpenMPは比較的手軽だが柔軟性に欠けるなど、それぞれに長所と短所がある。

 また、C++でアプリケーションを実装する場合、スレッドやOpenMPを利用した実装には別の困難さも生じてくる。1つはC++でのスレッドの扱いにくさ、もう1つは利用する関数やクラスが「スレッドセーフ」かどうか分かりにくい、という点だ。

 これらの問題を解決し、C++での見通しの良い並列処理実装を可能にするのが本記事で紹介する「Intel Threading Building Blocks」(以下、TBB)である。

Intel Threading Building Blocksが持つ機能

 TBBはC++で並列処理を記述するためのテンプレートクラスライブラリだ。インテルが開発し、かつては商用のライブラリとして販売されていたが、2007年にオープンソース化され、現在はGPL 2の下で公開されている。また、非オープンソースなプロダクトに利用できる商用版も引き続き提供されており、インテル コンパイラー製品やインテル Parallel Studio製品に同梱されているほか、単体でも購入が可能だ。現在の最新バージョンは2.2で、オープンソース版についてはthreadingbuildingblocks.orgからWindows/Linux/Mac OS X/Solaris向けのヘッダーファイルおよびバイナリを入手できる。

 TBBが提供する機能を簡単にまとめると、次のようになる。

  • データ群に対する反復処理を並列実装するためのテンプレートクラスやテンプレート関数
  • ストリーム処理/パイプライン処理を並列実装するためのテンプレートクラスやテンプレート関数
  • タスクベースの並列処理を実装するためのタスクスケジューラクラス
  • スレッドセーフなqueue/vector/mapクラス
  • スケーラブルなメモリアロケータ
  • mutexなどの排他処理機構

 並列処理を実装する場合、データや処理をどのように分割し、どのように実行するかがポイントとなる(図1)。たとえば多数のデータに対し同一の処理を行うような場合、データを分割して複数のスレッドに割り当て、同一の処理を並列実行させる方式が考えられる。また、ストリーム状の入力データを次々と処理していくような場合は、処理をパイプラインのように分割し、それぞれの行程を別スレッドで動作させることが考えられる。複数のコンポーネントが独立かつ非同期に処理を行うようなパターンでは、それぞれのコンポーネントごとにスレッドを割り当てるのが自然だ。TBBではこのような並列処理パターンそれぞれに対応するアルゴリズムやクラスが実装されており、アルゴリズムを選択し、ロジックとデータを用意するだけで簡単に並列処理を実装できる。

 また、並列化を行う場合には関数やクラスが「スレッドセーフ」かどうか、常に気を払わなければならない。並列化を行った個所でスレッドセーフでない関数を実行してしまうと、タイミングによってはデータの破壊や意図しない変更といった問題が発生するからだ。現在多くのC標準関数についてはスレッドセーフとなっているものの、C++でよく利用されるSTL(Standard Tempkate Library)についてはスレッドセーフであることは保証されていない。そのためTBBには、STLのqueueおよびvector、mapに相当するスレッドセーフなコンテナが用意されている。ただし、これらのスレッドセーフなコンテナはSTLのコンテナと比べると処理が遅い傾向があるため、必要な個所のみに利用するべきである。

 そのほか、アトミックな操作を行えるオブジェクトを作成するためのテンプレートクラスや、排他的な処理を行うためのmutex、スレッド化されスケーラブルなメモリアロケータなど、並列処理を実装する上で有用なコンポーネントなども含まれている。

TBBを利用するメリット

 さて、並列処理を実装する方法として、TBBのほかにも前述のようにマルチスレッドやOpenMPといった方法がある。これらと比較してTBBのメリットはどこにあるのだろうか。

 まずマルチスレッドによる実装との比較であるが、TBBでは「複数のスレッドを作成し、適切なデータに対して適切な処理を割り当てる」という処理が、テンプレート関数という形であらかじめ用意されている。そのため、データとロジックのみを用意するだけで、面倒なスレッドの生成やデータの分割といった部分を気にせずに並列処理を実装できる。

 一方OpenMPでは、for/whileループの並列処理やブロックごとの並列処理といった、スレッドの生成に関わる部分については面倒を見なくてもよいものの、データの分割については基本的には自前で処理しなければならない。また、OpenMPは並列処理すべき個所にプラグマを記述していく実装スタイルであるため、ソースコードの視認性や保守性が悪くなりやすいという問題もある。TBBはテンプレートクラスライブラリという形で実装されているため、並列処理をカプセル化しやすく、保守性/再利用性の高いコードを記述できる。

TBBを用いた並列アプリケーションの設計と実装

 先に述べたように、TBBでは実行する処理と処理すべきデータを用意し、使用するアルゴリズムに対応するテンプレート関数にそれらを渡すというスタイルで並列処理を実装していく。以下では、典型的な並列処理を例に、TBBでの実装方法を紹介していこう。

 なお、本記事ではテスト環境として、CPUにCore i7 920(2.66GHz)、3GBのメモリを搭載したPCを使用した。OSはWindows Vista Home Premiumで、開発環境にはVisual Studio 2009 SP2およびParallel Studioを使用している。また、コンパイルオプションについてはデフォルトの設定(最適化オプションは「/O2」のみ)を使用している。

ヘッダーファイルのインクルードとタスクスケジューラの初期化

 TBBを利用する際には、使用するコンポーネントに応じたヘッダーファイルをincludeしておく必要がある。また、アルゴリズムテンプレート関数や後述するタスクスケジューラを使用する前には、「task_scheduler_init.h」内で定義されている「tbb::task_scheduler_init」クラスのオブジェクトを生成・初期化しておく必要がある。通常はリスト1のようにプログラムのスタートアップ時にこのオブジェクトを生成し、終了時にオブジェクトが破棄されるようにすれば良いだろう。

リスト1 TBBの利用前に必要なタスクスケジューラ初期化
#include "tbb/task_scheduler_init.h"

int main() {
    tbb::task_scheduler_init init;

    // 以下、アプリケーションの初期化とメイン処理を記述
 :
 :
 :
    return 0; // main()関数から抜けるとともに、initオブジェクトは破棄される
}

 また、TBBのヘッダーファイルおよびライブラリが適切に読み込まれるよう、ヘッダーファイルおよびライブラリの検索ディレクトリを適切に設定しておく必要がある。インテル Parallel Studioに付属するTBBを利用する場合、Parallel Composerの「Select Build Components」画面の「Use TBB」にチェックを入れておくだけでこれらの設定が完了する(図3、4)。

要素数が分かっているデータ群に対する反復処理の並列実装(forループの並列化)

 並列処理が有効な典型的な例として、配列や各種コンテナに格納されたデータに対し、始点と終点を指定して次々と反復処理を行う、というパターンがある。リスト2はその典型的なパターンで、要素数がそれぞれdimである配列aと配列bに対し、おのおのの要素を足し合わせる処理をするものだ。

リスト2 典型的な反復処理の例:配列どうしの加算
void VectorAdd( double* a, double* b, int dim ) {
    for( int i = 0; i  dim; i++ ) {
        a[i] = a[i] + b[i];
    }
}

 このような反復処理を実装したアルゴリズムが、TBBの「parallel_for」テンプレート関数だ。parallel_forテンプレート関数は、次の図3のような処理を行う。

 parallel_forテンプレート関数の引数は、反復処理の範囲を表すオブジェクト(「Range」クラスの派生オブジェクト)と、反復して行う処理を定義した関数オブジェクトの2つだ。parallel_forテンプレート関数は、与えられたRangeオブジェクトを適切に分割し、分割したオブジェクトを引数として関数オブジェクトを実行する、という処理を行う。TBBにはRangeクラスの派生クラスとして、閉区間(ループの開始点と終了点)を表す「blocked_range」や、2次元の範囲を表す「blocked_range2d」といった汎用的なオブジェクトがあらかじめ用意されているので、これらから適切なものを選択して使用すればよい。もちろん、独自のRange派生クラスを定義して使用することも可能だ。

 たとえば、リスト2の処理をparallel_forテンプレート関数を用いて実装すると、次のリスト3のようになる。

リスト3 リスト2の処理をTBBで実装した例
#include "tbb/parallel_for.h"
#include "tbb/blocked_range.h"

class VectorAdder {
    public:
        double* vec1;
        double* vec2;

        VectorAdder( double* a, double* b ) :
        vec1(a),
        vec2(b)
        {}

    void operator() ( const tbb::blocked_rangeint r ) const {
        for( int i = r.begin(); i != r.end(); i++ ) {
            vec1[i] = vec1[i] + vec2[i];
        }
    }
};

void ParallelVectorAdd( double* a, double* b, int dim ) {
    tbb::blocked_rangeint range = tbb::blocked_rangeint(0, dim, 100);
    VectorAdder body = VectorAdder(a, b);
    tbb::parallel_for( range, body );
}

パイプライン処理の並列化・ソースコード詳細解説

 それでは、このコードについて詳しく解説していこう。まず、parallel_forテンプレート関数は「parallel_for.h」で定義されており、利用の際はこのヘッダーファイルをincludeしておく必要がある。また、後述する「blocked_range」クラスも利用しているので、こちらが宣言された「blocked_range.h」も同様にincludeする。

#include "tbb/parallel_for.h"
#include "tbb/blocked_range.h"

 次に、実行したい処理を行う関数オブジェクトを用意する。ここでは配列同士の加算処理を行う「VectorAdder」クラスを作成している。処理に必要な変数等はクラスのメンバー関数として用意しておき、オブジェクトのコンストラクタで初期化を行う。

class VectorAdder {
    public:
        double* vec1;
        double* vec2;

        VectorAdder( double* a, double* b ) :
        vec1(a),
        vec2(b)
        {}

 parallel_forテンプレート関数で並列処理を実装する際に、キモとなるのがoperator()関数である。operator()関数はデータに対する処理を実装する関数で、引数には処理すべきデータ範囲を表すRange派生クラス型のオブジェクトを取る。ここではint型の閉区間を示すblocked_rangeintクラスを用いている。Rangeオブジェクトに対し、begin()関数を呼ぶことで反復処理すべき最初のデータを、end()関数を呼ぶことで最後のデータを取得できるので、この範囲でループを実行すればよい。

    void operator() ( const tbb::blocked_rangeint r ) const {
        for( int i = r.begin(); i != r.end(); i++ ) {
            vec1[i] = vec1[i] + vec2[i];
        }
    }
};

 以上のように関数オブジェクトを定義したら、あとはRangeオブジェクトと関数オブジェクトを作成し、それを引数にparallel_forテンプレート関数を呼び出せばよい。

void ParallelVectorAdd( double* a, double* b, int dim ) {
    tbb::blocked_rangeint range = tbb::blocked_rangeint(0, dim, 100);
    VectorAdder body = VectorAdder(a, b);
    tbb::parallel_for( range, body );
}

 まず、Rangeオブジェクトを作成する。関数オブジェクトの引数にはblocked_rangeint型のオブジェクトが渡されるので、ここでも同じ型のオブジェクトを作成する。なお、blocked_rangeのコンストラクタの引数は次のようになっている。

template typename Value class blocked_range {
  :
  :
    blocked_range( Value begin, Value end, size_type grainsize=1 );
  :
  :

 ここで「Value」はテンプレートで指定した型、「begin」は反復を開始する値、「end」は反復を終了する値である。また、「grainsize」はループを分割する際の粒度を指定する。通常は100~1000程度を指定すると良いようだ。

 リスト3の例では、0から次元数(この例では変数「dim」に相当)-1までの整数に対して反復処理を行うため、引数にはそれぞれ「0」および「dim」を指定してblocked_rangeオブジェクトを作成し、VectorAdder型のオブジェクトとともにparallel_for関数に与えることで並列処理を実行している。

 なお、上記で紹介したVectorAdd関数およびParallelVectorAdd関数を用いて、要素数が52428800(50×1024×1024)の配列の加算を行ったところ、その処理時間は次の表1のようになった。並列化を行うことで、大幅な高速化が行えていることが分かる。

表1 VectorAddおよびその並列版であるParallelVectorAddの実行パフォーマンス
関数処理時間
VectorAdd(非並列版)78ミリ秒
ParallelVectorAdd(並列版)125ミリ秒

 また、TBBにはループの並列化を行うアルゴリズムとしてparallel_forテンプレート関数のほか、並列プレフィックス演算を行うparallel_scanや、リダクション演算を行うparallel_reduceといったテンプレート関数も用意されている。これらについても、基本的にparallel_forテンプレート関数と同じような方法で利用が可能だ。

要素数が分からないデータ群に対する反復処理の並列実装(whileループの並列化)

 先で解説したparallel_forを使用した並列処理は、あらかじめ処理するデータの個数が決まっている場合にのみ利用できる。一方、次に紹介するparallel_whileは入力ストリームに対する処理など、処理したいデータの個数が事前に分からない場合でも利用できる並列アルゴリズムだ。

 たとえば次のリスト4は、ファイルから1行ずつテキストを読み出し、正規表現で指定した特定のキーワードを含む行の行番号とその内容を出力するというプログラムだ。この場合、データの個数(ここでは行数)を調べるにはストリームを最後まで読まなければならず、データが非常に多い場合は現実的ではない。parallel_whileはこのような処理を並列化するのに有用である。

リスト4 ストリーム入力に対する処理の実装例
#include iostream
#include fstream
#include string
#inclue "boost/regex.hpp"

bool SearchText(ifstream is, string keyword) {
    boost::regex rex;
    boost::match_resultsconst char* results;
    string buf;
    int lineNumber = 1;

    rex = keyword;
    while( is  getline(is, buf) ) {
        if( boost::regex_search(buf.c_str(), results, rex) ) {
            cout  lineNumber  ": "  buf  endl;
        }
        lineNumber++;
    }
    cout  "read "  lineNumber - 1  " lines."  endl;
    return true;
}

 リスト4を、parallel_whileを使用してTBBで並列実装したものが次のリスト5だ。

リスト5 リスト4の処理をTBBで並列実装した例
#include iostream
#include fstream
#include string

class LinedString {
public:
    string contents;
    int lineNumber;
};

class LinedStream {
    istream is;
    int num;

public:
    bool pop_if_present( LinedString lstr ) {
        if( getline(is, lstr.contents) ) {
            num = num++;
            lstr.lineNumber = num;
        } else {
            return false;
        }
        return true;
    }

    LinedStream(istream inputStream) :
        is(inputStream),
        num(1)
    {}
};

class Searcher {
    string keyword;
public:
    void operator()( LinedString lstr ) const;
    Searcher(string key): keyword(key) {}
    typedef LinedString argument_type;
};

void Searcher::operator()( LinedString lstr ) const {
    boost::regex rex;
    boost::match_resultsconst char* results;

    rex = keyword;
    if( boost::regex_search(lstr.contents.c_str(), results, rex) ) {
        cout  lstr.lineNumber  ": "  lstr.contents  endl;
    }
}

bool ParallelSearchText(istream is, string keyword) {
    tbb::parallel_whileSearcher w;
    LinedStream stream(is);
    Searcher body(keyword);
    w.run( stream, body );
    return true;
}

 parallel_forなどと同様、parallel_whileでも関数オブジェクトとして実行すべき処理を渡すが、parallel_forと異なるのはparallel_whileは関数ではなくテンプレートクラスになっている点だ。テンプレート引数としては実行すべき処理を実装したクラスを取る。このparallel_while型のオブジェクトを生成し、引数に入力ストリームと関数オブジェクトを与えて「run」メンバー関数を実行することで、並列処理が行われる。

 リスト5では、並列実行すべき処理を実装したクラスとして「Searcher」クラスを、入力ストリームとして「LinedStream」クラスを用意している。

bool ParallelSearchText(istream is, string keyword) {
    tbb::parallel_whileSearcher w;
    LinedStream stream(is);
    Searcher body(keyword);
    w.run( stream, body );
    return true;
}

 parallel_whileに与える入力ストリームは、ストリームからデータを取り出す「pop_if_present」というメンバー関数を備えている必要がある。pop_if_present関数は引数に「argument_type」(後述)で指定した型のオブジェクト(の参照)を取り、次に処理すべきデータをこのオブジェクトに格納してtrueを返す。また、もし次に処理すべきデータがない場合はfalseを返せば良い。

 なお、リスト5の例では「行番号」と「その行に含まれる文字列」という2つのデータを格納するクラス「LinedString」を用意し、これをargument_typeとして使用している。

class LinedString {
public:
    string contents;
    int lineNumber;
};

class LinedStream {
    istream is;
    int num;

public:
    bool pop_if_present( LinedString lstr ) {
        if( getline(is, lstr.contents) ) {
            num = num++;
            lstr.lineNumber = num;
        } else {
            return false;
        }
        return true;
    }

    LinedStream(istream inputStream) :
        is(inputStream),
        num(1)
    {}
};

 並列実行すべき処理を実装するクラスには、operator()に加えてその引数の型を「argument_type」として定義しておく必要がある。

 リスト5の場合、入力ストリームからデータをLinedString型で取り出すので、argument_typeをLinedStreamとして定義している。また、operator()内では取り出したLinedStreamオブジェクトに対する処理を記述している。

class Searcher {
    string keyword;
public:
    void operator()( LinedString lstr ) const;
    Searcher(string key): keyword(key) {}
    typedef LinedString argument_type; // operator()の引数として利用する型を「argument_type」として定義しておく
};

void Searcher::operator()( LinedString lstr ) const {
    boost::regex rex;
    boost::match_resultsconst char* results;

    rex = keyword;
    if( boost::regex_search(lstr.contents.c_str(), results, rex) ) {
        cout  lstr.lineNumber  ": "  lstr.contents  endl;
    }
}

 なお、parallel_whileを使用した並列化の場合、ループ内で実行する処理の内容によってはあまりパフォーマンス向上が見られない場合がある。たとえば今回の例は1行分のテキストに対する正規表現検索という比較的軽めの処理であるため、次の表2のように並列化の効果は若干にとどまっている。

表2 10万行のテキスト(Webサーバーのアクセスログファイル)に対し、「MSIE [78]\..; Windows NT 5.1;」という正規表現にマッチする行を検索するのにかかった時間
関数処理時間
SearchText(非並列版)1389ミリ秒
ParallelSearchText(並列版)1294ミリ秒

パイプライン処理の並列化

 並列処理の実装の1つに、行うべき処理を複数のステージに分割し、それぞれのステージを並列に実行するというものがある。これはパイプライン処理などといわれている。

 パイプライン処理の例として、HTTPサーバーのアクセスログ整形を挙げてみよう。たとえばテキスト形式で1行に1件のログが記述されているログファイルに対し、「アクセス元IPアドレスからそのホスト名を取得、整形して出力する」というような処理を行う例を考えてみよう。この処理は、データに対して次のような操作を次々と適用するパイプライン処理で実装できる。

  • ファイルから1行読み出し
  • データのパース
  • IPアドレスの変換
  • 整形して出力

 このとき、データのパースおよびIPアドレスの変換処理については複数のスレッドで同時に実行できる。一方、行番号の付加と出力については読み出した順に行う必要がある。このように順序付きでデータをパイプライン処理するために、TBBには「pipeline」というアルゴリズムが用意されている。pipelineは「filter」クラスの派生クラスとして実装された処理を、入力データに対して次々と行っていくものだ。

 上記のアクセスログ整形処理をTBBを実装した例が、次のリスト6になる。

リスト6 TBBでアクセスログ整形処理を実装する例(抜粋)
#include string
#include iostream
#include fstream

#include "tbb/task_scheduler_init.h"
#include "tbb/pipeline.h"
#include "boost/regex.hpp"

using namespace std;
  :
  :
// アクセスログ1件を表すクラス
class AccessLogItem {
    // 元データ
    string buffer;
    boost::regex regex;

    // パース後のデータ
    string ipAddr;
    string hostname;
    string date;
    string request;
    string resultCode;
    string bites;

public:
    string parsed;
    AccessLogItem();

    // 入力ストリームから1行分のデータを読み込む
    // 成功すればtrue、失敗すればfalseを返す
    bool ReadFromIStream(ifstream ifs);

    // 元データをパースする
    // 成功すればtrue、失敗すればfalseを返す
    bool Parse();

    // IPアドレスをホスト名に変換
    // 成功すればtrue、失敗すればfalseを返す
    bool IPaddrToHostname();

    // 出力用に整形したstringを生成
    string Format();
};
  :
  :
// 各ステージでの処理を定義したフィルタを記述

// アクセスログをパースする
class ParseFilter: public tbb::filter {
public:
    // 派生元クラスのコンストラクタを呼ぶ
    // このステージは並列実行が可能なので、引数にfalseを指定
    ParseFilter() :
        filter(false)
    {}

    void* operator()( void* item ) {
        AccessLogItem a = *static_castAccessLogItem*(item);
        a.Parse();
        return a;
    }
};

// 入力ストリームから1行分のアクセスログを読み込む
class InputFilter: public tbb::filter {
private:
    ifstream ifs;
    AccessLogItem* buffer;
    int bufSize;
    int nextBuffer;
public:
    // 派生元クラスのコンストラクタを呼ぶ
    // このステージは並列実行が不可なので、引数にtrueを指定
    InputFilter( ifstream inputStream, int size_of_buffer );
    ~InputFilter();

    void* operator()(void*);
};


InputFilter::InputFilter( ifstream inputStream, int size_of_buffer ):
    filter(true),
    ifs(inputStream),
    bufSize(size_of_buffer),
    buffer(NULL),
    nextBuffer(0)
 {
    buffer = new AccessLogItem[bufSize];
}

InputFilter::~InputFilter() {
    delete[] buffer;
}

void* InputFilter::operator()(void*) {
    if( nextBuffer == bufSize - 1 ) {
        nextBuffer = 0;
    } else {
        nextBuffer++;
    }
    if( buffer[nextBuffer].ReadFromIStream(ifs) ) {
        return (buffer[nextBuffer]);
    } else {
        return NULL;
    }
}

// IPアドレスをホスト名に変換
class IPaddrToHostnameFilter: public tbb::filter {
public:
    // 派生元クラスのコンストラクタを呼ぶ
    // このステージは並列実行が可能なので、引数にfalseを指定
    IPaddrToHostnameFilter() :
        filter(false)
    {}

    void* operator()( void* item ) {
        AccessLogItem a = *static_castAccessLogItem*(item);
        a.IPaddrToHostname();
        return a;
    }
};

// 整形したログを出力
class OutputFilter: public tbb::filter {
private:
    ostream os;
public:
    // 派生元クラスのコンストラクタを呼ぶ
    // このステージは並列実行が不可なので、引数にtrueを指定
    OutputFilter( ostream outputStream ) :
        filter(true),
        os(outputStream)
    {}

    void* operator()( void* item ) {
        AccessLogItem a = *static_castAccessLogItem*(item);
        os  a.Format()  endl;
        return NULL;
    }
};

int _tmain(int argc, _TCHAR* argv[])
{
    tbb::task_scheduler_init init;
  :
  :
    // パイプラインを作成
    tbb::pipeline p;
    int n_buffer = 10;

    // 入力ストリームから読み込むパイプラインを作成、登録
    ifstream ifs;
  :
  :
    InputFilter inputFilter(ifs, n_buffer);
    p.add_filter(inputFilter);


    // アクセスログをパースするパイプラインを作成、登録
    ParseFilter parseFilter;
    p.add_filter(parseFilter);

    // IPアドレスをホスト名に変換するパイプラインを作成、登録
    IPaddrToHostnameFilter ip2hostFilter;
    p.add_filter(ip2hostFilter);

    // 整形したログを出力するパイプラインを作成、登録
    ofstream ofs;
  :
  :
    OutputFilter outputFilter(ofs);
    p.add_filter(outputFilter);
  :
  :
    // パイプラインを実行
    // 引数には処理に使用するバッファ数を指定
  :
  :
    p.run( n_buffer );
  :
  :
    return 0;
}

 それでは、ソースコードの詳細を見ていこう。パイプライン処理を実装するには、pipeline型のオブジェクトと、filterクラスの派生クラスとして処理を実装したクラスのオブジェクトを生成し、処理を行う順にfilter派生オブジェクトをpipelineオブジェクトに登録、最後にpipelineオブジェクトのrunメンバー関数を呼べばよい。

 まず、パイプラインを通るデータをクラスとして定義する。今回の例では、アクセスログデータを表現する「AccessLogItem」クラスがこれに相当する。

// アクセスログ1件を表すクラス
class AccessLogItem {
    // 元データ
    string buffer;

    boost::regex regex;

    // パース後のデータ
    string ipAddr;
    string hostname;
    string date;
    string request;
    string resultCode;
    string bites;

public:
    string parsed;
    AccessLogItem();

    // 入力ストリームから1行読みこむ
    // 成功すればtrue、失敗すればfalseを返す
    bool ReadFromIStream(ifstream ifs);

    // 元データをパースする
    // 成功すればtrue、失敗すればfalseを返す
    bool Parse();

    // IPアドレスをホスト名に変換
    // 成功すればtrue、失敗すればfalseを返す
    bool IPaddrToHostname();

    // 出力用に整形したstringを生成
    string Format();
};

 次に、各段階の処理を行うfilter派生クラスを定義する。先に述べたとおり、この例では先に宣言したAccessLogItemクラスに対し行う4つの操作をそれぞれフィルタとして定義していく。

  • ファイルから1行読み出し
  • データのパース
  • IPアドレスの変換
  • 整形して出力

 まず、ファイルから1行のデータを読み出すフィルタを用意する。フィルタは「filter」クラスの派生クラスとして定義し、フィルタが実際に行う処理はoperator()内に記述する。

// 入力ストリームから1行分のアクセスログを読み込む
class InputFilter: public tbb::filter {
private:
    ifstream ifs;
    AccessLogItem* buffer;
    int bufSize;
    int nextBuffer;
public:
    InputFilter( ifstream inputStream, int size_of_buffer );
    ~InputFilter();

    void* operator()(void*);
};

 フィルタのコンストラクタでは、まず基底クラスの初期化を行う。このとき、フィルタが並列処理可能であればfalseを、並列処理が行えない場合であればtrueを引数として与える。今回のInputFilterクラスはシーケンシャルにファイルからデータを読み出すフィルタであり、並列処理は行わない。そのため、ここではtrueを与えている。

 また、InuputFilterはパイプラインの初段であるため、パイプラインに流すデータの生成もここで行う必要がある。そこで、コンストラクタの第2引数をバッファサイズとし、指定された数のAccessLogItemオブジェクトを確保するようにしている。

InputFilter::InputFilter( ifstream inputStream, int size_of_buffer ):
    filter(true),
    ifs(inputStream),
    bufSize(size_of_buffer),
    buffer(NULL),
    nextBuffer(0)
 {
    buffer = new AccessLogItem[bufSize];
}

InputFilter::~InputFilter() {
    delete[] buffer;
}

 実際にフィルタが行う処理は、operator()内に記述する。InputFilterはパイプラインの初段であるため、引数は渡されない。また、戻り値は次のパイプラインに渡すデータへのポインタをvoid*型にキャストしたものとなる。

void* InputFilter::operator()(void*) {
    if( nextBuffer == bufSize - 1 ) {
        nextBuffer = 0;
    } else {
        nextBuffer++;
    }
    if( buffer[nextBuffer].ReadFromIStream(ifs) ) {
        return (buffer[nextBuffer]);
    } else {
        return NULL;
    }
}

 続いて、パイプライン2段目および3段目の処理となる、データのパースおよびIPアドレスの変換を行うフィルタを用意する。どちらのフィルタも並列実行が可能なので、基底クラスのコンストラクタにはfalseを与えて初期化している。またoperator()では、パイプラインの前の段のoperator()の戻り値が引数として与えられる。

// アクセスログをパースする
class ParseFilter: public tbb::filter {
public:
    // 派生元クラスのコンストラクタを呼ぶ
    // このステージは並列実行が可能なので、引数にfalseを指定
    ParseFilter() :
        filter(false)
    {}

    void* operator()( void* item ) {
        AccessLogItem a = *static_castAccessLogItem*(item);
        a.Parse();
        return a;
    }
};
// IPアドレスをホスト名に変換
class IPaddrToHostnameFilter: public tbb::filter {
public:
    // 派生元クラスのコンストラクタを呼ぶ
    // このステージは並列実行が可能なので、引数にfalseを指定
    IPaddrToHostnameFilter() :
        filter(false)
    {}

    void* operator()( void* item ) {
        AccessLogItem a = *static_castAccessLogItem*(item);
        a.IPaddrToHostname();
        return a;
    }
};

 最後に、パイプラインの最終段となる整形および出力を行うフィルタを用意する。出力についてもシーケンシャルに行うため、基底クラスのコンストラクタにはtrueを与えている。また、このフィルタは最終段であるので、フィルタの出力、つまりoperator()の戻り値はNULLとなる。

// 整形したログを出力
class OutputFilter: public tbb::filter {
private:
    ostream os;
public:
    // 派生元クラスのコンストラクタを呼ぶ
    // このステージは並列実行が不可なので、引数にtrueを指定
    OutputFilter( ostream outputStream ) :
        filter(true),
        os(outputStream)
    {}

    void* operator()( void* item ) {
        AccessLogItem a = *static_castAccessLogItem*(item);
        os  a.Format()  endl;
        return NULL;
    }
};

 パイプラインを実行するにはまずpipelineオブジェクトと各フィルタを用意し、pipelineオブジェクトの「add_filter」メンバー関数でフィルタを処理順に登録していく。

    // パイプラインを作成
    tbb::pipeline p;
    int n_buffer = 10;

    // 入力ストリームから読み込むパイプラインを作成、登録
    ifstream ifs;
  :
  :
    InputFilter inputFilter(ifs, n_buffer);
    p.add_filter(inputFilter);


    // アクセスログをパースするパイプラインを作成、登録
    ParseFilter parseFilter;
    p.add_filter(parseFilter);

    // IPアドレスをホスト名に変換するパイプラインを作成、登録
    IPaddrToHostnameFilter ip2hostFilter;
    p.add_filter(ip2hostFilter);

    // 整形したログを出力するパイプラインを作成、登録
    ofstream ofs;
  :
  :
    OutputFilter outputFilter(ofs);
    p.add_filter(outputFilter);

 最後に、pipelineオブジェクトの「run」メンバー関数を実行すると、パイプライン処理が開始される。なお、run関数の引数にはパイプラインを流れるバッファの数を指定する。

    p.run( n_buffer );

 なお、500件のアクセスログに対し、上記のように並列化を行ったものと、同様の処理を非並列に実行した場合とで処理時間を比較した結果が次の表3である。今回の例では、IPアドレスからホスト名を取得する個所が大きなボトルネックとなっており、これを並列化することで大幅に処理時間を向上できた。

表3 500件のアクセスログを処理した際にかかった時間
プログラム実行時間
並列版13635ミリ秒
非並列版82025ミリ秒

高い抽象度でアルゴリズムを実装できるTBB

 本記事ではTBBを使用した典型的な並列処理の実装方法について解説した。概念は若干難しいものの、TBBでは並列処理が一般化されて実装されているため、基本的な考え方さえ覚えてしまえば、さまざまなアルゴリズムの実装に向けて応用が効きやすい。また、ここでは解説しきれなかったが、TBBにはほかにもスレッドセーフなコンテナや、柔軟なタスクスケジューラといった高度な機能も用意されている。C++での並列処理の実装を考えているならば、ぜひ一度試す価値があるだろう。

 なお、TBBの解説書籍としてはオライリー・ジャパンから発売されている、「インテル スレッディング・ビルディング・ブロック」が非常に参考になる。TBBについてより詳しく、より高度な情報を知りたいなら、そちらも参考にしてほしい。