C++でマルチスレッドプログラミングを試してみたことのまとめ
- 作者: グレゴリーサティア,ダウグブラウン,Gregory Satir,Doug Brown,望月康司,谷口功
- 出版社/メーカー: オライリー・ジャパン
- 発売日: 2001/11
- メディア: 単行本
- 購入: 9人 クリック: 147回
- この商品を含むブログ (29件) を見る
Index
- Word
- 排他制御
- アトミック性
- クリティカルセクション
- fork
- マルチプロセス/マルチスレッド
- mutex
- semaphore
- Pthreads
- スレッドセーフ
- スピンロック
- ACID
- pthread API
- fork
- semaphore
- mutex
- Attention
- links
Word
初歩的な単語ですが、マルチスレッドプログラミングに関するものを簡単にまとめます。
排他制御
複数のタスクが処理を並行して同一の資源にアクセス可能な場合、データの整合性が合わない事を避けるために他のプロセスの処理を排除すること。相互排除(mutual exclusion)ともいう。最大k個のタスクが資源にアクセス可能な事をk-相互排除と呼ばれる。
アトミック性
複数の操作が不可分であり、切り離せない事。システム上の他の操作から見てアトミック性を持つ操作は全てが完了したか/全てが失敗したかの状態として観測できない事。全てが失敗したと観測された場合は処理前の状態にロールバックしないとならない。
クリティカルセクション
複数のタスクの同時アクセスを可能とした時に、データの不整合が発生すると破綻する部分の事。クリティカルセクションでは排他制御を行い、データのアトミック性を保証する。
マルチプロセス/マルチスレッド
マルチプロセスはそれぞれのプロセスが独立したメモリ空間を使用してプロセス毎の平行した処理が可能。マルチプロセスはプロセス間通信や共有メモリを使ってデータのやり取りができる。それに対してマルチスレッドは同一のメモリ空間に対して複数のスレッドを作成して平行処理が可能。メモリを共有しているのでマルチスレッドはデータのやり取りがスレッド間でできる。
mutex
Mutual Exclusion(相互排他)の略で排他制御を行う場面でアトミック性を保証するための同期仕様。広義にはsemaphoreの一種と言える。semaphoreは同一クリティカルセクションに対して複数のタスクが操作可能であり、mutexでは一つしか操作可能とならないので完全なる排他制御。P操作(ロック)、V操作(アンロック)を持ち排他制御を行う。
semaphore
複数のタスクが同一資源にアクセスする上限を規定する時に用いる。同時アクセス1とした場合はmutexとほぼ同じことになり、これをバイナリセマフォと呼ぶ。処理としては共通領域にセマフォ領域を確保し、任意の最大k個のアクセスを許可する。セマフォ領域でタスクがP操作、V操作により処理実行中/完了フラグのON/OFFを行う。
Pthread
POSIX標準のスレッドのこと。スレッド生成や操作のAPIが用意されている。C言語のデータ型や関数をpthread.hというヘッダファイルに組み込まれている。semaphore、mutexともにpthread_createという関数を利用してスレッドを生成する。
スピンロック
スレッドがスピン(ループ)しながらロックが取得できるのを待つ方法。
pthread API
pthreadAPIの概要をまとめます。以下のAPIはsemaphoreなどの実装で利用します。
API 説明 pthread_create 新規スレッド作成 pthread_exit スレッド終了 pthread_join スレッドが終了するまでブロック pthread_detach スレッドのメモリを解放可能とする pthread_equal スレッドのIDを比較する pthread_once 関数が一度だけ呼び出し可能とする pthread_self スレッドIDを取得 pthread_cleanup_push キャンセルスタックに挿入 pthread_cleanup_pop キャンセルスタックからハンドラを取り出して実行 pthread_key_create スレッドのKeyを作成 pthread_key_delete スレッドのkeyを削除 pthread_setspecific スレッド固有データをKeyに紐付け pthread_getspecific スレッド固有のデータを取得 pthread_cancel スレッドをキャンセル pthread_testcancel キャンセルポイント作成 pthread_setcancelstate キャンセル状態の設定 pthread_setcanceltype キャンセルタイプの設定
fork
Simple fork
プロセスから子プロセスを派生させます。プロセスを派生させるので単純なforkではデータの共有を行いません。例としてParentプロセスの時にChildプロセスのpid、Childプロセスの時にParentプロセスのpidをそれぞれ表示するプログラムを書いてみます。
#include <iostream> int main(){ int count = 0; count++; std::cout << "Count = "; std::cout << count << std::endl; pid_t pid; // 子プロセス作成 pid = fork(); // pidが0の場合は子プロセス // pidが-1の場合はforkに失敗 // pidが0,-1以外の場合は親プロセス if( pid < 0 ) { std::cout << "Fork Failed." << std::endl; return 0; } else if( pid == 0 ) { putchar( '\n' ); std::cout << "Child Process. pid = "; std::cout << getpid() << std::endl; std::cout << "My Parent pid = "; std::cout << getppid() << std::endl; count++; } else { putchar( '\n' ); std::cout << "Parent Process. pid = "; std::cout << getpid() << std::endl; std::cout << "My Child pid = "; std::cout << pid << std::endl; count++; } std::cout << "Count = "; std::cout << count << std::endl; putchar( '\n' ); return 0; }Count = 1 Child Process. pid = 4073 My Parent pid = 4072 Count = 2 Parent Process. pid = 4072 My Child pid = 4073 Count = 2fork wait
Parentプロセスで派生したChildプロセスが終わるのを待ちたい(Childプロセスを管理したい)プログラムを書く事があると思います。そんな時はwaitpid関数を利用します。
#include <iostream> #include <sys/wait.h> int main(){ int count = 0; int status; count++; std::cout << "Count = "; std::cout << count << std::endl; pid_t pid; // 子プロセス作成 pid = fork(); // pidが0の場合は子プロセス // pidが-1の場合はforkに失敗 // pidが0,-1以外の場合は親プロセス if( pid < 0 ) { std::cout << "Fork Failed." << std::endl; return -1; } else if( pid == 0 ) { putchar( '\n' ); std::cout << "Child Process. pid = "; std::cout << getpid() << std::endl; std::cout << "My Parent pid = "; std::cout << getppid() << std::endl; count++; } else { putchar( '\n' ); std::cout << "Parent Process. pid = "; std::cout << getpid() << std::endl; std::cout << "My Child pid = "; std::cout << pid << std::endl; //Child Processが終わるのを待つ waitpid( pid, &status, 0 ); std::cout << "Finishied. pid = "; std::cout << pid << std::endl; if (WIFEXITED(status)){ printf("exit, status=%d\n", WEXITSTATUS(status)); } else if (WIFSIGNALED(status)){ printf("signal, sig=%d\n", WTERMSIG(status)); } else { printf("abnormal exit\n"); } count++; } std::cout << "Count = "; std::cout << count << std::endl; putchar( '\n' ); return 0; }Count = 1 Child Process. pid = 4073 My Parent pid = 4072 Count = 2 Parent Process. pid = 4072 My Child pid = 4073 Finishied. pid = 4073 exit, status=0 Count = 2fork 共有メモリ
単純なforkではParent/Childプロセス間でデータの共有が出来ないので、共有メモリを介してデータを共有するやり方を記述します。共有メモリを扱うには#include
を読み込む必要があります。以下にshm関数の一覧を表にまとめます。
関数 説明 shmget 共有メモリのセグメント新規作成、取得を行う shmat 共有メモリのセグメントをattachする shmdt 不要になった共有メモリセグメントをdetachする shmctl 共有メモリセグメントを消去 #include <iostream> #include <sys/wait.h> #include <sys/shm.h> int main(){ int count = 0; int status; int shmid; int *shmaddr; std::cout << "Count = "; std::cout << count << std::endl; pid_t pid; // 子プロセス作成 pid = fork(); // 共有メモリセグメント作成 if ((shmid = shmget(IPC_PRIVATE, 100, 0600)) == -1){ perror( "shmget error." ); exit( EXIT_FAILURE ); } // pidが0の場合は子プロセス // pidが-1の場合はforkに失敗 // pidが0,-1以外の場合は親プロセス if( pid < 0 ) { std::cout << "Fork Failed." << std::endl; return -1; } else if( pid == 0 ) { putchar( '\n' ); std::cout << "Child Process. pid = "; std::cout << getpid() << std::endl; std::cout << "My Parent pid = "; std::cout << getppid() << std::endl; count++; //共有メモリattach if ((shmaddr = (int *)shmat(shmid, NULL, 0)) == (void *)-1) { perror( "ChildProcess shmat error." ); exit(EXIT_FAILURE); } *shmaddr = count; //共有メモリのdetach if (shmdt(shmaddr) == -1) { perror( "ChildProcess shmdt error." ); exit(EXIT_FAILURE); } exit(EXIT_SUCCESS); } else { putchar( '\n' ); std::cout << "Parent Process. pid = "; std::cout << getpid() << std::endl; std::cout << "My Child pid = "; std::cout << pid << std::endl; //Child Processが終わるのを待つ waitpid( pid, &status, 0 ); std::cout << "Finishied. pid = "; std::cout << pid << std::endl; /* 共有メモリ・セグメントをプロセスのアドレス空間に付加 */ /* if ((shmaddr = (int *)shmat(shmid, NULL, 0)) == (void *)-1) { perror( "ParentProcess shmat error." ); exit(EXIT_FAILURE); } */ std::cout << "Attache Count = "; std::cout << *shmaddr << std::endl; if (WIFEXITED(status)){ printf("exit, status=%d\n", WEXITSTATUS(status)); } else if (WIFSIGNALED(status)){ printf("signal, sig=%d\n", WTERMSIG(status)); } else { printf("abnormal exit\n"); } count++; } std::cout << "Count = "; std::cout << count << std::endl; putchar( '\n' ); return 0; }Count = 0 Child Process. pid = 9965 My Parent pid = 9964 Parent Process. pid = 9964 My Child pid = 9965 Finishied. pid = 9965 Attache Count = 1 exit, status=0 Count = 1
semaphore
semaphoreのPOSIXAPIについて表にまとめます。
API 説明 sem_init semaphoreの初期化。メモリ領域の確保 sem_post semaphoreの値を1加算 sem_wait semが0より大きければsemaphoreの値を1減算。semが0なら呼び出しスレッドはブロック sem_getvalue semaphoreの値を抽出 sem_destroy semaphoreの破棄 ptreadはdetachかjoinをしないとメモリが解放されません。joinを忘れるとメモリリークを発生する可能性があります。
#include <iostream> #include <pthread.h> #include <semaphore.h> // sem_t型の定義 sem_t sem; int count = 0; void* threadFunction1( void* args ) { while(1){ std::cout << "called threadFunction1" << std::endl; std::cout << "count before wait = "; std::cout << count << std::endl; // 待機状態 sem_wait( &sem ); std::cout << "count = "; std::cout << count << std::endl; count = 0; } return NULL; } /* void* threadFunction2( void* args ) { while(1){ std::cout << "called threadFunction2" << std::endl; std::cout << "count before wait = "; std::cout << count << std::endl; // 待機状態 sem_wait( &sem ); std::cout << "count = "; std::cout << count << std::endl; count = 0; } return NULL; } */ int main() { int status; pthread_t thread_1, thread_2; int n; // sempahoreの初期化 sem_init( &sem, 0, 0 ); // スレッド1個目作成 status = pthread_create( &thread_1, NULL, threadFunction1, NULL ); if( status != 0 ) { std::cout << "failed to create thread 1" << std::endl; exit(1); } // スレッド2個目作成 /* status = pthread_create( &thread_2, NULL, threadFunction2, NULL ); if( status != 0 ) { std::cout << "failed to create thread 2" << std::endl; exit(1); } */ // detach pthread_detach( thread_1 ); //pthread_detach( thread_2 ); for ( n = 0; n < 5; ){ count++; if ( count == 20 ) { // semaphoreの値を1加算 シグナル送信 std::cout << "main count = "; std::cout << count << std::endl; sem_post( &sem ); n++; } } // semaphore解放 sem_destroy( &sem ); }コンパイルを通すには-lpthreadを指定する必要があります。
$ g++ pthread.cpp -lpthread $ ./a.out main count = 20 called threadFunction1 count before wait = 142119050 count = 142119050 called threadFunction1 count before wait = 0 main count = 20 count = 188569130 called threadFunction1 count before wait = 0 main count = 20 count = 20 called threadFunction1 count before wait = 0 main count = 20 count = 20 called threadFunction1 count before wait = 0 main count = 20 count = 20 called threadFunction1 count before wait = 0
mutex
mutexのAPIについて表にまとめます。
API 説明 pthread_mutex_init mutexオブジェクトを初期化します。 pthread_mutex_lock 与えられたmutexをlockします。これはmutexがunlockされるまでスレッドの実行を停止させます。 pthread_mutex_trylock mutexが既に他のスレッドによってlockされている場合はスレッドをlockしません。 pthread_mutex_unlock lockされた状態からunlock状態に戻します。 pthread_mutex_destroy mutexオブジェクトを破棄して、資源を解放します。 以下はサンプルプログラムです。pthread_mutex_lock中の変数は変更できませんが、pthred_mutex_unlockされると変更が可能になります。
#include <iostream> #include <pthread.h> using namespace std; int count_1=0; int count_2=0; pthread_mutex_t mutex; //mutexの宣言 void* threadFunction1( void* args ) { int i; //mutexのlock pthread_mutex_lock(&mutex); // sleepしてその間に変数の書き換えが無いかを確認する system( "sleep 3" ); cout << "count_1 = "; for(i=0;i<=10;i++){ cout << count_1; cout << ","; count_1++; } printf("\n"); //mutexのunlock pthread_mutex_unlock(&mutex); // sleepしてその間に変数の書き換えが無いかを確認する system( "sleep 3" ); cout << "count_2 = "; for(i=0;i<=10;i++){ cout << count_2; cout << ","; count_2++; } printf("\n"); } void* threadFunction2( void* args ) { cout << "thread 2 call1 " << endl; pthread_mutex_lock(&mutex); // lock中なのでcount_1は変更されない count_1=10; cout << "thread 2 call2 " << endl; pthread_mutex_unlock(&mutex); // unlock中なのでcount_2は変更される count_2=10; } int main() { pthread_t thread_1, thread_2; // mutex初期化 pthread_mutex_init(&mutex, NULL); // スレッドの作成 pthread_create(&thread_1, NULL, threadFunction1, NULL); pthread_create(&thread_2, NULL, threadFunction2, NULL); // スレッドのjoin pthread_join(thread_1,NULL); pthread_join(thread_2,NULL); // mutexの破棄 pthread_mutex_destroy(&mutex); return 0; }$ g++ mutex.cpp -lpthread $ ./a.out thread 2 call1 count_1 = 0,1,2,3,4,5,6,7,8,9,10, thread 2 call2 count_2 = 10,11,12,13,14,15,16,17,18,19,20,
Attention
マルチスレッドプログラミングを行う際には以下の事に気をつけなければなりません。