先日の pthread のキューですが
pthread でキューを作る(再挑戦)。ついでに dtrace でスレッドの解析 - IT戦記
このエントリのコメント欄でさらに id:n-soda さんにアドバイスを頂いたので、修正したいと思います。
まず、 memory.h は古いらしい
string.h を使いましょう。ということらしい
試しに、 /usr/include/memory.h を cat してみたら
/* * Copyright (c) 1988, 1993 * The Regents of the University of California. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. All advertising materials mentioning features or use of this software * must display the following acknowledgement: * This product includes software developed by the University of * California, Berkeley and its contributors. * 4. Neither the name of the University nor the names of its contributors * may be used to endorse or promote products derived from this software * without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * @(#)memory.h 8.1 (Berkeley) 6/2/93 */ #include <string.h>
なんと! string.h をインクルードするのとまったく同じなんですね。
pthread_mutex_init と PTHREAD_MUTEX_INITIALIZER
p_queue->mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_init(&p_queue->mutex, NULL);
ちょっと気になったので Mac の pthread_mutex_init の実装を覗いてみます。
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr) { #if 0 /* conformance tests depend on not having this behavior */ /* The test for this behavior is optional */ if (mutex->sig == _PTHREAD_MUTEX_SIG) return EBUSY; #endif LOCK_INIT(mutex->lock); return (_pthread_mutex_init(mutex, attr)); }
mutex->lock は mutex を使ったロックをかけるための spin_lock 用の変数で、 LOCK_INIT(mutex->lock) は mutex->lock = 0; と同じみたいです。
で、 _pthread_mutex_init を見てみます。
static int _pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr) { if (attr) { if (attr->sig != _PTHREAD_MUTEX_ATTR_SIG) return (EINVAL); mutex->prioceiling = attr->prioceiling; mutex->protocol = attr->protocol; mutex->type = attr->type; mutex->pshared = attr->pshared; if (attr->pshared == PTHREAD_PROCESS_SHARED) { mutex->lock_count = 0; mutex->owner = (pthread_t)NULL; mutex->next = (pthread_mutex_t *)NULL; mutex->prev = (pthread_mutex_t *)NULL; mutex->busy = (pthread_cond_t *)NULL; mutex->waiters = 0; mutex->sem = SEMAPHORE_NULL; mutex->order = SEMAPHORE_NULL; mutex->sig = 0; if( __pthread_mutex_init(mutex, attr) == -1) return(errno); mutex->sig = _PTHREAD_KERN_MUTEX_SIG; return(0); } } else { mutex->prioceiling = _PTHREAD_DEFAULT_PRIOCEILING; mutex->protocol = _PTHREAD_DEFAULT_PROTOCOL; mutex->type = PTHREAD_MUTEX_DEFAULT; mutex->pshared = _PTHREAD_DEFAULT_PSHARED; } mutex->lock_count = 0; mutex->owner = (pthread_t)NULL; mutex->next = (pthread_mutex_t *)NULL; mutex->prev = (pthread_mutex_t *)NULL; mutex->busy = (pthread_cond_t *)NULL; mutex->waiters = 0; mutex->sem = SEMAPHORE_NULL; mutex->order = SEMAPHORE_NULL; mutex->sig = _PTHREAD_MUTEX_SIG; return (0); }
_PTHREAD_MUTEX_SIG_inig と 0 で初期化しています。
もう一度 _pthread_mutex_init と define の値を見てみると、 PTHREAD_MUTEX_INITIALIZER で設定される値と違うことが分かります。
今度は、pthread_mutex_lock の実装を見てみましょう。
int pthread_mutex_trylock(pthread_mutex_t *mutex) { if (mutex->sig != _PTHREAD_MUTEX_SIG) { /* 略 */ _pthread_mutex_init(mutex, NULL); } /* 略 */ }
第一引数の pthread_mutex_t が初期化されていない場合は、 lock の前に _pthread_mutex_init が呼ばれています。
- pthread_mutex_init はその場で初期化
また、pthread_cond_init と PTHREAD_COND_INITIALIZER も同じだと思います。
cond の意味
cond の意味は、 cond を通過するための条件という意味つまり、 cond_empty という名前を付けると empty の場合だけ実行される箇所という意味にとらえられてしまう。
というわけで、空じゃなくなるまで待つような場合は not_empty というような名前が望ましい。
今回修正したところは、コメントに ** と書いてある箇所です。
#include <stdio.h> #include <stdlib.h> #include <string.h> // ** #include <memory.h> は古いヘッダなので使わない #include <pthread.h> #include <assert.h> // 構造体: queue_t typedef struct queue { unsigned int in; // 次に入れるインデックス unsigned int out; // 次に出すインデックス unsigned int size; // キューのサイズ unsigned int length; // ** キューに格納されている要素数 pthread_mutex_t mutex; pthread_cond_t not_full; // ** キューが満タンじゃないという条件(cond) pthread_cond_t not_empty; // ** キューが空じゃないという条件(cond) void* buffer[1]; // バッファ // ここ以下のメモリは p_queue->buffer[3] などとして参照される } queue_t; // 関数名: create_queue // size で指定されたサイズのバッファを持つ queue_t を生成&初期化し、そのポインタを返す // destroy_queue で解放する必要がある // メモリの確保に失敗した場合は NULL を返す queue_t* create_queue (size_t size) { queue_t* p_queue; int memsize = sizeof(queue_t) + (size - 1) * sizeof(void*); // メモリの確保 // struct queue_t のサイズは buffer サイズとして 1 を含んでいるので 1 を引く p_queue = (queue_t*)malloc(memsize); if (p_queue != NULL) { // ** index の初期化 p_queue->in = 0; p_queue->out = 0; // バッファのサイズ p_queue->size = size; // 要素数の初期化 p_queue->length = 0; // pthread_mutex_t の初期化 pthread_mutex_init(&p_queue->mutex, NULL); // ** 以下のように、定数を使った初期化をすることも出来る。 // p_queue->mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER; // pthread_cond_t の初期化 pthread_cond_init(&p_queue->not_full, NULL); pthread_cond_init(&p_queue->not_empty, NULL); // ** 以下のように、定数を使った初期化をすることも出来る。 // p_queue->not_full = (pthread_cond_t) PTHREAD_COND_INITIALIZER; // p_queue->not_empty = (pthread_cond_t) PTHREAD_COND_INITIALIZER; } return p_queue; } // 関数名: destroy_queue // create_queue で確保されたメモリを解放する // 初期化されている mutex や cond も破棄する void destroy_queue (queue_t* p_queue) { int rv; assert(p_queue != NULL); // pthread_mutex_t の破棄 rv = pthread_mutex_destroy(&p_queue->mutex); assert(rv == 0); // pthread_mutex_t の破棄 rv = pthread_cond_destroy(&p_queue->not_full); assert(rv == 0); rv = pthread_cond_destroy(&p_queue->not_empty); assert(rv == 0); // メモリの解放 free(p_queue); } // 関数名: enqueue // キューにデータを入れる // データが満タンな場合は、ブロックします void enqueue (queue_t* p_queue, void* data) { int rv; rv = pthread_mutex_lock(&p_queue->mutex); assert(rv == 0); // -- ここから、クリティカルセクション -- // 満タンじゃなくなるまで待つ while (p_queue->length == p_queue->size) { rv = pthread_cond_wait(&p_queue->not_full, &p_queue->mutex); assert(rv == 0); } // データを入れる p_queue->buffer[p_queue->in] = data; // 次にデータを入れる場所をインクリメント p_queue->in++; p_queue->in %= p_queue->size; // 要素数の更新 p_queue->length++; // -- ここまで、クリティカルセクション -- pthread_mutex_unlock(&p_queue->mutex); pthread_cond_signal(&p_queue->not_empty); } // 関数名: dequeue // キューにデータを入れる // データが満タンな場合は、ブロックします void* dequeue (queue_t* p_queue) { void* result; int rv; rv = pthread_mutex_lock(&p_queue->mutex); assert(rv == 0); // -- ここから、クリティカルセクション -- // 空っぽじゃなくなるまで待つ while (!p_queue->length) { rv = pthread_cond_wait(&p_queue->not_empty, &p_queue->mutex); assert(rv == 0); } // データを取り出す result = p_queue->buffer[p_queue->out]; // 次にデータを取り出す場所をインクリメント p_queue->out++; p_queue->out %= p_queue->size; // 要素数の更新 p_queue->length--; // -- ここまで、クリティカルセクション -- pthread_mutex_unlock(&p_queue->mutex); pthread_cond_signal(&p_queue->not_full); return result; } void inspect_queue(queue_t* p_queue) { int rv; rv = pthread_mutex_lock(&p_queue->mutex); assert(rv == 0); printf( "queue(%p) {\n" " size: %d;\n" " length: %d;\n" "}\n\n", p_queue, p_queue->size, p_queue->length); pthread_mutex_unlock(&p_queue->mutex); } void* enqueue_thread_func(void *p) { queue_t *p_queue = (queue_t*) p; char* p_data; int i = 100; while (i--) { p_data = malloc(5 * sizeof(char)); strcpy(p_data, "hoge"); enqueue(p_queue, p_data); } return NULL; } void* dequeue_thread_func(void *p) { queue_t *p_queue = (queue_t*) p; char* p_data; int i = 100; while (i--) { p_data = dequeue(p_queue); free(p_data); } return NULL; } #define N_ENQUEUE_THREADS 10 #define N_DEQUEUE_THREADS 10 int main (int argc, char** argv) { queue_t* p_queue = create_queue(10); int rv; int i; pthread_t enqueue_threads[N_ENQUEUE_THREADS] = {0,}; pthread_t dequeue_threads[N_DEQUEUE_THREADS] = {0,}; for (i = 0; i < N_ENQUEUE_THREADS; i ++) { rv = pthread_create(enqueue_threads + i, NULL, enqueue_thread_func, p_queue); assert(rv == 0); } for (i = 0; i < N_DEQUEUE_THREADS; i ++) { rv = pthread_create(dequeue_threads + i, NULL, dequeue_thread_func, p_queue); assert(rv == 0); } for (i = 0; i < N_ENQUEUE_THREADS; i ++) { rv = pthread_join(enqueue_threads[i], NULL); assert(rv == 0); } for (i = 0; i < N_DEQUEUE_THREADS; i ++) { rv = pthread_join(dequeue_threads[i], NULL); assert(rv == 0); } destroy_queue(p_queue); return 0; }