Y's note

Web技術・プロダクトマネジメント・そして経営について

本ブログの更新を停止しており、今後は下記Noteに記載していきます。
https://note.com/yutakikuchi/

C++でマルチスレッドプログラミングを試してみたことのまとめ

C++プログラミング入門

C++プログラミング入門

Index

  • Word
  • pthread API
  • fork
  • semaphore
  • mutex
  • Attention
  • links

Word

初歩的な単語ですが、マルチスレッドプログラミングに関するものを簡単にまとめます。

排他制御

複数のタスクが処理を並行して同一の資源にアクセス可能な場合、データの整合性が合わない事を避けるために他のプロセスの処理を排除すること。相互排除(mutual exclusion)ともいう。最大k個のタスクが資源にアクセス可能な事をk-相互排除と呼ばれる。

アトミック性

複数の操作が不可分であり、切り離せない事。システム上の他の操作から見てアトミック性を持つ操作は全てが完了したか/全てが失敗したかの状態として観測できない事。全てが失敗したと観測された場合は処理前の状態にロールバックしないとならない。

クリティカルセクション

複数のタスクの同時アクセスを可能とした時に、データの不整合が発生すると破綻する部分の事。クリティカルセクションでは排他制御を行い、データのアトミック性を保証する。

fork

UNIX系のシステムコールで親プロセスから子プロセスを生成すること。forkはプロセスを新たに生成するのでマルチスレッドとは異なりマルチプロセスである。

マルチプロセス/マルチスレッド

マルチプロセスはそれぞれのプロセスが独立したメモリ空間を使用してプロセス毎の平行した処理が可能。マルチプロセスはプロセス間通信や共有メモリを使ってデータのやり取りができる。それに対してマルチスレッドは同一のメモリ空間に対して複数のスレッドを作成して平行処理が可能。メモリを共有しているのでマルチスレッドはデータのやり取りがスレッド間でできる。

mutex

Mutual Exclusion(相互排他)の略で排他制御を行う場面でアトミック性を保証するための同期仕様。広義にはsemaphoreの一種と言える。semaphoreは同一クリティカルセクションに対して複数のタスクが操作可能であり、mutexでは一つしか操作可能とならないので完全なる排他制御。P操作(ロック)、V操作(アンロック)を持ち排他制御を行う。

semaphore

複数のタスクが同一資源にアクセスする上限を規定する時に用いる。同時アクセス1とした場合はmutexとほぼ同じことになり、これをバイナリセマフォと呼ぶ。処理としては共通領域にセマフォ領域を確保し、任意の最大k個のアクセスを許可する。セマフォ領域でタスクがP操作、V操作により処理実行中/完了フラグのON/OFFを行う。

Pthread

POSIX標準のスレッドのこと。スレッド生成や操作のAPIが用意されている。C言語のデータ型や関数をpthread.hというヘッダファイルに組み込まれている。semaphore、mutexともにpthread_createという関数を利用してスレッドを生成する。

スレッドセーフ

マルチスレッドプログラミングで複数のスレッドが同時にアクセスしてもデータの不整合等の問題が発生しないこと。スレッドセーフを保つためには排他制御やリエントラントである事が望まれる。

スピンロック

スレッドがスピン(ループ)しながらロックが取得できるのを待つ方法。

ACID

原子性(英: atomicity)、一貫性(英: consistency)、独立性(英: isolation)、および永続性(英: durability)といったトランザクションシステムが持つべき性質概念。データベース系の話でよく利用される言葉。原子性(atomic性):全てが実行されるか/全くされないかを保証する事。一貫性(整合性):トランザクション開始と終了時で整合性が合っている事を保証する。独立性:トランザクション中の処理が他の操作から隔離される事。永続性:トランザクションログを記録し、データを失われなくする。

pthread API

pthreadAPIの概要をまとめます。以下のAPIはsemaphoreなどの実装で利用します。

API 説明
pthread_create 新規スレッド作成
pthread_exit スレッド終了
pthread_join スレッドが終了するまでブロック
pthread_detach スレッドのメモリを解放可能とする
pthread_equal スレッドのIDを比較する
pthread_once 関数が一度だけ呼び出し可能とする
pthread_self スレッドIDを取得
pthread_cleanup_push キャンセルスタックに挿入
pthread_cleanup_pop キャンセルスタックからハンドラを取り出して実行
pthread_key_create スレッドのKeyを作成
pthread_key_delete スレッドのkeyを削除
pthread_setspecific スレッド固有データをKeyに紐付け
pthread_getspecific スレッド固有のデータを取得
pthread_cancel スレッドをキャンセル
pthread_testcancel キャンセルポイント作成
pthread_setcancelstate キャンセル状態の設定
pthread_setcanceltype キャンセルタイプの設定

fork

Simple fork

プロセスから子プロセスを派生させます。プロセスを派生させるので単純なforkではデータの共有を行いません。例としてParentプロセスの時にChildプロセスのpid、Childプロセスの時にParentプロセスのpidをそれぞれ表示するプログラムを書いてみます。

#include <iostream>

int main(){

    int count = 0;
    count++;
    std::cout << "Count = ";
    std::cout << count << std::endl;

    pid_t pid;
    // 子プロセス作成
    pid = fork();

    // pidが0の場合は子プロセス
    // pidが-1の場合はforkに失敗
    // pidが0,-1以外の場合は親プロセス
    if( pid < 0 ) {
        std::cout << "Fork Failed." << std::endl;
        return 0;
    } else if( pid == 0 ) {
        putchar( '\n' );
        std::cout << "Child Process. pid = ";
        std::cout << getpid() << std::endl;
        std::cout << "My Parent pid = ";
        std::cout << getppid() << std::endl;
        count++;
    } else { 
        putchar( '\n' );
        std::cout << "Parent Process. pid = ";
        std::cout << getpid() << std::endl;
        std::cout << "My Child pid = ";
        std::cout << pid << std::endl;
        count++;
    }
    
    std::cout << "Count = ";
    std::cout << count << std::endl;

    putchar( '\n' );
    return 0;
}
Count = 1

Child Process. pid = 4073
My Parent pid = 4072
Count = 2

Parent Process. pid = 4072
My Child pid = 4073
Count = 2
fork wait

Parentプロセスで派生したChildプロセスが終わるのを待ちたい(Childプロセスを管理したい)プログラムを書く事があると思います。そんな時はwaitpid関数を利用します。

#include <iostream>
#include <sys/wait.h>

int main(){

    int count = 0;
    int status;
    count++;
    std::cout << "Count = ";
    std::cout << count << std::endl;

    pid_t pid;
    // 子プロセス作成
    pid = fork();

    // pidが0の場合は子プロセス
    // pidが-1の場合はforkに失敗
    // pidが0,-1以外の場合は親プロセス
    if( pid < 0 ) {
        std::cout << "Fork Failed." << std::endl;
        return -1;
    } else if( pid == 0 ) {
        putchar( '\n' );
        std::cout << "Child Process. pid = ";
        std::cout << getpid() << std::endl;
        std::cout << "My Parent pid = ";
        std::cout << getppid() << std::endl;
        count++;
    } else { 
        putchar( '\n' );
        std::cout << "Parent Process. pid = ";
        std::cout << getpid() << std::endl;
        std::cout << "My Child pid = ";
        std::cout << pid << std::endl;

        //Child Processが終わるのを待つ
        waitpid( pid, &status, 0 );
        std::cout << "Finishied. pid = ";
        std::cout << pid << std::endl;

        if (WIFEXITED(status)){
            printf("exit, status=%d\n", WEXITSTATUS(status));
        } else if (WIFSIGNALED(status)){
            printf("signal, sig=%d\n", WTERMSIG(status));
        } else {
            printf("abnormal exit\n");
        }
        count++;
    }
    
    std::cout << "Count = ";
    std::cout << count << std::endl;

    putchar( '\n' );
    return 0;
}
Count = 1

Child Process. pid = 4073
My Parent pid = 4072
Count = 2

Parent Process. pid = 4072
My Child pid = 4073
Finishied. pid = 4073
exit, status=0
Count = 2
fork 共有メモリ

単純なforkではParent/Childプロセス間でデータの共有が出来ないので、共有メモリを介してデータを共有するやり方を記述します。共有メモリを扱うには#include を読み込む必要があります。以下にshm関数の一覧を表にまとめます。

関数 説明
shmget 共有メモリのセグメント新規作成、取得を行う
shmat 共有メモリのセグメントをattachする
shmdt 不要になった共有メモリセグメントをdetachする
shmctl 共有メモリセグメントを消去
#include <iostream>
#include <sys/wait.h>
#include <sys/shm.h>

int main(){

    int count = 0;
    int status;
    int shmid; 
    int *shmaddr;
    std::cout << "Count = ";
    std::cout << count << std::endl;

    pid_t pid;
    // 子プロセス作成
    pid = fork();

    // 共有メモリセグメント作成
    if ((shmid = shmget(IPC_PRIVATE, 100, 0600)) == -1){
        perror( "shmget error." );
        exit( EXIT_FAILURE );
    }

    // pidが0の場合は子プロセス
    // pidが-1の場合はforkに失敗
    // pidが0,-1以外の場合は親プロセス
    if( pid < 0 ) {
        std::cout << "Fork Failed." << std::endl;
        return -1;
    } else if( pid == 0 ) {
        putchar( '\n' );
        std::cout << "Child Process. pid = ";
        std::cout << getpid() << std::endl;
        std::cout << "My Parent pid = ";
        std::cout << getppid() << std::endl;
        count++;

        //共有メモリattach
        if ((shmaddr = (int *)shmat(shmid, NULL, 0)) == (void *)-1) {
            perror( "ChildProcess shmat error." );
            exit(EXIT_FAILURE);
        }
        
        *shmaddr = count;

        //共有メモリのdetach
        if (shmdt(shmaddr) == -1) {
            perror( "ChildProcess shmdt error." );
            exit(EXIT_FAILURE);
        }

        exit(EXIT_SUCCESS);

    } else { 
        putchar( '\n' );
        std::cout << "Parent Process. pid = ";
        std::cout << getpid() << std::endl;
        std::cout << "My Child pid = ";
        std::cout << pid << std::endl;

        //Child Processが終わるのを待つ
        waitpid( pid, &status, 0 );
        std::cout << "Finishied. pid = ";
        std::cout << pid << std::endl;


        /* 共有メモリ・セグメントをプロセスのアドレス空間に付加 */
        /*
        if ((shmaddr = (int *)shmat(shmid, NULL, 0)) == (void *)-1) {
            perror( "ParentProcess shmat error." );
            exit(EXIT_FAILURE);
        }
        */

        std::cout << "Attache Count = ";
        std::cout << *shmaddr << std::endl;
         

        if (WIFEXITED(status)){
            printf("exit, status=%d\n", WEXITSTATUS(status));
        } else if (WIFSIGNALED(status)){
            printf("signal, sig=%d\n", WTERMSIG(status));
        } else {
            printf("abnormal exit\n");
        }
        count++;
    }
    
    std::cout << "Count = ";
    std::cout << count << std::endl;

    putchar( '\n' );
    return 0;
}
Count = 0

Child Process. pid = 9965
My Parent pid = 9964

Parent Process. pid = 9964
My Child pid = 9965
Finishied. pid = 9965
Attache Count = 1
exit, status=0
Count = 1

semaphore

semaphoreのPOSIXAPIについて表にまとめます。

API 説明
sem_init semaphoreの初期化。メモリ領域の確保
sem_post semaphoreの値を1加算
sem_wait semが0より大きければsemaphoreの値を1減算。semが0なら呼び出しスレッドはブロック
sem_getvalue semaphoreの値を抽出
sem_destroy semaphoreの破棄

ptreadはdetachかjoinをしないとメモリが解放されません。joinを忘れるとメモリリークを発生する可能性があります。

#include <iostream>
#include <pthread.h>
#include <semaphore.h>

// sem_t型の定義
sem_t sem;
int count = 0;

void* threadFunction1( void* args )
{
    while(1){
        std::cout << "called threadFunction1" << std::endl;
        std::cout << "count before wait = ";
        std::cout << count << std::endl;
        // 待機状態
        sem_wait( &sem );
        std::cout << "count = ";
        std::cout << count << std::endl;
        count = 0;
    }
    return NULL;
}

/*
void* threadFunction2( void* args )
{
    while(1){
        std::cout << "called threadFunction2" << std::endl;
        std::cout << "count before wait = ";
        std::cout << count << std::endl;
        // 待機状態
        sem_wait( &sem );
        std::cout << "count = ";
        std::cout << count << std::endl;
        count = 0;
    }
    return NULL;
}
*/

int main()
{
    int status;
    pthread_t thread_1, thread_2;
    int n;

    // sempahoreの初期化
    sem_init( &sem, 0, 0 );
    
    // スレッド1個目作成
    status = pthread_create( &thread_1, NULL, threadFunction1, NULL );
    if( status != 0 ) {
        std::cout << "failed to create thread 1" << std::endl;
        exit(1);
    }

    // スレッド2個目作成
    /*
    status = pthread_create( &thread_2, NULL, threadFunction2, NULL );
    if( status != 0 ) {
        std::cout << "failed to create thread 2" << std::endl;
        exit(1);
    }
    */

    // detach 
    pthread_detach( thread_1 );
    //pthread_detach( thread_2 );
    
    for ( n = 0; n < 5; ){
        count++;
        if ( count == 20 ) {
            // semaphoreの値を1加算 シグナル送信
            std::cout << "main count = ";
            std::cout << count << std::endl;
            sem_post( &sem );
            n++;
        }
    }
    
    // semaphore解放
    sem_destroy( &sem );
}

コンパイルを通すには-lpthreadを指定する必要があります。

$ g++ pthread.cpp -lpthread
$ ./a.out        
main count = 20
called threadFunction1
count before wait = 142119050
count = 142119050
called threadFunction1
count before wait = 0
main count = 20
count = 188569130
called threadFunction1
count before wait = 0
main count = 20
count = 20
called threadFunction1
count before wait = 0
main count = 20
count = 20
called threadFunction1
count before wait = 0
main count = 20
count = 20
called threadFunction1
count before wait = 0

mutex

mutexのAPIについて表にまとめます。

API 説明
pthread_mutex_init mutexオブジェクトを初期化します。
pthread_mutex_lock 与えられたmutexをlockします。これはmutexがunlockされるまでスレッドの実行を停止させます。
pthread_mutex_trylock mutexが既に他のスレッドによってlockされている場合はスレッドをlockしません。
pthread_mutex_unlock lockされた状態からunlock状態に戻します。
pthread_mutex_destroy mutexオブジェクトを破棄して、資源を解放します。

以下はサンプルプログラムです。pthread_mutex_lock中の変数は変更できませんが、pthred_mutex_unlockされると変更が可能になります。

#include <iostream>
#include <pthread.h>

using namespace std;
int count_1=0;
int count_2=0;
pthread_mutex_t mutex; //mutexの宣言

void* threadFunction1( void* args )
{
  int i;
  //mutexのlock
  pthread_mutex_lock(&mutex);
  
  // sleepしてその間に変数の書き換えが無いかを確認する
  system( "sleep 3" );
  cout << "count_1 = ";
  for(i=0;i<=10;i++){
    cout << count_1;
    cout << ",";
    count_1++;
  }
  printf("\n");

  //mutexのunlock
  pthread_mutex_unlock(&mutex);
  // sleepしてその間に変数の書き換えが無いかを確認する
  system( "sleep 3" );
  cout << "count_2 = ";
  for(i=0;i<=10;i++){
    cout << count_2;
    cout << ",";
    count_2++;
  }
  printf("\n");
}

void* threadFunction2( void* args )
{
  cout << "thread 2 call1 " << endl;
  pthread_mutex_lock(&mutex);
  // lock中なのでcount_1は変更されない
  count_1=10;
  cout << "thread 2 call2 " << endl;
  pthread_mutex_unlock(&mutex);
  // unlock中なのでcount_2は変更される
  count_2=10;
}

int main() {
  pthread_t thread_1, thread_2;

  // mutex初期化
  pthread_mutex_init(&mutex, NULL);

  // スレッドの作成
  pthread_create(&thread_1, NULL, threadFunction1, NULL);
  pthread_create(&thread_2, NULL, threadFunction2, NULL);
  
  // スレッドのjoin
  pthread_join(thread_1,NULL);
  pthread_join(thread_2,NULL);
  
  // mutexの破棄
  pthread_mutex_destroy(&mutex); 
  return 0;
}
$ g++ mutex.cpp -lpthread
$ ./a.out      
thread 2 call1 
count_1 = 0,1,2,3,4,5,6,7,8,9,10,
thread 2 call2 
count_2 = 10,11,12,13,14,15,16,17,18,19,20,

Attention

マルチスレッドプログラミングを行う際には以下の事に気をつけなければなりません。

  • パフォーマンスの検証。マルチスレッドを導入する事によりどれだけの処理コストやメモリが抑えられるか。
  • ロック時間を短くする。また可能であればロックを使わないようにすること。
  • 複数のリソースにロックを同時にかける場合はデッドロックに注意する。
  • 生成したスレッドから更なるスレッドを生成しないように気をつける。
  • スレッドセーフ排他制御を行い、データの整合性を保証する事。