一日一回スレッドの勉強
spin_lock とは
今のところの理解です。間違っているかもしれません。
スレッドが休止状態にならない。
ロックが取得できない場合は、 while (1) trylock() のようにスレッドが無限ループのような状態でロックが取得できるのを待ち続ける。
利点
スレッドが休止状態にならないので mutex を使うより CPU を占有している時間が長いので、コンテキストスイッチが発生する回数が少ない。
欠点
スレッドが休止状態にならないので、 lock 中は(割り込みが発生しないと)他のスレッドやプロセスがその CPU を使えなくなる。
昨日の mutex で作ったキューを spin_lock に変えてみる。
以下のエントリに元コードがあります。
http://d.hatena.ne.jp/amachang/20080617/1213694238
まず、構造体
pthread_mutex_t や pthread_cond_t を int spin に置き換える
// 構造体: queue_t typedef struct queue { unsigned int in; // 次に入れるインデックス unsigned int out; // 次に出すインデックス unsigned int size; // キューのサイズ unsigned int no_full; // 満タンになると 0 になる unsigned int no_empty; // 空っぽになると 0 になる #ifdef SPIN int spin; #else pthread_mutex_t mutex; pthread_cond_t cond_full; // データが満タンのときに待つための cond pthread_cond_t cond_empty; // データが空のときに待つための cond #endif void* buffer[1]; // バッファ // ここ以下のメモリは p_queue->buffer[3] などとして参照される } queue_t;
初期化、破棄
spin を初期化するだけ
#ifdef SPIN // spin_t の初期化 p_queue->spin = 0; #else // pthread_mutex_t の初期化 p_queue->mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER; pthread_mutex_init(&p_queue->mutex, NULL); // pthread_cond_t の初期化 p_queue->cond_full = (pthread_cond_t) PTHREAD_COND_INITIALIZER; p_queue->cond_empty = (pthread_cond_t) PTHREAD_COND_INITIALIZER; pthread_cond_init(&p_queue->cond_full, NULL); pthread_cond_init(&p_queue->cond_empty, NULL); #endif
破棄はいらない
#ifdef SPIN #else int rv; // pthread_mutex_t の破棄 rv = pthread_mutex_destroy(&p_queue->mutex); assert(rv == 0); // pthread_mutex_t の破棄 rv = pthread_cond_destroy(&p_queue->cond_full); assert(rv == 0); rv = pthread_cond_destroy(&p_queue->cond_empty); assert(rv == 0); #endif
ロック
#ifdef SPIN spin_lock(&p_queue->spin); #else pthread_mutex_lock(&p_queue->mutex); #endif
アンロックとシグナル
シグナルはいらない
#ifdef SPIN spin_unlock(&p_queue->spin); #else pthread_mutex_unlock(&p_queue->mutex); pthread_cond_signal(&p_queue->cond_empty); #endif
wait
これは悩んだ。以下のような感じでいいのかな><
#ifdef SPIN // ロックを解放 spin_unlock(&p_queue->spin); // ちょっとだけ待つ __asm__ __volatile__("nop"); // ロックする spin_lock(&p_queue->spin); #else pthread_cond_wait(&p_queue->cond_full, &p_queue->mutex); #endif
nop を入れてる。
nop の数で空回り回数を決める。
コード全体
#include <stdio.h> #include <stdlib.h> #include <memory.h> #include <pthread.h> #include <assert.h> extern void spin_unlock(int *p); extern void spin_lock(int *p); // 構造体: queue_t typedef struct queue { unsigned int in; // 次に入れるインデックス unsigned int out; // 次に出すインデックス unsigned int size; // キューのサイズ unsigned int no_full; // 満タンになると 0 になる unsigned int no_empty; // 空っぽになると 0 になる #ifdef SPIN int spin; #else pthread_mutex_t mutex; pthread_cond_t cond_full; // データが満タンのときに待つための cond pthread_cond_t cond_empty; // データが空のときに待つための cond #endif void* buffer[1]; // バッファ // ここ以下のメモリは p_queue->buffer[3] などとして参照される } queue_t; // 関数名: create_queue // size で指定されたサイズのバッファを持つ queue_t を生成&初期化し、そのポインタを返す // destroy_queue で解放する必要がある // メモリの確保に失敗した場合は NULL を返す queue_t* create_queue (size_t size) { queue_t* p_queue; int memsize = sizeof(queue_t) + (size - 1) * sizeof(void*); // メモリの確保 // struct queue_t のサイズは buffer サイズとして 1 を含んでいるので 1 を引く p_queue = (queue_t*)malloc(memsize); if (p_queue != NULL) { // メモリを 0 で初期化 memset(p_queue, 0x00, memsize); // バッファのサイズ p_queue->size = size; // フラグの初期化 p_queue->no_full = size; p_queue->no_empty = 0; #ifdef SPIN // spin_t の初期化 p_queue->spin = 0; #else // pthread_mutex_t の初期化 p_queue->mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER; pthread_mutex_init(&p_queue->mutex, NULL); // pthread_cond_t の初期化 p_queue->cond_full = (pthread_cond_t) PTHREAD_COND_INITIALIZER; p_queue->cond_empty = (pthread_cond_t) PTHREAD_COND_INITIALIZER; pthread_cond_init(&p_queue->cond_full, NULL); pthread_cond_init(&p_queue->cond_empty, NULL); #endif } return p_queue; } // 関数名: destroy_queue // create_queue で確保されたメモリを解放する // 初期化されている mutex や cond も破棄する void destroy_queue (queue_t* p_queue) { if (p_queue != NULL) { #ifdef SPIN #else int rv; // pthread_mutex_t の破棄 rv = pthread_mutex_destroy(&p_queue->mutex); assert(rv == 0); // pthread_mutex_t の破棄 rv = pthread_cond_destroy(&p_queue->cond_full); assert(rv == 0); rv = pthread_cond_destroy(&p_queue->cond_empty); assert(rv == 0); #endif // メモリの解放 free(p_queue); } } // 関数名: enqueue // キューにデータを入れる // データが満タンな場合は、ブロックします void enqueue (queue_t* p_queue, void* data) { #ifdef SPIN spin_lock(&p_queue->spin); #else pthread_mutex_lock(&p_queue->mutex); #endif // -- ここから、クリティカルセクション -- // 満タンじゃなくなるまで待つ while (!p_queue->no_full) { #ifdef SPIN // ロックを解放 spin_unlock(&p_queue->spin); // ちょっとだけ待つ __asm__ __volatile__("nop"); // ロックする spin_lock(&p_queue->spin); #else pthread_cond_wait(&p_queue->cond_full, &p_queue->mutex); #endif } // データを入れる p_queue->buffer[p_queue->in] = data; // 次にデータを入れる場所をインクリメント p_queue->in++; p_queue->in %= p_queue->size; // フラグの更新 p_queue->no_full--; p_queue->no_empty++; // -- ここまで、クリティカルセクション -- #ifdef SPIN spin_unlock(&p_queue->spin); #else pthread_mutex_unlock(&p_queue->mutex); pthread_cond_signal(&p_queue->cond_empty); #endif } // 関数名: dequeue // キューにデータを入れる // データが満タンな場合は、ブロックします void* dequeue (queue_t* p_queue) { void* result; #ifdef SPIN spin_lock(&p_queue->spin); #else pthread_mutex_lock(&p_queue->mutex); #endif // -- ここから、クリティカルセクション -- // 空っぽじゃなくなるまで待つ while (!p_queue->no_empty) { #ifdef SPIN // ロックを解放 spin_unlock(&p_queue->spin); // ちょっとだけ待つ __asm__ __volatile__("nop"); // ロックする spin_lock(&p_queue->spin); #else pthread_cond_wait(&p_queue->cond_empty, &p_queue->mutex); #endif } // データを取り出す result = p_queue->buffer[p_queue->out]; // 次にデータを取り出す場所をインクリメント p_queue->out++; p_queue->out %= p_queue->size; // フラグの更新 p_queue->no_full++; p_queue->no_empty--; // -- ここまで、クリティカルセクション -- #ifdef SPIN spin_unlock(&p_queue->spin); #else pthread_mutex_unlock(&p_queue->mutex); pthread_cond_signal(&p_queue->cond_full); #endif return result; } void inspect_queue(queue_t* p_queue) { #ifdef SPIN spin_lock(&p_queue->spin); #else pthread_mutex_lock(&p_queue->mutex); #endif printf( "queue(%p) {\n" " size: %d;\n" " length: %d;\n" "}\n\n", p_queue, p_queue->size, p_queue->no_empty); #ifdef SPIN spin_unlock(&p_queue->spin); #else pthread_mutex_unlock(&p_queue->mutex); #endif } void* enqueue_thread_func(void *p) { int id = (int) p; queue_t *p_queue = (queue_t*) p; char* p_data; int i; for (i = 0; i < 10; i++) { p_data = malloc(30 * sizeof(char)); sprintf(p_data, "tid[%p]: %d", pthread_self(), i); enqueue(p_queue, p_data); } return NULL; } void* dequeue_thread_func(void *p) { queue_t *p_queue = (queue_t*) p; char* p_data; int i; for (i = 0; i < 10; i++) { p_data = dequeue(p_queue); puts(p_data); free(p_data); } return NULL; } #define N_ENQUEUE_THREADS 10 #define N_DEQUEUE_THREADS 10 int main (int argc, char** argv) { queue_t* p_queue = create_queue(1); int rv; int i; pthread_t enqueue_threads[N_ENQUEUE_THREADS] = {0,}; pthread_t dequeue_threads[N_DEQUEUE_THREADS] = {0,}; for (i = 0; i < N_ENQUEUE_THREADS; i ++) { rv = pthread_create(enqueue_threads + i, NULL, enqueue_thread_func, p_queue); assert(rv == 0); } for (i = 0; i < N_DEQUEUE_THREADS; i ++) { rv = pthread_create(dequeue_threads + i, NULL, dequeue_thread_func, p_queue); assert(rv == 0); } for (i = 0; i < N_ENQUEUE_THREADS; i ++) { rv = pthread_join(enqueue_threads[i], NULL); assert(rv == 0); } for (i = 0; i < N_DEQUEUE_THREADS; i ++) { rv = pthread_join(dequeue_threads[i], NULL); assert(rv == 0); } destroy_queue(p_queue); return 0; }
実行
$ gcc -DSPIN queue.c && ./a.out tid[0xb0081000]: 0 tid[0xb0207000]: 0 tid[0xb0491000]: 0 tid[0xb0289000]: 0 tid[0xb0289000]: 1 tid[0xb0289000]: 2 tid[0xb0289000]: 3 tid[0xb0289000]: 4 tid[0xb0289000]: 5 tid[0xb0289000]: 6 tid[0xb0289000]: 7 tid[0xb0289000]: 8 tid[0xb0289000]: 9 tid[0xb0207000]: 1 tid[0xb0207000]: 2 tid[0xb0207000]: 3 tid[0xb038d000]: 0 tid[0xb038d000]: 1 tid[0xb038d000]: 2 tid[0xb038d000]: 3 tid[0xb038d000]: 4 tid[0xb038d000]: 5 tid[0xb038d000]: 6 tid[0xb038d000]: 7 tid[0xb038d000]: 8 : :
ちゃんと動いてるっぽい