bind async

This commit is contained in:
pengrongkun94@qq.com 2025-02-10 18:32:34 +08:00
parent b81d3e0938
commit 7025be999b
4 changed files with 80 additions and 16 deletions

View File

@ -247,6 +247,7 @@ typedef struct TAOS_STMT2_BINDV {
DLL_EXPORT TAOS_STMT2 *taos_stmt2_init(TAOS *taos, TAOS_STMT2_OPTION *option);
DLL_EXPORT int taos_stmt2_prepare(TAOS_STMT2 *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx);
DLL_EXPORT int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, void *param);
DLL_EXPORT int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows);
DLL_EXPORT int taos_stmt2_close(TAOS_STMT2 *stmt);
DLL_EXPORT int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert);

View File

@ -153,8 +153,10 @@ typedef struct {
char *db;
int64_t reqid;
int32_t errCode;
tsem_t asyncQuerySem;
bool semWaited;
tsem_t asyncExecSem;
bool execSemWaited;
tsem_t asyncBindSem;
bool bindSemWaited;
SStmtStatInfo stat;
} STscStmt2;
/*
@ -226,6 +228,7 @@ int stmtGetParamNum2(TAOS_STMT2 *stmt, int *nums);
int stmtIsInsert2(TAOS_STMT2 *stmt, int *insert);
TAOS_RES *stmtUseResult2(TAOS_STMT2 *stmt);
const char *stmtErrstr2(TAOS_STMT2 *stmt);
int stmt2AsyncBind(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, void *param);
#ifdef __cplusplus
}

View File

@ -2166,11 +2166,18 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col
}
STscStmt2 *pStmt = (STscStmt2 *)stmt;
if (pStmt->options.asyncExecFn && !pStmt->semWaited) {
if (tsem_wait(&pStmt->asyncQuerySem) != 0) {
tscError("wait async query sem failed");
if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
if (tsem_wait(&pStmt->asyncExecSem) != 0) {
tscError("wait asyncExecSem failed");
}
pStmt->semWaited = true;
pStmt->execSemWaited = true;
}
if (!pStmt->bindSemWaited) {
if (tsem_wait(&pStmt->asyncBindSem) != 0) {
tscError("wait asyncBindSem failed");
}
pStmt->bindSemWaited = true;
}
SSHashObj *hashTbnames = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR));
@ -2243,6 +2250,11 @@ out:
return code;
}
int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp,
void *param) {
return stmt2AsyncBind(stmt, bindv,col_idx, fp, param);
}
int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);

View File

@ -835,13 +835,13 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
pStmt->sql.siInfo.tableColsReady = true;
if (pStmt->options.asyncExecFn) {
if (tsem_init(&pStmt->asyncQuerySem, 0, 1) != 0) {
if (tsem_init(&pStmt->asyncExecSem, 0, 1) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
(void)stmtClose(pStmt);
return NULL;
}
}
pStmt->semWaited = false;
pStmt->execSemWaited = false;
STMT_LOG_SEQ(STMT_INIT);
@ -1656,8 +1656,8 @@ static void asyncQueryCb(void* userdata, TAOS_RES* res, int code) {
(void)stmtCleanExecInfo(pStmt, (code ? false : true), false);
++pStmt->sql.runTimes;
if (tsem_post(&pStmt->asyncQuerySem) != 0) {
tscError("failed to post asyncQuerySem");
if (tsem_post(&pStmt->asyncExecSem) != 0) {
tscError("failed to post asyncExecSem");
}
}
@ -1746,7 +1746,7 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
pRequest->body.queryFp = asyncQueryCb;
((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt;
pStmt->semWaited = false;
pStmt->execSemWaited = false;
launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper);
}
@ -1775,9 +1775,9 @@ int stmtClose2(TAOS_STMT2* stmt) {
(void)taosThreadCondDestroy(&pStmt->queue.waitCond);
(void)taosThreadMutexDestroy(&pStmt->queue.mutex);
if (pStmt->options.asyncExecFn && !pStmt->semWaited) {
if (tsem_wait(&pStmt->asyncQuerySem) != 0) {
tscError("failed to wait asyncQuerySem");
if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
if (tsem_wait(&pStmt->asyncExecSem) != 0) {
tscError("failed to wait asyncExecSem");
}
}
@ -1795,8 +1795,8 @@ int stmtClose2(TAOS_STMT2* stmt) {
STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
if (pStmt->options.asyncExecFn) {
if (tsem_destroy(&pStmt->asyncQuerySem) != 0) {
tscError("failed to destroy asyncQuerySem");
if (tsem_destroy(&pStmt->asyncExecSem) != 0) {
tscError("failed to destroy asyncExecSem");
}
}
taosMemoryFree(stmt);
@ -1925,3 +1925,51 @@ TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) {
return pStmt->exec.pRequest;
}
typedef struct {
TAOS_STMT2* stmt;
TAOS_STMT2_BINDV* bindv;
int32_t col_idx;
__taos_async_fn_t fp;
void* param;
} ThreadArgs;
static void* stmtAsyncBindThreadFunc(void* args) {
ThreadArgs* targs = (ThreadArgs*)args;
int code = taos_stmt2_bind_param(targs->stmt, targs->bindv, targs->col_idx);
targs->fp(targs->param, NULL, code);
taosMemoryFree(args);
return NULL;
}
int stmt2AsyncBind(TAOS_STMT2* stmt, TAOS_STMT2_BINDV* bindv, int32_t col_idx, __taos_async_fn_t fp, void* param) {
STscStmt2* pStmt = (STscStmt2*)stmt;
TdThreadAttr thAttr;
if (taosThreadAttrInit(&thAttr) != 0) {
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED) != 0) {
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
ThreadArgs* args = (ThreadArgs*)taosMemoryMalloc(sizeof(ThreadArgs));
args->stmt = stmt;
args->bindv = bindv;
args->col_idx = col_idx;
args->fp = fp;
args->param = param;
if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtAsyncBindThreadFunc, args) != 0) {
(void)taosThreadAttrDestroy(&thAttr);
terrno = TAOS_SYSTEM_ERROR(errno);
STMT_ERR_RET(terrno);
}
// pStmt->bindThreadInUse = true;
(void)taosThreadAttrDestroy(&thAttr);
return TSDB_CODE_SUCCESS;
}