【OktinyMP -- OpenMP の簡単な代替実装】
このページでは、 OpenMP の代替実装である OktinyMP の簡単な使い方を解説します。
まずは、OpenMP のサンプルと言ったらコレといういつものやつを例示して、 それの OktinyMP 版を示すことで説明します。
#include <iostream>
#include <chrono>
int main()
{
using namespace std;
using namespace std::chrono;
static const int N = 10000000;
static const double dx = 1.0/N;
double sum = 0;
const auto t1 = system_clock::now();
#pragma omp parallel for reduction(+:sum)
for (int i = 0; i < N; i++) {
const double x = (i + 0.5)/N;
sum += dx/(1 + x*x);
}
const auto t2 = system_clock::now();
cout << "pi = " << 4*sum << endl;
cout << "time = " << duration_cast<microseconds>(t2 - t1).count() << endl;
return 0;
}
π を計算する典型的な OpenMP の例題ですが、 これを OktinyMP で書き換えると以下のようになります。
#include <iostream>
#include "oktinymp.h" // OktinyMP を読み込むよ
int main()
{
using namespace std;
using namespace std::chrono;
using OktinyMP::Starter; // Starter クラスを使うよ
typedef Starter<1> okt, starter_t; // 名前を付けとこう
constexpr int N = 10000000;
constexpr double dx = 1.0/N;
starter_t s; // スレッドプールを起動するための starter_t オブジェクトを生成
double sum = 0;
const auto loop = [&](int) noexcept { // 並列実行するためのコードを定義するよ
const int i0 = okt::i0(N); // それぞれのループ変数の初期値
const int iN = okt::iN(N); // それぞれのループ変数の終了値
double psum = 0; // それぞれのスレッドでの sum
//for (int i = 0; i < N; i++)
for (int i = i0; i < iN; i++) {
const double x = (i + 0.5)/N;
psum += dx/(1 + x*x);
}
s.critical([&]{ sum += psum; }); // 全てのスレッドの sum を計算
};
s.fork(); // スレッドプールを起動するよ
auto t1 = system_clock::now();
s.go(loop); // さっき定義したコードを並列実行
auto t2 = system_clock::now();
s.join(); // スレッドプールを停止するよ
cout << "pi = " << 4*sum << endl;
cout << "time = " << duration_cast<microseconds>(t2 - t1).count() << endl;
return 0;
}
どうでしょう。Intel Threading Building Blocks (TBB) なんかに比べると、 考え方が簡単だと私自身は思ってます。それでは、各部分を説明していきましょう。
まず、
typedef Starter<1> okt, starter_t; // 名前を付けとこう
ですが、Starter<1> オブジェクトに名前を付けています。
OktinyMP はスレッドプールの待機中に、
ここで指定したマイクロ秒だけループ毎にスリープします。
Starter<1> ならループ毎に1マイクロ秒スリープします。
Starter<0> なら一切スリープしません。
つまりビジーループで待機するようになります。
何マイクロ秒かスリープさせた方が良いのか、
ビジーループが良いのかは問題によります。
実際に動かしてみて決定するのが良いでしょう。
次に、
const int i0 = okt::i0(N); // それぞれのループ変数の初期値 const int iN = okt::iN(N); // それぞれのループ変数の終了値
ですが、これは
for (int i = 0; i < N; i++) { /* ... */ }
の形のループを分割して並列に実行する場合の、 それぞれのループのループ変数の初期値と終了値を計算します。 OpenMP で言うところの static schedule で分割します。 もし、もっと凝った分割をしたいのなら自分で計算してやってください。 以下の並列コードを定義する部分を、
const auto loop = [&](int) noexcept { // 並列実行するためのコードを定義するよ
以下のように書き換えるとスレッド ID が取得できますので、 それを頼りにオリジナルの分割を計算してやってください。
const auto loop = [&](int id) noexcept {
スレッド ID は 0,1,2,... とスレッド数だけカウントされた数値です。
マスタースレッドの ID は 0 です。
スレッド ID は、okt::get_id() 関数でも取得できます。
スレッドプールのスレッド数は
okt::get_concurrency() 関数で取得できます。
次に、
s.critical([&]{ sum += psum; }); // 全てのスレッドの sum を計算
ですが、critical メンバ関数に渡されたラムダ式はシリアルに実行されます。 つまり、複数のスレッドが同時にこのラムダ式を実行することはありません。
次に、
s.fork(); // スレッドプールを起動するよ
ですが、ここでスレッドプールを起動して、 並列に実行したいコードが与えられるのを待機するようになります。
次に、
s.go(loop); // さっき定義したコードを並列実行
ですが、ここで並列に実行したいコードを渡して実行します。
次に、
s.join(); // スレッドプールを停止するよ
ですが、待機しているスレッドプールを停止します。
今回は、go() メンバ関数を1回しか実行しませんでしたが、
スレッドプールが待機している間、
何度も go() を実行することもできます。
最後に、上記のコードでは使われていませんが、
go() メンバ関数から呼ばれたコード内では、
s.wait() でバリア同期することができます。
【起動スレッド数を制御する環境変数】
デフォルトでは、 OktinyMP はそのコンピュータのスレッド数でスレッドプールを起動します。 例えば4コア8スレッドの Core i7 だと8スレッドで起動します。しかし、 一部の Linux などはスレッド数をフルで使うと劇的な性能の低下を起こしたりします。 そこで、起動スレッド数を制限したい場合、環境変数 OKT_NUM_THREADS を設定します。 例えば、OKT_NUM_THREADS を 7 に設定すると、起動スレッドが7つに制限されます。
【OktinyMP のソースコード】
OktinyMP ですが、上記のように非常に簡単なライブラリなので、 ソースコードも非常にコンパクトです。どのくらいコンパクトかと言うと、 このページに全体を掲示できるほとコンパクトです。 以下に OktinyMP のソースコードを示します。 ダウンロードは こちらのページ からどうぞ。
/******************************************************************************
* OktinyMP Version 2.1
*
* Copyright (c) 2019 OK Ojisan(Takuya OKAHISA)
* Released under the MIT license
* http://opensource.org/licenses/mit-license.php
******************************************************************************/
#ifndef oktinymp_h
#define oktinymp_h
#include <chrono>
#include <atomic>
#include <vector>
#include <thread>
#include <functional>
#include <mutex>
namespace OktinyMP {
template <int TSLEEP> struct usleep_for
{
inline void operator()() const noexcept
{
std::this_thread::sleep_for(std::chrono::microseconds(TSLEEP));
//std::this_thread::sleep_for(std::chrono::nanoseconds(TSLEEP));
}
};
template <> struct usleep_for<0>
{
//inline void operator()() const noexcept {}
inline void operator()() const noexcept { std::this_thread::yield(); }
};
template <int TSLEEP> class Trigger
{
const int n;
std::atomic<int> counter {0};
std::atomic<bool> x {false};
public:
Trigger(const int n) noexcept: n(n) {}
Trigger(const Trigger&) = delete;
Trigger& operator=(const Trigger&) = delete;
void wait() noexcept
{
const bool y = !x;
counter++;
while (x != y) usleep_for<TSLEEP>()();
}
void start() noexcept
{
while (counter < n-1) usleep_for<TSLEEP>()();
const bool y = !x;
counter = 0; x = y;
}
void end() noexcept
{
while (counter < n-1) usleep_for<TSLEEP>()();
}
};
template <int TSLEEP> class Barrier
{
const int n;
std::atomic<int> counter {0};
std::atomic<bool> x {false};
public:
Barrier(const int n) noexcept: n(n) {}
Barrier(const Barrier&) = delete;
Barrier& operator=(const Barrier&) = delete;
void wait() noexcept
{
const bool y = !x;
if (++counter < n)
while (x != y) usleep_for<TSLEEP>()();
else {
counter = 0;
x = y;
}
}
};
template <int TSLEEP> class Starter
{
const int n;
Trigger<TSLEEP> trigger;
Barrier<TSLEEP> barrier;
std::vector<std::thread> tg;
std::function<void(int)> f;
std::atomic<bool> done {true};
std::mutex mtx;
thread_local static int tid;
static int ncores;
public:
Starter(): n(get_concurrency()), trigger(n), barrier(n) { ncores = n; }
Starter(const Starter&) = delete;
Starter& operator=(const Starter&) = delete;
~Starter() { join(); }
void fork()
{
if (!done) return;
done = false;
tg.clear();
for (int id = 1; id < n; id++) {
const auto loop = [&](const int i) noexcept {
tid = i;
for (;;) {
trigger.wait();
if (done) break;
f(i);
}
};
tg.push_back(std::thread(loop, id));
}
}
inline void go(const std::function<void(int)>& code) noexcept
{
f = code;
fork();
trigger.start();
code(0);
trigger.end();
}
void join()
{
if (done) return;
done = true;
trigger.start();
for (auto& t : tg) t.join();
tg.clear();
}
inline void wait() noexcept { barrier.wait(); }
inline void critical(const std::function<void()>& code)
{
std::lock_guard<std::mutex> lck(mtx);
code();
}
#ifdef _MSC_VER
static int get_concurrency() noexcept
{
if (ncores > 0) return ncores;
char* value = 0; size_t sz = 0;
if (_dupenv_s(&value, &sz, "OKT_NUM_THREADS") == 0) {
if (value) {
const int n = atoi(value);
free(value);
return n;
}
}
return std::thread::hardware_concurrency();
}
#else
static int get_concurrency() noexcept
{
const char* value = getenv("OKT_NUM_THREADS");
return ncores > 0 ? ncores :
value ? atoi(value) : std::thread::hardware_concurrency();
}
#endif
static inline int get_id() noexcept { return tid; }
static inline int i0(const int N) noexcept
{
typedef long long ll;
return static_cast<int>(ll(N)*ll(tid)/ncores);
}
static inline int iN(const int N) noexcept
{
typedef long long ll;
return static_cast<int>(ll(N)*ll(tid+1)/ncores);
}
};
template <int TSLEEP> thread_local int Starter<TSLEEP>::tid = 0;
template <int TSLEEP> int Starter<TSLEEP>::ncores = 0;
} // namespace OktinyMP
#endif // oktinymp_h
Download: OktinyMP 版 OTFFT のダウンロード