Merge pull request #27837 from taosdata/fix/TD-30813-4

fix(stmt2/close): wait asnyc cb completed to free itself
This commit is contained in:
Hongze Cheng 2024-09-13 10:22:26 +08:00 committed by GitHub
commit 7b3c2bcbe3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 26 additions and 19 deletions

View File

@ -150,10 +150,10 @@ typedef struct {
SStmtExecInfo exec; SStmtExecInfo exec;
SStmtBindInfo bInfo; SStmtBindInfo bInfo;
int64_t reqid; int64_t reqid;
int32_t errCode; int32_t errCode;
tsem_t asyncQuerySem; tsem_t asyncQuerySem;
bool semWaited;
SStmtStatInfo stat; SStmtStatInfo stat;
} STscStmt2; } STscStmt2;
/* /*

View File

@ -1317,8 +1317,8 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d,QID:0x%" PRIx64, tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d,QID:0x%" PRIx64,
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
if (TSDB_CODE_SUCCESS != refreshMeta(pRequest->pTscObj, pRequest)) { if (TSDB_CODE_SUCCESS != refreshMeta(pRequest->pTscObj, pRequest)) {
tscWarn("0x%" PRIx64 " refresh meta failed, code:%d - %s,QID:0x%" PRIx64, pRequest->self, code, tscWarn("0x%" PRIx64 " refresh meta failed, code:%d - %s,QID:0x%" PRIx64, pRequest->self, code, tstrerror(code),
tstrerror(code), pRequest->requestId); pRequest->requestId);
} }
pRequest->prevCode = code; pRequest->prevCode = code;
doAsyncQuery(pRequest, true); doAsyncQuery(pRequest, true);
@ -1369,7 +1369,7 @@ typedef struct SAsyncFetchParam {
void *param; void *param;
} SAsyncFetchParam; } SAsyncFetchParam;
static int32_t doAsyncFetch(void* pParam) { static int32_t doAsyncFetch(void *pParam) {
SAsyncFetchParam *param = pParam; SAsyncFetchParam *param = pParam;
taosAsyncFetchImpl(param->pReq, param->fp, param->param); taosAsyncFetchImpl(param->pReq, param->fp, param->param);
taosMemoryFree(param); taosMemoryFree(param);
@ -1393,7 +1393,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
return; return;
} }
SAsyncFetchParam* pParam = taosMemoryCalloc(1, sizeof(SAsyncFetchParam)); SAsyncFetchParam *pParam = taosMemoryCalloc(1, sizeof(SAsyncFetchParam));
if (!pParam) { if (!pParam) {
fp(param, res, terrno); fp(param, res, terrno);
return; return;
@ -1983,6 +1983,12 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col
return terrno; return terrno;
} }
STscStmt2 *pStmt = (STscStmt2 *)stmt;
if (pStmt->options.asyncExecFn && !pStmt->semWaited) {
(void)tsem_wait(&pStmt->asyncQuerySem);
pStmt->semWaited = true;
}
int32_t code = 0; int32_t code = 0;
for (int i = 0; i < bindv->count; ++i) { for (int i = 0; i < bindv->count; ++i) {
if (bindv->tbnames && bindv->tbnames[i]) { if (bindv->tbnames && bindv->tbnames[i]) {

View File

@ -818,6 +818,7 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
if (pStmt->options.asyncExecFn) { if (pStmt->options.asyncExecFn) {
(void)tsem_init(&pStmt->asyncQuerySem, 0, 1); (void)tsem_init(&pStmt->asyncQuerySem, 0, 1);
} }
pStmt->semWaited = false;
STMT_LOG_SEQ(STMT_INIT); STMT_LOG_SEQ(STMT_INIT);
@ -1262,10 +1263,6 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
if (pStmt->options.asyncExecFn) {
(void)tsem_wait(&pStmt->asyncQuerySem);
}
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) { STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false; pStmt->bInfo.needParse = false;
@ -1666,7 +1663,6 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false)); STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false));
++pStmt->sql.runTimes; ++pStmt->sql.runTimes;
} else { } else {
SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper)); SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
if (pWrapper == NULL) { if (pWrapper == NULL) {
@ -1682,6 +1678,7 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
pRequest->body.queryFp = asyncQueryCb; pRequest->body.queryFp = asyncQueryCb;
((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt; ((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt;
pStmt->semWaited = false;
launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper); launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper);
} }
@ -1703,6 +1700,10 @@ int stmtClose2(TAOS_STMT2* stmt) {
pStmt->bindThreadInUse = false; pStmt->bindThreadInUse = false;
} }
if (pStmt->options.asyncExecFn && !pStmt->semWaited) {
(void)tsem_wait(&pStmt->asyncQuerySem);
}
STMT_DLOG("stmt %p closed, stbInterlaceMode: %d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64 STMT_DLOG("stmt %p closed, stbInterlaceMode: %d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64
", parseSqlNum=>%" PRId64 ", pStmt->stat.bindDataNum=>%" PRId64 ", parseSqlNum=>%" PRId64 ", pStmt->stat.bindDataNum=>%" PRId64
", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u" ", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u"

View File

@ -14,12 +14,12 @@ int64_t genReqid() {
return count; return count;
} }
sem_t sem; // sem_t sem;
void stmtAsyncQueryCb(void* param, TAOS_RES* pRes, int code) { void stmtAsyncQueryCb(void* param, TAOS_RES* pRes, int code) {
int affected_rows = taos_affected_rows(pRes); int affected_rows = taos_affected_rows(pRes);
printf("\033[31maffected rows:%d\033[0m\n", affected_rows); printf("\033[31maffected rows:%d\033[0m\n", affected_rows);
(void)sem_post(&sem); //(void)sem_post(&sem);
return; return;
/* /*
SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)param; SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)param;
@ -319,7 +319,7 @@ _bind_again:
taos_stmt2_free_fields(stmt, fields); taos_stmt2_free_fields(stmt, fields);
*/ */
// if (taos_stmt_execute(stmt) != 0) { // if (taos_stmt_execute(stmt) != 0) {
(void)sem_init(&sem, 0, 0); //(void)sem_init(&sem, 0, 0);
start = clock(); start = clock();
// if (taos_stmt2_exec(stmt, NULL, stmtAsyncQueryCb, NULL) != 0) { // if (taos_stmt2_exec(stmt, NULL, stmtAsyncQueryCb, NULL) != 0) {
if (taos_stmt2_exec(stmt, NULL) != 0) { if (taos_stmt2_exec(stmt, NULL) != 0) {
@ -330,9 +330,9 @@ _bind_again:
end = clock(); end = clock();
printf("exec time:%f\n", (double)(end - start) / CLOCKS_PER_SEC); printf("exec time:%f\n", (double)(end - start) / CLOCKS_PER_SEC);
sem_wait(&sem); // sem_wait(&sem);
(void)sem_destroy(&sem); //(void)sem_destroy(&sem);
if (++run_time < 2) { if (++run_time < 20) {
goto _bind_again; goto _bind_again;
} }
taos_stmt2_close(stmt); taos_stmt2_close(stmt);