From 027a0c11bb0560a8ff0891d79f2b25faf5e7408d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 28 Sep 2022 14:18:32 +0800 Subject: [PATCH] refactor(query): do some internal refactor. --- source/client/src/clientEnv.c | 5 +++-- source/client/src/clientImpl.c | 27 +++++++++++++-------------- source/client/src/clientMain.c | 8 ++++++-- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 6a4ef3d821..9ba529783b 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -85,11 +85,12 @@ static void deregisterRequest(SRequestObj *pRequest) { atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 - "us, planner:%" PRId64 "us, exec:%" PRId64 "us", + "us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%"PRIx64, duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart, pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.ctgEnd, pRequest->metric.planEnd - pRequest->metric.semanticEnd, - pRequest->metric.resultReady - pRequest->metric.planEnd); + pRequest->metric.resultReady - pRequest->metric.planEnd, pRequest->requestId); + atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index be907ea1e2..dd541b3ac1 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -870,8 +870,9 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { SRequestObj* pRequest = (SRequestObj*)param; - pRequest->code = code; + STscObj* pTscObj = pRequest->pTscObj; + pRequest->code = code; pRequest->metric.resultReady = taosGetTimestampUs(); if (pResult) { @@ -879,31 +880,28 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult)); } - if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || - TDMT_VND_CREATE_TABLE == pRequest->type) { + int32_t type = pRequest->type; + if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) { if (pResult) { pRequest->body.resInfo.numOfRows = pResult->numOfRows; - if (TDMT_VND_SUBMIT == pRequest->type) { - STscObj* pTscObj = pRequest->pTscObj; + + // record the insert rows + if (TDMT_VND_SUBMIT == type) { SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary; atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows); } } schedulerFreeJob(&pRequest->body.queryJob, 0); - - pRequest->metric.execEnd = taosGetTimestampUs(); } taosMemoryFree(pResult); + tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code), + pRequest->requestId); - tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, - tstrerror(code), pRequest->requestId); - - STscObj* pTscObj = pRequest->pTscObj; if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) { - tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, - pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); + tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d, reqId:0x%" PRIx64, + pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId); pRequest->prevCode = code; schedulerFreeJob(&pRequest->body.queryJob, 0); doAsyncQuery(pRequest, true); @@ -915,7 +913,8 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { removeMeta(pTscObj, pRequest->targetTableList); } - handleQueryExecRsp(pRequest); + pRequest->metric.execEnd = taosGetTimestampUs(); + code = handleQueryExecRsp(pRequest); // return to client pRequest->body.queryFp(pRequest->body.param, pRequest, code); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 650b16f855..dd2046121e 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -700,6 +700,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { SQuery *pQuery = pRequest->pQuery; pRequest->metric.ctgEnd = taosGetTimestampUs(); + qDebug("0x%" PRIx64 " start to semantic analysis, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId); if (code == TSDB_CODE_SUCCESS) { code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); @@ -723,13 +724,16 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { destorySqlParseWrapper(pWrapper); - tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, reqId:0x%" PRIx64, pRequest->self, - pRequest->requestId); + double el = (pRequest->metric.semanticEnd - pRequest->metric.ctgEnd)/1000.0; + tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, elapsed time:%.2f ms, reqId:0x%" PRIx64, + pRequest->self, el, pRequest->requestId); + launchAsyncQuery(pRequest, pQuery, pResultMeta); } else { destorySqlParseWrapper(pWrapper); qDestroyQuery(pRequest->pQuery); pRequest->pQuery = NULL; + if (NEED_CLIENT_HANDLE_ERROR(code)) { tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);