IT戦記

プログラミング、起業などについて書いているプログラマーのブログです😚

spin_lock とは

一日一回スレッドの勉強

spin_lock とは

今のところの理解です。間違っているかもしれません。
スレッドが休止状態にならない。
ロックが取得できない場合は、 while (1) trylock() のようにスレッドが無限ループのような状態でロックが取得できるのを待ち続ける。

利点

スレッドが休止状態にならないので mutex を使うより CPU を占有している時間が長いので、コンテキストスイッチが発生する回数が少ない。

欠点

スレッドが休止状態にならないので、 lock 中は(割り込みが発生しないと)他のスレッドやプロセスがその CPU を使えなくなる。

昨日の mutex で作ったキューを spin_lock に変えてみる。

以下のエントリに元コードがあります。
http://d.hatena.ne.jp/amachang/20080617/1213694238

まず、構造体

pthread_mutex_t や pthread_cond_t を int spin に置き換える

// 構造体: queue_t
typedef struct queue {
    unsigned int    in;         // 次に入れるインデックス
    unsigned int    out;        // 次に出すインデックス
    unsigned int    size;       // キューのサイズ
    unsigned int    no_full;    // 満タンになると 0 になる
    unsigned int    no_empty;   // 空っぽになると 0 になる

#ifdef SPIN
    int             spin;
#else
    pthread_mutex_t mutex;
    pthread_cond_t  cond_full;  // データが満タンのときに待つための cond
    pthread_cond_t  cond_empty; // データが空のときに待つための cond
#endif
    void*           buffer[1];  // バッファ
    // ここ以下のメモリは p_queue->buffer[3] などとして参照される
} queue_t;
初期化、破棄

spin を初期化するだけ

#ifdef SPIN
        // spin_t の初期化
        p_queue->spin = 0;
#else
        // pthread_mutex_t の初期化
        p_queue->mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER;
        pthread_mutex_init(&p_queue->mutex, NULL);

        // pthread_cond_t の初期化
        p_queue->cond_full  = (pthread_cond_t) PTHREAD_COND_INITIALIZER;
        p_queue->cond_empty = (pthread_cond_t) PTHREAD_COND_INITIALIZER;
        pthread_cond_init(&p_queue->cond_full, NULL);
        pthread_cond_init(&p_queue->cond_empty, NULL);
#endif

破棄はいらない

#ifdef SPIN
#else
        int rv;

        // pthread_mutex_t の破棄
        rv = pthread_mutex_destroy(&p_queue->mutex);
        assert(rv == 0);

        // pthread_mutex_t の破棄
        rv = pthread_cond_destroy(&p_queue->cond_full);
        assert(rv == 0);
        rv = pthread_cond_destroy(&p_queue->cond_empty);
        assert(rv == 0);
#endif
ロック
#ifdef SPIN
    spin_lock(&p_queue->spin);
#else
    pthread_mutex_lock(&p_queue->mutex);
#endif
アンロックとシグナル

シグナルはいらない

#ifdef SPIN
    spin_unlock(&p_queue->spin);
#else
    pthread_mutex_unlock(&p_queue->mutex);
    pthread_cond_signal(&p_queue->cond_empty);
#endif
wait

これは悩んだ。以下のような感じでいいのかな><

#ifdef SPIN
        // ロックを解放
        spin_unlock(&p_queue->spin);
        // ちょっとだけ待つ
        __asm__ __volatile__("nop");
        // ロックする
        spin_lock(&p_queue->spin);
#else
        pthread_cond_wait(&p_queue->cond_full, &p_queue->mutex);
#endif

nop を入れてる。
nop の数で空回り回数を決める。

コード全体
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include <pthread.h>
#include <assert.h>

extern void spin_unlock(int *p);
extern void spin_lock(int *p);

// 構造体: queue_t
typedef struct queue {
    unsigned int    in;         // 次に入れるインデックス
    unsigned int    out;        // 次に出すインデックス
    unsigned int    size;       // キューのサイズ
    unsigned int    no_full;    // 満タンになると 0 になる
    unsigned int    no_empty;   // 空っぽになると 0 になる

#ifdef SPIN
    int             spin;
#else
    pthread_mutex_t mutex;
    pthread_cond_t  cond_full;  // データが満タンのときに待つための cond
    pthread_cond_t  cond_empty; // データが空のときに待つための cond
#endif
    void*           buffer[1];  // バッファ
    // ここ以下のメモリは p_queue->buffer[3] などとして参照される
} queue_t;

// 関数名: create_queue
//  size で指定されたサイズのバッファを持つ queue_t を生成&初期化し、そのポインタを返す
//  destroy_queue で解放する必要がある
//  メモリの確保に失敗した場合は NULL を返す
queue_t* create_queue (size_t size) {
    queue_t* p_queue;
    int memsize = sizeof(queue_t) + (size - 1) * sizeof(void*);

    // メモリの確保
    // struct queue_t のサイズは buffer サイズとして 1 を含んでいるので 1 を引く
    p_queue = (queue_t*)malloc(memsize);

    if (p_queue != NULL) {

        // メモリを 0 で初期化
        memset(p_queue, 0x00, memsize);

        // バッファのサイズ
        p_queue->size = size;

        // フラグの初期化
        p_queue->no_full = size;
        p_queue->no_empty = 0;

#ifdef SPIN
        // spin_t の初期化
        p_queue->spin = 0;
#else
        // pthread_mutex_t の初期化
        p_queue->mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER;
        pthread_mutex_init(&p_queue->mutex, NULL);

        // pthread_cond_t の初期化
        p_queue->cond_full  = (pthread_cond_t) PTHREAD_COND_INITIALIZER;
        p_queue->cond_empty = (pthread_cond_t) PTHREAD_COND_INITIALIZER;
        pthread_cond_init(&p_queue->cond_full, NULL);
        pthread_cond_init(&p_queue->cond_empty, NULL);
#endif
    }

    return p_queue;
}

// 関数名: destroy_queue
//  create_queue で確保されたメモリを解放する
//  初期化されている mutex や cond も破棄する
void destroy_queue (queue_t* p_queue) {

    if (p_queue != NULL) {

#ifdef SPIN
#else
        int rv;

        // pthread_mutex_t の破棄
        rv = pthread_mutex_destroy(&p_queue->mutex);
        assert(rv == 0);

        // pthread_mutex_t の破棄
        rv = pthread_cond_destroy(&p_queue->cond_full);
        assert(rv == 0);
        rv = pthread_cond_destroy(&p_queue->cond_empty);
        assert(rv == 0);
#endif

        // メモリの解放
        free(p_queue);
    }
}

// 関数名: enqueue
//  キューにデータを入れる
//  データが満タンな場合は、ブロックします
void enqueue (queue_t* p_queue, void* data) {
#ifdef SPIN
    spin_lock(&p_queue->spin);
#else
    pthread_mutex_lock(&p_queue->mutex);
#endif
    // -- ここから、クリティカルセクション --

    // 満タンじゃなくなるまで待つ
    while (!p_queue->no_full) {
#ifdef SPIN
        // ロックを解放
        spin_unlock(&p_queue->spin);
        // ちょっとだけ待つ
        __asm__ __volatile__("nop");
        // ロックする
        spin_lock(&p_queue->spin);
#else
        pthread_cond_wait(&p_queue->cond_full, &p_queue->mutex);
#endif
    }

    // データを入れる
    p_queue->buffer[p_queue->in] = data;

    // 次にデータを入れる場所をインクリメント
    p_queue->in++;
    p_queue->in %= p_queue->size;

    // フラグの更新
    p_queue->no_full--;
    p_queue->no_empty++;

    // -- ここまで、クリティカルセクション --
#ifdef SPIN
    spin_unlock(&p_queue->spin);
#else
    pthread_mutex_unlock(&p_queue->mutex);
    pthread_cond_signal(&p_queue->cond_empty);
#endif
}

// 関数名: dequeue
//  キューにデータを入れる
//  データが満タンな場合は、ブロックします
void* dequeue (queue_t* p_queue) {
    void* result;

#ifdef SPIN
    spin_lock(&p_queue->spin);
#else
    pthread_mutex_lock(&p_queue->mutex);
#endif
    // -- ここから、クリティカルセクション --

    // 空っぽじゃなくなるまで待つ
    while (!p_queue->no_empty) {
#ifdef SPIN
        // ロックを解放
        spin_unlock(&p_queue->spin);
        // ちょっとだけ待つ
        __asm__ __volatile__("nop");
        // ロックする
        spin_lock(&p_queue->spin);
#else
        pthread_cond_wait(&p_queue->cond_empty, &p_queue->mutex);
#endif
    }

    // データを取り出す
    result = p_queue->buffer[p_queue->out];

    // 次にデータを取り出す場所をインクリメント
    p_queue->out++;
    p_queue->out %= p_queue->size;

    // フラグの更新
    p_queue->no_full++;
    p_queue->no_empty--;

    // -- ここまで、クリティカルセクション --
#ifdef SPIN
    spin_unlock(&p_queue->spin);
#else
    pthread_mutex_unlock(&p_queue->mutex);
    pthread_cond_signal(&p_queue->cond_full);
#endif

    return result;
}

void inspect_queue(queue_t* p_queue) {
#ifdef SPIN
    spin_lock(&p_queue->spin);
#else
    pthread_mutex_lock(&p_queue->mutex);
#endif
    printf( "queue(%p) {\n"
            "   size:   %d;\n"
            "   length: %d;\n"
            "}\n\n", p_queue, p_queue->size, p_queue->no_empty);
#ifdef SPIN
    spin_unlock(&p_queue->spin);
#else
    pthread_mutex_unlock(&p_queue->mutex);
#endif
}

void* enqueue_thread_func(void *p) {
    int id = (int) p;
    queue_t *p_queue = (queue_t*) p;
    char* p_data;
    int i;

    for (i = 0; i < 10; i++) {
        p_data = malloc(30 * sizeof(char));
        sprintf(p_data, "tid[%p]: %d", pthread_self(), i);
        enqueue(p_queue, p_data);
    }

    return NULL;
}

void* dequeue_thread_func(void *p) {
    queue_t *p_queue = (queue_t*) p;
    char* p_data;
    int i;

    for (i = 0; i < 10; i++) {
        p_data = dequeue(p_queue);
        puts(p_data);
        free(p_data);
    }

    return NULL;
}

#define N_ENQUEUE_THREADS 10
#define N_DEQUEUE_THREADS 10
int main (int argc, char** argv) {
    queue_t* p_queue = create_queue(1);
    int rv;
    int i;

    pthread_t enqueue_threads[N_ENQUEUE_THREADS] = {0,};
    pthread_t dequeue_threads[N_DEQUEUE_THREADS] = {0,};

    for (i = 0; i < N_ENQUEUE_THREADS; i ++) {
        rv = pthread_create(enqueue_threads + i, NULL, enqueue_thread_func, p_queue);
        assert(rv == 0);
    }
    for (i = 0; i < N_DEQUEUE_THREADS; i ++) {
        rv = pthread_create(dequeue_threads + i, NULL, dequeue_thread_func, p_queue);
        assert(rv == 0);
    }

    for (i = 0; i < N_ENQUEUE_THREADS; i ++) {
        rv = pthread_join(enqueue_threads[i], NULL);
        assert(rv == 0);
    }
    for (i = 0; i < N_DEQUEUE_THREADS; i ++) {
        rv = pthread_join(dequeue_threads[i], NULL);
        assert(rv == 0);
    }

    destroy_queue(p_queue);
    return 0;
}
実行
$ gcc -DSPIN queue.c && ./a.out
tid[0xb0081000]: 0
tid[0xb0207000]: 0
tid[0xb0491000]: 0
tid[0xb0289000]: 0
tid[0xb0289000]: 1
tid[0xb0289000]: 2
tid[0xb0289000]: 3
tid[0xb0289000]: 4
tid[0xb0289000]: 5
tid[0xb0289000]: 6
tid[0xb0289000]: 7
tid[0xb0289000]: 8
tid[0xb0289000]: 9
tid[0xb0207000]: 1
tid[0xb0207000]: 2
tid[0xb0207000]: 3
tid[0xb038d000]: 0
tid[0xb038d000]: 1
tid[0xb038d000]: 2
tid[0xb038d000]: 3
tid[0xb038d000]: 4
tid[0xb038d000]: 5
tid[0xb038d000]: 6
tid[0xb038d000]: 7
tid[0xb038d000]: 8
:
: 

ちゃんと動いてるっぽい