diff --git a/include/os/osSemaphore.h b/include/os/osSemaphore.h index 17dde6a396..523d6e0e61 100644 --- a/include/os/osSemaphore.h +++ b/include/os/osSemaphore.h @@ -33,6 +33,13 @@ int tsem_timewait(tsem_t *sim, int64_t milis); int tsem_post(tsem_t *sem); int tsem_destroy(tsem_t *sem); +#define tsem2_t sem_t +#define tsem2_init sem_init +#define tsem2_wait tsem_wait +#define tsem2_timewait tsem_timewait +#define tsem2_post sem_post +#define tsem2_destroy sem_destroy + #elif defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #include @@ -44,21 +51,35 @@ int tsem_timewait(tsem_t *sim, int64_t milis); int tsem_post(tsem_t *sem); int tsem_destroy(tsem_t *sem); +#define tsem2_t sem_t +#define tsem2_init sem_init +#define tsem2_wait tsem_wait +#define tsem2_timewait tsem_timewait +#define tsem2_post sem_post +#define tsem2_destroy sem_destroy + #else -typedef struct tsem_t { +#define tsem_t sem_t +#define tsem_init sem_init +int tsem_wait(tsem_t *sem); +int tsem_timewait(tsem_t *sim, int64_t milis); +#define tsem_post sem_post +#define tsem_destroy sem_destroy + +typedef struct tsem2_t { TdThreadMutex mutex; TdThreadCond cond; TdThreadCondAttr attr; int count; -} tsem_t; +} tsem2_t; -// #define tsem_t sem_t -int tsem_init(tsem_t* sem, int pshared, unsigned int value); -int tsem_wait(tsem_t* sem); -int tsem_timewait(tsem_t* sem, int64_t milis); -int tsem_post(tsem_t* sem); -int tsem_destroy(tsem_t* sem); +// #define tsem2_t sem_t +int tsem2_init(tsem2_t* sem, int pshared, unsigned int value); +int tsem2_wait(tsem2_t* sem); +int tsem2_timewait(tsem2_t* sem, int64_t milis); +int tsem2_post(tsem2_t* sem); +int tsem2_destroy(tsem2_t* sem); #endif diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 2a94c7a9c0..907a6ebe3c 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -105,7 +105,7 @@ struct tmq_t { STaosQueue* mqueue; // queue of rsp STaosQall* qall; STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit - tsem_t rspSem; + tsem2_t rspSem; }; typedef struct SAskEpInfo { @@ -727,7 +727,7 @@ static void generateTimedTask(int64_t refId, int32_t type) { *pTaskType = type; taosWriteQitem(tmq->delayedTask, pTaskType); - tsem_post(&tmq->rspSem); + tsem2_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); } @@ -742,7 +742,7 @@ void tmqReplayTask(void* param, void* tmrId) { tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) goto END; - tsem_post(&tmq->rspSem); + tsem2_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); END: taosMemoryFree(param); @@ -1033,7 +1033,7 @@ void tmqFreeImpl(void* handle) { } taosFreeQall(tmq->qall); - tsem_destroy(&tmq->rspSem); + tsem2_destroy(&tmq->rspSem); taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl); taos_close_internal(tmq->pTscObj); @@ -1121,7 +1121,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->consumerId = tGenIdPI64(); // init semaphore - if (tsem_init(&pTmq->rspSem, 0, 0) != 0) { + if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) { tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId); SET_ERROR_MSG_TMQ("init t_sem failed") @@ -1132,7 +1132,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ); if (pTmq->pTscObj == NULL) { tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); - tsem_destroy(&pTmq->rspSem); + tsem2_destroy(&pTmq->rspSem); SET_ERROR_MSG_TMQ("init tscObj failed") goto _failed; } @@ -1434,7 +1434,7 @@ END: taosReleaseRef(tmqMgmt.rsetId, refId); FAIL: - if (tmq) tsem_post(&tmq->rspSem); + if (tmq) tsem2_post(&tmq->rspSem); taosMemoryFree(pParam); if (pMsg) taosMemoryFreeClear(pMsg->pData); if (pMsg) taosMemoryFreeClear(pMsg->pEpSet); @@ -2132,10 +2132,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { tmq->consumerId, tmq->epoch, startTime, currentTime); return NULL; } - tsem_timewait(&tmq->rspSem, (timeout - elapsedTime)); + tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime)); } else { // use tsem_timewait instead of tsem_wait to avoid unexpected stuck - tsem_timewait(&tmq->rspSem, 1000); + tsem2_timewait(&tmq->rspSem, 1000); } } } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index da6d71e07b..e44a4ebac8 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -123,7 +123,7 @@ typedef struct { STransMsg* pRsp; SEpSet epSet; int8_t hasEpSet; - tsem_t* pSem; + tsem2_t* pSem; int8_t inited; SRWLatch latch; } STransSyncMsg; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index dfd7630f35..bfae9a7111 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2450,7 +2450,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { pSyncMsg->hasEpSet = 1; epsetAssign(&pSyncMsg->epSet, &pCtx->epSet); } - tsem_post(pSyncMsg->pSem); + tsem2_post(pSyncMsg->pSem); taosReleaseRef(transGetSyncMsgMgt(), pCtx->syncMsgRef); } else { rpcFreeCont(pResp->pCont); @@ -2679,8 +2679,8 @@ _RETURN: return ret; } int64_t transCreateSyncMsg(STransMsg* pTransMsg) { - tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); - tsem_init(sem, 0, 0); + tsem2_t* sem = taosMemoryCalloc(1, sizeof(tsem2_t)); + tsem2_init(sem, 0, 0); STransSyncMsg* pSyncMsg = taosMemoryCalloc(1, sizeof(STransSyncMsg)); @@ -2740,7 +2740,7 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr goto _RETURN; } - ret = tsem_timewait(pSyncMsg->pSem, timeoutMs); + ret = tsem2_timewait(pSyncMsg->pSem, timeoutMs); if (ret < 0) { pRsp->code = TSDB_CODE_TIMEOUT_ERROR; ret = TSDB_CODE_TIMEOUT_ERROR; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index fff13e7ebb..6584814053 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -681,7 +681,7 @@ void transDestroySyncMsg(void* msg) { if (msg == NULL) return; STransSyncMsg* pSyncMsg = msg; - tsem_destroy(pSyncMsg->pSem); + tsem2_destroy(pSyncMsg->pSem); taosMemoryFree(pSyncMsg->pSem); transFreeMsg(pSyncMsg->pRsp->pCont); taosMemoryFree(pSyncMsg->pRsp); diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index 12f6c5bf58..dd962a157f 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -215,7 +215,33 @@ int32_t taosGetAppName(char* name, int32_t* len) { return 0; } -int tsem_init(tsem_t* sem, int pshared, unsigned int value) { +int32_t tsem_timewait(tsem_t* sem, int64_t ms) { + int ret = 0; + + struct timespec ts = {0}; + + if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { + return -1; + } + + ts.tv_nsec += ms * 1000000; + ts.tv_sec += ts.tv_nsec / 1000000000; + ts.tv_nsec %= 1000000000; + + while ((ret = sem_timedwait(sem, &ts)) == -1 && errno == EINTR) continue; + + return ret; +} + +int32_t tsem_wait(tsem_t* sem) { + int ret = 0; + do { + ret = sem_wait(sem); + } while (ret != 0 && errno == EINTR); + return ret; +} + +int tsem2_init(tsem2_t* sem, int pshared, unsigned int value) { int ret = taosThreadMutexInit(&sem->mutex, NULL); if (ret != 0) return ret; ret = taosThreadCondAttrInit(&sem->attr); @@ -243,7 +269,7 @@ int tsem_init(tsem_t* sem, int pshared, unsigned int value) { return 0; } -int tsem_post(tsem_t *sem) { +int tsem2_post(tsem2_t *sem) { taosThreadMutexLock(&sem->mutex); sem->count++; taosThreadCondSignal(&sem->cond); @@ -251,14 +277,14 @@ int tsem_post(tsem_t *sem) { return 0; } -int tsem_destroy(tsem_t* sem) { +int tsem2_destroy(tsem2_t* sem) { taosThreadMutexDestroy(&sem->mutex); taosThreadCondDestroy(&sem->cond); taosThreadCondAttrDestroy(&sem->attr); return 0; } -int32_t tsem_wait(tsem_t* sem) { +int32_t tsem2_wait(tsem2_t* sem) { taosThreadMutexLock(&sem->mutex); while (sem->count <= 0) { int ret = taosThreadCondWait(&sem->cond, &sem->mutex); @@ -274,7 +300,7 @@ int32_t tsem_wait(tsem_t* sem) { return 0; } -int32_t tsem_timewait(tsem_t* sem, int64_t ms) { +int32_t tsem2_timewait(tsem2_t* sem, int64_t ms) { int ret = 0; taosThreadMutexLock(&sem->mutex); diff --git a/source/os/test/osSemaphoreTests.cpp b/source/os/test/osSemaphoreTests.cpp index 3a7edc7c8a..30f2c497d7 100644 --- a/source/os/test/osSemaphoreTests.cpp +++ b/source/os/test/osSemaphoreTests.cpp @@ -67,7 +67,7 @@ TEST(osSemaphoreTests, WaitTime1) { tsem_t sem; tsem_init(&sem, 0, 1); EXPECT_EQ(tsem_timewait(&sem, 1000), 0); - EXPECT_NE(tsem_timewait(&sem, 10000), 0); + EXPECT_NE(tsem_timewait(&sem, 1000), 0); tsem_destroy(&sem); } @@ -105,3 +105,135 @@ TEST(osSemaphoreTests, TimedWait) { result = tsem_destroy(&sem); EXPECT_EQ(result, 0); } + +TEST(osSemaphoreTests, Performance1_1) { + tsem_t sem; + const int count = 100000; + + tsem_init(&sem, 0, 0); + std::thread([&sem]() { + for (int i = 0; i < count; ++i) { + tsem_post(&sem); + } + }).detach(); + + for (int i = 0; i < count; ++i) { + tsem_wait(&sem); + } + tsem_destroy(&sem); +} + +TEST(osSemaphoreTests, Performance1_2) { + tsem2_t sem; + const int count = 100000; + + tsem2_init(&sem, 0, 0); + std::thread([&sem]() { + for (int i = 0; i < count; ++i) { + tsem2_post(&sem); + } + }).detach(); + + for (int i = 0; i < count; ++i) { + tsem2_wait(&sem); + } + tsem2_destroy(&sem); +} + +TEST(osSemaphoreTests, Performance2_1) { + tsem_t sem; + const int count = 50000; + + tsem_init(&sem, 0, 0); + std::thread([&sem]() { + for (int i = 0; i < count; ++i) { + tsem_post(&sem); + } + }).detach(); + + std::thread([&sem]() { + for (int i = 0; i < count; ++i) { + tsem_post(&sem); + } + }).detach(); + + for (int i = 0; i < count * 2; ++i) { + tsem_wait(&sem); + } + tsem_destroy(&sem); +} + +TEST(osSemaphoreTests, Performance2_2) { + tsem2_t sem; + const int count = 50000; + + tsem2_init(&sem, 0, 0); + std::thread([&sem]() { + for (int i = 0; i < count; ++i) { + tsem2_post(&sem); + } + }).detach(); + + std::thread([&sem]() { + for (int i = 0; i < count; ++i) { + tsem2_post(&sem); + } + }).detach(); + + for (int i = 0; i < count * 2; ++i) { + tsem2_wait(&sem); + } + tsem2_destroy(&sem); +} + +TEST(osSemaphoreTests, Performance3_1) { + const int count = 100000; + + for (int i = 0; i < count; ++i) { + tsem_t sem; + tsem_init(&sem, 0, 1); + EXPECT_EQ(tsem_timewait(&sem, 1000), 0); + tsem_destroy(&sem); + } +} + +TEST(osSemaphoreTests, Performance3_2) { + const int count = 100000; + + for (int i = 0; i < count; ++i) { + tsem2_t sem; + tsem2_init(&sem, 0, 1); + EXPECT_EQ(tsem2_timewait(&sem, 1000), 0); + tsem2_destroy(&sem); + } +} + +TEST(osSemaphoreTests, Performance4_1) { + const int count = 1000; + for (int i = 0; i < count; ++i) { + tsem_t sem; + tsem_init(&sem, 0, 0); + std::thread([&sem]() { + tsem_post(&sem); + }).detach(); + + tsem_timewait(&sem, 1000); + + tsem_destroy(&sem); + } +} + +TEST(osSemaphoreTests, Performance4_2) { + const int count = 1000; + for (int i = 0; i < count; ++i) { + tsem2_t sem; + tsem2_init(&sem, 0, 0); + std::thread([&sem]() { + tsem2_post(&sem); + }).detach(); + + tsem2_timewait(&sem, 1000); + + tsem2_destroy(&sem); + } +}