From aef5611d1b48f9e5ed17222d5c872d904a36f8e9 Mon Sep 17 00:00:00 2001 From: "pengrongkun94@qq.com" Date: Tue, 18 Feb 2025 17:59:57 +0800 Subject: [PATCH] switch create thread to taosc task queue --- include/client/taos.h | 13 +++++---- include/libs/qcom/query.h | 1 + source/client/inc/clientStmt2.h | 8 ++++++ source/client/src/clientMain.c | 17 +++++++++-- source/client/src/clientStmt2.c | 48 ++------------------------------ source/libs/qcom/src/queryUtil.c | 12 ++++++++ 6 files changed, 44 insertions(+), 55 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 59ada33d91..188b479cc6 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -247,12 +247,13 @@ typedef struct TAOS_STMT2_BINDV { DLL_EXPORT TAOS_STMT2 *taos_stmt2_init(TAOS *taos, TAOS_STMT2_OPTION *option); DLL_EXPORT int taos_stmt2_prepare(TAOS_STMT2 *stmt, const char *sql, unsigned long length); DLL_EXPORT int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx); -DLL_EXPORT int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, void *param); -DLL_EXPORT int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows); -DLL_EXPORT int taos_stmt2_close(TAOS_STMT2 *stmt); -DLL_EXPORT int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert); -DLL_EXPORT int taos_stmt2_get_fields(TAOS_STMT2 *stmt, int *count, TAOS_FIELD_ALL **fields); -DLL_EXPORT void taos_stmt2_free_fields(TAOS_STMT2 *stmt, TAOS_FIELD_ALL *fields); +DLL_EXPORT int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, + void *param); +DLL_EXPORT int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows); +DLL_EXPORT int taos_stmt2_close(TAOS_STMT2 *stmt); +DLL_EXPORT int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert); +DLL_EXPORT int taos_stmt2_get_fields(TAOS_STMT2 *stmt, int *count, TAOS_FIELD_ALL **fields); +DLL_EXPORT void taos_stmt2_free_fields(TAOS_STMT2 *stmt, TAOS_FIELD_ALL *fields); DLL_EXPORT TAOS_RES *taos_stmt2_result(TAOS_STMT2 *stmt); DLL_EXPORT char *taos_stmt2_error(TAOS_STMT2 *stmt); diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 5b28eadc4f..a17e67279c 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -300,6 +300,7 @@ int32_t cleanupTaskQueue(); int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); int32_t taosAsyncWait(); int32_t taosAsyncRecover(); +int32_t taosStmt2AsyncBind(__async_exec_fn_t execFn, void* execParam); void destroySendMsgInfo(SMsgSendInfo* pMsgBody); diff --git a/source/client/inc/clientStmt2.h b/source/client/inc/clientStmt2.h index feec9ec337..283573803e 100644 --- a/source/client/inc/clientStmt2.h +++ b/source/client/inc/clientStmt2.h @@ -133,6 +133,13 @@ SStmtQNode* tail; uint64_t qRemainNum; } SStmtQueue; */ +typedef struct { + TAOS_STMT2 *stmt; + TAOS_STMT2_BINDV *bindv; + int32_t col_idx; + __taos_async_fn_t fp; + void *param; +} ThreadArgs; typedef struct AsyncBindParam { TdThreadMutex mutex; @@ -234,6 +241,7 @@ int stmtIsInsert2(TAOS_STMT2 *stmt, int *insert); TAOS_RES *stmtUseResult2(TAOS_STMT2 *stmt); const char *stmtErrstr2(TAOS_STMT2 *stmt); int stmt2AsyncBind(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, void *param); +int stmtAsyncBindThreadFunc(void *args); #ifdef __cplusplus } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index ba67ff7f76..25184b041b 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -2256,14 +2256,25 @@ int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t c } STscStmt2 *pStmt = (STscStmt2 *)stmt; + if (atomic_load_8((int8_t *)&pStmt->asyncBindParam.asyncBindNum) > 0) { + tscError("async bind param is still working, please try again later"); + return TSDB_CODE_TSC_STMT_API_ERROR; + } + + ThreadArgs *args = (ThreadArgs *)taosMemoryMalloc(sizeof(ThreadArgs)); + args->stmt = stmt; + args->bindv = bindv; + args->col_idx = col_idx; + args->fp = fp; + args->param = param; (void)atomic_add_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1); - int code = stmt2AsyncBind(stmt, bindv, col_idx, fp, param); - if (code != TSDB_CODE_SUCCESS) { + int code_s = taosStmt2AsyncBind(stmtAsyncBindThreadFunc, (void *)args); + if (code_s != TSDB_CODE_SUCCESS) { (void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1); // terrno = TAOS_SYSTEM_ERROR(errno); } - return code; + return code_s; } int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows) { diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 597ec95f23..67066e1fdd 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -1957,17 +1957,8 @@ TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) { return pStmt->exec.pRequest; } -typedef struct { - TAOS_STMT2* stmt; - TAOS_STMT2_BINDV* bindv; - int32_t col_idx; - __taos_async_fn_t fp; - void* param; -} ThreadArgs; - -static void* stmtAsyncBindThreadFunc(void* args) { - setThreadName("stmtAsyncBind"); +int32_t stmtAsyncBindThreadFunc(void* args) { qInfo("async stmt bind thread started"); ThreadArgs* targs = (ThreadArgs*)args; @@ -1983,40 +1974,5 @@ static void* stmtAsyncBindThreadFunc(void* args) { qInfo("async stmt bind thread stopped"); - return NULL; -} - -int stmt2AsyncBind(TAOS_STMT2* stmt, TAOS_STMT2_BINDV* bindv, int32_t col_idx, __taos_async_fn_t fp, void* param) { - STscStmt2* pStmt = (STscStmt2*)stmt; - if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 1) { - tscError("async bind param is still working, please try again later"); - return TSDB_CODE_TSC_STMT_API_ERROR; - } - - TdThreadAttr thAttr; - if (taosThreadAttrInit(&thAttr) != 0) { - return TSDB_CODE_TSC_INTERNAL_ERROR; - } - if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED) != 0) { - return TSDB_CODE_TSC_INTERNAL_ERROR; - } - - ThreadArgs* args = (ThreadArgs*)taosMemoryMalloc(sizeof(ThreadArgs)); - args->stmt = stmt; - args->bindv = bindv; - args->col_idx = col_idx; - args->fp = fp; - args->param = param; - - if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtAsyncBindThreadFunc, args) != 0) { - (void)taosThreadAttrDestroy(&thAttr); - terrno = TAOS_SYSTEM_ERROR(errno); - STMT_ERR_RET(terrno); - } - - // pStmt->bindThreadInUse = true; - - (void)taosThreadAttrDestroy(&thAttr); - - return TSDB_CODE_SUCCESS; + return code; } \ No newline at end of file diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 85b1c543c0..35f258c554 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -202,6 +202,18 @@ int32_t taosAsyncRecover() { return taskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&taskQueue.wrokrerPool); } +int32_t taosStmt2AsyncBind(__async_exec_fn_t bindFn, void* bindParam) { + SSchedMsg* pSchedMsg; + int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg); + if (rc) return rc; + pSchedMsg->fp = NULL; + pSchedMsg->ahandle = bindFn; + pSchedMsg->thandle = bindParam; + // pSchedMsg->msg = code; + + return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg); +} + void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { if (NULL == pMsgBody) { return;