diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 4f912ec077..165c7b8bfb 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -39,38 +39,41 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** } bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) { - (void)taosThreadMutexLock(&pStmt->queue.mutex); + int i = 0; while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) { - (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); - if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { + if (i < 10) { + taosUsleep(1); + i++; + } else { + (void)taosThreadMutexLock(&pStmt->queue.mutex); + if (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) { + (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); + } (void)taosThreadMutexUnlock(&pStmt->queue.mutex); - return false; } } + if (pStmt->queue.stopQueue) { + return false; + } SStmtQNode* orig = pStmt->queue.head; SStmtQNode* node = pStmt->queue.head->next; pStmt->queue.head = pStmt->queue.head->next; *param = node; (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); - (void)taosThreadMutexUnlock(&pStmt->queue.mutex); - - - *param = node; return true; } void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) { - (void)taosThreadMutexLock(&pStmt->queue.mutex); - pStmt->queue.tail->next = param; pStmt->queue.tail = param; pStmt->stat.bindDataNum++; + + (void)taosThreadMutexLock(&pStmt->queue.mutex); (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); - (void)taosThreadMutexUnlock(&pStmt->queue.mutex); } @@ -423,11 +426,9 @@ void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) { pTblBuf->buffIdx = 1; pTblBuf->buffOffset = sizeof(*pQueue->head); - (void)taosThreadMutexLock(&pQueue->mutex); pQueue->head = pQueue->tail = pTblBuf->pCurBuff; pQueue->qRemainNum = 0; pQueue->head->next = NULL; - (void)taosThreadMutexUnlock(&pQueue->mutex); } int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) { @@ -778,7 +779,7 @@ void* stmtBindThreadFunc(void* param) { STscStmt* pStmt = (STscStmt*)param; while (true) { - if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { + if (pStmt->queue.stopQueue) { break; } @@ -1630,8 +1631,9 @@ int stmtClose(TAOS_STMT* stmt) { STMT_DLOG_E("start to free stmt"); pStmt->queue.stopQueue = true; - + (void)taosThreadMutexLock(&pStmt->queue.mutex); + (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); (void)taosThreadMutexUnlock(&pStmt->queue.mutex); diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 8e517eb5f2..75d763ec71 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -39,34 +39,41 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** } static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) { - (void)taosThreadMutexLock(&pStmt->queue.mutex); + int i = 0; while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) { - (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); - if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { + if (i < 10) { + taosUsleep(1); + i++; + } else { + (void)taosThreadMutexLock(&pStmt->queue.mutex); + if (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) { + (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); + } (void)taosThreadMutexUnlock(&pStmt->queue.mutex); - return false; } } + if (pStmt->queue.stopQueue) { + return false; + } SStmtQNode* orig = pStmt->queue.head; SStmtQNode* node = pStmt->queue.head->next; pStmt->queue.head = pStmt->queue.head->next; *param = node; (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); - (void)taosThreadMutexUnlock(&pStmt->queue.mutex); return true; } static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) { - (void)taosThreadMutexLock(&pStmt->queue.mutex); - pStmt->queue.tail->next = param; pStmt->queue.tail = param; - pStmt->stat.bindDataNum++; - (void)atomic_add_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); - (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); + pStmt->stat.bindDataNum++; + + (void)taosThreadMutexLock(&pStmt->queue.mutex); + (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); + (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); (void)taosThreadMutexUnlock(&pStmt->queue.mutex); } @@ -343,11 +350,9 @@ static void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) { pTblBuf->buffIdx = 1; pTblBuf->buffOffset = sizeof(*pQueue->head); - (void)taosThreadMutexLock(&pQueue->mutex); pQueue->head = pQueue->tail = pTblBuf->pCurBuff; pQueue->qRemainNum = 0; pQueue->head->next = NULL; - (void)taosThreadMutexUnlock(&pQueue->mutex); } static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) { @@ -701,7 +706,7 @@ static void* stmtBindThreadFunc(void* param) { STscStmt2* pStmt = (STscStmt2*)param; while (true) { - if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { + if (pStmt->queue.stopQueue) { break; } @@ -1764,6 +1769,7 @@ int stmtClose2(TAOS_STMT2* stmt) { pStmt->queue.stopQueue = true; (void)taosThreadMutexLock(&pStmt->queue.mutex); + (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); (void)taosThreadMutexUnlock(&pStmt->queue.mutex);