IT戦記

プログラミング、起業などについて書いているプログラマーのブログです😚

pthread でキューを作ってみる(再々挑戦、最終版)

先日の pthread のキューですが

pthread でキューを作る(再挑戦)。ついでに dtrace でスレッドの解析 - IT戦記
このエントリのコメント欄でさらに id:n-soda さんにアドバイスを頂いたので、修正したいと思います。

まず、 memory.h は古いらしい

string.h を使いましょう。ということらしい
試しに、 /usr/include/memory.h を cat してみたら

/*
 * Copyright (c) 1988, 1993
 *	The Regents of the University of California.  All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 * 3. All advertising materials mentioning features or use of this software
 *    must display the following acknowledgement:
 *	This product includes software developed by the University of
 *	California, Berkeley and its contributors.
 * 4. Neither the name of the University nor the names of its contributors
 *    may be used to endorse or promote products derived from this software
 *    without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 *
 *	@(#)memory.h	8.1 (Berkeley) 6/2/93
 */

#include <string.h>

なんと! string.h をインクルードするのとまったく同じなんですね。

pthread_mutex_init と PTHREAD_MUTEX_INITIALIZER

以下の二つのステートメントはどちらか一方で良いそうです。

p_queue->mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_init(&p_queue->mutex, NULL);

ちょっと気になったので Mac の pthread_mutex_init の実装を覗いてみます。
ソースは以下にあります。
http://www.opensource.apple.com/darwinsource/10.5.3/Libc-498.1.1/pthreads/pthread_mutex.c

int
pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
{
#if 0
	/* conformance tests depend on not having this behavior */
	/* The test for this behavior is optional */
	if (mutex->sig == _PTHREAD_MUTEX_SIG)
		return EBUSY;
#endif
	LOCK_INIT(mutex->lock);
	return (_pthread_mutex_init(mutex, attr));
}

mutex->lock は mutex を使ったロックをかけるための spin_lock 用の変数で、 LOCK_INIT(mutex->lock) は mutex->lock = 0; と同じみたいです。
で、 _pthread_mutex_init を見てみます。

static int
_pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
{
	if (attr)
	{
		if (attr->sig != _PTHREAD_MUTEX_ATTR_SIG)
			return (EINVAL);
		mutex->prioceiling = attr->prioceiling;
		mutex->protocol = attr->protocol;
		mutex->type = attr->type;
		mutex->pshared = attr->pshared;
		if (attr->pshared == PTHREAD_PROCESS_SHARED) {
			mutex->lock_count = 0;
			mutex->owner = (pthread_t)NULL;
			mutex->next = (pthread_mutex_t *)NULL;
			mutex->prev = (pthread_mutex_t *)NULL;
			mutex->busy = (pthread_cond_t *)NULL;
			mutex->waiters = 0;
			mutex->sem = SEMAPHORE_NULL;
			mutex->order = SEMAPHORE_NULL;
			mutex->sig = 0;
			if( __pthread_mutex_init(mutex, attr) == -1)
				return(errno);
			mutex->sig = _PTHREAD_KERN_MUTEX_SIG;
			return(0);
		}
	} else {
		mutex->prioceiling = _PTHREAD_DEFAULT_PRIOCEILING;
		mutex->protocol = _PTHREAD_DEFAULT_PROTOCOL;
		mutex->type = PTHREAD_MUTEX_DEFAULT;
		mutex->pshared = _PTHREAD_DEFAULT_PSHARED;
	}
	mutex->lock_count = 0;
	mutex->owner = (pthread_t)NULL;
	mutex->next = (pthread_mutex_t *)NULL;
	mutex->prev = (pthread_mutex_t *)NULL;
	mutex->busy = (pthread_cond_t *)NULL;
	mutex->waiters = 0;
	mutex->sem = SEMAPHORE_NULL;
	mutex->order = SEMAPHORE_NULL;
	mutex->sig = _PTHREAD_MUTEX_SIG;
	return (0);
}

値を設定してますね。
こんどは、 PTHREAD_MUTEX_INITIALIZER を見てみます。
ソースは以下
http://www.opensource.apple.com/darwinsource/10.5.3/Libc-498.1.1/pthreads/pthread.h

#define PTHREAD_MUTEX_INITIALIZER {_PTHREAD_MUTEX_SIG_init, {0}}

_PTHREAD_MUTEX_SIG_inig と 0 で初期化しています。
もう一度 _pthread_mutex_init と define の値を見てみると、 PTHREAD_MUTEX_INITIALIZER で設定される値と違うことが分かります。
今度は、pthread_mutex_lock の実装を見てみましょう。
ソースは以下
http://www.opensource.apple.com/darwinsource/10.5.3/Libc-498.1.1/pthreads/pthread_mutex.c

int
pthread_mutex_trylock(pthread_mutex_t *mutex)
{
	if (mutex->sig != _PTHREAD_MUTEX_SIG)
	{
                /* 略 */
		_pthread_mutex_init(mutex, NULL);
	}
        /* 略 */
}

おおお!なるほど!なるほど!
第一引数の pthread_mutex_t が初期化されていない場合は、 lock の前に _pthread_mutex_init が呼ばれています。
ということは、

  • PTHREAD_MUTEX_INITIALIZER は最初のロック時に初期化
  • pthread_mutex_init はその場で初期化

ということですね!
また、pthread_cond_init と PTHREAD_COND_INITIALIZER も同じだと思います。

cond の意味

cond の意味は、 cond を通過するための条件という意味つまり、 cond_empty という名前を付けると empty の場合だけ実行される箇所という意味にとらえられてしまう。
というわけで、空じゃなくなるまで待つような場合は not_empty というような名前が望ましい。
ということらしいです。

という訳で、最後の修正版

今回修正したところは、コメントに ** と書いてある箇所です。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
// ** #include <memory.h> は古いヘッダなので使わない
#include <pthread.h>
#include <assert.h>

// 構造体: queue_t
typedef struct queue {
    unsigned int    in;         // 次に入れるインデックス
    unsigned int    out;        // 次に出すインデックス
    unsigned int    size;       // キューのサイズ
    unsigned int    length;     // ** キューに格納されている要素数
    pthread_mutex_t mutex;
    pthread_cond_t  not_full;   // ** キューが満タンじゃないという条件(cond)
    pthread_cond_t  not_empty;  // ** キューが空じゃないという条件(cond)
    void*           buffer[1];  // バッファ
    // ここ以下のメモリは p_queue->buffer[3] などとして参照される
} queue_t;

// 関数名: create_queue
//  size で指定されたサイズのバッファを持つ queue_t を生成&初期化し、そのポインタを返す
//  destroy_queue で解放する必要がある
//  メモリの確保に失敗した場合は NULL を返す
queue_t* create_queue (size_t size) {
    queue_t* p_queue;
    int memsize = sizeof(queue_t) + (size - 1) * sizeof(void*);

    // メモリの確保
    // struct queue_t のサイズは buffer サイズとして 1 を含んでいるので 1 を引く
    p_queue = (queue_t*)malloc(memsize);

    if (p_queue != NULL) {

        // ** index の初期化
        p_queue->in = 0;
        p_queue->out = 0;

        // バッファのサイズ
        p_queue->size = size;

        // 要素数の初期化
        p_queue->length = 0;

        // pthread_mutex_t の初期化
        pthread_mutex_init(&p_queue->mutex, NULL);
        // ** 以下のように、定数を使った初期化をすることも出来る。
        // p_queue->mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER;

        // pthread_cond_t の初期化
        pthread_cond_init(&p_queue->not_full, NULL);
        pthread_cond_init(&p_queue->not_empty, NULL);
        // ** 以下のように、定数を使った初期化をすることも出来る。
        // p_queue->not_full  = (pthread_cond_t) PTHREAD_COND_INITIALIZER;
        // p_queue->not_empty = (pthread_cond_t) PTHREAD_COND_INITIALIZER;
    }

    return p_queue;
}

// 関数名: destroy_queue
//  create_queue で確保されたメモリを解放する
//  初期化されている mutex や cond も破棄する
void destroy_queue (queue_t* p_queue) {
    int rv;

    assert(p_queue != NULL);

    // pthread_mutex_t の破棄
    rv = pthread_mutex_destroy(&p_queue->mutex);
    assert(rv == 0);

    // pthread_mutex_t の破棄
    rv = pthread_cond_destroy(&p_queue->not_full);
    assert(rv == 0);
    rv = pthread_cond_destroy(&p_queue->not_empty);
    assert(rv == 0);

    // メモリの解放
    free(p_queue);
}

// 関数名: enqueue
//  キューにデータを入れる
//  データが満タンな場合は、ブロックします
void enqueue (queue_t* p_queue, void* data) {
    int rv;
    rv = pthread_mutex_lock(&p_queue->mutex);
    assert(rv == 0);
    // -- ここから、クリティカルセクション --

    // 満タンじゃなくなるまで待つ
    while (p_queue->length == p_queue->size) {
        rv = pthread_cond_wait(&p_queue->not_full, &p_queue->mutex);
        assert(rv == 0);
    }

    // データを入れる
    p_queue->buffer[p_queue->in] = data;

    // 次にデータを入れる場所をインクリメント
    p_queue->in++;
    p_queue->in %= p_queue->size;

    // 要素数の更新
    p_queue->length++;

    // -- ここまで、クリティカルセクション --
    pthread_mutex_unlock(&p_queue->mutex);
    pthread_cond_signal(&p_queue->not_empty);
}

// 関数名: dequeue
//  キューにデータを入れる
//  データが満タンな場合は、ブロックします
void* dequeue (queue_t* p_queue) {
    void* result;

    int rv;
    rv = pthread_mutex_lock(&p_queue->mutex);
    assert(rv == 0);
    // -- ここから、クリティカルセクション --

    // 空っぽじゃなくなるまで待つ
    while (!p_queue->length) {
        rv = pthread_cond_wait(&p_queue->not_empty, &p_queue->mutex);
        assert(rv == 0);
    }

    // データを取り出す
    result = p_queue->buffer[p_queue->out];

    // 次にデータを取り出す場所をインクリメント
    p_queue->out++;
    p_queue->out %= p_queue->size;

    // 要素数の更新
    p_queue->length--;

    // -- ここまで、クリティカルセクション --
    pthread_mutex_unlock(&p_queue->mutex);
    pthread_cond_signal(&p_queue->not_full);

    return result;
}

void inspect_queue(queue_t* p_queue) {
    int rv;
    rv = pthread_mutex_lock(&p_queue->mutex);
    assert(rv == 0);
    printf( "queue(%p) {\n"
            "   size:   %d;\n"
            "   length: %d;\n"
            "}\n\n", p_queue, p_queue->size, p_queue->length);
    pthread_mutex_unlock(&p_queue->mutex);
}

void* enqueue_thread_func(void *p) {
    queue_t *p_queue = (queue_t*) p;
    char* p_data;
    int i = 100;

    while (i--) {
        p_data = malloc(5 * sizeof(char));
        strcpy(p_data, "hoge");
        enqueue(p_queue, p_data);
    }

    return NULL;
}

void* dequeue_thread_func(void *p) {
    queue_t *p_queue = (queue_t*) p;
    char* p_data;
    int i = 100;

    while (i--) {
        p_data = dequeue(p_queue);
        free(p_data);
    }

    return NULL;
}

#define N_ENQUEUE_THREADS 10
#define N_DEQUEUE_THREADS 10
int main (int argc, char** argv) {
    queue_t* p_queue = create_queue(10);
    int rv;
    int i;

    pthread_t enqueue_threads[N_ENQUEUE_THREADS] = {0,};
    pthread_t dequeue_threads[N_DEQUEUE_THREADS] = {0,};

    for (i = 0; i < N_ENQUEUE_THREADS; i ++) {
        rv = pthread_create(enqueue_threads + i, NULL, enqueue_thread_func, p_queue);
        assert(rv == 0);
    }
    for (i = 0; i < N_DEQUEUE_THREADS; i ++) {
        rv = pthread_create(dequeue_threads + i, NULL, dequeue_thread_func, p_queue);
        assert(rv == 0);
    }

    for (i = 0; i < N_ENQUEUE_THREADS; i ++) {
        rv = pthread_join(enqueue_threads[i], NULL);
        assert(rv == 0);
    }
    for (i = 0; i < N_DEQUEUE_THREADS; i ++) {
        rv = pthread_join(dequeue_threads[i], NULL);
        assert(rv == 0);
    }

    destroy_queue(p_queue);
    return 0;
}