diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 4f912ec077..3a7ffa40d2 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -39,12 +39,19 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** } bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) { - (void)taosThreadMutexLock(&pStmt->queue.mutex); + int i = 0; while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) { - (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); - if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { + if (i < 5000) { + taosUsleep(1); + i++; + } else { + (void)taosThreadMutexLock(&pStmt->queue.mutex); + (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); (void)taosThreadMutexUnlock(&pStmt->queue.mutex); - return false; + if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { + (void)taosThreadMutexUnlock(&pStmt->queue.mutex); + return false; + } } } SStmtQNode* orig = pStmt->queue.head; @@ -53,7 +60,6 @@ bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) { *param = node; (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); - (void)taosThreadMutexUnlock(&pStmt->queue.mutex); *param = node; @@ -62,15 +68,15 @@ bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) { } void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) { - (void)taosThreadMutexLock(&pStmt->queue.mutex); pStmt->queue.tail->next = param; pStmt->queue.tail = param; pStmt->stat.bindDataNum++; (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); + + (void)taosThreadMutexLock(&pStmt->queue.mutex); (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); - (void)taosThreadMutexUnlock(&pStmt->queue.mutex); } @@ -423,11 +429,9 @@ void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) { pTblBuf->buffIdx = 1; pTblBuf->buffOffset = sizeof(*pQueue->head); - (void)taosThreadMutexLock(&pQueue->mutex); pQueue->head = pQueue->tail = pTblBuf->pCurBuff; pQueue->qRemainNum = 0; pQueue->head->next = NULL; - (void)taosThreadMutexUnlock(&pQueue->mutex); } int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {