From 889751e16f2bb562017c26c8714cd55d936b209b Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 14 May 2024 11:04:42 +0800 Subject: [PATCH 1/7] Semaphore on linux --- include/os/osSemaphore.h | 19 ++++--- include/os/osThread.h | 1 + source/os/src/osSemaphore.c | 84 ++++++++++++++++++++++++----- source/os/src/osThread.c | 8 +++ source/os/test/osSemaphoreTests.cpp | 14 ++--- 5 files changed, 99 insertions(+), 27 deletions(-) diff --git a/include/os/osSemaphore.h b/include/os/osSemaphore.h index e26a9d16d1..17dde6a396 100644 --- a/include/os/osSemaphore.h +++ b/include/os/osSemaphore.h @@ -46,12 +46,19 @@ int tsem_destroy(tsem_t *sem); #else -#define tsem_t sem_t -#define tsem_init sem_init -int tsem_wait(tsem_t *sem); -int tsem_timewait(tsem_t *sim, int64_t milis); -#define tsem_post sem_post -#define tsem_destroy sem_destroy +typedef struct tsem_t { + TdThreadMutex mutex; + TdThreadCond cond; + TdThreadCondAttr attr; + int count; +} tsem_t; + +// #define tsem_t sem_t +int tsem_init(tsem_t* sem, int pshared, unsigned int value); +int tsem_wait(tsem_t* sem); +int tsem_timewait(tsem_t* sem, int64_t milis); +int tsem_post(tsem_t* sem); +int tsem_destroy(tsem_t* sem); #endif diff --git a/include/os/osThread.h b/include/os/osThread.h index 4ef4550419..38c1b366f0 100644 --- a/include/os/osThread.h +++ b/include/os/osThread.h @@ -222,6 +222,7 @@ int32_t taosThreadCondTimedWait(TdThreadCond *cond, TdThreadMutex *mutex, const int32_t taosThreadCondAttrDestroy(TdThreadCondAttr *attr); int32_t taosThreadCondAttrGetPshared(const TdThreadCondAttr *attr, int32_t *pshared); int32_t taosThreadCondAttrInit(TdThreadCondAttr *attr); +int32_t taosThreadCondAttrSetclock(TdThreadCondAttr *attr, int clockId); int32_t taosThreadCondAttrSetPshared(TdThreadCondAttr *attr, int32_t pshared); int32_t taosThreadDetach(TdThread thread); int32_t taosThreadEqual(TdThread t1, TdThread t2); diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index 7d1cc746ff..21b255ca32 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -215,29 +215,85 @@ int32_t taosGetAppName(char* name, int32_t* len) { return 0; } -int32_t tsem_wait(tsem_t* sem) { - int ret = 0; - do { - ret = sem_wait(sem); - } while (ret != 0 && errno == EINTR); +int tsem_init(tsem_t* sem, int pshared, unsigned int value) { + int ret = taosThreadMutexInit(&sem->mutex, NULL); + if (ret != 0) return ret; + ret = taosThreadCondAttrInit(&sem->attr); + if (ret != 0) return ret; + ret = taosThreadCondAttrSetclock(&sem->attr, CLOCK_MONOTONIC); + if (ret != 0) return ret; + ret = taosThreadCondInit(&sem->cond, &sem->attr); + if (ret != 0) return ret; + + sem->count = value; + return 0; +} + +int tsem_post(tsem_t *sem) { + taosThreadMutexLock(&sem->mutex); + sem->count++; + taosThreadCondSignal(&sem->cond); + taosThreadMutexUnlock(&sem->mutex); + return 0; +} + +int tsem_destroy(tsem_t* sem) { + int ret = taosThreadMutexDestroy(&sem->mutex); + if (ret != 0) return ret; + ret = taosThreadCondDestroy(&sem->cond); return ret; + ret = taosThreadCondAttrDestroy(&sem->attr); + return ret; +} + +int32_t tsem_wait(tsem_t* sem) { + taosThreadMutexLock(&sem->mutex); + while (sem->count <= 0) { + struct timespec timeout; + clock_gettime(CLOCK_MONOTONIC, &timeout); + timeout.tv_sec += 9; + + while (sem->count <= 0) { + int ret = taosThreadCondTimedWait(&sem->cond, &sem->mutex, &timeout); + if (ret == ETIMEDOUT) { + continue; + } else { + return ret; + } + } + } + sem->count--; + taosThreadMutexUnlock(&sem->mutex); + return 0; } int32_t tsem_timewait(tsem_t* sem, int64_t ms) { int ret = 0; - struct timespec ts = {0}; + taosThreadMutexLock(&sem->mutex); + if (sem->count <= 0) { + struct timespec ts = {0}; + if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1) { + taosThreadMutexUnlock(&sem->mutex); + return -1; + } - if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { - return -1; + ts.tv_sec += ms / 1000; + ts.tv_nsec += (ms % 1000) * 1000000; + ts.tv_sec += ts.tv_nsec / 1000000000; + ts.tv_nsec %= 1000000000; + + while (sem->count <= 0) { + ret = taosThreadCondTimedWait(&sem->cond, &sem->mutex, &ts); + if (ret != 0) { + taosThreadMutexUnlock(&sem->mutex); + return ret; + } + } } - ts.tv_nsec += ms * 1000000; - ts.tv_sec += ts.tv_nsec / 1000000000; - ts.tv_nsec %= 1000000000; - - while ((ret = sem_timedwait(sem, &ts)) == -1 && errno == EINTR) continue; - + sem->count--; + taosThreadMutexUnlock(&sem->mutex); return ret; } diff --git a/source/os/src/osThread.c b/source/os/src/osThread.c index 0dd5374cf0..40795c320f 100644 --- a/source/os/src/osThread.c +++ b/source/os/src/osThread.c @@ -170,6 +170,14 @@ int32_t taosThreadCondAttrInit(TdThreadCondAttr *attr) { #endif } +int32_t taosThreadCondAttrSetclock(TdThreadCondAttr *attr, int clockId) { +#ifdef __USE_WIN_THREAD + return 0; +#else + return pthread_condattr_setclock(attr, clockId); +#endif +} + int32_t taosThreadCondAttrSetPshared(TdThreadCondAttr *attr, int32_t pshared) { #ifdef __USE_WIN_THREAD return 0; diff --git a/source/os/test/osSemaphoreTests.cpp b/source/os/test/osSemaphoreTests.cpp index 67cf4bb517..c9368d6567 100644 --- a/source/os/test/osSemaphoreTests.cpp +++ b/source/os/test/osSemaphoreTests.cpp @@ -49,12 +49,12 @@ TEST(osSemaphoreTests, Destroy) { } // skip, tsem_wait can not stopped, will block test. -// TEST(osSemaphoreTests, Wait) { -// tsem_t sem; -// tsem_init(&sem, 0, 0); -// ASSERT_EQ(tsem_wait(&sem), -1); -// tsem_destroy(&sem); -// } +TEST(osSemaphoreTests, Wait) { + tsem_t sem; + tsem_init(&sem, 0, 0); + ASSERT_EQ(tsem_wait(&sem), -1); + tsem_destroy(&sem); +} TEST(osSemaphoreTests, WaitTime0) { tsem_t sem; @@ -67,7 +67,7 @@ TEST(osSemaphoreTests, WaitTime1) { tsem_t sem; tsem_init(&sem, 0, 1); EXPECT_EQ(tsem_timewait(&sem, 1000), 0); - EXPECT_NE(tsem_timewait(&sem, 1000), 0); + EXPECT_NE(tsem_timewait(&sem, 10000), 0); tsem_destroy(&sem); } From 3c39f76365d6307c28c11f43347c69fe2aa54295 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 14 May 2024 16:52:53 +0800 Subject: [PATCH 2/7] tsem_wait --- source/os/src/osSemaphore.c | 17 ++++++----------- source/os/test/osSemaphoreTests.cpp | 13 ++++++------- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index 21b255ca32..8f315238c8 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -249,17 +249,12 @@ int tsem_destroy(tsem_t* sem) { int32_t tsem_wait(tsem_t* sem) { taosThreadMutexLock(&sem->mutex); while (sem->count <= 0) { - struct timespec timeout; - clock_gettime(CLOCK_MONOTONIC, &timeout); - timeout.tv_sec += 9; - - while (sem->count <= 0) { - int ret = taosThreadCondTimedWait(&sem->cond, &sem->mutex, &timeout); - if (ret == ETIMEDOUT) { - continue; - } else { - return ret; - } + int ret = taosThreadCondWait(&sem->cond, &sem->mutex); + if (0 == ret) { + continue; + } else { + taosThreadMutexUnlock(&sem->mutex); + return ret; } } sem->count--; diff --git a/source/os/test/osSemaphoreTests.cpp b/source/os/test/osSemaphoreTests.cpp index c9368d6567..3a7edc7c8a 100644 --- a/source/os/test/osSemaphoreTests.cpp +++ b/source/os/test/osSemaphoreTests.cpp @@ -49,12 +49,12 @@ TEST(osSemaphoreTests, Destroy) { } // skip, tsem_wait can not stopped, will block test. -TEST(osSemaphoreTests, Wait) { - tsem_t sem; - tsem_init(&sem, 0, 0); - ASSERT_EQ(tsem_wait(&sem), -1); - tsem_destroy(&sem); -} +// TEST(osSemaphoreTests, Wait) { +// tsem_t sem; +// tsem_init(&sem, 0, 0); +// ASSERT_EQ(tsem_wait(&sem), -1); +// tsem_destroy(&sem); +// } TEST(osSemaphoreTests, WaitTime0) { tsem_t sem; @@ -71,7 +71,6 @@ TEST(osSemaphoreTests, WaitTime1) { tsem_destroy(&sem); } - TEST(osSemaphoreTests, WaitAndPost) { tsem_t sem; int result = tsem_init(&sem, 0, 0); From 4db19b521546c7edd4034a31af4b1982f1a31e4c Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 14 May 2024 17:03:22 +0800 Subject: [PATCH 3/7] destory --- source/os/src/osSemaphore.c | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index 8f315238c8..12f6c5bf58 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -219,11 +219,25 @@ int tsem_init(tsem_t* sem, int pshared, unsigned int value) { int ret = taosThreadMutexInit(&sem->mutex, NULL); if (ret != 0) return ret; ret = taosThreadCondAttrInit(&sem->attr); - if (ret != 0) return ret; + if (ret != 0) + { + taosThreadMutexDestroy(&sem->mutex); + return ret; + } ret = taosThreadCondAttrSetclock(&sem->attr, CLOCK_MONOTONIC); - if (ret != 0) return ret; + if (ret != 0) + { + taosThreadMutexDestroy(&sem->mutex); + taosThreadCondAttrDestroy(&sem->attr); + return ret; + } ret = taosThreadCondInit(&sem->cond, &sem->attr); - if (ret != 0) return ret; + if (ret != 0) + { + taosThreadMutexDestroy(&sem->mutex); + taosThreadCondAttrDestroy(&sem->attr); + return ret; + } sem->count = value; return 0; @@ -238,12 +252,10 @@ int tsem_post(tsem_t *sem) { } int tsem_destroy(tsem_t* sem) { - int ret = taosThreadMutexDestroy(&sem->mutex); - if (ret != 0) return ret; - ret = taosThreadCondDestroy(&sem->cond); - return ret; - ret = taosThreadCondAttrDestroy(&sem->attr); - return ret; + taosThreadMutexDestroy(&sem->mutex); + taosThreadCondDestroy(&sem->cond); + taosThreadCondAttrDestroy(&sem->attr); + return 0; } int32_t tsem_wait(tsem_t* sem) { From 36cb98e79ee2f2eaf75fc01dbe4987dcd76fbcd8 Mon Sep 17 00:00:00 2001 From: facetosea <25808407@qq.com> Date: Tue, 14 May 2024 21:21:22 +0800 Subject: [PATCH 4/7] build on mac --- source/os/src/osThread.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/os/src/osThread.c b/source/os/src/osThread.c index 40795c320f..0acd6f67f5 100644 --- a/source/os/src/osThread.c +++ b/source/os/src/osThread.c @@ -173,6 +173,8 @@ int32_t taosThreadCondAttrInit(TdThreadCondAttr *attr) { int32_t taosThreadCondAttrSetclock(TdThreadCondAttr *attr, int clockId) { #ifdef __USE_WIN_THREAD return 0; +#elif defined(__APPLE__) + return 0; #else return pthread_condattr_setclock(attr, clockId); #endif From e1ae87bbf55cace33ad1bd42e17e9c3b8b755931 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Wed, 15 May 2024 14:57:14 +0800 Subject: [PATCH 5/7] use tsem2 --- include/os/osSemaphore.h | 37 +++++-- source/client/src/clientTmq.c | 18 ++-- source/libs/transport/inc/transComm.h | 2 +- source/libs/transport/src/transCli.c | 8 +- source/libs/transport/src/transComm.c | 2 +- source/os/src/osSemaphore.c | 36 ++++++- source/os/test/osSemaphoreTests.cpp | 134 +++++++++++++++++++++++++- 7 files changed, 208 insertions(+), 29 deletions(-) diff --git a/include/os/osSemaphore.h b/include/os/osSemaphore.h index 17dde6a396..523d6e0e61 100644 --- a/include/os/osSemaphore.h +++ b/include/os/osSemaphore.h @@ -33,6 +33,13 @@ int tsem_timewait(tsem_t *sim, int64_t milis); int tsem_post(tsem_t *sem); int tsem_destroy(tsem_t *sem); +#define tsem2_t sem_t +#define tsem2_init sem_init +#define tsem2_wait tsem_wait +#define tsem2_timewait tsem_timewait +#define tsem2_post sem_post +#define tsem2_destroy sem_destroy + #elif defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #include @@ -44,21 +51,35 @@ int tsem_timewait(tsem_t *sim, int64_t milis); int tsem_post(tsem_t *sem); int tsem_destroy(tsem_t *sem); +#define tsem2_t sem_t +#define tsem2_init sem_init +#define tsem2_wait tsem_wait +#define tsem2_timewait tsem_timewait +#define tsem2_post sem_post +#define tsem2_destroy sem_destroy + #else -typedef struct tsem_t { +#define tsem_t sem_t +#define tsem_init sem_init +int tsem_wait(tsem_t *sem); +int tsem_timewait(tsem_t *sim, int64_t milis); +#define tsem_post sem_post +#define tsem_destroy sem_destroy + +typedef struct tsem2_t { TdThreadMutex mutex; TdThreadCond cond; TdThreadCondAttr attr; int count; -} tsem_t; +} tsem2_t; -// #define tsem_t sem_t -int tsem_init(tsem_t* sem, int pshared, unsigned int value); -int tsem_wait(tsem_t* sem); -int tsem_timewait(tsem_t* sem, int64_t milis); -int tsem_post(tsem_t* sem); -int tsem_destroy(tsem_t* sem); +// #define tsem2_t sem_t +int tsem2_init(tsem2_t* sem, int pshared, unsigned int value); +int tsem2_wait(tsem2_t* sem); +int tsem2_timewait(tsem2_t* sem, int64_t milis); +int tsem2_post(tsem2_t* sem); +int tsem2_destroy(tsem2_t* sem); #endif diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 2a94c7a9c0..907a6ebe3c 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -105,7 +105,7 @@ struct tmq_t { STaosQueue* mqueue; // queue of rsp STaosQall* qall; STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit - tsem_t rspSem; + tsem2_t rspSem; }; typedef struct SAskEpInfo { @@ -727,7 +727,7 @@ static void generateTimedTask(int64_t refId, int32_t type) { *pTaskType = type; taosWriteQitem(tmq->delayedTask, pTaskType); - tsem_post(&tmq->rspSem); + tsem2_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); } @@ -742,7 +742,7 @@ void tmqReplayTask(void* param, void* tmrId) { tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) goto END; - tsem_post(&tmq->rspSem); + tsem2_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); END: taosMemoryFree(param); @@ -1033,7 +1033,7 @@ void tmqFreeImpl(void* handle) { } taosFreeQall(tmq->qall); - tsem_destroy(&tmq->rspSem); + tsem2_destroy(&tmq->rspSem); taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl); taos_close_internal(tmq->pTscObj); @@ -1121,7 +1121,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->consumerId = tGenIdPI64(); // init semaphore - if (tsem_init(&pTmq->rspSem, 0, 0) != 0) { + if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) { tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId); SET_ERROR_MSG_TMQ("init t_sem failed") @@ -1132,7 +1132,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ); if (pTmq->pTscObj == NULL) { tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); - tsem_destroy(&pTmq->rspSem); + tsem2_destroy(&pTmq->rspSem); SET_ERROR_MSG_TMQ("init tscObj failed") goto _failed; } @@ -1434,7 +1434,7 @@ END: taosReleaseRef(tmqMgmt.rsetId, refId); FAIL: - if (tmq) tsem_post(&tmq->rspSem); + if (tmq) tsem2_post(&tmq->rspSem); taosMemoryFree(pParam); if (pMsg) taosMemoryFreeClear(pMsg->pData); if (pMsg) taosMemoryFreeClear(pMsg->pEpSet); @@ -2132,10 +2132,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { tmq->consumerId, tmq->epoch, startTime, currentTime); return NULL; } - tsem_timewait(&tmq->rspSem, (timeout - elapsedTime)); + tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime)); } else { // use tsem_timewait instead of tsem_wait to avoid unexpected stuck - tsem_timewait(&tmq->rspSem, 1000); + tsem2_timewait(&tmq->rspSem, 1000); } } } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index da6d71e07b..e44a4ebac8 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -123,7 +123,7 @@ typedef struct { STransMsg* pRsp; SEpSet epSet; int8_t hasEpSet; - tsem_t* pSem; + tsem2_t* pSem; int8_t inited; SRWLatch latch; } STransSyncMsg; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index dfd7630f35..bfae9a7111 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2450,7 +2450,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { pSyncMsg->hasEpSet = 1; epsetAssign(&pSyncMsg->epSet, &pCtx->epSet); } - tsem_post(pSyncMsg->pSem); + tsem2_post(pSyncMsg->pSem); taosReleaseRef(transGetSyncMsgMgt(), pCtx->syncMsgRef); } else { rpcFreeCont(pResp->pCont); @@ -2679,8 +2679,8 @@ _RETURN: return ret; } int64_t transCreateSyncMsg(STransMsg* pTransMsg) { - tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); - tsem_init(sem, 0, 0); + tsem2_t* sem = taosMemoryCalloc(1, sizeof(tsem2_t)); + tsem2_init(sem, 0, 0); STransSyncMsg* pSyncMsg = taosMemoryCalloc(1, sizeof(STransSyncMsg)); @@ -2740,7 +2740,7 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr goto _RETURN; } - ret = tsem_timewait(pSyncMsg->pSem, timeoutMs); + ret = tsem2_timewait(pSyncMsg->pSem, timeoutMs); if (ret < 0) { pRsp->code = TSDB_CODE_TIMEOUT_ERROR; ret = TSDB_CODE_TIMEOUT_ERROR; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index fff13e7ebb..6584814053 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -681,7 +681,7 @@ void transDestroySyncMsg(void* msg) { if (msg == NULL) return; STransSyncMsg* pSyncMsg = msg; - tsem_destroy(pSyncMsg->pSem); + tsem2_destroy(pSyncMsg->pSem); taosMemoryFree(pSyncMsg->pSem); transFreeMsg(pSyncMsg->pRsp->pCont); taosMemoryFree(pSyncMsg->pRsp); diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index 12f6c5bf58..dd962a157f 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -215,7 +215,33 @@ int32_t taosGetAppName(char* name, int32_t* len) { return 0; } -int tsem_init(tsem_t* sem, int pshared, unsigned int value) { +int32_t tsem_timewait(tsem_t* sem, int64_t ms) { + int ret = 0; + + struct timespec ts = {0}; + + if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { + return -1; + } + + ts.tv_nsec += ms * 1000000; + ts.tv_sec += ts.tv_nsec / 1000000000; + ts.tv_nsec %= 1000000000; + + while ((ret = sem_timedwait(sem, &ts)) == -1 && errno == EINTR) continue; + + return ret; +} + +int32_t tsem_wait(tsem_t* sem) { + int ret = 0; + do { + ret = sem_wait(sem); + } while (ret != 0 && errno == EINTR); + return ret; +} + +int tsem2_init(tsem2_t* sem, int pshared, unsigned int value) { int ret = taosThreadMutexInit(&sem->mutex, NULL); if (ret != 0) return ret; ret = taosThreadCondAttrInit(&sem->attr); @@ -243,7 +269,7 @@ int tsem_init(tsem_t* sem, int pshared, unsigned int value) { return 0; } -int tsem_post(tsem_t *sem) { +int tsem2_post(tsem2_t *sem) { taosThreadMutexLock(&sem->mutex); sem->count++; taosThreadCondSignal(&sem->cond); @@ -251,14 +277,14 @@ int tsem_post(tsem_t *sem) { return 0; } -int tsem_destroy(tsem_t* sem) { +int tsem2_destroy(tsem2_t* sem) { taosThreadMutexDestroy(&sem->mutex); taosThreadCondDestroy(&sem->cond); taosThreadCondAttrDestroy(&sem->attr); return 0; } -int32_t tsem_wait(tsem_t* sem) { +int32_t tsem2_wait(tsem2_t* sem) { taosThreadMutexLock(&sem->mutex); while (sem->count <= 0) { int ret = taosThreadCondWait(&sem->cond, &sem->mutex); @@ -274,7 +300,7 @@ int32_t tsem_wait(tsem_t* sem) { return 0; } -int32_t tsem_timewait(tsem_t* sem, int64_t ms) { +int32_t tsem2_timewait(tsem2_t* sem, int64_t ms) { int ret = 0; taosThreadMutexLock(&sem->mutex); diff --git a/source/os/test/osSemaphoreTests.cpp b/source/os/test/osSemaphoreTests.cpp index 3a7edc7c8a..30f2c497d7 100644 --- a/source/os/test/osSemaphoreTests.cpp +++ b/source/os/test/osSemaphoreTests.cpp @@ -67,7 +67,7 @@ TEST(osSemaphoreTests, WaitTime1) { tsem_t sem; tsem_init(&sem, 0, 1); EXPECT_EQ(tsem_timewait(&sem, 1000), 0); - EXPECT_NE(tsem_timewait(&sem, 10000), 0); + EXPECT_NE(tsem_timewait(&sem, 1000), 0); tsem_destroy(&sem); } @@ -105,3 +105,135 @@ TEST(osSemaphoreTests, TimedWait) { result = tsem_destroy(&sem); EXPECT_EQ(result, 0); } + +TEST(osSemaphoreTests, Performance1_1) { + tsem_t sem; + const int count = 100000; + + tsem_init(&sem, 0, 0); + std::thread([&sem]() { + for (int i = 0; i < count; ++i) { + tsem_post(&sem); + } + }).detach(); + + for (int i = 0; i < count; ++i) { + tsem_wait(&sem); + } + tsem_destroy(&sem); +} + +TEST(osSemaphoreTests, Performance1_2) { + tsem2_t sem; + const int count = 100000; + + tsem2_init(&sem, 0, 0); + std::thread([&sem]() { + for (int i = 0; i < count; ++i) { + tsem2_post(&sem); + } + }).detach(); + + for (int i = 0; i < count; ++i) { + tsem2_wait(&sem); + } + tsem2_destroy(&sem); +} + +TEST(osSemaphoreTests, Performance2_1) { + tsem_t sem; + const int count = 50000; + + tsem_init(&sem, 0, 0); + std::thread([&sem]() { + for (int i = 0; i < count; ++i) { + tsem_post(&sem); + } + }).detach(); + + std::thread([&sem]() { + for (int i = 0; i < count; ++i) { + tsem_post(&sem); + } + }).detach(); + + for (int i = 0; i < count * 2; ++i) { + tsem_wait(&sem); + } + tsem_destroy(&sem); +} + +TEST(osSemaphoreTests, Performance2_2) { + tsem2_t sem; + const int count = 50000; + + tsem2_init(&sem, 0, 0); + std::thread([&sem]() { + for (int i = 0; i < count; ++i) { + tsem2_post(&sem); + } + }).detach(); + + std::thread([&sem]() { + for (int i = 0; i < count; ++i) { + tsem2_post(&sem); + } + }).detach(); + + for (int i = 0; i < count * 2; ++i) { + tsem2_wait(&sem); + } + tsem2_destroy(&sem); +} + +TEST(osSemaphoreTests, Performance3_1) { + const int count = 100000; + + for (int i = 0; i < count; ++i) { + tsem_t sem; + tsem_init(&sem, 0, 1); + EXPECT_EQ(tsem_timewait(&sem, 1000), 0); + tsem_destroy(&sem); + } +} + +TEST(osSemaphoreTests, Performance3_2) { + const int count = 100000; + + for (int i = 0; i < count; ++i) { + tsem2_t sem; + tsem2_init(&sem, 0, 1); + EXPECT_EQ(tsem2_timewait(&sem, 1000), 0); + tsem2_destroy(&sem); + } +} + +TEST(osSemaphoreTests, Performance4_1) { + const int count = 1000; + for (int i = 0; i < count; ++i) { + tsem_t sem; + tsem_init(&sem, 0, 0); + std::thread([&sem]() { + tsem_post(&sem); + }).detach(); + + tsem_timewait(&sem, 1000); + + tsem_destroy(&sem); + } +} + +TEST(osSemaphoreTests, Performance4_2) { + const int count = 1000; + for (int i = 0; i < count; ++i) { + tsem2_t sem; + tsem2_init(&sem, 0, 0); + std::thread([&sem]() { + tsem2_post(&sem); + }).detach(); + + tsem2_timewait(&sem, 1000); + + tsem2_destroy(&sem); + } +} From 9959c0cb2b1566e7bbd52d6975f8fedca61a1a80 Mon Sep 17 00:00:00 2001 From: facetosea <25808407@qq.com> Date: Wed, 15 May 2024 16:18:36 +0800 Subject: [PATCH 6/7] build failed --- include/os/osSemaphore.h | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/include/os/osSemaphore.h b/include/os/osSemaphore.h index 523d6e0e61..5b46706790 100644 --- a/include/os/osSemaphore.h +++ b/include/os/osSemaphore.h @@ -33,12 +33,12 @@ int tsem_timewait(tsem_t *sim, int64_t milis); int tsem_post(tsem_t *sem); int tsem_destroy(tsem_t *sem); -#define tsem2_t sem_t -#define tsem2_init sem_init +#define tsem2_t tsem_t +#define tsem2_init tsem_init #define tsem2_wait tsem_wait #define tsem2_timewait tsem_timewait -#define tsem2_post sem_post -#define tsem2_destroy sem_destroy +#define tsem2_post tsem_post +#define tsem2_destroy tsem_destroy #elif defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #include @@ -51,12 +51,12 @@ int tsem_timewait(tsem_t *sim, int64_t milis); int tsem_post(tsem_t *sem); int tsem_destroy(tsem_t *sem); -#define tsem2_t sem_t -#define tsem2_init sem_init +#define tsem2_t tsem_t +#define tsem2_init tsem_init #define tsem2_wait tsem_wait #define tsem2_timewait tsem_timewait -#define tsem2_post sem_post -#define tsem2_destroy sem_destroy +#define tsem2_post tsem_post +#define tsem2_destroy tsem_destroy #else From b8b16b1cb839603f4a02cb9de84c333f5d2d0935 Mon Sep 17 00:00:00 2001 From: facetosea <25808407@qq.com> Date: Wed, 15 May 2024 22:24:08 +0800 Subject: [PATCH 7/7] fix test case on windows --- source/os/test/osSemaphoreTests.cpp | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/source/os/test/osSemaphoreTests.cpp b/source/os/test/osSemaphoreTests.cpp index 30f2c497d7..09fbe27f55 100644 --- a/source/os/test/osSemaphoreTests.cpp +++ b/source/os/test/osSemaphoreTests.cpp @@ -111,7 +111,7 @@ TEST(osSemaphoreTests, Performance1_1) { const int count = 100000; tsem_init(&sem, 0, 0); - std::thread([&sem]() { + std::thread([&sem, count]() { for (int i = 0; i < count; ++i) { tsem_post(&sem); } @@ -128,7 +128,7 @@ TEST(osSemaphoreTests, Performance1_2) { const int count = 100000; tsem2_init(&sem, 0, 0); - std::thread([&sem]() { + std::thread([&sem, count]() { for (int i = 0; i < count; ++i) { tsem2_post(&sem); } @@ -145,13 +145,13 @@ TEST(osSemaphoreTests, Performance2_1) { const int count = 50000; tsem_init(&sem, 0, 0); - std::thread([&sem]() { + std::thread([&sem, count]() { for (int i = 0; i < count; ++i) { tsem_post(&sem); } }).detach(); - std::thread([&sem]() { + std::thread([&sem, count]() { for (int i = 0; i < count; ++i) { tsem_post(&sem); } @@ -168,13 +168,13 @@ TEST(osSemaphoreTests, Performance2_2) { const int count = 50000; tsem2_init(&sem, 0, 0); - std::thread([&sem]() { + std::thread([&sem, count]() { for (int i = 0; i < count; ++i) { tsem2_post(&sem); } }).detach(); - std::thread([&sem]() { + std::thread([&sem, count]() { for (int i = 0; i < count; ++i) { tsem2_post(&sem); } @@ -213,7 +213,7 @@ TEST(osSemaphoreTests, Performance4_1) { for (int i = 0; i < count; ++i) { tsem_t sem; tsem_init(&sem, 0, 0); - std::thread([&sem]() { + std::thread([&sem, count]() { tsem_post(&sem); }).detach(); @@ -228,7 +228,7 @@ TEST(osSemaphoreTests, Performance4_2) { for (int i = 0; i < count; ++i) { tsem2_t sem; tsem2_init(&sem, 0, 0); - std::thread([&sem]() { + std::thread([&sem, count]() { tsem2_post(&sem); }).detach();