stmt2/async: new sem for api syncing

This commit is contained in:
Minglei Jin 2024-08-29 08:56:24 +08:00
parent 249327419a
commit 8cbcdec4bf
3 changed files with 60 additions and 32 deletions

View File

@ -152,6 +152,7 @@ typedef struct {
int64_t reqid; int64_t reqid;
int32_t errCode; int32_t errCode;
tsem_t asyncQuerySem;
SStmtStatInfo stat; SStmtStatInfo stat;
} STscStmt2; } STscStmt2;

View File

@ -814,6 +814,7 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
} }
pStmt->sql.siInfo.tableColsReady = true; pStmt->sql.siInfo.tableColsReady = true;
(void)tsem_init(&pStmt->asyncQuerySem, 0, 1);
STMT_LOG_SEQ(STMT_INIT); STMT_LOG_SEQ(STMT_INIT);
@ -1574,11 +1575,21 @@ static int32_t createParseContext(const SRequestObj* pRequest, SParseContext** p
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void asyncQueryCb(void* userdata, TAOS_RES* res, int code) {
STscStmt2* pStmt = userdata;
__taos_async_fn_t fp = pStmt->options.asyncExecFn;
pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
pStmt->affectedRows += pStmt->exec.affectedRows;
fp(pStmt->options.userdata, res, code);
(void)tsem_post(&pStmt->asyncQuerySem);
}
int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) { int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
STscStmt2* pStmt = (STscStmt2*)stmt; STscStmt2* pStmt = (STscStmt2*)stmt;
int32_t code = 0; int32_t code = 0;
SSubmitRsp* pRsp = NULL;
int64_t startUs = taosGetTimestampUs(); int64_t startUs = taosGetTimestampUs();
STMT_DLOG_E("start to exec"); STMT_DLOG_E("start to exec");
@ -1589,6 +1600,8 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
(void)tsem_wait(&pStmt->asyncQuerySem);
if (STMT_TYPE_QUERY != pStmt->sql.type) { if (STMT_TYPE_QUERY != pStmt->sql.type) {
if (pStmt->sql.stbInterlaceMode) { if (pStmt->sql.stbInterlaceMode) {
int64_t startTs = taosGetTimestampUs(); int64_t startTs = taosGetTimestampUs();
@ -1611,31 +1624,17 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
} }
} }
(void)launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); SRequestObj* pRequest = pStmt->exec.pRequest;
__taos_async_fn_t fp = pStmt->options.asyncExecFn;
/* if (!fp) {
SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper)); (void)launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
if (pWrapper == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
pWrapper->pRequest = pStmt->exec.pRequest;
pStmt->exec.pRequest->pWrapper = pWrapper;
}
if (TSDB_CODE_SUCCESS == code) {
code = createParseContext(pStmt->exec.pRequest, &pWrapper->pParseCtx, pWrapper);
}
pStmt->exec.pRequest->syncQuery = false;
pStmt->exec.pRequest->body.queryFp = fp;
((SSyncQueryParam*)(pStmt->exec.pRequest)->body.interParam)->userParam = param;
launchAsyncQuery(pStmt->exec.pRequest, pStmt->sql.pQuery, NULL, pWrapper);
*/
if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) { if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest); code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
if (code) { if (code) {
pStmt->exec.pRequest->code = code; pStmt->exec.pRequest->code = code;
} else { } else {
tFreeSSubmitRsp(pRsp);
STMT_ERR_RET(stmtResetStmt(pStmt)); STMT_ERR_RET(stmtResetStmt(pStmt));
STMT_ERR_RET(TSDB_CODE_NEED_RETRY); STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
} }
@ -1644,18 +1643,35 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
STMT_ERR_JRET(pStmt->exec.pRequest->code); STMT_ERR_JRET(pStmt->exec.pRequest->code);
pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest); pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
if (affected_rows) {
*affected_rows = pStmt->exec.affectedRows;
}
pStmt->affectedRows += pStmt->exec.affectedRows; pStmt->affectedRows += pStmt->exec.affectedRows;
} else {
SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
if (pWrapper == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
pWrapper->pRequest = pRequest;
pRequest->pWrapper = pWrapper;
}
if (TSDB_CODE_SUCCESS == code) {
code = createParseContext(pRequest, &pWrapper->pParseCtx, pWrapper);
}
pRequest->syncQuery = false;
pRequest->body.queryFp = asyncQueryCb;
((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt;
launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper);
}
_return: _return:
while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) { while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
taosUsleep(1); taosUsleep(1);
} }
STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false)); STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false));
tFreeSSubmitRsp(pRsp);
++pStmt->sql.runTimes; ++pStmt->sql.runTimes;
int64_t startUs2 = taosGetTimestampUs(); int64_t startUs2 = taosGetTimestampUs();
@ -1688,6 +1704,8 @@ int stmtClose2(TAOS_STMT2* stmt) {
pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs); pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs);
STMT_ERR_RET(stmtCleanSQLInfo(pStmt)); STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
(void)tsem_destroy(&pStmt->asyncQuerySem);
taosMemoryFree(stmt); taosMemoryFree(stmt);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -1,6 +1,7 @@
// sample code to verify all TDengine API // sample code to verify all TDengine API
// to compile: gcc -o apitest apitest.c -ltaos // to compile: gcc -o apitest apitest.c -ltaos
#include <semaphore.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
@ -13,8 +14,12 @@ int64_t genReqid() {
return count; return count;
} }
sem_t sem;
void stmtAsyncQueryCb(void* param, TAOS_RES* pRes, int code) { void stmtAsyncQueryCb(void* param, TAOS_RES* pRes, int code) {
int affected_rows = taos_affected_rows(pRes); int affected_rows = taos_affected_rows(pRes);
printf("\033[31maffected rows:%d\033[0m\n", affected_rows);
(void)sem_post(&sem);
return; return;
/* /*
SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)param; SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)param;
@ -313,6 +318,7 @@ void veriry_stmt(TAOS* taos) {
taos_stmt2_free_fields(stmt, fields); taos_stmt2_free_fields(stmt, fields);
*/ */
// if (taos_stmt_execute(stmt) != 0) { // if (taos_stmt_execute(stmt) != 0) {
(void)sem_init(&sem, 0, 0);
start = clock(); start = clock();
// if (taos_stmt2_exec(stmt, NULL, stmtAsyncQueryCb, NULL) != 0) { // if (taos_stmt2_exec(stmt, NULL, stmtAsyncQueryCb, NULL) != 0) {
if (taos_stmt2_exec(stmt, NULL) != 0) { if (taos_stmt2_exec(stmt, NULL) != 0) {
@ -323,6 +329,9 @@ void veriry_stmt(TAOS* taos) {
end = clock(); end = clock();
printf("exec time:%f\n", (double)(end - start) / CLOCKS_PER_SEC); printf("exec time:%f\n", (double)(end - start) / CLOCKS_PER_SEC);
sem_wait(&sem);
(void)sem_destroy(&sem);
taos_stmt2_close(stmt); taos_stmt2_close(stmt);
free(t8_len); free(t8_len);