From fba5560f1bfb29e33bbdf43da8b8d22072eacf68 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 12 Sep 2024 14:24:11 +0800 Subject: [PATCH 1/4] fix(stmt2/close): wait asnyc cb completed to free itself --- source/client/src/clientStmt2.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index e7e08c0982..e716e06005 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -1667,7 +1667,6 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) { STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false)); ++pStmt->sql.runTimes; - } else { SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper)); if (pWrapper == NULL) { @@ -1718,6 +1717,7 @@ int stmtClose2(TAOS_STMT2* stmt) { STMT_ERR_RET(stmtCleanSQLInfo(pStmt)); if (pStmt->options.asyncExecFn) { + (void)tsem_wait(&pStmt->asyncQuerySem); (void)tsem_destroy(&pStmt->asyncQuerySem); } taosMemoryFree(stmt); From 9ad2b9d96a21ba3c98da904e0c13c41b290b9fc9 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 12 Sep 2024 14:58:05 +0800 Subject: [PATCH 2/4] move sem to client main to wait for set tbname etc. --- source/client/src/clientMain.c | 13 +++++++++---- source/client/src/clientStmt2.c | 9 ++++----- tests/script/api/stmt2-nohole.c | 12 ++++++------ 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 1c1fff9b7b..a28c17cb25 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -1317,8 +1317,8 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d,QID:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); if (TSDB_CODE_SUCCESS != refreshMeta(pRequest->pTscObj, pRequest)) { - tscWarn("0x%" PRIx64 " refresh meta failed, code:%d - %s,QID:0x%" PRIx64, pRequest->self, code, - tstrerror(code), pRequest->requestId); + tscWarn("0x%" PRIx64 " refresh meta failed, code:%d - %s,QID:0x%" PRIx64, pRequest->self, code, tstrerror(code), + pRequest->requestId); } pRequest->prevCode = code; doAsyncQuery(pRequest, true); @@ -1369,7 +1369,7 @@ typedef struct SAsyncFetchParam { void *param; } SAsyncFetchParam; -static int32_t doAsyncFetch(void* pParam) { +static int32_t doAsyncFetch(void *pParam) { SAsyncFetchParam *param = pParam; taosAsyncFetchImpl(param->pReq, param->fp, param->param); taosMemoryFree(param); @@ -1393,7 +1393,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { return; } - SAsyncFetchParam* pParam = taosMemoryCalloc(1, sizeof(SAsyncFetchParam)); + SAsyncFetchParam *pParam = taosMemoryCalloc(1, sizeof(SAsyncFetchParam)); if (!pParam) { fp(param, res, terrno); return; @@ -1983,6 +1983,11 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col return terrno; } + STscStmt2 *pStmt = (STscStmt2 *)stmt; + if (pStmt->options.asyncExecFn) { + (void)tsem_wait(&pStmt->asyncQuerySem); + } + int32_t code = 0; for (int i = 0; i < bindv->count; ++i) { if (bindv->tbnames && bindv->tbnames[i]) { diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index e716e06005..51533fc56c 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -1263,10 +1263,6 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) { STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND)); - if (pStmt->options.asyncExecFn) { - (void)tsem_wait(&pStmt->asyncQuerySem); - } - if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && STMT_TYPE_MULTI_INSERT != pStmt->sql.type) { pStmt->bInfo.needParse = false; @@ -1703,6 +1699,10 @@ int stmtClose2(TAOS_STMT2* stmt) { pStmt->bindThreadInUse = false; } + if (pStmt->options.asyncExecFn) { + (void)tsem_wait(&pStmt->asyncQuerySem); + } + STMT_DLOG("stmt %p closed, stbInterlaceMode: %d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64 ", parseSqlNum=>%" PRId64 ", pStmt->stat.bindDataNum=>%" PRId64 ", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u" @@ -1717,7 +1717,6 @@ int stmtClose2(TAOS_STMT2* stmt) { STMT_ERR_RET(stmtCleanSQLInfo(pStmt)); if (pStmt->options.asyncExecFn) { - (void)tsem_wait(&pStmt->asyncQuerySem); (void)tsem_destroy(&pStmt->asyncQuerySem); } taosMemoryFree(stmt); diff --git a/tests/script/api/stmt2-nohole.c b/tests/script/api/stmt2-nohole.c index b29dd3e826..5954f3660b 100644 --- a/tests/script/api/stmt2-nohole.c +++ b/tests/script/api/stmt2-nohole.c @@ -14,12 +14,12 @@ int64_t genReqid() { return count; } -sem_t sem; +// sem_t sem; void stmtAsyncQueryCb(void* param, TAOS_RES* pRes, int code) { int affected_rows = taos_affected_rows(pRes); printf("\033[31maffected rows:%d\033[0m\n", affected_rows); - (void)sem_post(&sem); + //(void)sem_post(&sem); return; /* SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)param; @@ -319,7 +319,7 @@ _bind_again: taos_stmt2_free_fields(stmt, fields); */ // if (taos_stmt_execute(stmt) != 0) { - (void)sem_init(&sem, 0, 0); + //(void)sem_init(&sem, 0, 0); start = clock(); // if (taos_stmt2_exec(stmt, NULL, stmtAsyncQueryCb, NULL) != 0) { if (taos_stmt2_exec(stmt, NULL) != 0) { @@ -330,9 +330,9 @@ _bind_again: end = clock(); printf("exec time:%f\n", (double)(end - start) / CLOCKS_PER_SEC); - sem_wait(&sem); - (void)sem_destroy(&sem); - if (++run_time < 2) { + // sem_wait(&sem); + //(void)sem_destroy(&sem); + if (++run_time < 20) { goto _bind_again; } taos_stmt2_close(stmt); From e7cfe21e7213e0e10213b297ecbfd9b3b86e3918 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 12 Sep 2024 15:17:09 +0800 Subject: [PATCH 3/4] stmt2/async: new flag semWaited for the current batch --- source/client/inc/clientStmt2.h | 8 ++++---- source/client/src/clientMain.c | 3 ++- source/client/src/clientStmt2.c | 2 ++ 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/source/client/inc/clientStmt2.h b/source/client/inc/clientStmt2.h index 74eb198930..4e9a09c082 100644 --- a/source/client/inc/clientStmt2.h +++ b/source/client/inc/clientStmt2.h @@ -150,10 +150,10 @@ typedef struct { SStmtExecInfo exec; SStmtBindInfo bInfo; - int64_t reqid; - int32_t errCode; - tsem_t asyncQuerySem; - + int64_t reqid; + int32_t errCode; + tsem_t asyncQuerySem; + bool semWaited; SStmtStatInfo stat; } STscStmt2; /* diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index a28c17cb25..7cfbeb1372 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -1984,8 +1984,9 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col } STscStmt2 *pStmt = (STscStmt2 *)stmt; - if (pStmt->options.asyncExecFn) { + if (pStmt->options.asyncExecFn && !pStmt->semWaited) { (void)tsem_wait(&pStmt->asyncQuerySem); + pStmt->semWaited = true; } int32_t code = 0; diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 51533fc56c..1739dced8f 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -819,6 +819,7 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) { if (pStmt->options.asyncExecFn) { (void)tsem_init(&pStmt->asyncQuerySem, 0, 1); } + pStmt->semWaited = false; STMT_LOG_SEQ(STMT_INIT); @@ -1678,6 +1679,7 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) { pRequest->body.queryFp = asyncQueryCb; ((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt; + pStmt->semWaited = false; launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper); } From aecab6e2caf2b4d71567bfaa66e6457918db5861 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 12 Sep 2024 15:27:11 +0800 Subject: [PATCH 4/4] stmt2/close: wait only when !semWaited (i.e. execed current batch) --- source/client/src/clientStmt2.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 1739dced8f..ae59319cec 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -1701,7 +1701,7 @@ int stmtClose2(TAOS_STMT2* stmt) { pStmt->bindThreadInUse = false; } - if (pStmt->options.asyncExecFn) { + if (pStmt->options.asyncExecFn && !pStmt->semWaited) { (void)tsem_wait(&pStmt->asyncQuerySem); }