diff --git a/source/client/inc/clientStmt2.h b/source/client/inc/clientStmt2.h index ad8938c474..74eb198930 100644 --- a/source/client/inc/clientStmt2.h +++ b/source/client/inc/clientStmt2.h @@ -152,6 +152,7 @@ typedef struct { int64_t reqid; int32_t errCode; + tsem_t asyncQuerySem; SStmtStatInfo stat; } STscStmt2; diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 3964cb184f..b3571078cd 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -814,6 +814,7 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) { } pStmt->sql.siInfo.tableColsReady = true; + (void)tsem_init(&pStmt->asyncQuerySem, 0, 1); STMT_LOG_SEQ(STMT_INIT); @@ -1574,12 +1575,22 @@ static int32_t createParseContext(const SRequestObj* pRequest, SParseContext** p return TSDB_CODE_SUCCESS; } -int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) { - STscStmt2* pStmt = (STscStmt2*)stmt; - int32_t code = 0; - SSubmitRsp* pRsp = NULL; +static void asyncQueryCb(void* userdata, TAOS_RES* res, int code) { + STscStmt2* pStmt = userdata; + __taos_async_fn_t fp = pStmt->options.asyncExecFn; - int64_t startUs = taosGetTimestampUs(); + pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest); + pStmt->affectedRows += pStmt->exec.affectedRows; + + fp(pStmt->options.userdata, res, code); + + (void)tsem_post(&pStmt->asyncQuerySem); +} + +int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) { + STscStmt2* pStmt = (STscStmt2*)stmt; + int32_t code = 0; + int64_t startUs = taosGetTimestampUs(); STMT_DLOG_E("start to exec"); @@ -1589,6 +1600,8 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) { STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE)); + (void)tsem_wait(&pStmt->asyncQuerySem); + if (STMT_TYPE_QUERY != pStmt->sql.type) { if (pStmt->sql.stbInterlaceMode) { int64_t startTs = taosGetTimestampUs(); @@ -1611,51 +1624,54 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) { } } - (void)launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); + SRequestObj* pRequest = pStmt->exec.pRequest; + __taos_async_fn_t fp = pStmt->options.asyncExecFn; - /* + if (!fp) { + (void)launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); + + if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) { + code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest); + if (code) { + pStmt->exec.pRequest->code = code; + } else { + STMT_ERR_RET(stmtResetStmt(pStmt)); + STMT_ERR_RET(TSDB_CODE_NEED_RETRY); + } + } + + STMT_ERR_JRET(pStmt->exec.pRequest->code); + + pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest); + if (affected_rows) { + *affected_rows = pStmt->exec.affectedRows; + } + pStmt->affectedRows += pStmt->exec.affectedRows; + } else { SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper)); if (pWrapper == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; } else { - pWrapper->pRequest = pStmt->exec.pRequest; - pStmt->exec.pRequest->pWrapper = pWrapper; + pWrapper->pRequest = pRequest; + pRequest->pWrapper = pWrapper; } if (TSDB_CODE_SUCCESS == code) { - code = createParseContext(pStmt->exec.pRequest, &pWrapper->pParseCtx, pWrapper); + code = createParseContext(pRequest, &pWrapper->pParseCtx, pWrapper); } - pStmt->exec.pRequest->syncQuery = false; - pStmt->exec.pRequest->body.queryFp = fp; - ((SSyncQueryParam*)(pStmt->exec.pRequest)->body.interParam)->userParam = param; - launchAsyncQuery(pStmt->exec.pRequest, pStmt->sql.pQuery, NULL, pWrapper); - */ + pRequest->syncQuery = false; + pRequest->body.queryFp = asyncQueryCb; + ((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt; - if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) { - code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest); - if (code) { - pStmt->exec.pRequest->code = code; - } else { - tFreeSSubmitRsp(pRsp); - STMT_ERR_RET(stmtResetStmt(pStmt)); - STMT_ERR_RET(TSDB_CODE_NEED_RETRY); - } + launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper); } - STMT_ERR_JRET(pStmt->exec.pRequest->code); - - pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest); - pStmt->affectedRows += pStmt->exec.affectedRows; - _return: - while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) { taosUsleep(1); } STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false)); - tFreeSSubmitRsp(pRsp); - ++pStmt->sql.runTimes; int64_t startUs2 = taosGetTimestampUs(); @@ -1688,6 +1704,8 @@ int stmtClose2(TAOS_STMT2* stmt) { pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs); STMT_ERR_RET(stmtCleanSQLInfo(pStmt)); + + (void)tsem_destroy(&pStmt->asyncQuerySem); taosMemoryFree(stmt); return TSDB_CODE_SUCCESS; diff --git a/tests/script/api/stmt2-nohole.c b/tests/script/api/stmt2-nohole.c index dbd441f136..1ddb4a66b2 100644 --- a/tests/script/api/stmt2-nohole.c +++ b/tests/script/api/stmt2-nohole.c @@ -1,6 +1,7 @@ // sample code to verify all TDengine API // to compile: gcc -o apitest apitest.c -ltaos +#include #include #include #include @@ -13,8 +14,12 @@ int64_t genReqid() { return count; } +sem_t sem; + void stmtAsyncQueryCb(void* param, TAOS_RES* pRes, int code) { int affected_rows = taos_affected_rows(pRes); + printf("\033[31maffected rows:%d\033[0m\n", affected_rows); + (void)sem_post(&sem); return; /* SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)param; @@ -313,6 +318,7 @@ void veriry_stmt(TAOS* taos) { taos_stmt2_free_fields(stmt, fields); */ // if (taos_stmt_execute(stmt) != 0) { + (void)sem_init(&sem, 0, 0); start = clock(); // if (taos_stmt2_exec(stmt, NULL, stmtAsyncQueryCb, NULL) != 0) { if (taos_stmt2_exec(stmt, NULL) != 0) { @@ -323,6 +329,9 @@ void veriry_stmt(TAOS* taos) { end = clock(); printf("exec time:%f\n", (double)(end - start) / CLOCKS_PER_SEC); + sem_wait(&sem); + (void)sem_destroy(&sem); + taos_stmt2_close(stmt); free(t8_len);