Merge remote-tracking branch 'origin/3.0' into enh/3.0/TD-32686
This commit is contained in:
commit
e1d09c0042
|
@ -2272,7 +2272,10 @@ int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t c
|
||||||
(void)atomic_add_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
|
(void)atomic_add_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
|
||||||
int code_s = taosStmt2AsyncBind(stmtAsyncBindThreadFunc, (void *)args);
|
int code_s = taosStmt2AsyncBind(stmtAsyncBindThreadFunc, (void *)args);
|
||||||
if (code_s != TSDB_CODE_SUCCESS) {
|
if (code_s != TSDB_CODE_SUCCESS) {
|
||||||
|
(void)taosThreadMutexLock(&(pStmt->asyncBindParam.mutex));
|
||||||
|
(void)taosThreadCondSignal(&(pStmt->asyncBindParam.waitCond));
|
||||||
(void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
|
(void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
|
||||||
|
(void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex));
|
||||||
// terrno = TAOS_SYSTEM_ERROR(errno);
|
// terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1711,11 +1711,11 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
|
||||||
return pStmt->errCode;
|
return pStmt->errCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexLock(&pStmt->asyncBindParam.mutex);
|
(void)taosThreadMutexLock(&pStmt->asyncBindParam.mutex);
|
||||||
if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
|
while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
|
||||||
(void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
|
(void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex);
|
(void)taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex);
|
||||||
|
|
||||||
if (pStmt->sql.stbInterlaceMode) {
|
if (pStmt->sql.stbInterlaceMode) {
|
||||||
STMT_ERR_RET(stmtAddBatch2(pStmt));
|
STMT_ERR_RET(stmtAddBatch2(pStmt));
|
||||||
|
@ -1817,11 +1817,12 @@ int stmtClose2(TAOS_STMT2* stmt) {
|
||||||
pStmt->bindThreadInUse = false;
|
pStmt->bindThreadInUse = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexLock(&pStmt->asyncBindParam.mutex);
|
(void)taosThreadMutexLock(&pStmt->asyncBindParam.mutex);
|
||||||
if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
|
while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
|
||||||
(void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
|
(void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex);
|
(void)taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex);
|
||||||
|
|
||||||
(void)taosThreadCondDestroy(&pStmt->queue.waitCond);
|
(void)taosThreadCondDestroy(&pStmt->queue.waitCond);
|
||||||
(void)taosThreadMutexDestroy(&pStmt->queue.mutex);
|
(void)taosThreadMutexDestroy(&pStmt->queue.mutex);
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,9 @@
|
||||||
|
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
#include <atomic>
|
||||||
|
#include <chrono>
|
||||||
|
#include <thread>
|
||||||
#include "clientInt.h"
|
#include "clientInt.h"
|
||||||
#include "geosWrapper.h"
|
#include "geosWrapper.h"
|
||||||
#include "osSemaphore.h"
|
#include "osSemaphore.h"
|
||||||
|
@ -1706,7 +1709,7 @@ void stmtAsyncBindCb2(void* param, TAOS_RES* pRes, int code) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(stmt2Case, async_order) {
|
void stmt2_async_test(std::atomic<bool>& stop_task) {
|
||||||
int CTB_NUMS = 2;
|
int CTB_NUMS = 2;
|
||||||
int ROW_NUMS = 2;
|
int ROW_NUMS = 2;
|
||||||
int CYC_NUMS = 2;
|
int CYC_NUMS = 2;
|
||||||
|
@ -1912,6 +1915,27 @@ TEST(stmt2Case, async_order) {
|
||||||
taosMemoryFree(tbs[i]);
|
taosMemoryFree(tbs[i]);
|
||||||
}
|
}
|
||||||
taosMemoryFree(tbs);
|
taosMemoryFree(tbs);
|
||||||
|
stop_task = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(stmt2Case, async_order) {
|
||||||
|
std::atomic<bool> stop_task(false);
|
||||||
|
std::thread t(stmt2_async_test, std::ref(stop_task));
|
||||||
|
|
||||||
|
// 等待 60 秒钟
|
||||||
|
auto start_time = std::chrono::steady_clock::now();
|
||||||
|
while (!stop_task) {
|
||||||
|
auto elapsed_time = std::chrono::steady_clock::now() - start_time;
|
||||||
|
if (std::chrono::duration_cast<std::chrono::seconds>(elapsed_time).count() > 60) {
|
||||||
|
FAIL() << "Test[stmt2_async_test] timed out";
|
||||||
|
t.detach();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
std::this_thread::sleep_for(std::chrono::seconds(1)); // 每 1s 检查一次
|
||||||
|
}
|
||||||
|
if (t.joinable()) {
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(stmt2Case, rowformat_bind) {
|
TEST(stmt2Case, rowformat_bind) {
|
||||||
|
@ -2067,4 +2091,5 @@ TEST(stmt2Case, rowformat_bind) {
|
||||||
do_query(taos, "drop database if exists stmt2_testdb_16");
|
do_query(taos, "drop database if exists stmt2_testdb_16");
|
||||||
taos_close(taos);
|
taos_close(taos);
|
||||||
}
|
}
|
||||||
|
|
||||||
#pragma GCC diagnostic pop
|
#pragma GCC diagnostic pop
|
||||||
|
|
|
@ -75,6 +75,12 @@ class TDTestCase:
|
||||||
|
|
||||||
def restoreDnodeThread(self, p, newTdSql):
|
def restoreDnodeThread(self, p, newTdSql):
|
||||||
sleep(1)
|
sleep(1)
|
||||||
|
count = 0
|
||||||
|
while count < 100:
|
||||||
|
newTdSql.query('show dnodes')
|
||||||
|
if newTdSql.queryResult[1][4] == "ready":
|
||||||
|
break
|
||||||
|
count+=1
|
||||||
|
|
||||||
sql = f"restore dnode 2"
|
sql = f"restore dnode 2"
|
||||||
tdLog.info(sql)
|
tdLog.info(sql)
|
||||||
|
|
Loading…
Reference in New Issue