From 889751e16f2bb562017c26c8714cd55d936b209b Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 14 May 2024 11:04:42 +0800 Subject: [PATCH] Semaphore on linux --- include/os/osSemaphore.h | 19 ++++--- include/os/osThread.h | 1 + source/os/src/osSemaphore.c | 84 ++++++++++++++++++++++++----- source/os/src/osThread.c | 8 +++ source/os/test/osSemaphoreTests.cpp | 14 ++--- 5 files changed, 99 insertions(+), 27 deletions(-) diff --git a/include/os/osSemaphore.h b/include/os/osSemaphore.h index e26a9d16d1..17dde6a396 100644 --- a/include/os/osSemaphore.h +++ b/include/os/osSemaphore.h @@ -46,12 +46,19 @@ int tsem_destroy(tsem_t *sem); #else -#define tsem_t sem_t -#define tsem_init sem_init -int tsem_wait(tsem_t *sem); -int tsem_timewait(tsem_t *sim, int64_t milis); -#define tsem_post sem_post -#define tsem_destroy sem_destroy +typedef struct tsem_t { + TdThreadMutex mutex; + TdThreadCond cond; + TdThreadCondAttr attr; + int count; +} tsem_t; + +// #define tsem_t sem_t +int tsem_init(tsem_t* sem, int pshared, unsigned int value); +int tsem_wait(tsem_t* sem); +int tsem_timewait(tsem_t* sem, int64_t milis); +int tsem_post(tsem_t* sem); +int tsem_destroy(tsem_t* sem); #endif diff --git a/include/os/osThread.h b/include/os/osThread.h index 4ef4550419..38c1b366f0 100644 --- a/include/os/osThread.h +++ b/include/os/osThread.h @@ -222,6 +222,7 @@ int32_t taosThreadCondTimedWait(TdThreadCond *cond, TdThreadMutex *mutex, const int32_t taosThreadCondAttrDestroy(TdThreadCondAttr *attr); int32_t taosThreadCondAttrGetPshared(const TdThreadCondAttr *attr, int32_t *pshared); int32_t taosThreadCondAttrInit(TdThreadCondAttr *attr); +int32_t taosThreadCondAttrSetclock(TdThreadCondAttr *attr, int clockId); int32_t taosThreadCondAttrSetPshared(TdThreadCondAttr *attr, int32_t pshared); int32_t taosThreadDetach(TdThread thread); int32_t taosThreadEqual(TdThread t1, TdThread t2); diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index 7d1cc746ff..21b255ca32 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -215,29 +215,85 @@ int32_t taosGetAppName(char* name, int32_t* len) { return 0; } -int32_t tsem_wait(tsem_t* sem) { - int ret = 0; - do { - ret = sem_wait(sem); - } while (ret != 0 && errno == EINTR); +int tsem_init(tsem_t* sem, int pshared, unsigned int value) { + int ret = taosThreadMutexInit(&sem->mutex, NULL); + if (ret != 0) return ret; + ret = taosThreadCondAttrInit(&sem->attr); + if (ret != 0) return ret; + ret = taosThreadCondAttrSetclock(&sem->attr, CLOCK_MONOTONIC); + if (ret != 0) return ret; + ret = taosThreadCondInit(&sem->cond, &sem->attr); + if (ret != 0) return ret; + + sem->count = value; + return 0; +} + +int tsem_post(tsem_t *sem) { + taosThreadMutexLock(&sem->mutex); + sem->count++; + taosThreadCondSignal(&sem->cond); + taosThreadMutexUnlock(&sem->mutex); + return 0; +} + +int tsem_destroy(tsem_t* sem) { + int ret = taosThreadMutexDestroy(&sem->mutex); + if (ret != 0) return ret; + ret = taosThreadCondDestroy(&sem->cond); return ret; + ret = taosThreadCondAttrDestroy(&sem->attr); + return ret; +} + +int32_t tsem_wait(tsem_t* sem) { + taosThreadMutexLock(&sem->mutex); + while (sem->count <= 0) { + struct timespec timeout; + clock_gettime(CLOCK_MONOTONIC, &timeout); + timeout.tv_sec += 9; + + while (sem->count <= 0) { + int ret = taosThreadCondTimedWait(&sem->cond, &sem->mutex, &timeout); + if (ret == ETIMEDOUT) { + continue; + } else { + return ret; + } + } + } + sem->count--; + taosThreadMutexUnlock(&sem->mutex); + return 0; } int32_t tsem_timewait(tsem_t* sem, int64_t ms) { int ret = 0; - struct timespec ts = {0}; + taosThreadMutexLock(&sem->mutex); + if (sem->count <= 0) { + struct timespec ts = {0}; + if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1) { + taosThreadMutexUnlock(&sem->mutex); + return -1; + } - if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { - return -1; + ts.tv_sec += ms / 1000; + ts.tv_nsec += (ms % 1000) * 1000000; + ts.tv_sec += ts.tv_nsec / 1000000000; + ts.tv_nsec %= 1000000000; + + while (sem->count <= 0) { + ret = taosThreadCondTimedWait(&sem->cond, &sem->mutex, &ts); + if (ret != 0) { + taosThreadMutexUnlock(&sem->mutex); + return ret; + } + } } - ts.tv_nsec += ms * 1000000; - ts.tv_sec += ts.tv_nsec / 1000000000; - ts.tv_nsec %= 1000000000; - - while ((ret = sem_timedwait(sem, &ts)) == -1 && errno == EINTR) continue; - + sem->count--; + taosThreadMutexUnlock(&sem->mutex); return ret; } diff --git a/source/os/src/osThread.c b/source/os/src/osThread.c index 0dd5374cf0..40795c320f 100644 --- a/source/os/src/osThread.c +++ b/source/os/src/osThread.c @@ -170,6 +170,14 @@ int32_t taosThreadCondAttrInit(TdThreadCondAttr *attr) { #endif } +int32_t taosThreadCondAttrSetclock(TdThreadCondAttr *attr, int clockId) { +#ifdef __USE_WIN_THREAD + return 0; +#else + return pthread_condattr_setclock(attr, clockId); +#endif +} + int32_t taosThreadCondAttrSetPshared(TdThreadCondAttr *attr, int32_t pshared) { #ifdef __USE_WIN_THREAD return 0; diff --git a/source/os/test/osSemaphoreTests.cpp b/source/os/test/osSemaphoreTests.cpp index 67cf4bb517..c9368d6567 100644 --- a/source/os/test/osSemaphoreTests.cpp +++ b/source/os/test/osSemaphoreTests.cpp @@ -49,12 +49,12 @@ TEST(osSemaphoreTests, Destroy) { } // skip, tsem_wait can not stopped, will block test. -// TEST(osSemaphoreTests, Wait) { -// tsem_t sem; -// tsem_init(&sem, 0, 0); -// ASSERT_EQ(tsem_wait(&sem), -1); -// tsem_destroy(&sem); -// } +TEST(osSemaphoreTests, Wait) { + tsem_t sem; + tsem_init(&sem, 0, 0); + ASSERT_EQ(tsem_wait(&sem), -1); + tsem_destroy(&sem); +} TEST(osSemaphoreTests, WaitTime0) { tsem_t sem; @@ -67,7 +67,7 @@ TEST(osSemaphoreTests, WaitTime1) { tsem_t sem; tsem_init(&sem, 0, 1); EXPECT_EQ(tsem_timewait(&sem, 1000), 0); - EXPECT_NE(tsem_timewait(&sem, 1000), 0); + EXPECT_NE(tsem_timewait(&sem, 10000), 0); tsem_destroy(&sem); }