From dd5bee0120dbaea19b261715972114e96aed4dfe Mon Sep 17 00:00:00 2001 From: "pengrongkun94@qq.com" Date: Mon, 24 Feb 2025 15:20:40 +0800 Subject: [PATCH 1/6] test performance --- source/client/src/clientStmt.c | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 4f912ec077..3a7ffa40d2 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -39,12 +39,19 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** } bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) { - (void)taosThreadMutexLock(&pStmt->queue.mutex); + int i = 0; while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) { - (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); - if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { + if (i < 5000) { + taosUsleep(1); + i++; + } else { + (void)taosThreadMutexLock(&pStmt->queue.mutex); + (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); (void)taosThreadMutexUnlock(&pStmt->queue.mutex); - return false; + if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { + (void)taosThreadMutexUnlock(&pStmt->queue.mutex); + return false; + } } } SStmtQNode* orig = pStmt->queue.head; @@ -53,7 +60,6 @@ bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) { *param = node; (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); - (void)taosThreadMutexUnlock(&pStmt->queue.mutex); *param = node; @@ -62,15 +68,15 @@ bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) { } void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) { - (void)taosThreadMutexLock(&pStmt->queue.mutex); pStmt->queue.tail->next = param; pStmt->queue.tail = param; pStmt->stat.bindDataNum++; (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); + + (void)taosThreadMutexLock(&pStmt->queue.mutex); (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); - (void)taosThreadMutexUnlock(&pStmt->queue.mutex); } @@ -423,11 +429,9 @@ void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) { pTblBuf->buffIdx = 1; pTblBuf->buffOffset = sizeof(*pQueue->head); - (void)taosThreadMutexLock(&pQueue->mutex); pQueue->head = pQueue->tail = pTblBuf->pCurBuff; pQueue->qRemainNum = 0; pQueue->head->next = NULL; - (void)taosThreadMutexUnlock(&pQueue->mutex); } int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) { From 1bf82888e9e35e4b8d69298fa2e7f903780dbfa7 Mon Sep 17 00:00:00 2001 From: "pengrongkun94@qq.com" Date: Mon, 24 Feb 2025 15:55:35 +0800 Subject: [PATCH 2/6] fix thread block --- source/client/src/clientStmt.c | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 3a7ffa40d2..932b5e8980 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -48,12 +48,12 @@ bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) { (void)taosThreadMutexLock(&pStmt->queue.mutex); (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); (void)taosThreadMutexUnlock(&pStmt->queue.mutex); - if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { - (void)taosThreadMutexUnlock(&pStmt->queue.mutex); - return false; - } } } + if (pStmt->queue.stopQueue) { + (void)taosThreadMutexUnlock(&pStmt->queue.mutex); + return false; + } SStmtQNode* orig = pStmt->queue.head; SStmtQNode* node = pStmt->queue.head->next; pStmt->queue.head = pStmt->queue.head->next; @@ -61,20 +61,16 @@ bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) { (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); - - *param = node; - return true; } void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) { - pStmt->queue.tail->next = param; pStmt->queue.tail = param; pStmt->stat.bindDataNum++; (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); - + (void)taosThreadMutexLock(&pStmt->queue.mutex); (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); (void)taosThreadMutexUnlock(&pStmt->queue.mutex); @@ -782,7 +778,7 @@ void* stmtBindThreadFunc(void* param) { STscStmt* pStmt = (STscStmt*)param; while (true) { - if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { + if (pStmt->queue.stopQueue) { break; } @@ -1634,6 +1630,7 @@ int stmtClose(TAOS_STMT* stmt) { STMT_DLOG_E("start to free stmt"); pStmt->queue.stopQueue = true; + (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); (void)taosThreadMutexLock(&pStmt->queue.mutex); (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); From 417d1efa5d3131ca3d2faeeff248fb34b01202cb Mon Sep 17 00:00:00 2001 From: "pengrongkun94@qq.com" Date: Mon, 24 Feb 2025 16:33:01 +0800 Subject: [PATCH 3/6] fix stmt2 --- source/client/src/clientStmt2.c | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 8e517eb5f2..bf98ba6358 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -39,34 +39,40 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** } static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) { - (void)taosThreadMutexLock(&pStmt->queue.mutex); + int i = 0; while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) { - (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); - if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { + if (i < 5000) { + taosUsleep(1); + i++; + } else { + (void)taosThreadMutexLock(&pStmt->queue.mutex); + (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); (void)taosThreadMutexUnlock(&pStmt->queue.mutex); - return false; } } + if (pStmt->queue.stopQueue) { + (void)taosThreadMutexUnlock(&pStmt->queue.mutex); + return false; + } SStmtQNode* orig = pStmt->queue.head; SStmtQNode* node = pStmt->queue.head->next; pStmt->queue.head = pStmt->queue.head->next; *param = node; (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); - (void)taosThreadMutexUnlock(&pStmt->queue.mutex); return true; } static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) { - (void)taosThreadMutexLock(&pStmt->queue.mutex); - pStmt->queue.tail->next = param; pStmt->queue.tail = param; - pStmt->stat.bindDataNum++; - (void)atomic_add_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); - (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); + pStmt->stat.bindDataNum++; + (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); + + (void)taosThreadMutexLock(&pStmt->queue.mutex); + (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); (void)taosThreadMutexUnlock(&pStmt->queue.mutex); } @@ -343,11 +349,9 @@ static void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) { pTblBuf->buffIdx = 1; pTblBuf->buffOffset = sizeof(*pQueue->head); - (void)taosThreadMutexLock(&pQueue->mutex); pQueue->head = pQueue->tail = pTblBuf->pCurBuff; pQueue->qRemainNum = 0; pQueue->head->next = NULL; - (void)taosThreadMutexUnlock(&pQueue->mutex); } static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) { @@ -701,7 +705,7 @@ static void* stmtBindThreadFunc(void* param) { STscStmt2* pStmt = (STscStmt2*)param; while (true) { - if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) { + if (pStmt->queue.stopQueue) { break; } @@ -1762,6 +1766,7 @@ int stmtClose2(TAOS_STMT2* stmt) { STMT_DLOG_E("start to free stmt"); pStmt->queue.stopQueue = true; + (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); (void)taosThreadMutexLock(&pStmt->queue.mutex); (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); From cbd9504e8fd208308725eb6f147c5eb442b2ef92 Mon Sep 17 00:00:00 2001 From: "pengrongkun94@qq.com" Date: Mon, 24 Feb 2025 17:16:08 +0800 Subject: [PATCH 4/6] fix deadlock problem --- source/client/src/clientStmt.c | 10 ++++++---- source/client/src/clientStmt2.c | 8 +++++--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 932b5e8980..279078b759 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -46,7 +46,9 @@ bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) { i++; } else { (void)taosThreadMutexLock(&pStmt->queue.mutex); - (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); + if (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) { + (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); + } (void)taosThreadMutexUnlock(&pStmt->queue.mutex); } } @@ -69,9 +71,9 @@ void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) { pStmt->queue.tail = param; pStmt->stat.bindDataNum++; - (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); (void)taosThreadMutexLock(&pStmt->queue.mutex); + (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); (void)taosThreadMutexUnlock(&pStmt->queue.mutex); } @@ -1630,9 +1632,9 @@ int stmtClose(TAOS_STMT* stmt) { STMT_DLOG_E("start to free stmt"); pStmt->queue.stopQueue = true; - (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); - + (void)taosThreadMutexLock(&pStmt->queue.mutex); + (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); (void)taosThreadMutexUnlock(&pStmt->queue.mutex); diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index bf98ba6358..2d5029ca4d 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -46,7 +46,9 @@ static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) { i++; } else { (void)taosThreadMutexLock(&pStmt->queue.mutex); - (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); + if (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) { + (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex); + } (void)taosThreadMutexUnlock(&pStmt->queue.mutex); } } @@ -69,9 +71,9 @@ static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) { pStmt->queue.tail = param; pStmt->stat.bindDataNum++; - (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); (void)taosThreadMutexLock(&pStmt->queue.mutex); + (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); (void)taosThreadMutexUnlock(&pStmt->queue.mutex); } @@ -1766,9 +1768,9 @@ int stmtClose2(TAOS_STMT2* stmt) { STMT_DLOG_E("start to free stmt"); pStmt->queue.stopQueue = true; - (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); (void)taosThreadMutexLock(&pStmt->queue.mutex); + (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); (void)taosThreadCondSignal(&(pStmt->queue.waitCond)); (void)taosThreadMutexUnlock(&pStmt->queue.mutex); From 1e569acc3356116c24301a0bcefbd83a26928273 Mon Sep 17 00:00:00 2001 From: "pengrongkun94@qq.com" Date: Tue, 25 Feb 2025 10:45:16 +0800 Subject: [PATCH 5/6] change sleep loop times --- source/client/src/clientStmt.c | 2 +- source/client/src/clientStmt2.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 279078b759..b16d1e8aac 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -41,7 +41,7 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) { int i = 0; while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) { - if (i < 5000) { + if (i < 10) { taosUsleep(1); i++; } else { diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 2d5029ca4d..bc8b03ffd4 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -41,7 +41,7 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) { int i = 0; while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) { - if (i < 5000) { + if (i < 10) { taosUsleep(1); i++; } else { From 09a8a8131634c4ddaab45401d2b6d0af00274858 Mon Sep 17 00:00:00 2001 From: "pengrongkun94@qq.com" Date: Tue, 25 Feb 2025 15:06:59 +0800 Subject: [PATCH 6/6] fix review --- source/client/src/clientStmt.c | 1 - source/client/src/clientStmt2.c | 1 - 2 files changed, 2 deletions(-) diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index b16d1e8aac..165c7b8bfb 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -53,7 +53,6 @@ bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) { } } if (pStmt->queue.stopQueue) { - (void)taosThreadMutexUnlock(&pStmt->queue.mutex); return false; } SStmtQNode* orig = pStmt->queue.head; diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index bc8b03ffd4..75d763ec71 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -53,7 +53,6 @@ static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) { } } if (pStmt->queue.stopQueue) { - (void)taosThreadMutexUnlock(&pStmt->queue.mutex); return false; } SStmtQNode* orig = pStmt->queue.head;