stmt query
This commit is contained in:
parent
426200a71b
commit
5c1be75b9b
|
@ -71,7 +71,6 @@ typedef struct SStmtBindInfo {
|
||||||
|
|
||||||
typedef struct SStmtExecInfo {
|
typedef struct SStmtExecInfo {
|
||||||
int32_t affectedRows;
|
int32_t affectedRows;
|
||||||
bool emptyRes;
|
|
||||||
SRequestObj* pRequest;
|
SRequestObj* pRequest;
|
||||||
SHashObj* pVgHash;
|
SHashObj* pVgHash;
|
||||||
SHashObj* pBlockHash;
|
SHashObj* pBlockHash;
|
||||||
|
@ -87,7 +86,6 @@ typedef struct SStmtSQLInfo {
|
||||||
char* sqlStr;
|
char* sqlStr;
|
||||||
int32_t sqlLen;
|
int32_t sqlLen;
|
||||||
SArray* nodeList;
|
SArray* nodeList;
|
||||||
SQueryPlan* pQueryPlan;
|
|
||||||
SStmtQueryResInfo queryRes;
|
SStmtQueryResInfo queryRes;
|
||||||
bool autoCreateTbl;
|
bool autoCreateTbl;
|
||||||
} SStmtSQLInfo;
|
} SStmtSQLInfo;
|
||||||
|
|
|
@ -226,6 +226,7 @@ int32_t stmtParseSql(STscStmt* pStmt) {
|
||||||
break;
|
break;
|
||||||
case QUERY_NODE_SELECT_STMT:
|
case QUERY_NODE_SELECT_STMT:
|
||||||
pStmt->sql.type = STMT_TYPE_QUERY;
|
pStmt->sql.type = STMT_TYPE_QUERY;
|
||||||
|
STMT_ERR_RET(stmtBackupQueryFields(pStmt));
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
tscError("not supported stmt type %d", nodeType(pStmt->sql.pQuery->pRoot));
|
tscError("not supported stmt type %d", nodeType(pStmt->sql.pQuery->pRoot));
|
||||||
|
@ -279,7 +280,6 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pStmt->exec.autoCreateTbl = false;
|
pStmt->exec.autoCreateTbl = false;
|
||||||
pStmt->exec.emptyRes = false;
|
|
||||||
|
|
||||||
if (keepTable) {
|
if (keepTable) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -298,7 +298,6 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
|
||||||
taosMemoryFree(pStmt->sql.queryRes.userFields);
|
taosMemoryFree(pStmt->sql.queryRes.userFields);
|
||||||
taosMemoryFree(pStmt->sql.sqlStr);
|
taosMemoryFree(pStmt->sql.sqlStr);
|
||||||
qDestroyQuery(pStmt->sql.pQuery);
|
qDestroyQuery(pStmt->sql.pQuery);
|
||||||
qDestroyQueryPlan(pStmt->sql.pQueryPlan);
|
|
||||||
taosArrayDestroy(pStmt->sql.nodeList);
|
taosArrayDestroy(pStmt->sql.nodeList);
|
||||||
|
|
||||||
void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
|
void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
|
||||||
|
@ -599,6 +598,8 @@ int32_t stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD** fiel
|
||||||
int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
|
int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
|
||||||
STscStmt* pStmt = (STscStmt*)stmt;
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
|
||||||
|
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
|
||||||
|
|
||||||
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
|
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
|
||||||
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
|
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
|
||||||
pStmt->bInfo.needParse = false;
|
pStmt->bInfo.needParse = false;
|
||||||
|
@ -615,23 +616,29 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
|
||||||
|
|
||||||
if (pStmt->bInfo.needParse) {
|
if (pStmt->bInfo.needParse) {
|
||||||
STMT_ERR_RET(stmtParseSql(pStmt));
|
STMT_ERR_RET(stmtParseSql(pStmt));
|
||||||
|
} else if (STMT_TYPE_QUERY == pStmt->sql.type) {
|
||||||
|
STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
|
||||||
}
|
}
|
||||||
|
|
||||||
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
|
|
||||||
|
|
||||||
if (STMT_TYPE_QUERY == pStmt->sql.type) {
|
if (STMT_TYPE_QUERY == pStmt->sql.type) {
|
||||||
if (NULL == pStmt->sql.pQueryPlan) {
|
STMT_ERR_RET(qStmtBindParams(pStmt->sql.pQuery, bind, colIdx, pStmt->exec.pRequest->requestId));
|
||||||
STMT_ERR_RET(getQueryPlan(pStmt->exec.pRequest, pStmt->sql.pQuery, &pStmt->sql.nodeList));
|
|
||||||
pStmt->sql.pQueryPlan = pStmt->exec.pRequest->body.pDag;
|
SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
|
||||||
pStmt->exec.pRequest->body.pDag = NULL;
|
.acctId = pStmt->taos->acctId,
|
||||||
STMT_ERR_RET(stmtBackupQueryFields(pStmt));
|
.db = pStmt->exec.pRequest->pDb,
|
||||||
} else {
|
.topicQuery = false,
|
||||||
STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
|
.pSql = pStmt->sql.sqlStr,
|
||||||
}
|
.sqlLen = pStmt->sql.sqlLen,
|
||||||
|
.pMsg = pStmt->exec.pRequest->msgBuf,
|
||||||
|
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
|
||||||
|
.pTransporter = pStmt->taos->pAppInfo->pTransporter,
|
||||||
|
.pStmtCb = NULL,
|
||||||
|
.pUser = pStmt->taos->user};
|
||||||
|
STMT_ERR_RET(qStmtParseQuerySql(&ctx, pStmt->sql.pQuery));
|
||||||
|
|
||||||
STMT_RET(qStmtBindParam(pStmt->sql.pQueryPlan, bind, colIdx, pStmt->exec.pRequest->requestId, &pStmt->exec.emptyRes));
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||||
if (NULL == pDataBlock) {
|
if (NULL == pDataBlock) {
|
||||||
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
|
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
|
||||||
|
@ -736,11 +743,7 @@ int stmtExec(TAOS_STMT *stmt) {
|
||||||
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
|
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
|
||||||
|
|
||||||
if (STMT_TYPE_QUERY == pStmt->sql.type) {
|
if (STMT_TYPE_QUERY == pStmt->sql.type) {
|
||||||
if (pStmt->exec.emptyRes) {
|
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true, NULL);
|
||||||
pStmt->exec.pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
|
||||||
} else {
|
|
||||||
scheduleQuery(pStmt->exec.pRequest, pStmt->sql.pQueryPlan, pStmt->sql.nodeList, NULL);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash));
|
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash));
|
||||||
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true, (autoCreateTbl ? (void**)&pRsp : NULL));
|
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true, (autoCreateTbl ? (void**)&pRsp : NULL));
|
||||||
|
@ -839,16 +842,7 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (STMT_TYPE_QUERY == pStmt->sql.type) {
|
if (STMT_TYPE_QUERY == pStmt->sql.type) {
|
||||||
if (NULL == pStmt->sql.pQueryPlan) {
|
*nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
|
||||||
STMT_ERR_RET(getQueryPlan(pStmt->exec.pRequest, pStmt->sql.pQuery, &pStmt->sql.nodeList));
|
|
||||||
pStmt->sql.pQueryPlan = pStmt->exec.pRequest->body.pDag;
|
|
||||||
pStmt->exec.pRequest->body.pDag = NULL;
|
|
||||||
STMT_ERR_RET(stmtBackupQueryFields(pStmt));
|
|
||||||
} else {
|
|
||||||
STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
|
|
||||||
}
|
|
||||||
|
|
||||||
*nums = taosArrayGetSize(pStmt->sql.pQueryPlan->pPlaceholderValues);
|
|
||||||
} else {
|
} else {
|
||||||
STMT_ERR_RET(stmtFetchColFields(stmt, nums, NULL));
|
STMT_ERR_RET(stmtFetchColFields(stmt, nums, NULL));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue