fix stmt2

This commit is contained in:
pengrongkun94@qq.com 2025-02-24 16:33:01 +08:00
parent 1bf82888e9
commit 417d1efa5d
1 changed files with 18 additions and 13 deletions

View File

@ -39,34 +39,40 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void**
} }
static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) { static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) {
(void)taosThreadMutexLock(&pStmt->queue.mutex); int i = 0;
while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) { while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
(void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); if (i < 5000) {
if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { taosUsleep(1);
i++;
} else {
(void)taosThreadMutexLock(&pStmt->queue.mutex);
(void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
(void)taosThreadMutexUnlock(&pStmt->queue.mutex); (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
return false;
} }
} }
if (pStmt->queue.stopQueue) {
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
return false;
}
SStmtQNode* orig = pStmt->queue.head; SStmtQNode* orig = pStmt->queue.head;
SStmtQNode* node = pStmt->queue.head->next; SStmtQNode* node = pStmt->queue.head->next;
pStmt->queue.head = pStmt->queue.head->next; pStmt->queue.head = pStmt->queue.head->next;
*param = node; *param = node;
(void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
return true; return true;
} }
static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) { static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) {
(void)taosThreadMutexLock(&pStmt->queue.mutex);
pStmt->queue.tail->next = param; pStmt->queue.tail->next = param;
pStmt->queue.tail = param; pStmt->queue.tail = param;
pStmt->stat.bindDataNum++;
(void)atomic_add_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
(void)taosThreadCondSignal(&(pStmt->queue.waitCond));
pStmt->stat.bindDataNum++;
(void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
(void)taosThreadMutexLock(&pStmt->queue.mutex);
(void)taosThreadCondSignal(&(pStmt->queue.waitCond));
(void)taosThreadMutexUnlock(&pStmt->queue.mutex); (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
} }
@ -343,11 +349,9 @@ static void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
pTblBuf->buffIdx = 1; pTblBuf->buffIdx = 1;
pTblBuf->buffOffset = sizeof(*pQueue->head); pTblBuf->buffOffset = sizeof(*pQueue->head);
(void)taosThreadMutexLock(&pQueue->mutex);
pQueue->head = pQueue->tail = pTblBuf->pCurBuff; pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
pQueue->qRemainNum = 0; pQueue->qRemainNum = 0;
pQueue->head->next = NULL; pQueue->head->next = NULL;
(void)taosThreadMutexUnlock(&pQueue->mutex);
} }
static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) { static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) {
@ -701,7 +705,7 @@ static void* stmtBindThreadFunc(void* param) {
STscStmt2* pStmt = (STscStmt2*)param; STscStmt2* pStmt = (STscStmt2*)param;
while (true) { while (true) {
if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { if (pStmt->queue.stopQueue) {
break; break;
} }
@ -1762,6 +1766,7 @@ int stmtClose2(TAOS_STMT2* stmt) {
STMT_DLOG_E("start to free stmt"); STMT_DLOG_E("start to free stmt");
pStmt->queue.stopQueue = true; pStmt->queue.stopQueue = true;
(void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
(void)taosThreadMutexLock(&pStmt->queue.mutex); (void)taosThreadMutexLock(&pStmt->queue.mutex);
(void)taosThreadCondSignal(&(pStmt->queue.waitCond)); (void)taosThreadCondSignal(&(pStmt->queue.waitCond));