refactor stmt-async-bind loop usleep to Producer Consumer Model

This commit is contained in:
pengrongkun94@qq.com 2025-01-20 09:50:14 +08:00
parent a059e78c6c
commit 2f817e1781
3 changed files with 53 additions and 17 deletions

View File

@ -131,6 +131,8 @@ typedef struct SStmtQueue {
SStmtQNode* head;
SStmtQNode* tail;
uint64_t qRemainNum;
TdThreadMutex mutex;
TdThreadCond waitCond;
} SStmtQueue;
typedef struct STscStmt {

View File

@ -39,31 +39,39 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void**
}
bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) {
while (0 == atomic_load_64(&pStmt->queue.qRemainNum)) {
taosUsleep(1);
return false;
(void)taosThreadMutexLock(&pStmt->queue.mutex);
while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
(void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) {
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
return false;
}
}
SStmtQNode* orig = pStmt->queue.head;
SStmtQNode* node = pStmt->queue.head->next;
pStmt->queue.head = pStmt->queue.head->next;
// taosMemoryFreeClear(orig);
*param = node;
(void)atomic_sub_fetch_64(&pStmt->queue.qRemainNum, 1);
(void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
*param = node;
return true;
}
void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) {
(void)taosThreadMutexLock(&pStmt->queue.mutex);
pStmt->queue.tail->next = param;
pStmt->queue.tail = param;
pStmt->stat.bindDataNum++;
(void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
(void)taosThreadCondSignal(&(pStmt->queue.waitCond));
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
}
static int32_t stmtCreateRequest(STscStmt* pStmt) {
@ -415,9 +423,11 @@ void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
pTblBuf->buffIdx = 1;
pTblBuf->buffOffset = sizeof(*pQueue->head);
(void)taosThreadMutexLock(&pQueue->mutex);
pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
pQueue->qRemainNum = 0;
pQueue->head->next = NULL;
(void)taosThreadMutexUnlock(&pQueue->mutex);
}
int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
@ -809,6 +819,8 @@ int32_t stmtStartBindThread(STscStmt* pStmt) {
}
int32_t stmtInitQueue(STscStmt* pStmt) {
(void)taosThreadCondInit(&pStmt->queue.waitCond, NULL);
(void)taosThreadMutexInit(&pStmt->queue.mutex, NULL);
STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head));
pStmt->queue.tail = pStmt->queue.head;
@ -1619,11 +1631,18 @@ int stmtClose(TAOS_STMT* stmt) {
pStmt->queue.stopQueue = true;
(void)taosThreadMutexLock(&pStmt->queue.mutex);
(void)taosThreadCondSignal(&(pStmt->queue.waitCond));
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
if (pStmt->bindThreadInUse) {
(void)taosThreadJoin(pStmt->bindThread, NULL);
pStmt->bindThreadInUse = false;
}
(void)taosThreadCondDestroy(&pStmt->queue.waitCond);
(void)taosThreadMutexDestroy(&pStmt->queue.mutex);
STMT_DLOG("stmt %p closed, stbInterlaceMode: %d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64
", parseSqlNum=>%" PRId64 ", pStmt->stat.bindDataNum=>%" PRId64
", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u"

View File

@ -39,31 +39,35 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void**
}
static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) {
(void)taosThreadMutexLock(&pStmt->queue.mutex);
while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
taosUsleep(1);
return false;
(void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) {
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
return false;
}
}
SStmtQNode* orig = pStmt->queue.head;
SStmtQNode* node = pStmt->queue.head->next;
pStmt->queue.head = pStmt->queue.head->next;
// taosMemoryFreeClear(orig);
*param = node;
(void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
return true;
}
static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) {
(void)taosThreadMutexLock(&pStmt->queue.mutex);
pStmt->queue.tail->next = param;
pStmt->queue.tail = param;
pStmt->stat.bindDataNum++;
(void)atomic_add_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
(void)taosThreadCondSignal(&(pStmt->queue.waitCond));
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
}
static int32_t stmtCreateRequest(STscStmt2* pStmt) {
@ -339,9 +343,11 @@ static void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
pTblBuf->buffIdx = 1;
pTblBuf->buffOffset = sizeof(*pQueue->head);
(void)taosThreadMutexLock(&pQueue->mutex);
pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
pQueue->qRemainNum = 0;
pQueue->head->next = NULL;
(void)taosThreadMutexUnlock(&pQueue->mutex);
}
static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) {
@ -735,6 +741,8 @@ static int32_t stmtStartBindThread(STscStmt2* pStmt) {
}
static int32_t stmtInitQueue(STscStmt2* pStmt) {
(void)taosThreadCondInit(&pStmt->queue.waitCond, NULL);
(void)taosThreadMutexInit(&pStmt->queue.mutex, NULL);
STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head));
pStmt->queue.tail = pStmt->queue.head;
@ -1748,11 +1756,18 @@ int stmtClose2(TAOS_STMT2* stmt) {
pStmt->queue.stopQueue = true;
(void)taosThreadMutexLock(&pStmt->queue.mutex);
(void)taosThreadCondSignal(&(pStmt->queue.waitCond));
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
if (pStmt->bindThreadInUse) {
(void)taosThreadJoin(pStmt->bindThread, NULL);
pStmt->bindThreadInUse = false;
}
(void)taosThreadCondDestroy(&pStmt->queue.waitCond);
(void)taosThreadMutexDestroy(&pStmt->queue.mutex);
if (pStmt->options.asyncExecFn && !pStmt->semWaited) {
if (tsem_wait(&pStmt->asyncQuerySem) != 0) {
tscError("failed to wait asyncQuerySem");