refactor(query): do some internal refactor.
This commit is contained in:
parent
14412f2811
commit
027a0c11bb
|
@ -85,11 +85,12 @@ static void deregisterRequest(SRequestObj *pRequest) {
|
|||
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
|
||||
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
|
||||
tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
|
||||
"us, planner:%" PRId64 "us, exec:%" PRId64 "us",
|
||||
"us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%"PRIx64,
|
||||
duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
|
||||
pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.ctgEnd,
|
||||
pRequest->metric.planEnd - pRequest->metric.semanticEnd,
|
||||
pRequest->metric.resultReady - pRequest->metric.planEnd);
|
||||
pRequest->metric.resultReady - pRequest->metric.planEnd, pRequest->requestId);
|
||||
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
|
||||
}
|
||||
|
||||
|
|
|
@ -870,8 +870,9 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
|
|||
|
||||
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
||||
SRequestObj* pRequest = (SRequestObj*)param;
|
||||
pRequest->code = code;
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
|
||||
pRequest->code = code;
|
||||
pRequest->metric.resultReady = taosGetTimestampUs();
|
||||
|
||||
if (pResult) {
|
||||
|
@ -879,31 +880,28 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
|||
memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
|
||||
}
|
||||
|
||||
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
|
||||
TDMT_VND_CREATE_TABLE == pRequest->type) {
|
||||
int32_t type = pRequest->type;
|
||||
if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
|
||||
if (pResult) {
|
||||
pRequest->body.resInfo.numOfRows = pResult->numOfRows;
|
||||
if (TDMT_VND_SUBMIT == pRequest->type) {
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
|
||||
// record the insert rows
|
||||
if (TDMT_VND_SUBMIT == type) {
|
||||
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
||||
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
|
||||
}
|
||||
}
|
||||
|
||||
schedulerFreeJob(&pRequest->body.queryJob, 0);
|
||||
|
||||
pRequest->metric.execEnd = taosGetTimestampUs();
|
||||
}
|
||||
|
||||
taosMemoryFree(pResult);
|
||||
tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||
pRequest->requestId);
|
||||
|
||||
tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
|
||||
tstrerror(code), pRequest->requestId);
|
||||
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
|
||||
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
|
||||
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
||||
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d, reqId:0x%" PRIx64,
|
||||
pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId);
|
||||
pRequest->prevCode = code;
|
||||
schedulerFreeJob(&pRequest->body.queryJob, 0);
|
||||
doAsyncQuery(pRequest, true);
|
||||
|
@ -915,7 +913,8 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
|||
removeMeta(pTscObj, pRequest->targetTableList);
|
||||
}
|
||||
|
||||
handleQueryExecRsp(pRequest);
|
||||
pRequest->metric.execEnd = taosGetTimestampUs();
|
||||
code = handleQueryExecRsp(pRequest);
|
||||
|
||||
// return to client
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
||||
|
|
|
@ -700,6 +700,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
|||
SQuery *pQuery = pRequest->pQuery;
|
||||
|
||||
pRequest->metric.ctgEnd = taosGetTimestampUs();
|
||||
qDebug("0x%" PRIx64 " start to semantic analysis, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId);
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
|
||||
|
@ -723,13 +724,16 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
|||
|
||||
destorySqlParseWrapper(pWrapper);
|
||||
|
||||
tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, reqId:0x%" PRIx64, pRequest->self,
|
||||
pRequest->requestId);
|
||||
double el = (pRequest->metric.semanticEnd - pRequest->metric.ctgEnd)/1000.0;
|
||||
tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, elapsed time:%.2f ms, reqId:0x%" PRIx64,
|
||||
pRequest->self, el, pRequest->requestId);
|
||||
|
||||
launchAsyncQuery(pRequest, pQuery, pResultMeta);
|
||||
} else {
|
||||
destorySqlParseWrapper(pWrapper);
|
||||
qDestroyQuery(pRequest->pQuery);
|
||||
pRequest->pQuery = NULL;
|
||||
|
||||
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
||||
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
|
||||
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
||||
|
|
Loading…
Reference in New Issue