From 7025be999b72c0a85f5679a0a18d3e26d81c7a7b Mon Sep 17 00:00:00 2001 From: "pengrongkun94@qq.com" Date: Mon, 10 Feb 2025 18:32:34 +0800 Subject: [PATCH 1/5] bind async --- include/client/taos.h | 1 + source/client/inc/clientStmt2.h | 7 +++- source/client/src/clientMain.c | 20 ++++++++-- source/client/src/clientStmt2.c | 68 ++++++++++++++++++++++++++++----- 4 files changed, 80 insertions(+), 16 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 17f97d3d3d..59ada33d91 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -247,6 +247,7 @@ typedef struct TAOS_STMT2_BINDV { DLL_EXPORT TAOS_STMT2 *taos_stmt2_init(TAOS *taos, TAOS_STMT2_OPTION *option); DLL_EXPORT int taos_stmt2_prepare(TAOS_STMT2 *stmt, const char *sql, unsigned long length); DLL_EXPORT int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx); +DLL_EXPORT int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, void *param); DLL_EXPORT int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows); DLL_EXPORT int taos_stmt2_close(TAOS_STMT2 *stmt); DLL_EXPORT int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert); diff --git a/source/client/inc/clientStmt2.h b/source/client/inc/clientStmt2.h index 05a4c849f8..e55d9a048f 100644 --- a/source/client/inc/clientStmt2.h +++ b/source/client/inc/clientStmt2.h @@ -153,8 +153,10 @@ typedef struct { char *db; int64_t reqid; int32_t errCode; - tsem_t asyncQuerySem; - bool semWaited; + tsem_t asyncExecSem; + bool execSemWaited; + tsem_t asyncBindSem; + bool bindSemWaited; SStmtStatInfo stat; } STscStmt2; /* @@ -226,6 +228,7 @@ int stmtGetParamNum2(TAOS_STMT2 *stmt, int *nums); int stmtIsInsert2(TAOS_STMT2 *stmt, int *insert); TAOS_RES *stmtUseResult2(TAOS_STMT2 *stmt); const char *stmtErrstr2(TAOS_STMT2 *stmt); +int stmt2AsyncBind(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, void *param); #ifdef __cplusplus } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 190a724151..2a27894fc3 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -2166,11 +2166,18 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col } STscStmt2 *pStmt = (STscStmt2 *)stmt; - if (pStmt->options.asyncExecFn && !pStmt->semWaited) { - if (tsem_wait(&pStmt->asyncQuerySem) != 0) { - tscError("wait async query sem failed"); + if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) { + if (tsem_wait(&pStmt->asyncExecSem) != 0) { + tscError("wait asyncExecSem failed"); } - pStmt->semWaited = true; + pStmt->execSemWaited = true; + } + + if (!pStmt->bindSemWaited) { + if (tsem_wait(&pStmt->asyncBindSem) != 0) { + tscError("wait asyncBindSem failed"); + } + pStmt->bindSemWaited = true; } SSHashObj *hashTbnames = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR)); @@ -2243,6 +2250,11 @@ out: return code; } +int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, + void *param) { + return stmt2AsyncBind(stmt, bindv,col_idx, fp, param); +} + int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows) { if (stmt == NULL) { tscError("NULL parameter for %s", __FUNCTION__); diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 8e517eb5f2..f4e0fefb00 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -835,13 +835,13 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) { pStmt->sql.siInfo.tableColsReady = true; if (pStmt->options.asyncExecFn) { - if (tsem_init(&pStmt->asyncQuerySem, 0, 1) != 0) { + if (tsem_init(&pStmt->asyncExecSem, 0, 1) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); (void)stmtClose(pStmt); return NULL; } } - pStmt->semWaited = false; + pStmt->execSemWaited = false; STMT_LOG_SEQ(STMT_INIT); @@ -1656,8 +1656,8 @@ static void asyncQueryCb(void* userdata, TAOS_RES* res, int code) { (void)stmtCleanExecInfo(pStmt, (code ? false : true), false); ++pStmt->sql.runTimes; - if (tsem_post(&pStmt->asyncQuerySem) != 0) { - tscError("failed to post asyncQuerySem"); + if (tsem_post(&pStmt->asyncExecSem) != 0) { + tscError("failed to post asyncExecSem"); } } @@ -1746,7 +1746,7 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) { pRequest->body.queryFp = asyncQueryCb; ((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt; - pStmt->semWaited = false; + pStmt->execSemWaited = false; launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper); } @@ -1775,9 +1775,9 @@ int stmtClose2(TAOS_STMT2* stmt) { (void)taosThreadCondDestroy(&pStmt->queue.waitCond); (void)taosThreadMutexDestroy(&pStmt->queue.mutex); - if (pStmt->options.asyncExecFn && !pStmt->semWaited) { - if (tsem_wait(&pStmt->asyncQuerySem) != 0) { - tscError("failed to wait asyncQuerySem"); + if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) { + if (tsem_wait(&pStmt->asyncExecSem) != 0) { + tscError("failed to wait asyncExecSem"); } } @@ -1795,8 +1795,8 @@ int stmtClose2(TAOS_STMT2* stmt) { STMT_ERR_RET(stmtCleanSQLInfo(pStmt)); if (pStmt->options.asyncExecFn) { - if (tsem_destroy(&pStmt->asyncQuerySem) != 0) { - tscError("failed to destroy asyncQuerySem"); + if (tsem_destroy(&pStmt->asyncExecSem) != 0) { + tscError("failed to destroy asyncExecSem"); } } taosMemoryFree(stmt); @@ -1925,3 +1925,51 @@ TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) { return pStmt->exec.pRequest; } +typedef struct { + TAOS_STMT2* stmt; + TAOS_STMT2_BINDV* bindv; + int32_t col_idx; + __taos_async_fn_t fp; + void* param; +} ThreadArgs; + +static void* stmtAsyncBindThreadFunc(void* args) { + ThreadArgs* targs = (ThreadArgs*)args; + + int code = taos_stmt2_bind_param(targs->stmt, targs->bindv, targs->col_idx); + targs->fp(targs->param, NULL, code); + taosMemoryFree(args); + + return NULL; +} + +int stmt2AsyncBind(TAOS_STMT2* stmt, TAOS_STMT2_BINDV* bindv, int32_t col_idx, __taos_async_fn_t fp, void* param) { + STscStmt2* pStmt = (STscStmt2*)stmt; + + TdThreadAttr thAttr; + if (taosThreadAttrInit(&thAttr) != 0) { + return TSDB_CODE_TSC_INTERNAL_ERROR; + } + if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED) != 0) { + return TSDB_CODE_TSC_INTERNAL_ERROR; + } + + ThreadArgs* args = (ThreadArgs*)taosMemoryMalloc(sizeof(ThreadArgs)); + args->stmt = stmt; + args->bindv = bindv; + args->col_idx = col_idx; + args->fp = fp; + args->param = param; + + if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtAsyncBindThreadFunc, args) != 0) { + (void)taosThreadAttrDestroy(&thAttr); + terrno = TAOS_SYSTEM_ERROR(errno); + STMT_ERR_RET(terrno); + } + + // pStmt->bindThreadInUse = true; + + (void)taosThreadAttrDestroy(&thAttr); + + return TSDB_CODE_SUCCESS; +} \ No newline at end of file From 2f67ebbd5083602c6f7d30a4a509ff832bec05dd Mon Sep 17 00:00:00 2001 From: "pengrongkun94@qq.com" Date: Wed, 12 Feb 2025 10:27:58 +0800 Subject: [PATCH 2/5] add test --- source/client/test/stmt2Test.cpp | 107 +++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/source/client/test/stmt2Test.cpp b/source/client/test/stmt2Test.cpp index 91f884941f..cb5afedec0 100644 --- a/source/client/test/stmt2Test.cpp +++ b/source/client/test/stmt2Test.cpp @@ -1543,4 +1543,111 @@ TEST(stmt2Case, errcode) { code = taos_stmt_prepare(stmt, sql, 0); checkError(stmt, code); } + +void stmtAsyncBindCb(void* param, TAOS_RES* pRes, int code) { + if (code != TSDB_CODE_SUCCESS) { + taos_errstr(pRes); + } + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + return; +} + +TEST(stmt2Case, async_order) { + int CTB_NUMS = 3; + int ROW_NUMS = 3; + int CYC_NUMS = 3; + + TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0); + TAOS_STMT2_OPTION option = {0, true, true, stmtAsyncQueryCb, NULL}; + char* sql = "insert into ? values(?,?)"; + + do_query(taos, "drop database if exists stmt2_testdb_15"); + do_query(taos, "create database IF NOT EXISTS stmt2_testdb_15"); + do_query(taos, "create stable stmt2_testdb_15.stb (ts timestamp, b binary(10)) tags(t1 int, t2 binary(10))"); + do_query(taos, "use stmt2_testdb_15"); + + TAOS_STMT2* stmt = taos_stmt2_init(taos, &option); + ASSERT_NE(stmt, nullptr); + int code = taos_stmt2_prepare(stmt, sql, 0); + checkError(stmt, code); + int total_affected = 0; + + // tbname + char** tbs = (char**)taosMemoryMalloc(CTB_NUMS * sizeof(char*)); + for (int i = 0; i < CTB_NUMS; i++) { + tbs[i] = (char*)taosMemoryMalloc(sizeof(char) * 20); + sprintf(tbs[i], "ctb_%d", i); + char* tmp = (char*)taosMemoryMalloc(sizeof(char) * 100); + sprintf(tmp, "create table stmt2_testdb_15.%s using stmt2_testdb_15.stb tags(0, 'after')", tbs[i]); + do_query(taos, tmp); + } + + // case 1 : bind_a->exec_a->bind_a->exec_a->... + for (int r = 0; r < CYC_NUMS; r++) { + // col params + int64_t** ts = (int64_t**)taosMemoryMalloc(CTB_NUMS * sizeof(int64_t*)); + char** b = (char**)taosMemoryMalloc(CTB_NUMS * sizeof(char*)); + int* ts_len = (int*)taosMemoryMalloc(ROW_NUMS * sizeof(int)); + int* b_len = (int*)taosMemoryMalloc(ROW_NUMS * sizeof(int)); + for (int i = 0; i < ROW_NUMS; i++) { + ts_len[i] = sizeof(int64_t); + b_len[i] = 1; + } + for (int i = 0; i < CTB_NUMS; i++) { + ts[i] = (int64_t*)taosMemoryMalloc(ROW_NUMS * sizeof(int64_t)); + b[i] = (char*)taosMemoryMalloc(ROW_NUMS * sizeof(char)); + for (int j = 0; j < ROW_NUMS; j++) { + ts[i][j] = 1591060628000 + r * 100000 + j; + b[i][j] = 'a' + j; + } + } + + // bind params + TAOS_STMT2_BIND** paramv = (TAOS_STMT2_BIND**)taosMemoryMalloc(CTB_NUMS * sizeof(TAOS_STMT2_BIND*)); + + for (int i = 0; i < CTB_NUMS; i++) { + // create col params + paramv[i] = (TAOS_STMT2_BIND*)taosMemoryMalloc(2 * sizeof(TAOS_STMT2_BIND)); + paramv[i][0] = {TSDB_DATA_TYPE_TIMESTAMP, &ts[i][0], &ts_len[0], NULL, ROW_NUMS}; + paramv[i][1] = {TSDB_DATA_TYPE_BINARY, &b[i][0], &b_len[0], NULL, ROW_NUMS}; + } + // bind + TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv}; + // code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, NULL); + code = taos_stmt2_bind_param(stmt, &bindv, -1); + + checkError(stmt, code); + + // exec + int affected = 0; + code = taos_stmt2_exec(stmt, &affected); + total_affected += affected; + checkError(stmt, code); + + for (int i = 0; i < CTB_NUMS; i++) { + taosMemoryFree(paramv[i]); + taosMemoryFree(ts[i]); + taosMemoryFree(b[i]); + } + taosMemoryFree(ts); + taosMemoryFree(b); + taosMemoryFree(ts_len); + taosMemoryFree(b_len); + taosMemoryFree(paramv); + } + ASSERT_EQ(total_affected, CYC_NUMS * ROW_NUMS * CTB_NUMS); + + // case 2 : bind_a->bind_a->bind_a->exec_a->... + // case 3 : bind->exec_a->bind->exec_a->... + // case 4 : bind_a->exec->bind_a->exec->... + // case 5 : bind_a->close + // case 6 : exec_a->close + + for (int i = 0; i < CTB_NUMS; i++) { + taosMemoryFree(tbs[i]); + } + taosMemoryFree(tbs); + + taos_stmt2_close(stmt); +} #pragma GCC diagnostic pop From b55980d71dc4c70b04f2c5940f0e35d6338fc543 Mon Sep 17 00:00:00 2001 From: "pengrongkun94@qq.com" Date: Thu, 13 Feb 2025 10:16:09 +0800 Subject: [PATCH 3/5] fix some thread sync problem --- source/client/inc/clientStmt2.h | 21 ++++++++----- source/client/src/clientMain.c | 27 +++++++++++----- source/client/src/clientStmt2.c | 52 ++++++++++++++++++++++++++++--- source/client/test/stmt2Test.cpp | 53 +++++++++++++++----------------- 4 files changed, 104 insertions(+), 49 deletions(-) diff --git a/source/client/inc/clientStmt2.h b/source/client/inc/clientStmt2.h index e55d9a048f..feec9ec337 100644 --- a/source/client/inc/clientStmt2.h +++ b/source/client/inc/clientStmt2.h @@ -134,6 +134,12 @@ uint64_t qRemainNum; } SStmtQueue; */ +typedef struct AsyncBindParam { + TdThreadMutex mutex; + TdThreadCond waitCond; + uint8_t asyncBindNum; +} AsyncBindParam; + typedef struct { STscObj *taos; SCatalog *pCatalog; @@ -150,14 +156,13 @@ typedef struct { SStmtExecInfo exec; SStmtBindInfo bInfo; - char *db; - int64_t reqid; - int32_t errCode; - tsem_t asyncExecSem; - bool execSemWaited; - tsem_t asyncBindSem; - bool bindSemWaited; - SStmtStatInfo stat; + char *db; + int64_t reqid; + int32_t errCode; + tsem_t asyncExecSem; + bool execSemWaited; + AsyncBindParam asyncBindParam; + SStmtStatInfo stat; } STscStmt2; /* extern char *gStmtStatusStr[]; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index c472be8bc8..ba67ff7f76 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -2166,6 +2166,11 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col } STscStmt2 *pStmt = (STscStmt2 *)stmt; + if( atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum)>1) { + tscError("async bind param is still working, please try again later"); + return TSDB_CODE_TSC_STMT_API_ERROR; + } + if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) { if (tsem_wait(&pStmt->asyncExecSem) != 0) { tscError("wait asyncExecSem failed"); @@ -2173,13 +2178,6 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col pStmt->execSemWaited = true; } - if (!pStmt->bindSemWaited) { - if (tsem_wait(&pStmt->asyncBindSem) != 0) { - tscError("wait asyncBindSem failed"); - } - pStmt->bindSemWaited = true; - } - SSHashObj *hashTbnames = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR)); if (NULL == hashTbnames) { tscError("stmt2 bind failed: %s", tstrerror(terrno)); @@ -2252,7 +2250,20 @@ out: int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, void *param) { - return stmt2AsyncBind(stmt, bindv,col_idx, fp, param); + if (stmt == NULL || bindv == NULL || fp == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + STscStmt2 *pStmt = (STscStmt2 *)stmt; + (void)atomic_add_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1); + int code = stmt2AsyncBind(stmt, bindv, col_idx, fp, param); + if (code != TSDB_CODE_SUCCESS) { + (void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1); + // terrno = TAOS_SYSTEM_ERROR(errno); + } + + return code; } int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows) { diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 5c8e2655e6..597ec95f23 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -752,6 +752,14 @@ static int32_t stmtInitQueue(STscStmt2* pStmt) { return TSDB_CODE_SUCCESS; } +static int32_t stmtIniAsyncBind(STscStmt2* pStmt) { + (void)taosThreadCondInit(&pStmt->asyncBindParam.waitCond, NULL); + (void)taosThreadMutexInit(&pStmt->asyncBindParam.mutex, NULL); + pStmt->asyncBindParam.asyncBindNum = 0; + + return TSDB_CODE_SUCCESS; +} + static int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) { pTblBuf->buffUnit = sizeof(SStmtQNode); pTblBuf->buffSize = pTblBuf->buffUnit * 1000; @@ -812,13 +820,13 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) { pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp); pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); if (NULL == pStmt->sql.siInfo.pTableHash) { - (void)stmtClose(pStmt); + (void)stmtClose2(pStmt); return NULL; } pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES); if (NULL == pStmt->sql.siInfo.pTableCols) { terrno = terrno; - (void)stmtClose(pStmt); + (void)stmtClose2(pStmt); return NULL; } @@ -831,7 +839,7 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) { } if (TSDB_CODE_SUCCESS != code) { terrno = code; - (void)stmtClose(pStmt); + (void)stmtClose2(pStmt); return NULL; } } @@ -840,10 +848,17 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) { if (pStmt->options.asyncExecFn) { if (tsem_init(&pStmt->asyncExecSem, 0, 1) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); - (void)stmtClose(pStmt); + (void)stmtClose2(pStmt); return NULL; } } + code = stmtIniAsyncBind(pStmt); + if (TSDB_CODE_SUCCESS != code) { + terrno = code; + (void)stmtClose2(pStmt); + return NULL; + } + pStmt->execSemWaited = false; STMT_LOG_SEQ(STMT_INIT); @@ -1675,6 +1690,12 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) { return pStmt->errCode; } + taosThreadMutexLock(&pStmt->asyncBindParam.mutex); + if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) { + (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex); + } + taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex); + if (pStmt->sql.stbInterlaceMode) { STMT_ERR_RET(stmtAddBatch2(pStmt)); } @@ -1775,9 +1796,17 @@ int stmtClose2(TAOS_STMT2* stmt) { pStmt->bindThreadInUse = false; } + taosThreadMutexLock(&pStmt->asyncBindParam.mutex); + if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) { + (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex); + } + taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex); (void)taosThreadCondDestroy(&pStmt->queue.waitCond); (void)taosThreadMutexDestroy(&pStmt->queue.mutex); + (void)taosThreadCondDestroy(&pStmt->asyncBindParam.waitCond); + (void)taosThreadMutexDestroy(&pStmt->asyncBindParam.mutex); + if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) { if (tsem_wait(&pStmt->asyncExecSem) != 0) { tscError("failed to wait asyncExecSem"); @@ -1937,17 +1966,32 @@ typedef struct { } ThreadArgs; static void* stmtAsyncBindThreadFunc(void* args) { + setThreadName("stmtAsyncBind"); + + qInfo("async stmt bind thread started"); + ThreadArgs* targs = (ThreadArgs*)args; + STscStmt2* pStmt = (STscStmt2*)targs->stmt; int code = taos_stmt2_bind_param(targs->stmt, targs->bindv, targs->col_idx); targs->fp(targs->param, NULL, code); + (void)taosThreadMutexLock(&(pStmt->asyncBindParam.mutex)); + (void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1); + (void)taosThreadCondSignal(&(pStmt->asyncBindParam.waitCond)); + (void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex)); taosMemoryFree(args); + qInfo("async stmt bind thread stopped"); + return NULL; } int stmt2AsyncBind(TAOS_STMT2* stmt, TAOS_STMT2_BINDV* bindv, int32_t col_idx, __taos_async_fn_t fp, void* param) { STscStmt2* pStmt = (STscStmt2*)stmt; + if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 1) { + tscError("async bind param is still working, please try again later"); + return TSDB_CODE_TSC_STMT_API_ERROR; + } TdThreadAttr thAttr; if (taosThreadAttrInit(&thAttr) != 0) { diff --git a/source/client/test/stmt2Test.cpp b/source/client/test/stmt2Test.cpp index 95a7ecd8f7..6062189602 100644 --- a/source/client/test/stmt2Test.cpp +++ b/source/client/test/stmt2Test.cpp @@ -255,48 +255,41 @@ TEST(stmt2Case, stmt2_test_limit) { do_query(taos, "create database IF NOT EXISTS stmt2_testdb_7"); do_query(taos, "create stable stmt2_testdb_7.stb (ts timestamp, b binary(10)) tags(t1 int, t2 binary(10))"); do_query(taos, - "insert into stmt2_testdb_7.tb2 using stmt2_testdb_7.stb tags(2,'xyz') values(1591060628000, " - "'abc'),(1591060628001,'def'),(1591060628004, 'hij')"); + "insert into stmt2_testdb_7.tb2 using stmt2_testdb_7.stb tags(2,'xyz') values(1591060628000, " + "'abc'),(1591060628001,'def'),(1591060628004, 'hij')"); do_query(taos, "use stmt2_testdb_7"); - TAOS_STMT2_OPTION option = {0, true, true, NULL, NULL}; - TAOS_STMT2* stmt = taos_stmt2_init(taos, &option); ASSERT_NE(stmt, nullptr); - const char* sql = "select * from stmt2_testdb_7.tb2 where ts > ? and ts < ? limit ?"; - int code = taos_stmt2_prepare(stmt, sql, 0); + int code = taos_stmt2_prepare(stmt, sql, 0); checkError(stmt, code); - - int t64_len[1] = {sizeof(int64_t)}; - int b_len[1] = {3}; - int x = 2; - int x_len = sizeof(int); - int64_t ts[2] = {1591060627000, 1591060628005}; - TAOS_STMT2_BIND params[3] = {{TSDB_DATA_TYPE_TIMESTAMP, &ts[0], t64_len, NULL, 1}, - {TSDB_DATA_TYPE_TIMESTAMP, &ts[1], t64_len, NULL, 1}, - {TSDB_DATA_TYPE_INT, &x, &x_len, NULL, 1}}; + int t64_len[1] = {sizeof(int64_t)}; + int b_len[1] = {3}; + int x = 2; + int x_len = sizeof(int); + int64_t ts[2] = {1591060627000, 1591060628005}; + TAOS_STMT2_BIND params[3] = {{TSDB_DATA_TYPE_TIMESTAMP, &ts[0], t64_len, NULL, 1}, + {TSDB_DATA_TYPE_TIMESTAMP, &ts[1], t64_len, NULL, 1}, + {TSDB_DATA_TYPE_INT, &x, &x_len, NULL, 1}}; TAOS_STMT2_BIND* paramv = ¶ms[0]; TAOS_STMT2_BINDV bindv = {1, NULL, NULL, ¶mv}; code = taos_stmt2_bind_param(stmt, &bindv, -1); checkError(stmt, code); - taos_stmt2_exec(stmt, NULL); checkError(stmt, code); - TAOS_RES* pRes = taos_stmt2_result(stmt); ASSERT_NE(pRes, nullptr); - int getRecordCounts = 0; while ((taos_fetch_row(pRes))) { - getRecordCounts++; + getRecordCounts++; } ASSERT_EQ(getRecordCounts, 2); taos_stmt2_close(stmt); @@ -304,7 +297,6 @@ TEST(stmt2Case, stmt2_test_limit) { taos_close(taos); } - TEST(stmt2Case, insert_stb_get_fields_Test) { TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(taos, nullptr); @@ -1619,20 +1611,24 @@ TEST(stmt2Case, errcode) { } void stmtAsyncBindCb(void* param, TAOS_RES* pRes, int code) { - if (code != TSDB_CODE_SUCCESS) { - taos_errstr(pRes); - } ASSERT_EQ(code, TSDB_CODE_SUCCESS); + taosMsleep(500); + return; +} + +void stmtAsyncQueryCb2(void* param, TAOS_RES* pRes, int code) { + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + taosMsleep(500); return; } TEST(stmt2Case, async_order) { int CTB_NUMS = 3; int ROW_NUMS = 3; - int CYC_NUMS = 3; + int CYC_NUMS = 1; TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0); - TAOS_STMT2_OPTION option = {0, true, true, stmtAsyncQueryCb, NULL}; + TAOS_STMT2_OPTION option = {0, true, true, stmtAsyncQueryCb2, NULL}; char* sql = "insert into ? values(?,?)"; do_query(taos, "drop database if exists stmt2_testdb_15"); @@ -1687,15 +1683,14 @@ TEST(stmt2Case, async_order) { } // bind TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv}; - // code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, NULL); - code = taos_stmt2_bind_param(stmt, &bindv, -1); + code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, NULL); + // code = taos_stmt2_bind_param(stmt, &bindv, -1); checkError(stmt, code); // exec int affected = 0; code = taos_stmt2_exec(stmt, &affected); - total_affected += affected; checkError(stmt, code); for (int i = 0; i < CTB_NUMS; i++) { @@ -1709,7 +1704,7 @@ TEST(stmt2Case, async_order) { taosMemoryFree(b_len); taosMemoryFree(paramv); } - ASSERT_EQ(total_affected, CYC_NUMS * ROW_NUMS * CTB_NUMS); + // ASSERT_EQ(total_affected, CYC_NUMS * ROW_NUMS * CTB_NUMS); // case 2 : bind_a->bind_a->bind_a->exec_a->... // case 3 : bind->exec_a->bind->exec_a->... From b0bf9bb5020fb9cd8e936d8cf22a219d190f3232 Mon Sep 17 00:00:00 2001 From: "pengrongkun94@qq.com" Date: Thu, 13 Feb 2025 11:02:11 +0800 Subject: [PATCH 4/5] add unit test async stmt order --- source/client/test/stmt2Test.cpp | 239 +++++++++++++++++++++++-------- 1 file changed, 179 insertions(+), 60 deletions(-) diff --git a/source/client/test/stmt2Test.cpp b/source/client/test/stmt2Test.cpp index 6062189602..407c8745a5 100644 --- a/source/client/test/stmt2Test.cpp +++ b/source/client/test/stmt2Test.cpp @@ -37,7 +37,7 @@ namespace { void checkError(TAOS_STMT2* stmt, int code) { if (code != TSDB_CODE_SUCCESS) { STscStmt2* pStmt = (STscStmt2*)stmt; - if (pStmt == nullptr || pStmt->sql.sqlStr == nullptr) { + if (pStmt == nullptr || pStmt->sql.sqlStr == nullptr || pStmt->exec.pRequest == nullptr) { printf("stmt api error\n stats : %d\n errstr : %s\n", pStmt->sql.status, taos_stmt_errstr(stmt)); } else { printf("stmt api error\n sql : %s\n stats : %d\n errstr : %s\n", pStmt->sql.sqlStr, pStmt->sql.status, @@ -1611,8 +1611,10 @@ TEST(stmt2Case, errcode) { } void stmtAsyncBindCb(void* param, TAOS_RES* pRes, int code) { + bool* finish = (bool*)param; ASSERT_EQ(code, TSDB_CODE_SUCCESS); taosMsleep(500); + *finish = true; return; } @@ -1622,10 +1624,17 @@ void stmtAsyncQueryCb2(void* param, TAOS_RES* pRes, int code) { return; } +void stmtAsyncBindCb2(void* param, TAOS_RES* pRes, int code) { + bool* finish = (bool*)param; + taosMsleep(500); + *finish = true; + return; +} + TEST(stmt2Case, async_order) { - int CTB_NUMS = 3; - int ROW_NUMS = 3; - int CYC_NUMS = 1; + int CTB_NUMS = 2; + int ROW_NUMS = 2; + int CYC_NUMS = 2; TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS_STMT2_OPTION option = {0, true, true, stmtAsyncQueryCb2, NULL}; @@ -1651,72 +1660,182 @@ TEST(stmt2Case, async_order) { sprintf(tmp, "create table stmt2_testdb_15.%s using stmt2_testdb_15.stb tags(0, 'after')", tbs[i]); do_query(taos, tmp); } + // params + TAOS_STMT2_BIND** paramv = (TAOS_STMT2_BIND**)taosMemoryMalloc(CTB_NUMS * sizeof(TAOS_STMT2_BIND*)); + // col params + int64_t** ts = (int64_t**)taosMemoryMalloc(CTB_NUMS * sizeof(int64_t*)); + char** b = (char**)taosMemoryMalloc(CTB_NUMS * sizeof(char*)); + int* ts_len = (int*)taosMemoryMalloc(ROW_NUMS * sizeof(int)); + int* b_len = (int*)taosMemoryMalloc(ROW_NUMS * sizeof(int)); + for (int i = 0; i < ROW_NUMS; i++) { + ts_len[i] = sizeof(int64_t); + b_len[i] = 1; + } + for (int i = 0; i < CTB_NUMS; i++) { + ts[i] = (int64_t*)taosMemoryMalloc(ROW_NUMS * sizeof(int64_t)); + b[i] = (char*)taosMemoryMalloc(ROW_NUMS * sizeof(char)); + for (int j = 0; j < ROW_NUMS; j++) { + ts[i][j] = 1591060628000 + 100000 + j; + b[i][j] = 'a' + j; + } + } + // bind params + for (int i = 0; i < CTB_NUMS; i++) { + // create col params + paramv[i] = (TAOS_STMT2_BIND*)taosMemoryMalloc(2 * sizeof(TAOS_STMT2_BIND)); + paramv[i][0] = {TSDB_DATA_TYPE_TIMESTAMP, &ts[i][0], &ts_len[0], NULL, ROW_NUMS}; + paramv[i][1] = {TSDB_DATA_TYPE_BINARY, &b[i][0], &b_len[0], NULL, ROW_NUMS}; + } // case 1 : bind_a->exec_a->bind_a->exec_a->... - for (int r = 0; r < CYC_NUMS; r++) { - // col params - int64_t** ts = (int64_t**)taosMemoryMalloc(CTB_NUMS * sizeof(int64_t*)); - char** b = (char**)taosMemoryMalloc(CTB_NUMS * sizeof(char*)); - int* ts_len = (int*)taosMemoryMalloc(ROW_NUMS * sizeof(int)); - int* b_len = (int*)taosMemoryMalloc(ROW_NUMS * sizeof(int)); - for (int i = 0; i < ROW_NUMS; i++) { - ts_len[i] = sizeof(int64_t); - b_len[i] = 1; + { + printf("case 1 : bind_a->exec_a->bind_a->exec_a->...\n"); + for (int r = 0; r < CYC_NUMS; r++) { + // bind + TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv}; + bool finish = false; + code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, (void*)&finish); + + checkError(stmt, code); + + // exec + code = taos_stmt2_exec(stmt, NULL); + checkError(stmt, code); } - for (int i = 0; i < CTB_NUMS; i++) { - ts[i] = (int64_t*)taosMemoryMalloc(ROW_NUMS * sizeof(int64_t)); - b[i] = (char*)taosMemoryMalloc(ROW_NUMS * sizeof(char)); - for (int j = 0; j < ROW_NUMS; j++) { - ts[i][j] = 1591060628000 + r * 100000 + j; - b[i][j] = 'a' + j; - } - } - - // bind params - TAOS_STMT2_BIND** paramv = (TAOS_STMT2_BIND**)taosMemoryMalloc(CTB_NUMS * sizeof(TAOS_STMT2_BIND*)); - - for (int i = 0; i < CTB_NUMS; i++) { - // create col params - paramv[i] = (TAOS_STMT2_BIND*)taosMemoryMalloc(2 * sizeof(TAOS_STMT2_BIND)); - paramv[i][0] = {TSDB_DATA_TYPE_TIMESTAMP, &ts[i][0], &ts_len[0], NULL, ROW_NUMS}; - paramv[i][1] = {TSDB_DATA_TYPE_BINARY, &b[i][0], &b_len[0], NULL, ROW_NUMS}; - } - // bind - TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv}; - code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, NULL); - // code = taos_stmt2_bind_param(stmt, &bindv, -1); - - checkError(stmt, code); - - // exec - int affected = 0; - code = taos_stmt2_exec(stmt, &affected); - checkError(stmt, code); - - for (int i = 0; i < CTB_NUMS; i++) { - taosMemoryFree(paramv[i]); - taosMemoryFree(ts[i]); - taosMemoryFree(b[i]); - } - taosMemoryFree(ts); - taosMemoryFree(b); - taosMemoryFree(ts_len); - taosMemoryFree(b_len); - taosMemoryFree(paramv); } - // ASSERT_EQ(total_affected, CYC_NUMS * ROW_NUMS * CTB_NUMS); // case 2 : bind_a->bind_a->bind_a->exec_a->... - // case 3 : bind->exec_a->bind->exec_a->... - // case 4 : bind_a->exec->bind_a->exec->... - // case 5 : bind_a->close - // case 6 : exec_a->close + { + printf("case 2 : bind_a->bind_a->bind_a->exec_a->...\n"); + for (int r = 0; r < CYC_NUMS; r++) { + // bind params + TAOS_STMT2_BIND** paramv = (TAOS_STMT2_BIND**)taosMemoryMalloc(CTB_NUMS * sizeof(TAOS_STMT2_BIND*)); + for (int i = 0; i < CTB_NUMS; i++) { + // create col params + paramv[i] = (TAOS_STMT2_BIND*)taosMemoryMalloc(2 * sizeof(TAOS_STMT2_BIND)); + paramv[i][0] = {TSDB_DATA_TYPE_TIMESTAMP, &ts[i][0], &ts_len[0], NULL, ROW_NUMS}; + paramv[i][1] = {TSDB_DATA_TYPE_BINARY, &b[i][0], &b_len[0], NULL, ROW_NUMS}; + } + // bind + TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv}; + bool finish = false; + code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, (void*)&finish); + while (!finish) { + taosMsleep(100); + } + checkError(stmt, code); + } + // exec + code = taos_stmt2_exec(stmt, NULL); + checkError(stmt, code); + } + // case 3 : bind->exec_a->bind->exec_a->... + { + printf("case 3 : bind->exec_a->bind->exec_a->...\n"); + for (int r = 0; r < CYC_NUMS; r++) { + // bind + TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv}; + bool finish = false; + code = taos_stmt2_bind_param(stmt, &bindv, -1); + + checkError(stmt, code); + + // exec + code = taos_stmt2_exec(stmt, NULL); + checkError(stmt, code); + } + } + + // case 4 : bind_a->close + { + printf("case 4 : bind_a->close\n"); + // bind + TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv}; + bool finish = false; + code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, (void*)&finish); + checkError(stmt, code); + taos_stmt2_close(stmt); + checkError(stmt, code); + } + + // case 5 : bind_a->exec_a->close + { + printf("case 5 : bind_a->exec_a->close\n"); + // init + TAOS_STMT2* stmt = taos_stmt2_init(taos, &option); + ASSERT_NE(stmt, nullptr); + int code = taos_stmt2_prepare(stmt, sql, 0); + checkError(stmt, code); + // bind + TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv}; + bool finish = false; + code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, (void*)&finish); + checkError(stmt, code); + // exec + code = taos_stmt2_exec(stmt, NULL); + checkError(stmt, code); + // close + taos_stmt2_close(stmt); + checkError(stmt, code); + } + + option = {0, false, false, NULL, NULL}; + stmt = taos_stmt2_init(taos, &option); + ASSERT_NE(stmt, nullptr); + code = taos_stmt2_prepare(stmt, sql, 0); + checkError(stmt, code); + + // case 6 : bind_a->exec->bind_a->exec->... + { + printf("case 6 : bind_a->exec->bind_a->exec->...\n"); + // init + + checkError(stmt, code); + for (int r = 0; r < CYC_NUMS; r++) { + // bind + TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv}; + bool finish = false; + code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, (void*)&finish); + checkError(stmt, code); + // exec + code = taos_stmt2_exec(stmt, NULL); + checkError(stmt, code); + } + } + + // case 7 (error:no wait error) : bind_a->bind_a + { + printf("case 7 (error:no wait error) : bind_a->bind_a\n"); + // bind + TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv}; + bool finish = false; + code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb2, (void*)&finish); + checkError(stmt, code); + taosMsleep(200); + code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb2, (void*)&finish); + ASSERT_EQ(code, TSDB_CODE_TSC_STMT_API_ERROR); + while (!finish) { + taosMsleep(100); + } + } + // close + taos_stmt2_close(stmt); + + // free memory + for (int i = 0; i < CTB_NUMS; i++) { + taosMemoryFree(paramv[i]); + taosMemoryFree(ts[i]); + taosMemoryFree(b[i]); + } + taosMemoryFree(ts); + taosMemoryFree(b); + taosMemoryFree(ts_len); + taosMemoryFree(b_len); + taosMemoryFree(paramv); for (int i = 0; i < CTB_NUMS; i++) { taosMemoryFree(tbs[i]); } taosMemoryFree(tbs); - - taos_stmt2_close(stmt); } #pragma GCC diagnostic pop From aef5611d1b48f9e5ed17222d5c872d904a36f8e9 Mon Sep 17 00:00:00 2001 From: "pengrongkun94@qq.com" Date: Tue, 18 Feb 2025 17:59:57 +0800 Subject: [PATCH 5/5] switch create thread to taosc task queue --- include/client/taos.h | 13 +++++---- include/libs/qcom/query.h | 1 + source/client/inc/clientStmt2.h | 8 ++++++ source/client/src/clientMain.c | 17 +++++++++-- source/client/src/clientStmt2.c | 48 ++------------------------------ source/libs/qcom/src/queryUtil.c | 12 ++++++++ 6 files changed, 44 insertions(+), 55 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 59ada33d91..188b479cc6 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -247,12 +247,13 @@ typedef struct TAOS_STMT2_BINDV { DLL_EXPORT TAOS_STMT2 *taos_stmt2_init(TAOS *taos, TAOS_STMT2_OPTION *option); DLL_EXPORT int taos_stmt2_prepare(TAOS_STMT2 *stmt, const char *sql, unsigned long length); DLL_EXPORT int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx); -DLL_EXPORT int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, void *param); -DLL_EXPORT int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows); -DLL_EXPORT int taos_stmt2_close(TAOS_STMT2 *stmt); -DLL_EXPORT int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert); -DLL_EXPORT int taos_stmt2_get_fields(TAOS_STMT2 *stmt, int *count, TAOS_FIELD_ALL **fields); -DLL_EXPORT void taos_stmt2_free_fields(TAOS_STMT2 *stmt, TAOS_FIELD_ALL *fields); +DLL_EXPORT int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, + void *param); +DLL_EXPORT int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows); +DLL_EXPORT int taos_stmt2_close(TAOS_STMT2 *stmt); +DLL_EXPORT int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert); +DLL_EXPORT int taos_stmt2_get_fields(TAOS_STMT2 *stmt, int *count, TAOS_FIELD_ALL **fields); +DLL_EXPORT void taos_stmt2_free_fields(TAOS_STMT2 *stmt, TAOS_FIELD_ALL *fields); DLL_EXPORT TAOS_RES *taos_stmt2_result(TAOS_STMT2 *stmt); DLL_EXPORT char *taos_stmt2_error(TAOS_STMT2 *stmt); diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 5b28eadc4f..a17e67279c 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -300,6 +300,7 @@ int32_t cleanupTaskQueue(); int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); int32_t taosAsyncWait(); int32_t taosAsyncRecover(); +int32_t taosStmt2AsyncBind(__async_exec_fn_t execFn, void* execParam); void destroySendMsgInfo(SMsgSendInfo* pMsgBody); diff --git a/source/client/inc/clientStmt2.h b/source/client/inc/clientStmt2.h index feec9ec337..283573803e 100644 --- a/source/client/inc/clientStmt2.h +++ b/source/client/inc/clientStmt2.h @@ -133,6 +133,13 @@ SStmtQNode* tail; uint64_t qRemainNum; } SStmtQueue; */ +typedef struct { + TAOS_STMT2 *stmt; + TAOS_STMT2_BINDV *bindv; + int32_t col_idx; + __taos_async_fn_t fp; + void *param; +} ThreadArgs; typedef struct AsyncBindParam { TdThreadMutex mutex; @@ -234,6 +241,7 @@ int stmtIsInsert2(TAOS_STMT2 *stmt, int *insert); TAOS_RES *stmtUseResult2(TAOS_STMT2 *stmt); const char *stmtErrstr2(TAOS_STMT2 *stmt); int stmt2AsyncBind(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, void *param); +int stmtAsyncBindThreadFunc(void *args); #ifdef __cplusplus } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index ba67ff7f76..25184b041b 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -2256,14 +2256,25 @@ int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t c } STscStmt2 *pStmt = (STscStmt2 *)stmt; + if (atomic_load_8((int8_t *)&pStmt->asyncBindParam.asyncBindNum) > 0) { + tscError("async bind param is still working, please try again later"); + return TSDB_CODE_TSC_STMT_API_ERROR; + } + + ThreadArgs *args = (ThreadArgs *)taosMemoryMalloc(sizeof(ThreadArgs)); + args->stmt = stmt; + args->bindv = bindv; + args->col_idx = col_idx; + args->fp = fp; + args->param = param; (void)atomic_add_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1); - int code = stmt2AsyncBind(stmt, bindv, col_idx, fp, param); - if (code != TSDB_CODE_SUCCESS) { + int code_s = taosStmt2AsyncBind(stmtAsyncBindThreadFunc, (void *)args); + if (code_s != TSDB_CODE_SUCCESS) { (void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1); // terrno = TAOS_SYSTEM_ERROR(errno); } - return code; + return code_s; } int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows) { diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 597ec95f23..67066e1fdd 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -1957,17 +1957,8 @@ TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) { return pStmt->exec.pRequest; } -typedef struct { - TAOS_STMT2* stmt; - TAOS_STMT2_BINDV* bindv; - int32_t col_idx; - __taos_async_fn_t fp; - void* param; -} ThreadArgs; - -static void* stmtAsyncBindThreadFunc(void* args) { - setThreadName("stmtAsyncBind"); +int32_t stmtAsyncBindThreadFunc(void* args) { qInfo("async stmt bind thread started"); ThreadArgs* targs = (ThreadArgs*)args; @@ -1983,40 +1974,5 @@ static void* stmtAsyncBindThreadFunc(void* args) { qInfo("async stmt bind thread stopped"); - return NULL; -} - -int stmt2AsyncBind(TAOS_STMT2* stmt, TAOS_STMT2_BINDV* bindv, int32_t col_idx, __taos_async_fn_t fp, void* param) { - STscStmt2* pStmt = (STscStmt2*)stmt; - if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 1) { - tscError("async bind param is still working, please try again later"); - return TSDB_CODE_TSC_STMT_API_ERROR; - } - - TdThreadAttr thAttr; - if (taosThreadAttrInit(&thAttr) != 0) { - return TSDB_CODE_TSC_INTERNAL_ERROR; - } - if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED) != 0) { - return TSDB_CODE_TSC_INTERNAL_ERROR; - } - - ThreadArgs* args = (ThreadArgs*)taosMemoryMalloc(sizeof(ThreadArgs)); - args->stmt = stmt; - args->bindv = bindv; - args->col_idx = col_idx; - args->fp = fp; - args->param = param; - - if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtAsyncBindThreadFunc, args) != 0) { - (void)taosThreadAttrDestroy(&thAttr); - terrno = TAOS_SYSTEM_ERROR(errno); - STMT_ERR_RET(terrno); - } - - // pStmt->bindThreadInUse = true; - - (void)taosThreadAttrDestroy(&thAttr); - - return TSDB_CODE_SUCCESS; + return code; } \ No newline at end of file diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 85b1c543c0..35f258c554 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -202,6 +202,18 @@ int32_t taosAsyncRecover() { return taskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&taskQueue.wrokrerPool); } +int32_t taosStmt2AsyncBind(__async_exec_fn_t bindFn, void* bindParam) { + SSchedMsg* pSchedMsg; + int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg); + if (rc) return rc; + pSchedMsg->fp = NULL; + pSchedMsg->ahandle = bindFn; + pSchedMsg->thandle = bindParam; + // pSchedMsg->msg = code; + + return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg); +} + void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { if (NULL == pMsgBody) { return;