switch create thread to taosc task queue

This commit is contained in:
pengrongkun94@qq.com 2025-02-18 17:59:57 +08:00
parent b0bf9bb502
commit aef5611d1b
6 changed files with 44 additions and 55 deletions

View File

@ -247,12 +247,13 @@ typedef struct TAOS_STMT2_BINDV {
DLL_EXPORT TAOS_STMT2 *taos_stmt2_init(TAOS *taos, TAOS_STMT2_OPTION *option);
DLL_EXPORT int taos_stmt2_prepare(TAOS_STMT2 *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx);
DLL_EXPORT int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, void *param);
DLL_EXPORT int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows);
DLL_EXPORT int taos_stmt2_close(TAOS_STMT2 *stmt);
DLL_EXPORT int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert);
DLL_EXPORT int taos_stmt2_get_fields(TAOS_STMT2 *stmt, int *count, TAOS_FIELD_ALL **fields);
DLL_EXPORT void taos_stmt2_free_fields(TAOS_STMT2 *stmt, TAOS_FIELD_ALL *fields);
DLL_EXPORT int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp,
void *param);
DLL_EXPORT int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows);
DLL_EXPORT int taos_stmt2_close(TAOS_STMT2 *stmt);
DLL_EXPORT int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert);
DLL_EXPORT int taos_stmt2_get_fields(TAOS_STMT2 *stmt, int *count, TAOS_FIELD_ALL **fields);
DLL_EXPORT void taos_stmt2_free_fields(TAOS_STMT2 *stmt, TAOS_FIELD_ALL *fields);
DLL_EXPORT TAOS_RES *taos_stmt2_result(TAOS_STMT2 *stmt);
DLL_EXPORT char *taos_stmt2_error(TAOS_STMT2 *stmt);

View File

@ -300,6 +300,7 @@ int32_t cleanupTaskQueue();
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
int32_t taosAsyncWait();
int32_t taosAsyncRecover();
int32_t taosStmt2AsyncBind(__async_exec_fn_t execFn, void* execParam);
void destroySendMsgInfo(SMsgSendInfo* pMsgBody);

View File

@ -133,6 +133,13 @@ SStmtQNode* tail;
uint64_t qRemainNum;
} SStmtQueue;
*/
typedef struct {
TAOS_STMT2 *stmt;
TAOS_STMT2_BINDV *bindv;
int32_t col_idx;
__taos_async_fn_t fp;
void *param;
} ThreadArgs;
typedef struct AsyncBindParam {
TdThreadMutex mutex;
@ -234,6 +241,7 @@ int stmtIsInsert2(TAOS_STMT2 *stmt, int *insert);
TAOS_RES *stmtUseResult2(TAOS_STMT2 *stmt);
const char *stmtErrstr2(TAOS_STMT2 *stmt);
int stmt2AsyncBind(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, void *param);
int stmtAsyncBindThreadFunc(void *args);
#ifdef __cplusplus
}

View File

@ -2256,14 +2256,25 @@ int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t c
}
STscStmt2 *pStmt = (STscStmt2 *)stmt;
if (atomic_load_8((int8_t *)&pStmt->asyncBindParam.asyncBindNum) > 0) {
tscError("async bind param is still working, please try again later");
return TSDB_CODE_TSC_STMT_API_ERROR;
}
ThreadArgs *args = (ThreadArgs *)taosMemoryMalloc(sizeof(ThreadArgs));
args->stmt = stmt;
args->bindv = bindv;
args->col_idx = col_idx;
args->fp = fp;
args->param = param;
(void)atomic_add_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
int code = stmt2AsyncBind(stmt, bindv, col_idx, fp, param);
if (code != TSDB_CODE_SUCCESS) {
int code_s = taosStmt2AsyncBind(stmtAsyncBindThreadFunc, (void *)args);
if (code_s != TSDB_CODE_SUCCESS) {
(void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
// terrno = TAOS_SYSTEM_ERROR(errno);
}
return code;
return code_s;
}
int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows) {

View File

@ -1957,17 +1957,8 @@ TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) {
return pStmt->exec.pRequest;
}
typedef struct {
TAOS_STMT2* stmt;
TAOS_STMT2_BINDV* bindv;
int32_t col_idx;
__taos_async_fn_t fp;
void* param;
} ThreadArgs;
static void* stmtAsyncBindThreadFunc(void* args) {
setThreadName("stmtAsyncBind");
int32_t stmtAsyncBindThreadFunc(void* args) {
qInfo("async stmt bind thread started");
ThreadArgs* targs = (ThreadArgs*)args;
@ -1983,40 +1974,5 @@ static void* stmtAsyncBindThreadFunc(void* args) {
qInfo("async stmt bind thread stopped");
return NULL;
}
int stmt2AsyncBind(TAOS_STMT2* stmt, TAOS_STMT2_BINDV* bindv, int32_t col_idx, __taos_async_fn_t fp, void* param) {
STscStmt2* pStmt = (STscStmt2*)stmt;
if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 1) {
tscError("async bind param is still working, please try again later");
return TSDB_CODE_TSC_STMT_API_ERROR;
}
TdThreadAttr thAttr;
if (taosThreadAttrInit(&thAttr) != 0) {
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED) != 0) {
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
ThreadArgs* args = (ThreadArgs*)taosMemoryMalloc(sizeof(ThreadArgs));
args->stmt = stmt;
args->bindv = bindv;
args->col_idx = col_idx;
args->fp = fp;
args->param = param;
if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtAsyncBindThreadFunc, args) != 0) {
(void)taosThreadAttrDestroy(&thAttr);
terrno = TAOS_SYSTEM_ERROR(errno);
STMT_ERR_RET(terrno);
}
// pStmt->bindThreadInUse = true;
(void)taosThreadAttrDestroy(&thAttr);
return TSDB_CODE_SUCCESS;
return code;
}

View File

@ -202,6 +202,18 @@ int32_t taosAsyncRecover() {
return taskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&taskQueue.wrokrerPool);
}
int32_t taosStmt2AsyncBind(__async_exec_fn_t bindFn, void* bindParam) {
SSchedMsg* pSchedMsg;
int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
if (rc) return rc;
pSchedMsg->fp = NULL;
pSchedMsg->ahandle = bindFn;
pSchedMsg->thandle = bindParam;
// pSchedMsg->msg = code;
return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
}
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
if (NULL == pMsgBody) {
return;