From 7025be999b72c0a85f5679a0a18d3e26d81c7a7b Mon Sep 17 00:00:00 2001 From: "pengrongkun94@qq.com" Date: Mon, 10 Feb 2025 18:32:34 +0800 Subject: [PATCH] bind async --- include/client/taos.h | 1 + source/client/inc/clientStmt2.h | 7 +++- source/client/src/clientMain.c | 20 ++++++++-- source/client/src/clientStmt2.c | 68 ++++++++++++++++++++++++++++----- 4 files changed, 80 insertions(+), 16 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 17f97d3d3d..59ada33d91 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -247,6 +247,7 @@ typedef struct TAOS_STMT2_BINDV { DLL_EXPORT TAOS_STMT2 *taos_stmt2_init(TAOS *taos, TAOS_STMT2_OPTION *option); DLL_EXPORT int taos_stmt2_prepare(TAOS_STMT2 *stmt, const char *sql, unsigned long length); DLL_EXPORT int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx); +DLL_EXPORT int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, void *param); DLL_EXPORT int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows); DLL_EXPORT int taos_stmt2_close(TAOS_STMT2 *stmt); DLL_EXPORT int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert); diff --git a/source/client/inc/clientStmt2.h b/source/client/inc/clientStmt2.h index 05a4c849f8..e55d9a048f 100644 --- a/source/client/inc/clientStmt2.h +++ b/source/client/inc/clientStmt2.h @@ -153,8 +153,10 @@ typedef struct { char *db; int64_t reqid; int32_t errCode; - tsem_t asyncQuerySem; - bool semWaited; + tsem_t asyncExecSem; + bool execSemWaited; + tsem_t asyncBindSem; + bool bindSemWaited; SStmtStatInfo stat; } STscStmt2; /* @@ -226,6 +228,7 @@ int stmtGetParamNum2(TAOS_STMT2 *stmt, int *nums); int stmtIsInsert2(TAOS_STMT2 *stmt, int *insert); TAOS_RES *stmtUseResult2(TAOS_STMT2 *stmt); const char *stmtErrstr2(TAOS_STMT2 *stmt); +int stmt2AsyncBind(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, void *param); #ifdef __cplusplus } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 190a724151..2a27894fc3 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -2166,11 +2166,18 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col } STscStmt2 *pStmt = (STscStmt2 *)stmt; - if (pStmt->options.asyncExecFn && !pStmt->semWaited) { - if (tsem_wait(&pStmt->asyncQuerySem) != 0) { - tscError("wait async query sem failed"); + if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) { + if (tsem_wait(&pStmt->asyncExecSem) != 0) { + tscError("wait asyncExecSem failed"); } - pStmt->semWaited = true; + pStmt->execSemWaited = true; + } + + if (!pStmt->bindSemWaited) { + if (tsem_wait(&pStmt->asyncBindSem) != 0) { + tscError("wait asyncBindSem failed"); + } + pStmt->bindSemWaited = true; } SSHashObj *hashTbnames = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR)); @@ -2243,6 +2250,11 @@ out: return code; } +int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, + void *param) { + return stmt2AsyncBind(stmt, bindv,col_idx, fp, param); +} + int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows) { if (stmt == NULL) { tscError("NULL parameter for %s", __FUNCTION__); diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 8e517eb5f2..f4e0fefb00 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -835,13 +835,13 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) { pStmt->sql.siInfo.tableColsReady = true; if (pStmt->options.asyncExecFn) { - if (tsem_init(&pStmt->asyncQuerySem, 0, 1) != 0) { + if (tsem_init(&pStmt->asyncExecSem, 0, 1) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); (void)stmtClose(pStmt); return NULL; } } - pStmt->semWaited = false; + pStmt->execSemWaited = false; STMT_LOG_SEQ(STMT_INIT); @@ -1656,8 +1656,8 @@ static void asyncQueryCb(void* userdata, TAOS_RES* res, int code) { (void)stmtCleanExecInfo(pStmt, (code ? false : true), false); ++pStmt->sql.runTimes; - if (tsem_post(&pStmt->asyncQuerySem) != 0) { - tscError("failed to post asyncQuerySem"); + if (tsem_post(&pStmt->asyncExecSem) != 0) { + tscError("failed to post asyncExecSem"); } } @@ -1746,7 +1746,7 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) { pRequest->body.queryFp = asyncQueryCb; ((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt; - pStmt->semWaited = false; + pStmt->execSemWaited = false; launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper); } @@ -1775,9 +1775,9 @@ int stmtClose2(TAOS_STMT2* stmt) { (void)taosThreadCondDestroy(&pStmt->queue.waitCond); (void)taosThreadMutexDestroy(&pStmt->queue.mutex); - if (pStmt->options.asyncExecFn && !pStmt->semWaited) { - if (tsem_wait(&pStmt->asyncQuerySem) != 0) { - tscError("failed to wait asyncQuerySem"); + if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) { + if (tsem_wait(&pStmt->asyncExecSem) != 0) { + tscError("failed to wait asyncExecSem"); } } @@ -1795,8 +1795,8 @@ int stmtClose2(TAOS_STMT2* stmt) { STMT_ERR_RET(stmtCleanSQLInfo(pStmt)); if (pStmt->options.asyncExecFn) { - if (tsem_destroy(&pStmt->asyncQuerySem) != 0) { - tscError("failed to destroy asyncQuerySem"); + if (tsem_destroy(&pStmt->asyncExecSem) != 0) { + tscError("failed to destroy asyncExecSem"); } } taosMemoryFree(stmt); @@ -1925,3 +1925,51 @@ TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) { return pStmt->exec.pRequest; } +typedef struct { + TAOS_STMT2* stmt; + TAOS_STMT2_BINDV* bindv; + int32_t col_idx; + __taos_async_fn_t fp; + void* param; +} ThreadArgs; + +static void* stmtAsyncBindThreadFunc(void* args) { + ThreadArgs* targs = (ThreadArgs*)args; + + int code = taos_stmt2_bind_param(targs->stmt, targs->bindv, targs->col_idx); + targs->fp(targs->param, NULL, code); + taosMemoryFree(args); + + return NULL; +} + +int stmt2AsyncBind(TAOS_STMT2* stmt, TAOS_STMT2_BINDV* bindv, int32_t col_idx, __taos_async_fn_t fp, void* param) { + STscStmt2* pStmt = (STscStmt2*)stmt; + + TdThreadAttr thAttr; + if (taosThreadAttrInit(&thAttr) != 0) { + return TSDB_CODE_TSC_INTERNAL_ERROR; + } + if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED) != 0) { + return TSDB_CODE_TSC_INTERNAL_ERROR; + } + + ThreadArgs* args = (ThreadArgs*)taosMemoryMalloc(sizeof(ThreadArgs)); + args->stmt = stmt; + args->bindv = bindv; + args->col_idx = col_idx; + args->fp = fp; + args->param = param; + + if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtAsyncBindThreadFunc, args) != 0) { + (void)taosThreadAttrDestroy(&thAttr); + terrno = TAOS_SYSTEM_ERROR(errno); + STMT_ERR_RET(terrno); + } + + // pStmt->bindThreadInUse = true; + + (void)taosThreadAttrDestroy(&thAttr); + + return TSDB_CODE_SUCCESS; +} \ No newline at end of file