Merge pull request #29888 from taosdata/fix/main/TD-33653
fix(stmt/stmt2)[TD-33653]:fix stmt/stmt2 insert perfomance degradation
This commit is contained in:
commit
51a9257d87
|
@ -39,38 +39,41 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void**
|
||||||
}
|
}
|
||||||
|
|
||||||
bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) {
|
bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) {
|
||||||
(void)taosThreadMutexLock(&pStmt->queue.mutex);
|
int i = 0;
|
||||||
while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
|
while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
|
||||||
(void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
|
if (i < 10) {
|
||||||
if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) {
|
taosUsleep(1);
|
||||||
|
i++;
|
||||||
|
} else {
|
||||||
|
(void)taosThreadMutexLock(&pStmt->queue.mutex);
|
||||||
|
if (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
|
||||||
|
(void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
|
||||||
|
}
|
||||||
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
|
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (pStmt->queue.stopQueue) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
SStmtQNode* orig = pStmt->queue.head;
|
SStmtQNode* orig = pStmt->queue.head;
|
||||||
SStmtQNode* node = pStmt->queue.head->next;
|
SStmtQNode* node = pStmt->queue.head->next;
|
||||||
pStmt->queue.head = pStmt->queue.head->next;
|
pStmt->queue.head = pStmt->queue.head->next;
|
||||||
*param = node;
|
*param = node;
|
||||||
|
|
||||||
(void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
|
(void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
|
||||||
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
|
|
||||||
|
|
||||||
|
|
||||||
*param = node;
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) {
|
void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) {
|
||||||
(void)taosThreadMutexLock(&pStmt->queue.mutex);
|
|
||||||
|
|
||||||
pStmt->queue.tail->next = param;
|
pStmt->queue.tail->next = param;
|
||||||
pStmt->queue.tail = param;
|
pStmt->queue.tail = param;
|
||||||
|
|
||||||
pStmt->stat.bindDataNum++;
|
pStmt->stat.bindDataNum++;
|
||||||
|
|
||||||
|
(void)taosThreadMutexLock(&pStmt->queue.mutex);
|
||||||
(void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
|
(void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
|
||||||
(void)taosThreadCondSignal(&(pStmt->queue.waitCond));
|
(void)taosThreadCondSignal(&(pStmt->queue.waitCond));
|
||||||
|
|
||||||
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
|
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -423,11 +426,9 @@ void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
|
||||||
pTblBuf->buffIdx = 1;
|
pTblBuf->buffIdx = 1;
|
||||||
pTblBuf->buffOffset = sizeof(*pQueue->head);
|
pTblBuf->buffOffset = sizeof(*pQueue->head);
|
||||||
|
|
||||||
(void)taosThreadMutexLock(&pQueue->mutex);
|
|
||||||
pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
|
pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
|
||||||
pQueue->qRemainNum = 0;
|
pQueue->qRemainNum = 0;
|
||||||
pQueue->head->next = NULL;
|
pQueue->head->next = NULL;
|
||||||
(void)taosThreadMutexUnlock(&pQueue->mutex);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
|
int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
|
||||||
|
@ -778,7 +779,7 @@ void* stmtBindThreadFunc(void* param) {
|
||||||
STscStmt* pStmt = (STscStmt*)param;
|
STscStmt* pStmt = (STscStmt*)param;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) {
|
if (pStmt->queue.stopQueue) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1630,8 +1631,9 @@ int stmtClose(TAOS_STMT* stmt) {
|
||||||
STMT_DLOG_E("start to free stmt");
|
STMT_DLOG_E("start to free stmt");
|
||||||
|
|
||||||
pStmt->queue.stopQueue = true;
|
pStmt->queue.stopQueue = true;
|
||||||
|
|
||||||
(void)taosThreadMutexLock(&pStmt->queue.mutex);
|
(void)taosThreadMutexLock(&pStmt->queue.mutex);
|
||||||
|
(void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
|
||||||
(void)taosThreadCondSignal(&(pStmt->queue.waitCond));
|
(void)taosThreadCondSignal(&(pStmt->queue.waitCond));
|
||||||
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
|
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
|
||||||
|
|
||||||
|
|
|
@ -39,34 +39,41 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void**
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) {
|
static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) {
|
||||||
(void)taosThreadMutexLock(&pStmt->queue.mutex);
|
int i = 0;
|
||||||
while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
|
while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
|
||||||
(void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
|
if (i < 10) {
|
||||||
if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) {
|
taosUsleep(1);
|
||||||
|
i++;
|
||||||
|
} else {
|
||||||
|
(void)taosThreadMutexLock(&pStmt->queue.mutex);
|
||||||
|
if (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
|
||||||
|
(void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
|
||||||
|
}
|
||||||
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
|
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (pStmt->queue.stopQueue) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
SStmtQNode* orig = pStmt->queue.head;
|
SStmtQNode* orig = pStmt->queue.head;
|
||||||
SStmtQNode* node = pStmt->queue.head->next;
|
SStmtQNode* node = pStmt->queue.head->next;
|
||||||
pStmt->queue.head = pStmt->queue.head->next;
|
pStmt->queue.head = pStmt->queue.head->next;
|
||||||
*param = node;
|
*param = node;
|
||||||
|
|
||||||
(void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
|
(void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
|
||||||
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) {
|
static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) {
|
||||||
(void)taosThreadMutexLock(&pStmt->queue.mutex);
|
|
||||||
|
|
||||||
pStmt->queue.tail->next = param;
|
pStmt->queue.tail->next = param;
|
||||||
pStmt->queue.tail = param;
|
pStmt->queue.tail = param;
|
||||||
pStmt->stat.bindDataNum++;
|
|
||||||
(void)atomic_add_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
|
|
||||||
(void)taosThreadCondSignal(&(pStmt->queue.waitCond));
|
|
||||||
|
|
||||||
|
pStmt->stat.bindDataNum++;
|
||||||
|
|
||||||
|
(void)taosThreadMutexLock(&pStmt->queue.mutex);
|
||||||
|
(void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
|
||||||
|
(void)taosThreadCondSignal(&(pStmt->queue.waitCond));
|
||||||
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
|
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -343,11 +350,9 @@ static void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
|
||||||
pTblBuf->buffIdx = 1;
|
pTblBuf->buffIdx = 1;
|
||||||
pTblBuf->buffOffset = sizeof(*pQueue->head);
|
pTblBuf->buffOffset = sizeof(*pQueue->head);
|
||||||
|
|
||||||
(void)taosThreadMutexLock(&pQueue->mutex);
|
|
||||||
pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
|
pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
|
||||||
pQueue->qRemainNum = 0;
|
pQueue->qRemainNum = 0;
|
||||||
pQueue->head->next = NULL;
|
pQueue->head->next = NULL;
|
||||||
(void)taosThreadMutexUnlock(&pQueue->mutex);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) {
|
static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) {
|
||||||
|
@ -701,7 +706,7 @@ static void* stmtBindThreadFunc(void* param) {
|
||||||
STscStmt2* pStmt = (STscStmt2*)param;
|
STscStmt2* pStmt = (STscStmt2*)param;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) {
|
if (pStmt->queue.stopQueue) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1764,6 +1769,7 @@ int stmtClose2(TAOS_STMT2* stmt) {
|
||||||
pStmt->queue.stopQueue = true;
|
pStmt->queue.stopQueue = true;
|
||||||
|
|
||||||
(void)taosThreadMutexLock(&pStmt->queue.mutex);
|
(void)taosThreadMutexLock(&pStmt->queue.mutex);
|
||||||
|
(void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
|
||||||
(void)taosThreadCondSignal(&(pStmt->queue.waitCond));
|
(void)taosThreadCondSignal(&(pStmt->queue.waitCond));
|
||||||
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
|
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue