From 417d1efa5d3131ca3d2faeeff248fb34b01202cb Mon Sep 17 00:00:00 2001 From: "pengrongkun94@qq.com" Date: Mon, 24 Feb 2025 16:33:01 +0800 Subject: [PATCH] fix stmt2 --- source/client/src/clientStmt2.c | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 8e517eb5f2..bf98ba6358 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -39,34 +39,40 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** } static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) { - (void)taosThreadMutexLock(&pStmt->queue.mutex); + int i = 0; while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) { - (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); - if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { + if (i < 5000) { + taosUsleep(1); + i++; + } else { + (void)taosThreadMutexLock(&pStmt->queue.mutex); + (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); (void)taosThreadMutexUnlock(&pStmt->queue.mutex); - return false; } } + if (pStmt->queue.stopQueue) { + (void)taosThreadMutexUnlock(&pStmt->queue.mutex); + return false; + } SStmtQNode* orig = pStmt->queue.head; SStmtQNode* node = pStmt->queue.head->next; pStmt->queue.head = pStmt->queue.head->next; *param = node; (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); - (void)taosThreadMutexUnlock(&pStmt->queue.mutex); return true; } static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) { - (void)taosThreadMutexLock(&pStmt->queue.mutex); - pStmt->queue.tail->next = param; pStmt->queue.tail = param; - pStmt->stat.bindDataNum++; - (void)atomic_add_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); - (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); + pStmt->stat.bindDataNum++; + (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); + + (void)taosThreadMutexLock(&pStmt->queue.mutex); + (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); (void)taosThreadMutexUnlock(&pStmt->queue.mutex); } @@ -343,11 +349,9 @@ static void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) { pTblBuf->buffIdx = 1; pTblBuf->buffOffset = sizeof(*pQueue->head); - (void)taosThreadMutexLock(&pQueue->mutex); pQueue->head = pQueue->tail = pTblBuf->pCurBuff; pQueue->qRemainNum = 0; pQueue->head->next = NULL; - (void)taosThreadMutexUnlock(&pQueue->mutex); } static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) { @@ -701,7 +705,7 @@ static void* stmtBindThreadFunc(void* param) { STscStmt2* pStmt = (STscStmt2*)param; while (true) { - if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { + if (pStmt->queue.stopQueue) { break; } @@ -1762,6 +1766,7 @@ int stmtClose2(TAOS_STMT2* stmt) { STMT_DLOG_E("start to free stmt"); pStmt->queue.stopQueue = true; + (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); (void)taosThreadMutexLock(&pStmt->queue.mutex); (void)taosThreadCondSignal(&(pStmt->queue.waitCond));