Merge pull request #25773 from taosdata/enh/TS-4716/timewait

Enh/ts 4716/timewait
This commit is contained in:
dapan1121 2024-05-16 10:17:10 +08:00 committed by GitHub
commit 51b9046cbc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 283 additions and 24 deletions

View File

@ -33,6 +33,13 @@ int tsem_timewait(tsem_t *sim, int64_t milis);
int tsem_post(tsem_t *sem);
int tsem_destroy(tsem_t *sem);
#define tsem2_t tsem_t
#define tsem2_init tsem_init
#define tsem2_wait tsem_wait
#define tsem2_timewait tsem_timewait
#define tsem2_post tsem_post
#define tsem2_destroy tsem_destroy
#elif defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#include <windows.h>
@ -44,6 +51,13 @@ int tsem_timewait(tsem_t *sim, int64_t milis);
int tsem_post(tsem_t *sem);
int tsem_destroy(tsem_t *sem);
#define tsem2_t tsem_t
#define tsem2_init tsem_init
#define tsem2_wait tsem_wait
#define tsem2_timewait tsem_timewait
#define tsem2_post tsem_post
#define tsem2_destroy tsem_destroy
#else
#define tsem_t sem_t
@ -53,6 +67,20 @@ int tsem_timewait(tsem_t *sim, int64_t milis);
#define tsem_post sem_post
#define tsem_destroy sem_destroy
typedef struct tsem2_t {
TdThreadMutex mutex;
TdThreadCond cond;
TdThreadCondAttr attr;
int count;
} tsem2_t;
// #define tsem2_t sem_t
int tsem2_init(tsem2_t* sem, int pshared, unsigned int value);
int tsem2_wait(tsem2_t* sem);
int tsem2_timewait(tsem2_t* sem, int64_t milis);
int tsem2_post(tsem2_t* sem);
int tsem2_destroy(tsem2_t* sem);
#endif
#if defined(_TD_DARWIN_64)

View File

@ -222,6 +222,7 @@ int32_t taosThreadCondTimedWait(TdThreadCond *cond, TdThreadMutex *mutex, const
int32_t taosThreadCondAttrDestroy(TdThreadCondAttr *attr);
int32_t taosThreadCondAttrGetPshared(const TdThreadCondAttr *attr, int32_t *pshared);
int32_t taosThreadCondAttrInit(TdThreadCondAttr *attr);
int32_t taosThreadCondAttrSetclock(TdThreadCondAttr *attr, int clockId);
int32_t taosThreadCondAttrSetPshared(TdThreadCondAttr *attr, int32_t pshared);
int32_t taosThreadDetach(TdThread thread);
int32_t taosThreadEqual(TdThread t1, TdThread t2);

View File

@ -105,7 +105,7 @@ struct tmq_t {
STaosQueue* mqueue; // queue of rsp
STaosQall* qall;
STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit
tsem_t rspSem;
tsem2_t rspSem;
};
typedef struct SAskEpInfo {
@ -727,7 +727,7 @@ static void generateTimedTask(int64_t refId, int32_t type) {
*pTaskType = type;
taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem);
tsem2_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
}
@ -742,7 +742,7 @@ void tmqReplayTask(void* param, void* tmrId) {
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) goto END;
tsem_post(&tmq->rspSem);
tsem2_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
END:
taosMemoryFree(param);
@ -1033,7 +1033,7 @@ void tmqFreeImpl(void* handle) {
}
taosFreeQall(tmq->qall);
tsem_destroy(&tmq->rspSem);
tsem2_destroy(&tmq->rspSem);
taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl);
taos_close_internal(tmq->pTscObj);
@ -1121,7 +1121,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->consumerId = tGenIdPI64();
// init semaphore
if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) {
tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
pTmq->groupId);
SET_ERROR_MSG_TMQ("init t_sem failed")
@ -1132,7 +1132,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
if (pTmq->pTscObj == NULL) {
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
tsem_destroy(&pTmq->rspSem);
tsem2_destroy(&pTmq->rspSem);
SET_ERROR_MSG_TMQ("init tscObj failed")
goto _failed;
}
@ -1434,7 +1434,7 @@ END:
taosReleaseRef(tmqMgmt.rsetId, refId);
FAIL:
if (tmq) tsem_post(&tmq->rspSem);
if (tmq) tsem2_post(&tmq->rspSem);
taosMemoryFree(pParam);
if (pMsg) taosMemoryFreeClear(pMsg->pData);
if (pMsg) taosMemoryFreeClear(pMsg->pEpSet);
@ -2132,10 +2132,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
tmq->consumerId, tmq->epoch, startTime, currentTime);
return NULL;
}
tsem_timewait(&tmq->rspSem, (timeout - elapsedTime));
tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime));
} else {
// use tsem_timewait instead of tsem_wait to avoid unexpected stuck
tsem_timewait(&tmq->rspSem, 1000);
tsem2_timewait(&tmq->rspSem, 1000);
}
}
}

View File

@ -123,7 +123,7 @@ typedef struct {
STransMsg* pRsp;
SEpSet epSet;
int8_t hasEpSet;
tsem_t* pSem;
tsem2_t* pSem;
int8_t inited;
SRWLatch latch;
} STransSyncMsg;

View File

@ -2450,7 +2450,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
pSyncMsg->hasEpSet = 1;
epsetAssign(&pSyncMsg->epSet, &pCtx->epSet);
}
tsem_post(pSyncMsg->pSem);
tsem2_post(pSyncMsg->pSem);
taosReleaseRef(transGetSyncMsgMgt(), pCtx->syncMsgRef);
} else {
rpcFreeCont(pResp->pCont);
@ -2679,8 +2679,8 @@ _RETURN:
return ret;
}
int64_t transCreateSyncMsg(STransMsg* pTransMsg) {
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
tsem_init(sem, 0, 0);
tsem2_t* sem = taosMemoryCalloc(1, sizeof(tsem2_t));
tsem2_init(sem, 0, 0);
STransSyncMsg* pSyncMsg = taosMemoryCalloc(1, sizeof(STransSyncMsg));
@ -2740,7 +2740,7 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr
goto _RETURN;
}
ret = tsem_timewait(pSyncMsg->pSem, timeoutMs);
ret = tsem2_timewait(pSyncMsg->pSem, timeoutMs);
if (ret < 0) {
pRsp->code = TSDB_CODE_TIMEOUT_ERROR;
ret = TSDB_CODE_TIMEOUT_ERROR;

View File

@ -681,7 +681,7 @@ void transDestroySyncMsg(void* msg) {
if (msg == NULL) return;
STransSyncMsg* pSyncMsg = msg;
tsem_destroy(pSyncMsg->pSem);
tsem2_destroy(pSyncMsg->pSem);
taosMemoryFree(pSyncMsg->pSem);
transFreeMsg(pSyncMsg->pRsp->pCont);
taosMemoryFree(pSyncMsg->pRsp);

View File

@ -215,14 +215,6 @@ int32_t taosGetAppName(char* name, int32_t* len) {
return 0;
}
int32_t tsem_wait(tsem_t* sem) {
int ret = 0;
do {
ret = sem_wait(sem);
} while (ret != 0 && errno == EINTR);
return ret;
}
int32_t tsem_timewait(tsem_t* sem, int64_t ms) {
int ret = 0;
@ -241,4 +233,101 @@ int32_t tsem_timewait(tsem_t* sem, int64_t ms) {
return ret;
}
int32_t tsem_wait(tsem_t* sem) {
int ret = 0;
do {
ret = sem_wait(sem);
} while (ret != 0 && errno == EINTR);
return ret;
}
int tsem2_init(tsem2_t* sem, int pshared, unsigned int value) {
int ret = taosThreadMutexInit(&sem->mutex, NULL);
if (ret != 0) return ret;
ret = taosThreadCondAttrInit(&sem->attr);
if (ret != 0)
{
taosThreadMutexDestroy(&sem->mutex);
return ret;
}
ret = taosThreadCondAttrSetclock(&sem->attr, CLOCK_MONOTONIC);
if (ret != 0)
{
taosThreadMutexDestroy(&sem->mutex);
taosThreadCondAttrDestroy(&sem->attr);
return ret;
}
ret = taosThreadCondInit(&sem->cond, &sem->attr);
if (ret != 0)
{
taosThreadMutexDestroy(&sem->mutex);
taosThreadCondAttrDestroy(&sem->attr);
return ret;
}
sem->count = value;
return 0;
}
int tsem2_post(tsem2_t *sem) {
taosThreadMutexLock(&sem->mutex);
sem->count++;
taosThreadCondSignal(&sem->cond);
taosThreadMutexUnlock(&sem->mutex);
return 0;
}
int tsem2_destroy(tsem2_t* sem) {
taosThreadMutexDestroy(&sem->mutex);
taosThreadCondDestroy(&sem->cond);
taosThreadCondAttrDestroy(&sem->attr);
return 0;
}
int32_t tsem2_wait(tsem2_t* sem) {
taosThreadMutexLock(&sem->mutex);
while (sem->count <= 0) {
int ret = taosThreadCondWait(&sem->cond, &sem->mutex);
if (0 == ret) {
continue;
} else {
taosThreadMutexUnlock(&sem->mutex);
return ret;
}
}
sem->count--;
taosThreadMutexUnlock(&sem->mutex);
return 0;
}
int32_t tsem2_timewait(tsem2_t* sem, int64_t ms) {
int ret = 0;
taosThreadMutexLock(&sem->mutex);
if (sem->count <= 0) {
struct timespec ts = {0};
if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1) {
taosThreadMutexUnlock(&sem->mutex);
return -1;
}
ts.tv_sec += ms / 1000;
ts.tv_nsec += (ms % 1000) * 1000000;
ts.tv_sec += ts.tv_nsec / 1000000000;
ts.tv_nsec %= 1000000000;
while (sem->count <= 0) {
ret = taosThreadCondTimedWait(&sem->cond, &sem->mutex, &ts);
if (ret != 0) {
taosThreadMutexUnlock(&sem->mutex);
return ret;
}
}
}
sem->count--;
taosThreadMutexUnlock(&sem->mutex);
return ret;
}
#endif

View File

@ -170,6 +170,16 @@ int32_t taosThreadCondAttrInit(TdThreadCondAttr *attr) {
#endif
}
int32_t taosThreadCondAttrSetclock(TdThreadCondAttr *attr, int clockId) {
#ifdef __USE_WIN_THREAD
return 0;
#elif defined(__APPLE__)
return 0;
#else
return pthread_condattr_setclock(attr, clockId);
#endif
}
int32_t taosThreadCondAttrSetPshared(TdThreadCondAttr *attr, int32_t pshared) {
#ifdef __USE_WIN_THREAD
return 0;

View File

@ -71,7 +71,6 @@ TEST(osSemaphoreTests, WaitTime1) {
tsem_destroy(&sem);
}
TEST(osSemaphoreTests, WaitAndPost) {
tsem_t sem;
int result = tsem_init(&sem, 0, 0);
@ -106,3 +105,135 @@ TEST(osSemaphoreTests, TimedWait) {
result = tsem_destroy(&sem);
EXPECT_EQ(result, 0);
}
TEST(osSemaphoreTests, Performance1_1) {
tsem_t sem;
const int count = 100000;
tsem_init(&sem, 0, 0);
std::thread([&sem, count]() {
for (int i = 0; i < count; ++i) {
tsem_post(&sem);
}
}).detach();
for (int i = 0; i < count; ++i) {
tsem_wait(&sem);
}
tsem_destroy(&sem);
}
TEST(osSemaphoreTests, Performance1_2) {
tsem2_t sem;
const int count = 100000;
tsem2_init(&sem, 0, 0);
std::thread([&sem, count]() {
for (int i = 0; i < count; ++i) {
tsem2_post(&sem);
}
}).detach();
for (int i = 0; i < count; ++i) {
tsem2_wait(&sem);
}
tsem2_destroy(&sem);
}
TEST(osSemaphoreTests, Performance2_1) {
tsem_t sem;
const int count = 50000;
tsem_init(&sem, 0, 0);
std::thread([&sem, count]() {
for (int i = 0; i < count; ++i) {
tsem_post(&sem);
}
}).detach();
std::thread([&sem, count]() {
for (int i = 0; i < count; ++i) {
tsem_post(&sem);
}
}).detach();
for (int i = 0; i < count * 2; ++i) {
tsem_wait(&sem);
}
tsem_destroy(&sem);
}
TEST(osSemaphoreTests, Performance2_2) {
tsem2_t sem;
const int count = 50000;
tsem2_init(&sem, 0, 0);
std::thread([&sem, count]() {
for (int i = 0; i < count; ++i) {
tsem2_post(&sem);
}
}).detach();
std::thread([&sem, count]() {
for (int i = 0; i < count; ++i) {
tsem2_post(&sem);
}
}).detach();
for (int i = 0; i < count * 2; ++i) {
tsem2_wait(&sem);
}
tsem2_destroy(&sem);
}
TEST(osSemaphoreTests, Performance3_1) {
const int count = 100000;
for (int i = 0; i < count; ++i) {
tsem_t sem;
tsem_init(&sem, 0, 1);
EXPECT_EQ(tsem_timewait(&sem, 1000), 0);
tsem_destroy(&sem);
}
}
TEST(osSemaphoreTests, Performance3_2) {
const int count = 100000;
for (int i = 0; i < count; ++i) {
tsem2_t sem;
tsem2_init(&sem, 0, 1);
EXPECT_EQ(tsem2_timewait(&sem, 1000), 0);
tsem2_destroy(&sem);
}
}
TEST(osSemaphoreTests, Performance4_1) {
const int count = 1000;
for (int i = 0; i < count; ++i) {
tsem_t sem;
tsem_init(&sem, 0, 0);
std::thread([&sem, count]() {
tsem_post(&sem);
}).detach();
tsem_timewait(&sem, 1000);
tsem_destroy(&sem);
}
}
TEST(osSemaphoreTests, Performance4_2) {
const int count = 1000;
for (int i = 0; i < count; ++i) {
tsem2_t sem;
tsem2_init(&sem, 0, 0);
std::thread([&sem, count]() {
tsem2_post(&sem);
}).detach();
tsem2_timewait(&sem, 1000);
tsem2_destroy(&sem);
}
}