一日、一回 pthread 勉強。
はじめに
pthread でキューを書いてみる - IT戦記に関して、id:n-soda さんからとても貴重なアドバイスを沢山頂いたのですべて直しておきたいと思います。
本当にありがとうございます。
指摘が会った点まとめ
再帰呼び出しの問題
キューが一杯の場合に enQ() の再帰呼び出ししているのが変です。ここは while で書けるはずです。Cの場合、末尾再帰の削除をしない処理系がほとんどですし、このケースでは、deQ() 側のスレッドと enQ() 側のスレッドで速度差があるだけで再帰してしまいますから、スタックを相当無駄に消費します。スタックオーバーフローで死んでしまう可能性もあります。
http://d.hatena.ne.jp/amachang/20080611/1213165364#c1213677983
sleep(0) の問題
キューの空き待ちに pthread_cond_wait() を使わず、sleep(0) してるのは変です。当然 deQ() 側では、キューが一杯の状態から空きありに変わったら、pthread_cond_signal() します。こうすると、sleep(0) とか sched_yield() の呼び出しは、そもそも不要になります。
http://d.hatena.ne.jp/amachang/20080611/1213165364#c1213677983
-1 の問題
空きの意味で -1 のような特別な値を使うのは一般性の低いプログラミングスタイルです。今回の例では、たとえばキュー中の有効なデータ数を示す変数を導入し、「その変数 >= s」ならキューが一杯だと判断すれば済むはずです。
http://d.hatena.ne.jp/amachang/20080611/1213165364#c1213677983
変数名の問題
変数名が分かりにくいと思います。特に条件変数は、その変数が示す条件の名前をつけるべきです。今回なら c → non_empty、(2)で書いた条件変数→non_full と名付けるべきでしょう。あと、ふつう n→in、l→out、s→size じゃないですか?
http://d.hatena.ne.jp/amachang/20080611/1213165364#c1213677983
static 変数の問題
いくら C だからといって、static 変数にするんじゃなくて、キュー構造体にして、各関数には明示的に構造体へのポインタを渡すぐらいはした方が、いまどき良いと思います。
http://d.hatena.ne.jp/amachang/20080611/1213165364#c1213677983
これらを踏まえてもう一度書き直してみました。
#include <stdio.h> #include <stdlib.h> #include <memory.h> #include <pthread.h> #include <assert.h> // 構造体: queue_t typedef struct queue { unsigned int in; // 次に入れるインデックス unsigned int out; // 次に出すインデックス unsigned int size; // キューのサイズ unsigned int no_full; // 満タンになると 0 になる unsigned int no_empty; // 空っぽになると 0 になる pthread_mutex_t mutex; pthread_cond_t cond_full; // データが満タンのときに待つための cond pthread_cond_t cond_empty; // データが空のときに待つための cond void* buffer[1]; // バッファ // ここ以下のメモリは p_queue->buffer[3] などとして参照される } queue_t; // 関数名: create_queue // size で指定されたサイズのバッファを持つ queue_t を生成&初期化し、そのポインタを返す // destroy_queue で解放する必要がある // メモリの確保に失敗した場合は NULL を返す queue_t* create_queue (size_t size) { queue_t* p_queue; int memsize = sizeof(queue_t) + (size - 1) * sizeof(void*); // メモリの確保 // struct queue_t のサイズは buffer サイズとして 1 を含んでいるので 1 を引く p_queue = (queue_t*)malloc(memsize); if (p_queue != NULL) { // メモリを 0 で初期化 memset(p_queue, 0x00, memsize); // バッファのサイズ p_queue->size = size; // フラグの初期化 p_queue->no_full = size; p_queue->no_empty = 0; // pthread_mutex_t の初期化 p_queue->mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER; // エラーチェック有り mutex pthread_mutex_init(&p_queue->mutex, NULL); // pthread_cond_t の初期化 p_queue->cond_full = (pthread_cond_t) PTHREAD_COND_INITIALIZER; p_queue->cond_empty = (pthread_cond_t) PTHREAD_COND_INITIALIZER; pthread_cond_init(&p_queue->cond_full, NULL); pthread_cond_init(&p_queue->cond_empty, NULL); } return p_queue; } // 関数名: destroy_queue // create_queue で確保されたメモリを解放する // 初期化されている mutex や cond も破棄する void destroy_queue (queue_t* p_queue) { int rv; if (p_queue != NULL) { // pthread_mutex_t の破棄 rv = pthread_mutex_destroy(&p_queue->mutex); assert(rv == 0); // pthread_mutex_t の破棄 rv = pthread_cond_destroy(&p_queue->cond_full); assert(rv == 0); rv = pthread_cond_destroy(&p_queue->cond_empty); assert(rv == 0); // メモリの解放 free(p_queue); } } // 関数名: enqueue // キューにデータを入れる // データが満タンな場合は、ブロックします void enqueue (queue_t* p_queue, void* data) { pthread_mutex_lock(&p_queue->mutex); // -- ここから、クリティカルセクション -- // 満タンじゃなくなるまで待つ while (!p_queue->no_full) { pthread_cond_wait(&p_queue->cond_full, &p_queue->mutex); } // データを入れる p_queue->buffer[p_queue->in] = data; // 次にデータを入れる場所をインクリメント p_queue->in++; p_queue->in %= p_queue->size; // フラグの更新 p_queue->no_full--; p_queue->no_empty++; // -- ここまで、クリティカルセクション -- pthread_mutex_unlock(&p_queue->mutex); pthread_cond_signal(&p_queue->cond_empty); } // 関数名: dequeue // キューにデータを入れる // データが満タンな場合は、ブロックします void* dequeue (queue_t* p_queue) { void* result; pthread_mutex_lock(&p_queue->mutex); // -- ここから、クリティカルセクション -- // 空っぽじゃなくなるまで待つ while (!p_queue->no_empty) { pthread_cond_wait(&p_queue->cond_empty, &p_queue->mutex); } // データを取り出す result = p_queue->buffer[p_queue->out]; // 次にデータを取り出す場所をインクリメント p_queue->out++; p_queue->out %= p_queue->size; // フラグの更新 p_queue->no_full++; p_queue->no_empty--; // -- ここまで、クリティカルセクション -- pthread_mutex_unlock(&p_queue->mutex); pthread_cond_signal(&p_queue->cond_full); return result; } void inspect_queue(queue_t* p_queue) { pthread_mutex_lock(&p_queue->mutex); printf( "queue(%p) {\n" " size: %d;\n" " length: %d;\n" "}\n\n", p_queue, p_queue->size, p_queue->no_empty); pthread_mutex_unlock(&p_queue->mutex); } void* enqueue_thread_func(void *p) { queue_t *p_queue = (queue_t*) p; char* p_data; int i = 100; while (i--) { p_data = malloc(5 * sizeof(char)); strcpy(p_data, "hoge"); enqueue(p_queue, p_data); } return NULL; } void* dequeue_thread_func(void *p) { queue_t *p_queue = (queue_t*) p; char* p_data; int i = 100; while (i--) { p_data = dequeue(p_queue); free(p_data); } return NULL; } #define N_ENQUEUE_THREADS 10 #define N_DEQUEUE_THREADS 10 int main (int argc, char** argv) { queue_t* p_queue = create_queue(10); int rv; int i; pthread_t enqueue_threads[N_ENQUEUE_THREADS] = {0,}; pthread_t dequeue_threads[N_DEQUEUE_THREADS] = {0,}; for (i = 0; i < N_ENQUEUE_THREADS; i ++) { rv = pthread_create(enqueue_threads + i, NULL, enqueue_thread_func, p_queue); assert(rv == 0); } for (i = 0; i < N_DEQUEUE_THREADS; i ++) { rv = pthread_create(dequeue_threads + i, NULL, dequeue_thread_func, p_queue); assert(rv == 0); } for (i = 0; i < N_ENQUEUE_THREADS; i ++) { rv = pthread_join(enqueue_threads[i], NULL); assert(rv == 0); } for (i = 0; i < N_DEQUEUE_THREADS; i ++) { rv = pthread_join(dequeue_threads[i], NULL); assert(rv == 0); } destroy_queue(p_queue); return 0; }
今度はどうでしょうか
今度は dtrace で lock, unlock, block の回数を数える
$ sudo dtrace -n " > plockstat\$target::pthread_mutex_unlock:mutex-release, > plockstat\$target::pthread_mutex_lock:mutex-acquire, > plockstat\$target::pthread_mutex_lock:mutex-block, > plockstat\$target::_pthread_cond_wait:mutex-release > { @[probefunc,probename,arg0] = count() } >" -c ./a.out dtrace: description 'plockstat$target::pthread_mutex_unlock:mutex-release, plockstat$target::pthread_mutex_lock:mutex-acquire, plockstat$target::pthread_mutex_lock:mutex-block, plockstat$target::_pthread_cond_wait:mutex-release ' matched 4 probes dtrace: pid 21145 has exited pthread_mutex_lock mutex-acquire 2693110208 3 pthread_mutex_unlock mutex-release 2693110208 3 _pthread_cond_wait mutex-release 1048996 59 pthread_mutex_lock mutex-block 1048996 1966 pthread_mutex_unlock mutex-release 1048996 2000 pthread_mutex_lock mutex-acquire 1048996 2059 $
2000 回 unlock されている 1048996 というアドレスが件の mutex のアドレスなので
- 2059 回 lock (wait からの復帰も含まれる) され
- 59 回 wait し
- 2000 回 unlock され
- 1966 回 block したことがわかります
ちゃんと動いてそうですね。
まとめ
皆様のアドバイスのおかげで、とても勉強になりました。
本当にありがとうございます。
(追記)さらにアドバイスを頂いたので
もう一回直してみました!やった!
pthread でキューを作ってみる(再々挑戦、最終版) - IT戦記