use tsem2

This commit is contained in:
factosea 2024-05-15 14:57:14 +08:00
parent 36cb98e79e
commit e1ae87bbf5
7 changed files with 208 additions and 29 deletions

View File

@ -33,6 +33,13 @@ int tsem_timewait(tsem_t *sim, int64_t milis);
int tsem_post(tsem_t *sem);
int tsem_destroy(tsem_t *sem);
#define tsem2_t sem_t
#define tsem2_init sem_init
#define tsem2_wait tsem_wait
#define tsem2_timewait tsem_timewait
#define tsem2_post sem_post
#define tsem2_destroy sem_destroy
#elif defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#include <windows.h>
@ -44,21 +51,35 @@ int tsem_timewait(tsem_t *sim, int64_t milis);
int tsem_post(tsem_t *sem);
int tsem_destroy(tsem_t *sem);
#define tsem2_t sem_t
#define tsem2_init sem_init
#define tsem2_wait tsem_wait
#define tsem2_timewait tsem_timewait
#define tsem2_post sem_post
#define tsem2_destroy sem_destroy
#else
typedef struct tsem_t {
#define tsem_t sem_t
#define tsem_init sem_init
int tsem_wait(tsem_t *sem);
int tsem_timewait(tsem_t *sim, int64_t milis);
#define tsem_post sem_post
#define tsem_destroy sem_destroy
typedef struct tsem2_t {
TdThreadMutex mutex;
TdThreadCond cond;
TdThreadCondAttr attr;
int count;
} tsem_t;
} tsem2_t;
// #define tsem_t sem_t
int tsem_init(tsem_t* sem, int pshared, unsigned int value);
int tsem_wait(tsem_t* sem);
int tsem_timewait(tsem_t* sem, int64_t milis);
int tsem_post(tsem_t* sem);
int tsem_destroy(tsem_t* sem);
// #define tsem2_t sem_t
int tsem2_init(tsem2_t* sem, int pshared, unsigned int value);
int tsem2_wait(tsem2_t* sem);
int tsem2_timewait(tsem2_t* sem, int64_t milis);
int tsem2_post(tsem2_t* sem);
int tsem2_destroy(tsem2_t* sem);
#endif

View File

@ -105,7 +105,7 @@ struct tmq_t {
STaosQueue* mqueue; // queue of rsp
STaosQall* qall;
STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit
tsem_t rspSem;
tsem2_t rspSem;
};
typedef struct SAskEpInfo {
@ -727,7 +727,7 @@ static void generateTimedTask(int64_t refId, int32_t type) {
*pTaskType = type;
taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem);
tsem2_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
}
@ -742,7 +742,7 @@ void tmqReplayTask(void* param, void* tmrId) {
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) goto END;
tsem_post(&tmq->rspSem);
tsem2_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
END:
taosMemoryFree(param);
@ -1033,7 +1033,7 @@ void tmqFreeImpl(void* handle) {
}
taosFreeQall(tmq->qall);
tsem_destroy(&tmq->rspSem);
tsem2_destroy(&tmq->rspSem);
taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl);
taos_close_internal(tmq->pTscObj);
@ -1121,7 +1121,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->consumerId = tGenIdPI64();
// init semaphore
if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) {
tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
pTmq->groupId);
SET_ERROR_MSG_TMQ("init t_sem failed")
@ -1132,7 +1132,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
if (pTmq->pTscObj == NULL) {
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
tsem_destroy(&pTmq->rspSem);
tsem2_destroy(&pTmq->rspSem);
SET_ERROR_MSG_TMQ("init tscObj failed")
goto _failed;
}
@ -1434,7 +1434,7 @@ END:
taosReleaseRef(tmqMgmt.rsetId, refId);
FAIL:
if (tmq) tsem_post(&tmq->rspSem);
if (tmq) tsem2_post(&tmq->rspSem);
taosMemoryFree(pParam);
if (pMsg) taosMemoryFreeClear(pMsg->pData);
if (pMsg) taosMemoryFreeClear(pMsg->pEpSet);
@ -2132,10 +2132,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
tmq->consumerId, tmq->epoch, startTime, currentTime);
return NULL;
}
tsem_timewait(&tmq->rspSem, (timeout - elapsedTime));
tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime));
} else {
// use tsem_timewait instead of tsem_wait to avoid unexpected stuck
tsem_timewait(&tmq->rspSem, 1000);
tsem2_timewait(&tmq->rspSem, 1000);
}
}
}

View File

@ -123,7 +123,7 @@ typedef struct {
STransMsg* pRsp;
SEpSet epSet;
int8_t hasEpSet;
tsem_t* pSem;
tsem2_t* pSem;
int8_t inited;
SRWLatch latch;
} STransSyncMsg;

View File

@ -2450,7 +2450,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
pSyncMsg->hasEpSet = 1;
epsetAssign(&pSyncMsg->epSet, &pCtx->epSet);
}
tsem_post(pSyncMsg->pSem);
tsem2_post(pSyncMsg->pSem);
taosReleaseRef(transGetSyncMsgMgt(), pCtx->syncMsgRef);
} else {
rpcFreeCont(pResp->pCont);
@ -2679,8 +2679,8 @@ _RETURN:
return ret;
}
int64_t transCreateSyncMsg(STransMsg* pTransMsg) {
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
tsem_init(sem, 0, 0);
tsem2_t* sem = taosMemoryCalloc(1, sizeof(tsem2_t));
tsem2_init(sem, 0, 0);
STransSyncMsg* pSyncMsg = taosMemoryCalloc(1, sizeof(STransSyncMsg));
@ -2740,7 +2740,7 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr
goto _RETURN;
}
ret = tsem_timewait(pSyncMsg->pSem, timeoutMs);
ret = tsem2_timewait(pSyncMsg->pSem, timeoutMs);
if (ret < 0) {
pRsp->code = TSDB_CODE_TIMEOUT_ERROR;
ret = TSDB_CODE_TIMEOUT_ERROR;

View File

@ -681,7 +681,7 @@ void transDestroySyncMsg(void* msg) {
if (msg == NULL) return;
STransSyncMsg* pSyncMsg = msg;
tsem_destroy(pSyncMsg->pSem);
tsem2_destroy(pSyncMsg->pSem);
taosMemoryFree(pSyncMsg->pSem);
transFreeMsg(pSyncMsg->pRsp->pCont);
taosMemoryFree(pSyncMsg->pRsp);

View File

@ -215,7 +215,33 @@ int32_t taosGetAppName(char* name, int32_t* len) {
return 0;
}
int tsem_init(tsem_t* sem, int pshared, unsigned int value) {
int32_t tsem_timewait(tsem_t* sem, int64_t ms) {
int ret = 0;
struct timespec ts = {0};
if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
return -1;
}
ts.tv_nsec += ms * 1000000;
ts.tv_sec += ts.tv_nsec / 1000000000;
ts.tv_nsec %= 1000000000;
while ((ret = sem_timedwait(sem, &ts)) == -1 && errno == EINTR) continue;
return ret;
}
int32_t tsem_wait(tsem_t* sem) {
int ret = 0;
do {
ret = sem_wait(sem);
} while (ret != 0 && errno == EINTR);
return ret;
}
int tsem2_init(tsem2_t* sem, int pshared, unsigned int value) {
int ret = taosThreadMutexInit(&sem->mutex, NULL);
if (ret != 0) return ret;
ret = taosThreadCondAttrInit(&sem->attr);
@ -243,7 +269,7 @@ int tsem_init(tsem_t* sem, int pshared, unsigned int value) {
return 0;
}
int tsem_post(tsem_t *sem) {
int tsem2_post(tsem2_t *sem) {
taosThreadMutexLock(&sem->mutex);
sem->count++;
taosThreadCondSignal(&sem->cond);
@ -251,14 +277,14 @@ int tsem_post(tsem_t *sem) {
return 0;
}
int tsem_destroy(tsem_t* sem) {
int tsem2_destroy(tsem2_t* sem) {
taosThreadMutexDestroy(&sem->mutex);
taosThreadCondDestroy(&sem->cond);
taosThreadCondAttrDestroy(&sem->attr);
return 0;
}
int32_t tsem_wait(tsem_t* sem) {
int32_t tsem2_wait(tsem2_t* sem) {
taosThreadMutexLock(&sem->mutex);
while (sem->count <= 0) {
int ret = taosThreadCondWait(&sem->cond, &sem->mutex);
@ -274,7 +300,7 @@ int32_t tsem_wait(tsem_t* sem) {
return 0;
}
int32_t tsem_timewait(tsem_t* sem, int64_t ms) {
int32_t tsem2_timewait(tsem2_t* sem, int64_t ms) {
int ret = 0;
taosThreadMutexLock(&sem->mutex);

View File

@ -67,7 +67,7 @@ TEST(osSemaphoreTests, WaitTime1) {
tsem_t sem;
tsem_init(&sem, 0, 1);
EXPECT_EQ(tsem_timewait(&sem, 1000), 0);
EXPECT_NE(tsem_timewait(&sem, 10000), 0);
EXPECT_NE(tsem_timewait(&sem, 1000), 0);
tsem_destroy(&sem);
}
@ -105,3 +105,135 @@ TEST(osSemaphoreTests, TimedWait) {
result = tsem_destroy(&sem);
EXPECT_EQ(result, 0);
}
TEST(osSemaphoreTests, Performance1_1) {
tsem_t sem;
const int count = 100000;
tsem_init(&sem, 0, 0);
std::thread([&sem]() {
for (int i = 0; i < count; ++i) {
tsem_post(&sem);
}
}).detach();
for (int i = 0; i < count; ++i) {
tsem_wait(&sem);
}
tsem_destroy(&sem);
}
TEST(osSemaphoreTests, Performance1_2) {
tsem2_t sem;
const int count = 100000;
tsem2_init(&sem, 0, 0);
std::thread([&sem]() {
for (int i = 0; i < count; ++i) {
tsem2_post(&sem);
}
}).detach();
for (int i = 0; i < count; ++i) {
tsem2_wait(&sem);
}
tsem2_destroy(&sem);
}
TEST(osSemaphoreTests, Performance2_1) {
tsem_t sem;
const int count = 50000;
tsem_init(&sem, 0, 0);
std::thread([&sem]() {
for (int i = 0; i < count; ++i) {
tsem_post(&sem);
}
}).detach();
std::thread([&sem]() {
for (int i = 0; i < count; ++i) {
tsem_post(&sem);
}
}).detach();
for (int i = 0; i < count * 2; ++i) {
tsem_wait(&sem);
}
tsem_destroy(&sem);
}
TEST(osSemaphoreTests, Performance2_2) {
tsem2_t sem;
const int count = 50000;
tsem2_init(&sem, 0, 0);
std::thread([&sem]() {
for (int i = 0; i < count; ++i) {
tsem2_post(&sem);
}
}).detach();
std::thread([&sem]() {
for (int i = 0; i < count; ++i) {
tsem2_post(&sem);
}
}).detach();
for (int i = 0; i < count * 2; ++i) {
tsem2_wait(&sem);
}
tsem2_destroy(&sem);
}
TEST(osSemaphoreTests, Performance3_1) {
const int count = 100000;
for (int i = 0; i < count; ++i) {
tsem_t sem;
tsem_init(&sem, 0, 1);
EXPECT_EQ(tsem_timewait(&sem, 1000), 0);
tsem_destroy(&sem);
}
}
TEST(osSemaphoreTests, Performance3_2) {
const int count = 100000;
for (int i = 0; i < count; ++i) {
tsem2_t sem;
tsem2_init(&sem, 0, 1);
EXPECT_EQ(tsem2_timewait(&sem, 1000), 0);
tsem2_destroy(&sem);
}
}
TEST(osSemaphoreTests, Performance4_1) {
const int count = 1000;
for (int i = 0; i < count; ++i) {
tsem_t sem;
tsem_init(&sem, 0, 0);
std::thread([&sem]() {
tsem_post(&sem);
}).detach();
tsem_timewait(&sem, 1000);
tsem_destroy(&sem);
}
}
TEST(osSemaphoreTests, Performance4_2) {
const int count = 1000;
for (int i = 0; i < count; ++i) {
tsem2_t sem;
tsem2_init(&sem, 0, 0);
std::thread([&sem]() {
tsem2_post(&sem);
}).detach();
tsem2_timewait(&sem, 1000);
tsem2_destroy(&sem);
}
}