diff --git a/include/client/taos.h b/include/client/taos.h index 94a7884950..433779f811 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -247,11 +247,13 @@ typedef struct TAOS_STMT2_BINDV { DLL_EXPORT TAOS_STMT2 *taos_stmt2_init(TAOS *taos, TAOS_STMT2_OPTION *option); DLL_EXPORT int taos_stmt2_prepare(TAOS_STMT2 *stmt, const char *sql, unsigned long length); DLL_EXPORT int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx); -DLL_EXPORT int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows); -DLL_EXPORT int taos_stmt2_close(TAOS_STMT2 *stmt); -DLL_EXPORT int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert); -DLL_EXPORT int taos_stmt2_get_fields(TAOS_STMT2 *stmt, int *count, TAOS_FIELD_ALL **fields); -DLL_EXPORT void taos_stmt2_free_fields(TAOS_STMT2 *stmt, TAOS_FIELD_ALL *fields); +DLL_EXPORT int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, + void *param); +DLL_EXPORT int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows); +DLL_EXPORT int taos_stmt2_close(TAOS_STMT2 *stmt); +DLL_EXPORT int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert); +DLL_EXPORT int taos_stmt2_get_fields(TAOS_STMT2 *stmt, int *count, TAOS_FIELD_ALL **fields); +DLL_EXPORT void taos_stmt2_free_fields(TAOS_STMT2 *stmt, TAOS_FIELD_ALL *fields); DLL_EXPORT TAOS_RES *taos_stmt2_result(TAOS_STMT2 *stmt); DLL_EXPORT char *taos_stmt2_error(TAOS_STMT2 *stmt); diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 5b28eadc4f..a17e67279c 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -300,6 +300,7 @@ int32_t cleanupTaskQueue(); int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); int32_t taosAsyncWait(); int32_t taosAsyncRecover(); +int32_t taosStmt2AsyncBind(__async_exec_fn_t execFn, void* execParam); void destroySendMsgInfo(SMsgSendInfo* pMsgBody); diff --git a/source/client/inc/clientStmt2.h b/source/client/inc/clientStmt2.h index 05a4c849f8..283573803e 100644 --- a/source/client/inc/clientStmt2.h +++ b/source/client/inc/clientStmt2.h @@ -133,6 +133,19 @@ SStmtQNode* tail; uint64_t qRemainNum; } SStmtQueue; */ +typedef struct { + TAOS_STMT2 *stmt; + TAOS_STMT2_BINDV *bindv; + int32_t col_idx; + __taos_async_fn_t fp; + void *param; +} ThreadArgs; + +typedef struct AsyncBindParam { + TdThreadMutex mutex; + TdThreadCond waitCond; + uint8_t asyncBindNum; +} AsyncBindParam; typedef struct { STscObj *taos; @@ -150,12 +163,13 @@ typedef struct { SStmtExecInfo exec; SStmtBindInfo bInfo; - char *db; - int64_t reqid; - int32_t errCode; - tsem_t asyncQuerySem; - bool semWaited; - SStmtStatInfo stat; + char *db; + int64_t reqid; + int32_t errCode; + tsem_t asyncExecSem; + bool execSemWaited; + AsyncBindParam asyncBindParam; + SStmtStatInfo stat; } STscStmt2; /* extern char *gStmtStatusStr[]; @@ -226,6 +240,8 @@ int stmtGetParamNum2(TAOS_STMT2 *stmt, int *nums); int stmtIsInsert2(TAOS_STMT2 *stmt, int *insert); TAOS_RES *stmtUseResult2(TAOS_STMT2 *stmt); const char *stmtErrstr2(TAOS_STMT2 *stmt); +int stmt2AsyncBind(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, void *param); +int stmtAsyncBindThreadFunc(void *args); #ifdef __cplusplus } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 47414228ca..184f73c664 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -2168,11 +2168,16 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col } STscStmt2 *pStmt = (STscStmt2 *)stmt; - if (pStmt->options.asyncExecFn && !pStmt->semWaited) { - if (tsem_wait(&pStmt->asyncQuerySem) != 0) { - tscError("wait async query sem failed"); + if( atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum)>1) { + tscError("async bind param is still working, please try again later"); + return TSDB_CODE_TSC_STMT_API_ERROR; + } + + if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) { + if (tsem_wait(&pStmt->asyncExecSem) != 0) { + tscError("wait asyncExecSem failed"); } - pStmt->semWaited = true; + pStmt->execSemWaited = true; } SSHashObj *hashTbnames = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR)); @@ -2245,6 +2250,35 @@ out: return code; } +int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, + void *param) { + if (stmt == NULL || bindv == NULL || fp == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + STscStmt2 *pStmt = (STscStmt2 *)stmt; + if (atomic_load_8((int8_t *)&pStmt->asyncBindParam.asyncBindNum) > 0) { + tscError("async bind param is still working, please try again later"); + return TSDB_CODE_TSC_STMT_API_ERROR; + } + + ThreadArgs *args = (ThreadArgs *)taosMemoryMalloc(sizeof(ThreadArgs)); + args->stmt = stmt; + args->bindv = bindv; + args->col_idx = col_idx; + args->fp = fp; + args->param = param; + (void)atomic_add_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1); + int code_s = taosStmt2AsyncBind(stmtAsyncBindThreadFunc, (void *)args); + if (code_s != TSDB_CODE_SUCCESS) { + (void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1); + // terrno = TAOS_SYSTEM_ERROR(errno); + } + + return code_s; +} + int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows) { if (stmt == NULL) { tscError("NULL parameter for %s", __FUNCTION__); diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index fe303bc314..67066e1fdd 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -752,6 +752,14 @@ static int32_t stmtInitQueue(STscStmt2* pStmt) { return TSDB_CODE_SUCCESS; } +static int32_t stmtIniAsyncBind(STscStmt2* pStmt) { + (void)taosThreadCondInit(&pStmt->asyncBindParam.waitCond, NULL); + (void)taosThreadMutexInit(&pStmt->asyncBindParam.mutex, NULL); + pStmt->asyncBindParam.asyncBindNum = 0; + + return TSDB_CODE_SUCCESS; +} + static int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) { pTblBuf->buffUnit = sizeof(SStmtQNode); pTblBuf->buffSize = pTblBuf->buffUnit * 1000; @@ -812,13 +820,13 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) { pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp); pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); if (NULL == pStmt->sql.siInfo.pTableHash) { - (void)stmtClose(pStmt); + (void)stmtClose2(pStmt); return NULL; } pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES); if (NULL == pStmt->sql.siInfo.pTableCols) { terrno = terrno; - (void)stmtClose(pStmt); + (void)stmtClose2(pStmt); return NULL; } @@ -831,20 +839,27 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) { } if (TSDB_CODE_SUCCESS != code) { terrno = code; - (void)stmtClose(pStmt); + (void)stmtClose2(pStmt); return NULL; } } pStmt->sql.siInfo.tableColsReady = true; if (pStmt->options.asyncExecFn) { - if (tsem_init(&pStmt->asyncQuerySem, 0, 1) != 0) { + if (tsem_init(&pStmt->asyncExecSem, 0, 1) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); - (void)stmtClose(pStmt); + (void)stmtClose2(pStmt); return NULL; } } - pStmt->semWaited = false; + code = stmtIniAsyncBind(pStmt); + if (TSDB_CODE_SUCCESS != code) { + terrno = code; + (void)stmtClose2(pStmt); + return NULL; + } + + pStmt->execSemWaited = false; STMT_LOG_SEQ(STMT_INIT); @@ -1659,8 +1674,8 @@ static void asyncQueryCb(void* userdata, TAOS_RES* res, int code) { (void)stmtCleanExecInfo(pStmt, (code ? false : true), false); ++pStmt->sql.runTimes; - if (tsem_post(&pStmt->asyncQuerySem) != 0) { - tscError("failed to post asyncQuerySem"); + if (tsem_post(&pStmt->asyncExecSem) != 0) { + tscError("failed to post asyncExecSem"); } } @@ -1675,6 +1690,12 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) { return pStmt->errCode; } + taosThreadMutexLock(&pStmt->asyncBindParam.mutex); + if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) { + (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex); + } + taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex); + if (pStmt->sql.stbInterlaceMode) { STMT_ERR_RET(stmtAddBatch2(pStmt)); } @@ -1749,7 +1770,7 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) { pRequest->body.queryFp = asyncQueryCb; ((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt; - pStmt->semWaited = false; + pStmt->execSemWaited = false; launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper); } @@ -1775,12 +1796,20 @@ int stmtClose2(TAOS_STMT2* stmt) { pStmt->bindThreadInUse = false; } + taosThreadMutexLock(&pStmt->asyncBindParam.mutex); + if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) { + (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex); + } + taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex); (void)taosThreadCondDestroy(&pStmt->queue.waitCond); (void)taosThreadMutexDestroy(&pStmt->queue.mutex); - if (pStmt->options.asyncExecFn && !pStmt->semWaited) { - if (tsem_wait(&pStmt->asyncQuerySem) != 0) { - tscError("failed to wait asyncQuerySem"); + (void)taosThreadCondDestroy(&pStmt->asyncBindParam.waitCond); + (void)taosThreadMutexDestroy(&pStmt->asyncBindParam.mutex); + + if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) { + if (tsem_wait(&pStmt->asyncExecSem) != 0) { + tscError("failed to wait asyncExecSem"); } } @@ -1798,8 +1827,8 @@ int stmtClose2(TAOS_STMT2* stmt) { STMT_ERR_RET(stmtCleanSQLInfo(pStmt)); if (pStmt->options.asyncExecFn) { - if (tsem_destroy(&pStmt->asyncQuerySem) != 0) { - tscError("failed to destroy asyncQuerySem"); + if (tsem_destroy(&pStmt->asyncExecSem) != 0) { + tscError("failed to destroy asyncExecSem"); } } taosMemoryFree(stmt); @@ -1928,3 +1957,22 @@ TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) { return pStmt->exec.pRequest; } + +int32_t stmtAsyncBindThreadFunc(void* args) { + qInfo("async stmt bind thread started"); + + ThreadArgs* targs = (ThreadArgs*)args; + STscStmt2* pStmt = (STscStmt2*)targs->stmt; + + int code = taos_stmt2_bind_param(targs->stmt, targs->bindv, targs->col_idx); + targs->fp(targs->param, NULL, code); + (void)taosThreadMutexLock(&(pStmt->asyncBindParam.mutex)); + (void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1); + (void)taosThreadCondSignal(&(pStmt->asyncBindParam.waitCond)); + (void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex)); + taosMemoryFree(args); + + qInfo("async stmt bind thread stopped"); + + return code; +} \ No newline at end of file diff --git a/source/client/test/stmt2Test.cpp b/source/client/test/stmt2Test.cpp index 3a648042a6..407c8745a5 100644 --- a/source/client/test/stmt2Test.cpp +++ b/source/client/test/stmt2Test.cpp @@ -37,7 +37,7 @@ namespace { void checkError(TAOS_STMT2* stmt, int code) { if (code != TSDB_CODE_SUCCESS) { STscStmt2* pStmt = (STscStmt2*)stmt; - if (pStmt == nullptr || pStmt->sql.sqlStr == nullptr) { + if (pStmt == nullptr || pStmt->sql.sqlStr == nullptr || pStmt->exec.pRequest == nullptr) { printf("stmt api error\n stats : %d\n errstr : %s\n", pStmt->sql.status, taos_stmt_errstr(stmt)); } else { printf("stmt api error\n sql : %s\n stats : %d\n errstr : %s\n", pStmt->sql.sqlStr, pStmt->sql.status, @@ -255,48 +255,41 @@ TEST(stmt2Case, stmt2_test_limit) { do_query(taos, "create database IF NOT EXISTS stmt2_testdb_7"); do_query(taos, "create stable stmt2_testdb_7.stb (ts timestamp, b binary(10)) tags(t1 int, t2 binary(10))"); do_query(taos, - "insert into stmt2_testdb_7.tb2 using stmt2_testdb_7.stb tags(2,'xyz') values(1591060628000, " - "'abc'),(1591060628001,'def'),(1591060628004, 'hij')"); + "insert into stmt2_testdb_7.tb2 using stmt2_testdb_7.stb tags(2,'xyz') values(1591060628000, " + "'abc'),(1591060628001,'def'),(1591060628004, 'hij')"); do_query(taos, "use stmt2_testdb_7"); - TAOS_STMT2_OPTION option = {0, true, true, NULL, NULL}; - TAOS_STMT2* stmt = taos_stmt2_init(taos, &option); ASSERT_NE(stmt, nullptr); - const char* sql = "select * from stmt2_testdb_7.tb2 where ts > ? and ts < ? limit ?"; - int code = taos_stmt2_prepare(stmt, sql, 0); + int code = taos_stmt2_prepare(stmt, sql, 0); checkError(stmt, code); - - int t64_len[1] = {sizeof(int64_t)}; - int b_len[1] = {3}; - int x = 2; - int x_len = sizeof(int); - int64_t ts[2] = {1591060627000, 1591060628005}; - TAOS_STMT2_BIND params[3] = {{TSDB_DATA_TYPE_TIMESTAMP, &ts[0], t64_len, NULL, 1}, - {TSDB_DATA_TYPE_TIMESTAMP, &ts[1], t64_len, NULL, 1}, - {TSDB_DATA_TYPE_INT, &x, &x_len, NULL, 1}}; + int t64_len[1] = {sizeof(int64_t)}; + int b_len[1] = {3}; + int x = 2; + int x_len = sizeof(int); + int64_t ts[2] = {1591060627000, 1591060628005}; + TAOS_STMT2_BIND params[3] = {{TSDB_DATA_TYPE_TIMESTAMP, &ts[0], t64_len, NULL, 1}, + {TSDB_DATA_TYPE_TIMESTAMP, &ts[1], t64_len, NULL, 1}, + {TSDB_DATA_TYPE_INT, &x, &x_len, NULL, 1}}; TAOS_STMT2_BIND* paramv = ¶ms[0]; TAOS_STMT2_BINDV bindv = {1, NULL, NULL, ¶mv}; code = taos_stmt2_bind_param(stmt, &bindv, -1); checkError(stmt, code); - taos_stmt2_exec(stmt, NULL); checkError(stmt, code); - TAOS_RES* pRes = taos_stmt2_result(stmt); ASSERT_NE(pRes, nullptr); - int getRecordCounts = 0; while ((taos_fetch_row(pRes))) { - getRecordCounts++; + getRecordCounts++; } ASSERT_EQ(getRecordCounts, 2); taos_stmt2_close(stmt); @@ -304,7 +297,6 @@ TEST(stmt2Case, stmt2_test_limit) { taos_close(taos); } - TEST(stmt2Case, insert_stb_get_fields_Test) { TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(taos, nullptr); @@ -1617,4 +1609,233 @@ TEST(stmt2Case, errcode) { code = taos_stmt_prepare(stmt, sql, 0); checkError(stmt, code); } + +void stmtAsyncBindCb(void* param, TAOS_RES* pRes, int code) { + bool* finish = (bool*)param; + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + taosMsleep(500); + *finish = true; + return; +} + +void stmtAsyncQueryCb2(void* param, TAOS_RES* pRes, int code) { + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + taosMsleep(500); + return; +} + +void stmtAsyncBindCb2(void* param, TAOS_RES* pRes, int code) { + bool* finish = (bool*)param; + taosMsleep(500); + *finish = true; + return; +} + +TEST(stmt2Case, async_order) { + int CTB_NUMS = 2; + int ROW_NUMS = 2; + int CYC_NUMS = 2; + + TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0); + TAOS_STMT2_OPTION option = {0, true, true, stmtAsyncQueryCb2, NULL}; + char* sql = "insert into ? values(?,?)"; + + do_query(taos, "drop database if exists stmt2_testdb_15"); + do_query(taos, "create database IF NOT EXISTS stmt2_testdb_15"); + do_query(taos, "create stable stmt2_testdb_15.stb (ts timestamp, b binary(10)) tags(t1 int, t2 binary(10))"); + do_query(taos, "use stmt2_testdb_15"); + + TAOS_STMT2* stmt = taos_stmt2_init(taos, &option); + ASSERT_NE(stmt, nullptr); + int code = taos_stmt2_prepare(stmt, sql, 0); + checkError(stmt, code); + int total_affected = 0; + + // tbname + char** tbs = (char**)taosMemoryMalloc(CTB_NUMS * sizeof(char*)); + for (int i = 0; i < CTB_NUMS; i++) { + tbs[i] = (char*)taosMemoryMalloc(sizeof(char) * 20); + sprintf(tbs[i], "ctb_%d", i); + char* tmp = (char*)taosMemoryMalloc(sizeof(char) * 100); + sprintf(tmp, "create table stmt2_testdb_15.%s using stmt2_testdb_15.stb tags(0, 'after')", tbs[i]); + do_query(taos, tmp); + } + // params + TAOS_STMT2_BIND** paramv = (TAOS_STMT2_BIND**)taosMemoryMalloc(CTB_NUMS * sizeof(TAOS_STMT2_BIND*)); + // col params + int64_t** ts = (int64_t**)taosMemoryMalloc(CTB_NUMS * sizeof(int64_t*)); + char** b = (char**)taosMemoryMalloc(CTB_NUMS * sizeof(char*)); + int* ts_len = (int*)taosMemoryMalloc(ROW_NUMS * sizeof(int)); + int* b_len = (int*)taosMemoryMalloc(ROW_NUMS * sizeof(int)); + for (int i = 0; i < ROW_NUMS; i++) { + ts_len[i] = sizeof(int64_t); + b_len[i] = 1; + } + for (int i = 0; i < CTB_NUMS; i++) { + ts[i] = (int64_t*)taosMemoryMalloc(ROW_NUMS * sizeof(int64_t)); + b[i] = (char*)taosMemoryMalloc(ROW_NUMS * sizeof(char)); + for (int j = 0; j < ROW_NUMS; j++) { + ts[i][j] = 1591060628000 + 100000 + j; + b[i][j] = 'a' + j; + } + } + // bind params + for (int i = 0; i < CTB_NUMS; i++) { + // create col params + paramv[i] = (TAOS_STMT2_BIND*)taosMemoryMalloc(2 * sizeof(TAOS_STMT2_BIND)); + paramv[i][0] = {TSDB_DATA_TYPE_TIMESTAMP, &ts[i][0], &ts_len[0], NULL, ROW_NUMS}; + paramv[i][1] = {TSDB_DATA_TYPE_BINARY, &b[i][0], &b_len[0], NULL, ROW_NUMS}; + } + + // case 1 : bind_a->exec_a->bind_a->exec_a->... + { + printf("case 1 : bind_a->exec_a->bind_a->exec_a->...\n"); + for (int r = 0; r < CYC_NUMS; r++) { + // bind + TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv}; + bool finish = false; + code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, (void*)&finish); + + checkError(stmt, code); + + // exec + code = taos_stmt2_exec(stmt, NULL); + checkError(stmt, code); + } + } + + // case 2 : bind_a->bind_a->bind_a->exec_a->... + { + printf("case 2 : bind_a->bind_a->bind_a->exec_a->...\n"); + for (int r = 0; r < CYC_NUMS; r++) { + // bind params + TAOS_STMT2_BIND** paramv = (TAOS_STMT2_BIND**)taosMemoryMalloc(CTB_NUMS * sizeof(TAOS_STMT2_BIND*)); + for (int i = 0; i < CTB_NUMS; i++) { + // create col params + paramv[i] = (TAOS_STMT2_BIND*)taosMemoryMalloc(2 * sizeof(TAOS_STMT2_BIND)); + paramv[i][0] = {TSDB_DATA_TYPE_TIMESTAMP, &ts[i][0], &ts_len[0], NULL, ROW_NUMS}; + paramv[i][1] = {TSDB_DATA_TYPE_BINARY, &b[i][0], &b_len[0], NULL, ROW_NUMS}; + } + // bind + TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv}; + bool finish = false; + code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, (void*)&finish); + while (!finish) { + taosMsleep(100); + } + checkError(stmt, code); + } + // exec + code = taos_stmt2_exec(stmt, NULL); + checkError(stmt, code); + } + + // case 3 : bind->exec_a->bind->exec_a->... + { + printf("case 3 : bind->exec_a->bind->exec_a->...\n"); + for (int r = 0; r < CYC_NUMS; r++) { + // bind + TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv}; + bool finish = false; + code = taos_stmt2_bind_param(stmt, &bindv, -1); + + checkError(stmt, code); + + // exec + code = taos_stmt2_exec(stmt, NULL); + checkError(stmt, code); + } + } + + // case 4 : bind_a->close + { + printf("case 4 : bind_a->close\n"); + // bind + TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv}; + bool finish = false; + code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, (void*)&finish); + checkError(stmt, code); + taos_stmt2_close(stmt); + checkError(stmt, code); + } + + // case 5 : bind_a->exec_a->close + { + printf("case 5 : bind_a->exec_a->close\n"); + // init + TAOS_STMT2* stmt = taos_stmt2_init(taos, &option); + ASSERT_NE(stmt, nullptr); + int code = taos_stmt2_prepare(stmt, sql, 0); + checkError(stmt, code); + // bind + TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv}; + bool finish = false; + code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, (void*)&finish); + checkError(stmt, code); + // exec + code = taos_stmt2_exec(stmt, NULL); + checkError(stmt, code); + // close + taos_stmt2_close(stmt); + checkError(stmt, code); + } + + option = {0, false, false, NULL, NULL}; + stmt = taos_stmt2_init(taos, &option); + ASSERT_NE(stmt, nullptr); + code = taos_stmt2_prepare(stmt, sql, 0); + checkError(stmt, code); + + // case 6 : bind_a->exec->bind_a->exec->... + { + printf("case 6 : bind_a->exec->bind_a->exec->...\n"); + // init + + checkError(stmt, code); + for (int r = 0; r < CYC_NUMS; r++) { + // bind + TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv}; + bool finish = false; + code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, (void*)&finish); + checkError(stmt, code); + // exec + code = taos_stmt2_exec(stmt, NULL); + checkError(stmt, code); + } + } + + // case 7 (error:no wait error) : bind_a->bind_a + { + printf("case 7 (error:no wait error) : bind_a->bind_a\n"); + // bind + TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv}; + bool finish = false; + code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb2, (void*)&finish); + checkError(stmt, code); + taosMsleep(200); + code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb2, (void*)&finish); + ASSERT_EQ(code, TSDB_CODE_TSC_STMT_API_ERROR); + while (!finish) { + taosMsleep(100); + } + } + // close + taos_stmt2_close(stmt); + + // free memory + for (int i = 0; i < CTB_NUMS; i++) { + taosMemoryFree(paramv[i]); + taosMemoryFree(ts[i]); + taosMemoryFree(b[i]); + } + taosMemoryFree(ts); + taosMemoryFree(b); + taosMemoryFree(ts_len); + taosMemoryFree(b_len); + taosMemoryFree(paramv); + for (int i = 0; i < CTB_NUMS; i++) { + taosMemoryFree(tbs[i]); + } + taosMemoryFree(tbs); +} #pragma GCC diagnostic pop diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 85b1c543c0..35f258c554 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -202,6 +202,18 @@ int32_t taosAsyncRecover() { return taskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&taskQueue.wrokrerPool); } +int32_t taosStmt2AsyncBind(__async_exec_fn_t bindFn, void* bindParam) { + SSchedMsg* pSchedMsg; + int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg); + if (rc) return rc; + pSchedMsg->fp = NULL; + pSchedMsg->ahandle = bindFn; + pSchedMsg->thandle = bindParam; + // pSchedMsg->msg = code; + + return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg); +} + void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { if (NULL == pMsgBody) { return;