Merge branch '3.0' into fix/internal

This commit is contained in:
Simon Guan 2025-02-27 13:48:54 +08:00
commit 7676a02979
5 changed files with 47 additions and 9 deletions

View File

@ -2273,7 +2273,10 @@ int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t c
(void)atomic_add_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
int code_s = taosStmt2AsyncBind(stmtAsyncBindThreadFunc, (void *)args);
if (code_s != TSDB_CODE_SUCCESS) {
(void)taosThreadMutexLock(&(pStmt->asyncBindParam.mutex));
(void)taosThreadCondSignal(&(pStmt->asyncBindParam.waitCond));
(void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
(void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex));
// terrno = TAOS_SYSTEM_ERROR(errno);
}

View File

@ -1690,11 +1690,11 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
return pStmt->errCode;
}
taosThreadMutexLock(&pStmt->asyncBindParam.mutex);
if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
(void)taosThreadMutexLock(&pStmt->asyncBindParam.mutex);
while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
(void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
}
taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex);
(void)taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex);
if (pStmt->sql.stbInterlaceMode) {
STMT_ERR_RET(stmtAddBatch2(pStmt));
@ -1796,11 +1796,12 @@ int stmtClose2(TAOS_STMT2* stmt) {
pStmt->bindThreadInUse = false;
}
taosThreadMutexLock(&pStmt->asyncBindParam.mutex);
if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
(void)taosThreadMutexLock(&pStmt->asyncBindParam.mutex);
while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
(void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
}
taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex);
(void)taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex);
(void)taosThreadCondDestroy(&pStmt->queue.waitCond);
(void)taosThreadMutexDestroy(&pStmt->queue.mutex);

View File

@ -15,6 +15,9 @@
#include <gtest/gtest.h>
#include <string.h>
#include <atomic>
#include <chrono>
#include <thread>
#include "clientInt.h"
#include "geosWrapper.h"
#include "osSemaphore.h"
@ -1662,7 +1665,7 @@ void stmtAsyncBindCb2(void* param, TAOS_RES* pRes, int code) {
return;
}
TEST(stmt2Case, async_order) {
void stmt2_async_test(std::atomic<bool>& stop_task) {
int CTB_NUMS = 2;
int ROW_NUMS = 2;
int CYC_NUMS = 2;
@ -1868,5 +1871,27 @@ TEST(stmt2Case, async_order) {
taosMemoryFree(tbs[i]);
}
taosMemoryFree(tbs);
stop_task = true;
}
TEST(stmt2Case, async_order) {
std::atomic<bool> stop_task(false);
std::thread t(stmt2_async_test, std::ref(stop_task));
// 等待 60 秒钟
auto start_time = std::chrono::steady_clock::now();
while (!stop_task) {
auto elapsed_time = std::chrono::steady_clock::now() - start_time;
if (std::chrono::duration_cast<std::chrono::seconds>(elapsed_time).count() > 60) {
FAIL() << "Test[stmt2_async_test] timed out";
t.detach();
break;
}
std::this_thread::sleep_for(std::chrono::seconds(1)); // 每 1s 检查一次
}
if (t.joinable()) {
t.join();
}
}
#pragma GCC diagnostic pop

View File

@ -1079,11 +1079,14 @@ end:
}
int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data) {
transformRawSSubmitTbData(data, pTableMeta->suid, pTableMeta->uid, pTableMeta->sversion);
int code = transformRawSSubmitTbData(data, pTableMeta->suid, pTableMeta->uid, pTableMeta->sversion);
if (code != 0){
return code;
}
SVgroupDataCxt* pVgCxt = NULL;
void** pp = taosHashGet(pVgroupHash, &pTableMeta->vgId, sizeof(pTableMeta->vgId));
if (NULL == pp) {
int code = createVgroupDataCxt(pTableMeta->vgId, pVgroupHash, pVgroupList, &pVgCxt);
code = createVgroupDataCxt(pTableMeta->vgId, pVgroupHash, pVgroupList, &pVgCxt);
if (code != 0){
return code;
}

View File

@ -75,6 +75,12 @@ class TDTestCase:
def restoreDnodeThread(self, p, newTdSql):
sleep(1)
count = 0
while count < 100:
newTdSql.query('show dnodes')
if newTdSql.queryResult[1][4] == "ready":
break
count+=1
sql = f"restore dnode 2"
tdLog.info(sql)