diff --git a/include/os/osSemaphore.h b/include/os/osSemaphore.h index e26a9d16d1..5b46706790 100644 --- a/include/os/osSemaphore.h +++ b/include/os/osSemaphore.h @@ -33,6 +33,13 @@ int tsem_timewait(tsem_t *sim, int64_t milis); int tsem_post(tsem_t *sem); int tsem_destroy(tsem_t *sem); +#define tsem2_t tsem_t +#define tsem2_init tsem_init +#define tsem2_wait tsem_wait +#define tsem2_timewait tsem_timewait +#define tsem2_post tsem_post +#define tsem2_destroy tsem_destroy + #elif defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #include @@ -44,6 +51,13 @@ int tsem_timewait(tsem_t *sim, int64_t milis); int tsem_post(tsem_t *sem); int tsem_destroy(tsem_t *sem); +#define tsem2_t tsem_t +#define tsem2_init tsem_init +#define tsem2_wait tsem_wait +#define tsem2_timewait tsem_timewait +#define tsem2_post tsem_post +#define tsem2_destroy tsem_destroy + #else #define tsem_t sem_t @@ -53,6 +67,20 @@ int tsem_timewait(tsem_t *sim, int64_t milis); #define tsem_post sem_post #define tsem_destroy sem_destroy +typedef struct tsem2_t { + TdThreadMutex mutex; + TdThreadCond cond; + TdThreadCondAttr attr; + int count; +} tsem2_t; + +// #define tsem2_t sem_t +int tsem2_init(tsem2_t* sem, int pshared, unsigned int value); +int tsem2_wait(tsem2_t* sem); +int tsem2_timewait(tsem2_t* sem, int64_t milis); +int tsem2_post(tsem2_t* sem); +int tsem2_destroy(tsem2_t* sem); + #endif #if defined(_TD_DARWIN_64) diff --git a/include/os/osThread.h b/include/os/osThread.h index 4ef4550419..38c1b366f0 100644 --- a/include/os/osThread.h +++ b/include/os/osThread.h @@ -222,6 +222,7 @@ int32_t taosThreadCondTimedWait(TdThreadCond *cond, TdThreadMutex *mutex, const int32_t taosThreadCondAttrDestroy(TdThreadCondAttr *attr); int32_t taosThreadCondAttrGetPshared(const TdThreadCondAttr *attr, int32_t *pshared); int32_t taosThreadCondAttrInit(TdThreadCondAttr *attr); +int32_t taosThreadCondAttrSetclock(TdThreadCondAttr *attr, int clockId); int32_t taosThreadCondAttrSetPshared(TdThreadCondAttr *attr, int32_t pshared); int32_t taosThreadDetach(TdThread thread); int32_t taosThreadEqual(TdThread t1, TdThread t2); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 2a94c7a9c0..907a6ebe3c 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -105,7 +105,7 @@ struct tmq_t { STaosQueue* mqueue; // queue of rsp STaosQall* qall; STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit - tsem_t rspSem; + tsem2_t rspSem; }; typedef struct SAskEpInfo { @@ -727,7 +727,7 @@ static void generateTimedTask(int64_t refId, int32_t type) { *pTaskType = type; taosWriteQitem(tmq->delayedTask, pTaskType); - tsem_post(&tmq->rspSem); + tsem2_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); } @@ -742,7 +742,7 @@ void tmqReplayTask(void* param, void* tmrId) { tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) goto END; - tsem_post(&tmq->rspSem); + tsem2_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); END: taosMemoryFree(param); @@ -1033,7 +1033,7 @@ void tmqFreeImpl(void* handle) { } taosFreeQall(tmq->qall); - tsem_destroy(&tmq->rspSem); + tsem2_destroy(&tmq->rspSem); taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl); taos_close_internal(tmq->pTscObj); @@ -1121,7 +1121,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->consumerId = tGenIdPI64(); // init semaphore - if (tsem_init(&pTmq->rspSem, 0, 0) != 0) { + if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) { tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId); SET_ERROR_MSG_TMQ("init t_sem failed") @@ -1132,7 +1132,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ); if (pTmq->pTscObj == NULL) { tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); - tsem_destroy(&pTmq->rspSem); + tsem2_destroy(&pTmq->rspSem); SET_ERROR_MSG_TMQ("init tscObj failed") goto _failed; } @@ -1434,7 +1434,7 @@ END: taosReleaseRef(tmqMgmt.rsetId, refId); FAIL: - if (tmq) tsem_post(&tmq->rspSem); + if (tmq) tsem2_post(&tmq->rspSem); taosMemoryFree(pParam); if (pMsg) taosMemoryFreeClear(pMsg->pData); if (pMsg) taosMemoryFreeClear(pMsg->pEpSet); @@ -2132,10 +2132,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { tmq->consumerId, tmq->epoch, startTime, currentTime); return NULL; } - tsem_timewait(&tmq->rspSem, (timeout - elapsedTime)); + tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime)); } else { // use tsem_timewait instead of tsem_wait to avoid unexpected stuck - tsem_timewait(&tmq->rspSem, 1000); + tsem2_timewait(&tmq->rspSem, 1000); } } } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index da6d71e07b..e44a4ebac8 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -123,7 +123,7 @@ typedef struct { STransMsg* pRsp; SEpSet epSet; int8_t hasEpSet; - tsem_t* pSem; + tsem2_t* pSem; int8_t inited; SRWLatch latch; } STransSyncMsg; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index dfd7630f35..bfae9a7111 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2450,7 +2450,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { pSyncMsg->hasEpSet = 1; epsetAssign(&pSyncMsg->epSet, &pCtx->epSet); } - tsem_post(pSyncMsg->pSem); + tsem2_post(pSyncMsg->pSem); taosReleaseRef(transGetSyncMsgMgt(), pCtx->syncMsgRef); } else { rpcFreeCont(pResp->pCont); @@ -2679,8 +2679,8 @@ _RETURN: return ret; } int64_t transCreateSyncMsg(STransMsg* pTransMsg) { - tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); - tsem_init(sem, 0, 0); + tsem2_t* sem = taosMemoryCalloc(1, sizeof(tsem2_t)); + tsem2_init(sem, 0, 0); STransSyncMsg* pSyncMsg = taosMemoryCalloc(1, sizeof(STransSyncMsg)); @@ -2740,7 +2740,7 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr goto _RETURN; } - ret = tsem_timewait(pSyncMsg->pSem, timeoutMs); + ret = tsem2_timewait(pSyncMsg->pSem, timeoutMs); if (ret < 0) { pRsp->code = TSDB_CODE_TIMEOUT_ERROR; ret = TSDB_CODE_TIMEOUT_ERROR; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index fff13e7ebb..6584814053 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -681,7 +681,7 @@ void transDestroySyncMsg(void* msg) { if (msg == NULL) return; STransSyncMsg* pSyncMsg = msg; - tsem_destroy(pSyncMsg->pSem); + tsem2_destroy(pSyncMsg->pSem); taosMemoryFree(pSyncMsg->pSem); transFreeMsg(pSyncMsg->pRsp->pCont); taosMemoryFree(pSyncMsg->pRsp); diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index 7d1cc746ff..dd962a157f 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -215,14 +215,6 @@ int32_t taosGetAppName(char* name, int32_t* len) { return 0; } -int32_t tsem_wait(tsem_t* sem) { - int ret = 0; - do { - ret = sem_wait(sem); - } while (ret != 0 && errno == EINTR); - return ret; -} - int32_t tsem_timewait(tsem_t* sem, int64_t ms) { int ret = 0; @@ -241,4 +233,101 @@ int32_t tsem_timewait(tsem_t* sem, int64_t ms) { return ret; } +int32_t tsem_wait(tsem_t* sem) { + int ret = 0; + do { + ret = sem_wait(sem); + } while (ret != 0 && errno == EINTR); + return ret; +} + +int tsem2_init(tsem2_t* sem, int pshared, unsigned int value) { + int ret = taosThreadMutexInit(&sem->mutex, NULL); + if (ret != 0) return ret; + ret = taosThreadCondAttrInit(&sem->attr); + if (ret != 0) + { + taosThreadMutexDestroy(&sem->mutex); + return ret; + } + ret = taosThreadCondAttrSetclock(&sem->attr, CLOCK_MONOTONIC); + if (ret != 0) + { + taosThreadMutexDestroy(&sem->mutex); + taosThreadCondAttrDestroy(&sem->attr); + return ret; + } + ret = taosThreadCondInit(&sem->cond, &sem->attr); + if (ret != 0) + { + taosThreadMutexDestroy(&sem->mutex); + taosThreadCondAttrDestroy(&sem->attr); + return ret; + } + + sem->count = value; + return 0; +} + +int tsem2_post(tsem2_t *sem) { + taosThreadMutexLock(&sem->mutex); + sem->count++; + taosThreadCondSignal(&sem->cond); + taosThreadMutexUnlock(&sem->mutex); + return 0; +} + +int tsem2_destroy(tsem2_t* sem) { + taosThreadMutexDestroy(&sem->mutex); + taosThreadCondDestroy(&sem->cond); + taosThreadCondAttrDestroy(&sem->attr); + return 0; +} + +int32_t tsem2_wait(tsem2_t* sem) { + taosThreadMutexLock(&sem->mutex); + while (sem->count <= 0) { + int ret = taosThreadCondWait(&sem->cond, &sem->mutex); + if (0 == ret) { + continue; + } else { + taosThreadMutexUnlock(&sem->mutex); + return ret; + } + } + sem->count--; + taosThreadMutexUnlock(&sem->mutex); + return 0; +} + +int32_t tsem2_timewait(tsem2_t* sem, int64_t ms) { + int ret = 0; + + taosThreadMutexLock(&sem->mutex); + if (sem->count <= 0) { + struct timespec ts = {0}; + if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1) { + taosThreadMutexUnlock(&sem->mutex); + return -1; + } + + ts.tv_sec += ms / 1000; + ts.tv_nsec += (ms % 1000) * 1000000; + ts.tv_sec += ts.tv_nsec / 1000000000; + ts.tv_nsec %= 1000000000; + + while (sem->count <= 0) { + ret = taosThreadCondTimedWait(&sem->cond, &sem->mutex, &ts); + if (ret != 0) { + taosThreadMutexUnlock(&sem->mutex); + return ret; + } + } + } + + sem->count--; + taosThreadMutexUnlock(&sem->mutex); + return ret; +} + #endif diff --git a/source/os/src/osThread.c b/source/os/src/osThread.c index 0dd5374cf0..0acd6f67f5 100644 --- a/source/os/src/osThread.c +++ b/source/os/src/osThread.c @@ -170,6 +170,16 @@ int32_t taosThreadCondAttrInit(TdThreadCondAttr *attr) { #endif } +int32_t taosThreadCondAttrSetclock(TdThreadCondAttr *attr, int clockId) { +#ifdef __USE_WIN_THREAD + return 0; +#elif defined(__APPLE__) + return 0; +#else + return pthread_condattr_setclock(attr, clockId); +#endif +} + int32_t taosThreadCondAttrSetPshared(TdThreadCondAttr *attr, int32_t pshared) { #ifdef __USE_WIN_THREAD return 0; diff --git a/source/os/test/osSemaphoreTests.cpp b/source/os/test/osSemaphoreTests.cpp index 67cf4bb517..09fbe27f55 100644 --- a/source/os/test/osSemaphoreTests.cpp +++ b/source/os/test/osSemaphoreTests.cpp @@ -71,7 +71,6 @@ TEST(osSemaphoreTests, WaitTime1) { tsem_destroy(&sem); } - TEST(osSemaphoreTests, WaitAndPost) { tsem_t sem; int result = tsem_init(&sem, 0, 0); @@ -106,3 +105,135 @@ TEST(osSemaphoreTests, TimedWait) { result = tsem_destroy(&sem); EXPECT_EQ(result, 0); } + +TEST(osSemaphoreTests, Performance1_1) { + tsem_t sem; + const int count = 100000; + + tsem_init(&sem, 0, 0); + std::thread([&sem, count]() { + for (int i = 0; i < count; ++i) { + tsem_post(&sem); + } + }).detach(); + + for (int i = 0; i < count; ++i) { + tsem_wait(&sem); + } + tsem_destroy(&sem); +} + +TEST(osSemaphoreTests, Performance1_2) { + tsem2_t sem; + const int count = 100000; + + tsem2_init(&sem, 0, 0); + std::thread([&sem, count]() { + for (int i = 0; i < count; ++i) { + tsem2_post(&sem); + } + }).detach(); + + for (int i = 0; i < count; ++i) { + tsem2_wait(&sem); + } + tsem2_destroy(&sem); +} + +TEST(osSemaphoreTests, Performance2_1) { + tsem_t sem; + const int count = 50000; + + tsem_init(&sem, 0, 0); + std::thread([&sem, count]() { + for (int i = 0; i < count; ++i) { + tsem_post(&sem); + } + }).detach(); + + std::thread([&sem, count]() { + for (int i = 0; i < count; ++i) { + tsem_post(&sem); + } + }).detach(); + + for (int i = 0; i < count * 2; ++i) { + tsem_wait(&sem); + } + tsem_destroy(&sem); +} + +TEST(osSemaphoreTests, Performance2_2) { + tsem2_t sem; + const int count = 50000; + + tsem2_init(&sem, 0, 0); + std::thread([&sem, count]() { + for (int i = 0; i < count; ++i) { + tsem2_post(&sem); + } + }).detach(); + + std::thread([&sem, count]() { + for (int i = 0; i < count; ++i) { + tsem2_post(&sem); + } + }).detach(); + + for (int i = 0; i < count * 2; ++i) { + tsem2_wait(&sem); + } + tsem2_destroy(&sem); +} + +TEST(osSemaphoreTests, Performance3_1) { + const int count = 100000; + + for (int i = 0; i < count; ++i) { + tsem_t sem; + tsem_init(&sem, 0, 1); + EXPECT_EQ(tsem_timewait(&sem, 1000), 0); + tsem_destroy(&sem); + } +} + +TEST(osSemaphoreTests, Performance3_2) { + const int count = 100000; + + for (int i = 0; i < count; ++i) { + tsem2_t sem; + tsem2_init(&sem, 0, 1); + EXPECT_EQ(tsem2_timewait(&sem, 1000), 0); + tsem2_destroy(&sem); + } +} + +TEST(osSemaphoreTests, Performance4_1) { + const int count = 1000; + for (int i = 0; i < count; ++i) { + tsem_t sem; + tsem_init(&sem, 0, 0); + std::thread([&sem, count]() { + tsem_post(&sem); + }).detach(); + + tsem_timewait(&sem, 1000); + + tsem_destroy(&sem); + } +} + +TEST(osSemaphoreTests, Performance4_2) { + const int count = 1000; + for (int i = 0; i < count; ++i) { + tsem2_t sem; + tsem2_init(&sem, 0, 0); + std::thread([&sem, count]() { + tsem2_post(&sem); + }).detach(); + + tsem2_timewait(&sem, 1000); + + tsem2_destroy(&sem); + } +}