IT戦記

プログラミング、起業などについて書いているプログラマーのブログです😚

pthread でキューを作る(再挑戦)。ついでに dtrace でスレッドの解析

一日、一回 pthread 勉強。

はじめに

pthread でキューを書いてみる - IT戦記に関して、id:n-soda さんからとても貴重なアドバイスを沢山頂いたのですべて直しておきたいと思います。
本当にありがとうございます。

指摘が会った点まとめ

再帰呼び出しの問題

キューが一杯の場合に enQ() の再帰呼び出ししているのが変です。ここは while で書けるはずです。Cの場合、末尾再帰の削除をしない処理系がほとんどですし、このケースでは、deQ() 側のスレッドと enQ() 側のスレッドで速度差があるだけで再帰してしまいますから、スタックを相当無駄に消費します。スタックオーバーフローで死んでしまう可能性もあります。

http://d.hatena.ne.jp/amachang/20080611/1213165364#c1213677983
sleep(0) の問題

キューの空き待ちに pthread_cond_wait() を使わず、sleep(0) してるのは変です。当然 deQ() 側では、キューが一杯の状態から空きありに変わったら、pthread_cond_signal() します。こうすると、sleep(0) とか sched_yield() の呼び出しは、そもそも不要になります。

http://d.hatena.ne.jp/amachang/20080611/1213165364#c1213677983
-1 の問題

空きの意味で -1 のような特別な値を使うのは一般性の低いプログラミングスタイルです。今回の例では、たとえばキュー中の有効なデータ数を示す変数を導入し、「その変数 >= s」ならキューが一杯だと判断すれば済むはずです。

http://d.hatena.ne.jp/amachang/20080611/1213165364#c1213677983
変数名の問題

変数名が分かりにくいと思います。特に条件変数は、その変数が示す条件の名前をつけるべきです。今回なら c → non_empty、(2)で書いた条件変数→non_full と名付けるべきでしょう。あと、ふつう n→in、l→out、s→size じゃないですか?

http://d.hatena.ne.jp/amachang/20080611/1213165364#c1213677983
static 変数の問題

いくら C だからといって、static 変数にするんじゃなくて、キュー構造体にして、各関数には明示的に構造体へのポインタを渡すぐらいはした方が、いまどき良いと思います。

http://d.hatena.ne.jp/amachang/20080611/1213165364#c1213677983

これらを踏まえてもう一度書き直してみました。

#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include <pthread.h>
#include <assert.h>

// 構造体: queue_t
typedef struct queue {
    unsigned int    in;         // 次に入れるインデックス
    unsigned int    out;        // 次に出すインデックス
    unsigned int    size;       // キューのサイズ
    unsigned int    no_full;    // 満タンになると 0 になる
    unsigned int    no_empty;   // 空っぽになると 0 になる
    pthread_mutex_t mutex;
    pthread_cond_t  cond_full;  // データが満タンのときに待つための cond
    pthread_cond_t  cond_empty; // データが空のときに待つための cond
    void*           buffer[1];  // バッファ
    // ここ以下のメモリは p_queue->buffer[3] などとして参照される
} queue_t;

// 関数名: create_queue
//  size で指定されたサイズのバッファを持つ queue_t を生成&初期化し、そのポインタを返す
//  destroy_queue で解放する必要がある
//  メモリの確保に失敗した場合は NULL を返す
queue_t* create_queue (size_t size) {
    queue_t* p_queue;
    int memsize = sizeof(queue_t) + (size - 1) * sizeof(void*);

    // メモリの確保
    // struct queue_t のサイズは buffer サイズとして 1 を含んでいるので 1 を引く
    p_queue = (queue_t*)malloc(memsize);

    if (p_queue != NULL) {

        // メモリを 0 で初期化
        memset(p_queue, 0x00, memsize);

        // バッファのサイズ
        p_queue->size = size;

        // フラグの初期化
        p_queue->no_full = size;
        p_queue->no_empty = 0;

        // pthread_mutex_t の初期化
        p_queue->mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER; // エラーチェック有り mutex
        pthread_mutex_init(&p_queue->mutex, NULL);

        // pthread_cond_t の初期化
        p_queue->cond_full  = (pthread_cond_t) PTHREAD_COND_INITIALIZER;
        p_queue->cond_empty = (pthread_cond_t) PTHREAD_COND_INITIALIZER;
        pthread_cond_init(&p_queue->cond_full, NULL);
        pthread_cond_init(&p_queue->cond_empty, NULL);
    }

    return p_queue;
}

// 関数名: destroy_queue
//  create_queue で確保されたメモリを解放する
//  初期化されている mutex や cond も破棄する
void destroy_queue (queue_t* p_queue) {
    int rv;

    if (p_queue != NULL) {
        // pthread_mutex_t の破棄
        rv = pthread_mutex_destroy(&p_queue->mutex);
        assert(rv == 0);

        // pthread_mutex_t の破棄
        rv = pthread_cond_destroy(&p_queue->cond_full);
        assert(rv == 0);
        rv = pthread_cond_destroy(&p_queue->cond_empty);
        assert(rv == 0);

        // メモリの解放
        free(p_queue);
    }
}

// 関数名: enqueue
//  キューにデータを入れる
//  データが満タンな場合は、ブロックします
void enqueue (queue_t* p_queue, void* data) {
    pthread_mutex_lock(&p_queue->mutex);
    // -- ここから、クリティカルセクション --

    // 満タンじゃなくなるまで待つ
    while (!p_queue->no_full) {
        pthread_cond_wait(&p_queue->cond_full, &p_queue->mutex);
    }

    // データを入れる
    p_queue->buffer[p_queue->in] = data;

    // 次にデータを入れる場所をインクリメント
    p_queue->in++;
    p_queue->in %= p_queue->size;

    // フラグの更新
    p_queue->no_full--;
    p_queue->no_empty++;

    // -- ここまで、クリティカルセクション --
    pthread_mutex_unlock(&p_queue->mutex);
    pthread_cond_signal(&p_queue->cond_empty);
}

// 関数名: dequeue
//  キューにデータを入れる
//  データが満タンな場合は、ブロックします
void* dequeue (queue_t* p_queue) {
    void* result;

    pthread_mutex_lock(&p_queue->mutex);
    // -- ここから、クリティカルセクション --

    // 空っぽじゃなくなるまで待つ
    while (!p_queue->no_empty) {
        pthread_cond_wait(&p_queue->cond_empty, &p_queue->mutex);
    }

    // データを取り出す
    result = p_queue->buffer[p_queue->out];

    // 次にデータを取り出す場所をインクリメント
    p_queue->out++;
    p_queue->out %= p_queue->size;

    // フラグの更新
    p_queue->no_full++;
    p_queue->no_empty--;

    // -- ここまで、クリティカルセクション --
    pthread_mutex_unlock(&p_queue->mutex);
    pthread_cond_signal(&p_queue->cond_full);

    return result;
}

void inspect_queue(queue_t* p_queue) {
    pthread_mutex_lock(&p_queue->mutex);
    printf( "queue(%p) {\n"
            "   size:   %d;\n"
            "   length: %d;\n"
            "}\n\n", p_queue, p_queue->size, p_queue->no_empty);
    pthread_mutex_unlock(&p_queue->mutex);
}

void* enqueue_thread_func(void *p) {
    queue_t *p_queue = (queue_t*) p;
    char* p_data;
    int i = 100;

    while (i--) {
        p_data = malloc(5 * sizeof(char));
        strcpy(p_data, "hoge");
        enqueue(p_queue, p_data);
    }

    return NULL;
}

void* dequeue_thread_func(void *p) {
    queue_t *p_queue = (queue_t*) p;
    char* p_data;
    int i = 100;

    while (i--) {
        p_data = dequeue(p_queue);
        free(p_data);
    }

    return NULL;
}

#define N_ENQUEUE_THREADS 10
#define N_DEQUEUE_THREADS 10
int main (int argc, char** argv) {
    queue_t* p_queue = create_queue(10);
    int rv;
    int i;

    pthread_t enqueue_threads[N_ENQUEUE_THREADS] = {0,};
    pthread_t dequeue_threads[N_DEQUEUE_THREADS] = {0,};

    for (i = 0; i < N_ENQUEUE_THREADS; i ++) {
        rv = pthread_create(enqueue_threads + i, NULL, enqueue_thread_func, p_queue);
        assert(rv == 0);
    }
    for (i = 0; i < N_DEQUEUE_THREADS; i ++) {
        rv = pthread_create(dequeue_threads + i, NULL, dequeue_thread_func, p_queue);
        assert(rv == 0);
    }

    for (i = 0; i < N_ENQUEUE_THREADS; i ++) {
        rv = pthread_join(enqueue_threads[i], NULL);
        assert(rv == 0);
    }
    for (i = 0; i < N_DEQUEUE_THREADS; i ++) {
        rv = pthread_join(dequeue_threads[i], NULL);
        assert(rv == 0);
    }

    destroy_queue(p_queue);
    return 0;
}

今度はどうでしょうか

コンパイル

$ gcc -Wall queue.c
$

コンパイルは通りました

今度は dtrace で lock, unlock, block の回数を数える

$ sudo dtrace -n "
>  plockstat\$target::pthread_mutex_unlock:mutex-release,
>  plockstat\$target::pthread_mutex_lock:mutex-acquire,
>  plockstat\$target::pthread_mutex_lock:mutex-block,
>  plockstat\$target::_pthread_cond_wait:mutex-release
>  { @[probefunc,probename,arg0] = count() }
>" -c ./a.out
dtrace: description 'plockstat$target::pthread_mutex_unlock:mutex-release, plockstat$target::pthread_mutex_lock:mutex-acquire, plockstat$target::pthread_mutex_lock:mutex-block, plockstat$target::_pthread_cond_wait:mutex-release ' matched 4 probes
dtrace: pid 21145 has exited

  pthread_mutex_lock                                  mutex-acquire                                            2693110208                3
  pthread_mutex_unlock                                mutex-release                                            2693110208                3
  _pthread_cond_wait                                  mutex-release                                               1048996               59
  pthread_mutex_lock                                  mutex-block                                                 1048996             1966
  pthread_mutex_unlock                                mutex-release                                               1048996             2000
  pthread_mutex_lock                                  mutex-acquire                                               1048996             2059

$

2000 回 unlock されている 1048996 というアドレスが件の mutex のアドレスなので

  • 2059 回 lock (wait からの復帰も含まれる) され
  • 59 回 wait し
  • 2000 回 unlock され
  • 1966 回 block したことがわかります

ちゃんと動いてそうですね。

まとめ

皆様のアドバイスのおかげで、とても勉強になりました。
本当にありがとうございます。

(追記)さらにアドバイスを頂いたので

もう一回直してみました!やった!
pthread でキューを作ってみる(再々挑戦、最終版) - IT戦記