diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index 3540dc5c68..35bfa66f72 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -131,6 +131,8 @@ typedef struct SStmtQueue { SStmtQNode* head; SStmtQNode* tail; uint64_t qRemainNum; + TdThreadMutex mutex; + TdThreadCond waitCond; } SStmtQueue; typedef struct STscStmt { diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 4b993ccc1e..ad8681fbcd 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -39,31 +39,39 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** } bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) { - while (0 == atomic_load_64(&pStmt->queue.qRemainNum)) { - taosUsleep(1); - return false; + (void)taosThreadMutexLock(&pStmt->queue.mutex); + while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) { + (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); + if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { + (void)taosThreadMutexUnlock(&pStmt->queue.mutex); + return false; + } } - SStmtQNode* orig = pStmt->queue.head; - SStmtQNode* node = pStmt->queue.head->next; pStmt->queue.head = pStmt->queue.head->next; - - // taosMemoryFreeClear(orig); - *param = node; - (void)atomic_sub_fetch_64(&pStmt->queue.qRemainNum, 1); + (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); + (void)taosThreadMutexUnlock(&pStmt->queue.mutex); + + + *param = node; return true; } void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) { + (void)taosThreadMutexLock(&pStmt->queue.mutex); + pStmt->queue.tail->next = param; pStmt->queue.tail = param; pStmt->stat.bindDataNum++; (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); + (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); + + (void)taosThreadMutexUnlock(&pStmt->queue.mutex); } static int32_t stmtCreateRequest(STscStmt* pStmt) { @@ -415,9 +423,11 @@ void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) { pTblBuf->buffIdx = 1; pTblBuf->buffOffset = sizeof(*pQueue->head); + (void)taosThreadMutexLock(&pQueue->mutex); pQueue->head = pQueue->tail = pTblBuf->pCurBuff; pQueue->qRemainNum = 0; pQueue->head->next = NULL; + (void)taosThreadMutexUnlock(&pQueue->mutex); } int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) { @@ -809,6 +819,8 @@ int32_t stmtStartBindThread(STscStmt* pStmt) { } int32_t stmtInitQueue(STscStmt* pStmt) { + (void)taosThreadCondInit(&pStmt->queue.waitCond, NULL); + (void)taosThreadMutexInit(&pStmt->queue.mutex, NULL); STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head)); pStmt->queue.tail = pStmt->queue.head; @@ -1619,11 +1631,18 @@ int stmtClose(TAOS_STMT* stmt) { pStmt->queue.stopQueue = true; + (void)taosThreadMutexLock(&pStmt->queue.mutex); + (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); + (void)taosThreadMutexUnlock(&pStmt->queue.mutex); + if (pStmt->bindThreadInUse) { (void)taosThreadJoin(pStmt->bindThread, NULL); pStmt->bindThreadInUse = false; } + (void)taosThreadCondDestroy(&pStmt->queue.waitCond); + (void)taosThreadMutexDestroy(&pStmt->queue.mutex); + STMT_DLOG("stmt %p closed, stbInterlaceMode: %d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64 ", parseSqlNum=>%" PRId64 ", pStmt->stat.bindDataNum=>%" PRId64 ", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u" diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 8edd60e4b5..72166fab84 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -39,31 +39,35 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** } static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) { + (void)taosThreadMutexLock(&pStmt->queue.mutex); while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) { - taosUsleep(1); - return false; + (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); + if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { + (void)taosThreadMutexUnlock(&pStmt->queue.mutex); + return false; + } } - SStmtQNode* orig = pStmt->queue.head; - SStmtQNode* node = pStmt->queue.head->next; pStmt->queue.head = pStmt->queue.head->next; - - // taosMemoryFreeClear(orig); - *param = node; (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); + (void)taosThreadMutexUnlock(&pStmt->queue.mutex); return true; } static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) { + (void)taosThreadMutexLock(&pStmt->queue.mutex); + pStmt->queue.tail->next = param; pStmt->queue.tail = param; - pStmt->stat.bindDataNum++; (void)atomic_add_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); + (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); + + (void)taosThreadMutexUnlock(&pStmt->queue.mutex); } static int32_t stmtCreateRequest(STscStmt2* pStmt) { @@ -339,9 +343,11 @@ static void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) { pTblBuf->buffIdx = 1; pTblBuf->buffOffset = sizeof(*pQueue->head); + (void)taosThreadMutexLock(&pQueue->mutex); pQueue->head = pQueue->tail = pTblBuf->pCurBuff; pQueue->qRemainNum = 0; pQueue->head->next = NULL; + (void)taosThreadMutexUnlock(&pQueue->mutex); } static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) { @@ -735,6 +741,8 @@ static int32_t stmtStartBindThread(STscStmt2* pStmt) { } static int32_t stmtInitQueue(STscStmt2* pStmt) { + (void)taosThreadCondInit(&pStmt->queue.waitCond, NULL); + (void)taosThreadMutexInit(&pStmt->queue.mutex, NULL); STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head)); pStmt->queue.tail = pStmt->queue.head; @@ -1748,11 +1756,18 @@ int stmtClose2(TAOS_STMT2* stmt) { pStmt->queue.stopQueue = true; + (void)taosThreadMutexLock(&pStmt->queue.mutex); + (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); + (void)taosThreadMutexUnlock(&pStmt->queue.mutex); + if (pStmt->bindThreadInUse) { (void)taosThreadJoin(pStmt->bindThread, NULL); pStmt->bindThreadInUse = false; } + (void)taosThreadCondDestroy(&pStmt->queue.waitCond); + (void)taosThreadMutexDestroy(&pStmt->queue.mutex); + if (pStmt->options.asyncExecFn && !pStmt->semWaited) { if (tsem_wait(&pStmt->asyncQuerySem) != 0) { tscError("failed to wait asyncQuerySem");