fix thread block

This commit is contained in:
pengrongkun94@qq.com 2025-02-24 15:55:35 +08:00
parent dd5bee0120
commit 1bf82888e9
1 changed files with 7 additions and 10 deletions

View File

@ -48,12 +48,12 @@ bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) {
(void)taosThreadMutexLock(&pStmt->queue.mutex); (void)taosThreadMutexLock(&pStmt->queue.mutex);
(void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
(void)taosThreadMutexUnlock(&pStmt->queue.mutex); (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) {
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
return false;
}
} }
} }
if (pStmt->queue.stopQueue) {
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
return false;
}
SStmtQNode* orig = pStmt->queue.head; SStmtQNode* orig = pStmt->queue.head;
SStmtQNode* node = pStmt->queue.head->next; SStmtQNode* node = pStmt->queue.head->next;
pStmt->queue.head = pStmt->queue.head->next; pStmt->queue.head = pStmt->queue.head->next;
@ -61,20 +61,16 @@ bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) {
(void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
*param = node;
return true; return true;
} }
void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) { void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) {
pStmt->queue.tail->next = param; pStmt->queue.tail->next = param;
pStmt->queue.tail = param; pStmt->queue.tail = param;
pStmt->stat.bindDataNum++; pStmt->stat.bindDataNum++;
(void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
(void)taosThreadMutexLock(&pStmt->queue.mutex); (void)taosThreadMutexLock(&pStmt->queue.mutex);
(void)taosThreadCondSignal(&(pStmt->queue.waitCond)); (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
(void)taosThreadMutexUnlock(&pStmt->queue.mutex); (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
@ -782,7 +778,7 @@ void* stmtBindThreadFunc(void* param) {
STscStmt* pStmt = (STscStmt*)param; STscStmt* pStmt = (STscStmt*)param;
while (true) { while (true) {
if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { if (pStmt->queue.stopQueue) {
break; break;
} }
@ -1634,6 +1630,7 @@ int stmtClose(TAOS_STMT* stmt) {
STMT_DLOG_E("start to free stmt"); STMT_DLOG_E("start to free stmt");
pStmt->queue.stopQueue = true; pStmt->queue.stopQueue = true;
(void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
(void)taosThreadMutexLock(&pStmt->queue.mutex); (void)taosThreadMutexLock(&pStmt->queue.mutex);
(void)taosThreadCondSignal(&(pStmt->queue.waitCond)); (void)taosThreadCondSignal(&(pStmt->queue.waitCond));