stmt2/async: move post query actions into cb function

This commit is contained in:
Minglei Jin 2024-09-04 15:55:15 +08:00
parent 270190b4ee
commit 2aa14bb8c3
1 changed files with 28 additions and 14 deletions

View File

@ -349,8 +349,10 @@ static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClea
}
} else {
if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) {
// if (!pStmt->options.asyncExecFn) {
taos_free_result(pStmt->exec.pRequest);
pStmt->exec.pRequest = NULL;
//}
}
size_t keyLen = 0;
@ -814,7 +816,9 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
}
pStmt->sql.siInfo.tableColsReady = true;
(void)tsem_init(&pStmt->asyncQuerySem, 0, 1);
if (pStmt->options.asyncExecFn) {
(void)tsem_init(&pStmt->asyncQuerySem, 0, 1);
}
STMT_LOG_SEQ(STMT_INIT);
@ -1275,6 +1279,10 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
if (pStmt->options.asyncExecFn) {
(void)tsem_wait(&pStmt->asyncQuerySem);
}
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_ERR_RET(qStmtBindParams2(pStmt->sql.pQuery, bind, colIdx));
@ -1584,6 +1592,12 @@ static void asyncQueryCb(void* userdata, TAOS_RES* res, int code) {
fp(pStmt->options.userdata, res, code);
while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
taosUsleep(1);
}
(void)stmtCleanExecInfo(pStmt, (code ? false : true), false);
++pStmt->sql.runTimes;
(void)tsem_post(&pStmt->asyncQuerySem);
}
@ -1600,8 +1614,6 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
(void)tsem_wait(&pStmt->asyncQuerySem);
if (STMT_TYPE_QUERY != pStmt->sql.type) {
if (pStmt->sql.stbInterlaceMode) {
int64_t startTs = taosGetTimestampUs();
@ -1647,6 +1659,15 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
*affected_rows = pStmt->exec.affectedRows;
}
pStmt->affectedRows += pStmt->exec.affectedRows;
while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
taosUsleep(1);
}
STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false));
++pStmt->sql.runTimes;
} else {
SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
if (pWrapper == NULL) {
@ -1666,16 +1687,7 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
}
_return:
while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
taosUsleep(1);
}
STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false));
++pStmt->sql.runTimes;
int64_t startUs2 = taosGetTimestampUs();
pStmt->stat.execUseUs += startUs2 - startUs;
pStmt->stat.execUseUs += taosGetTimestampUs() - startUs;
STMT_RET(code);
}
@ -1705,7 +1717,9 @@ int stmtClose2(TAOS_STMT2* stmt) {
STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
(void)tsem_destroy(&pStmt->asyncQuerySem);
if (pStmt->options.asyncExecFn) {
(void)tsem_destroy(&pStmt->asyncQuerySem);
}
taosMemoryFree(stmt);
return TSDB_CODE_SUCCESS;