From aa055abd7cda13e627bef0d756cc2fb8a44a0abd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 27 Sep 2022 10:52:55 +0800 Subject: [PATCH 1/7] refactor:do some internal refactor. --- include/libs/executor/executor.h | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 3 +- source/libs/executor/src/executor.c | 3 +- source/libs/qworker/src/qworker.c | 38 ++++++++++++++++---------- 4 files changed, 28 insertions(+), 18 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 8c1d957381..81882ca1ac 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -136,7 +136,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table * @param handle * @return */ -int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch *pLocal); +int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch *pLocal); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds); /** diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 8d1525e081..aad141a404 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -690,7 +690,8 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma while (1) { uint64_t ts; - int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, NULL); + bool hasMore = false; + int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL); if (code < 0) { if (code == TSDB_CODE_QRY_IN_EXEC) { break; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 373cb451f4..0fbbc25873 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -479,7 +479,7 @@ static void freeBlock(void* param) { blockDataDestroy(pBlock); } -int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch* pLocal) { +int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; int64_t threadId = taosGetSelfPthreadId(); @@ -536,6 +536,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SL } } + *hasMore = (pRes != NULL); uint64_t el = (taosGetTimestampUs() - st); pTaskInfo->cost.elapsedTime += el; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 3c102938d0..ae9dd82a58 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -107,9 +107,12 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { QW_TASK_DLOG("start to execTask, loopIdx:%d", i++); // if *taskHandle is NULL, it's killed right now + bool hasMore = false; + if (taskHandle) { qwDbgSimulateSleep(); - code = qExecTaskOpt(taskHandle, pResList, &useconds, &localFetch); + + code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch); if (code) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) { QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); @@ -122,20 +125,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ++execNum; - if (taosArrayGetSize(pResList) == 0) { - QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds); - dsEndPut(sinkHandle, useconds); - - QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx)); - - if (queryStop) { - *queryStop = true; - } - - break; - } - - for (int32_t j = 0; j < taosArrayGetSize(pResList); ++j) { + size_t numOfResBlock = taosArrayGetSize(pResList); + for (int32_t j = 0; j < numOfResBlock; ++j) { SSDataBlock *pRes = taosArrayGetP(pResList, j); ASSERT(pRes->info.rows > 0); @@ -149,6 +140,23 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue); } + if (numOfResBlock == 0 || (hasMore == false)) { + if (numOfResBlock == 0) { + QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds); + } else { + QW_TASK_DLOG("qExecTask done", ""); + } + + dsEndPut(sinkHandle, useconds); + QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx)); + + if (queryStop) { + *queryStop = true; + } + + break; + } + if (!qcontinue) { if (queryStop) { *queryStop = true; From 398c23edecb8aafa3feb30d0a1ead1253343376c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 27 Sep 2022 18:37:02 +0800 Subject: [PATCH 2/7] enh(query): only load the required columns. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 39 ++++++++------------------ 1 file changed, 12 insertions(+), 27 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index b76b6b6280..c6fb817f6d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -951,15 +951,22 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn return TSDB_CODE_SUCCESS; } -static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData) { +static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData, uint64_t uid) { int64_t st = taosGetTimestampUs(); + tBlockDataReset(pBlockData); + TABLEID tid = {.suid = pReader->suid, .uid = uid}; + int32_t code = tBlockDataInit(pBlockData, &tid, pReader->pSchema, pReader->suppInfo.colIds, pReader->suppInfo.numOfCols); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; ASSERT(pBlockInfo != NULL); SDataBlk* pBlock = getCurrentBlock(pBlockIter); - int32_t code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData); + code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, code:%s %s", @@ -2455,14 +2462,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ASSERT(pBlockIter->numOfBlocks == 0); code = buildComposedDataBlock(pReader); } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) { - tBlockDataReset(&pStatus->fileBlockData); - TABLEID tid = {.suid = pReader->suid, .uid = pScanInfo->uid}; - code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData); + code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2936,14 +2936,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step); // 3. load the neighbor block, and set it to be the currently accessed file data block - tBlockDataReset(&pStatus->fileBlockData); - TABLEID tid = {.suid = pReader->suid, .uid = pFBlock->uid}; - int32_t code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData); + int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pFBlock->uid); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3701,15 +3694,7 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter); STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); - tBlockDataReset(&pStatus->fileBlockData); - TABLEID tid = {.suid = pReader->suid, .uid = pBlockScanInfo->uid}; - int32_t code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return NULL; - } - - code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData); + int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid); if (code != TSDB_CODE_SUCCESS) { tBlockDataDestroy(&pStatus->fileBlockData, 1); terrno = code; From c565c4cb636cd3eef7003e9db178bce5f0435244 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 27 Sep 2022 18:50:51 +0800 Subject: [PATCH 3/7] fix(query): remove the primary timestamp column in the required column list. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c6fb817f6d..f9b65b6527 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -956,7 +956,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI tBlockDataReset(pBlockData); TABLEID tid = {.suid = pReader->suid, .uid = uid}; - int32_t code = tBlockDataInit(pBlockData, &tid, pReader->pSchema, pReader->suppInfo.colIds, pReader->suppInfo.numOfCols); + int32_t code = tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols-1); if (code != TSDB_CODE_SUCCESS) { return code; } From 14412f28118e1b6626b984f8ca9ba9282dba5f16 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 28 Sep 2022 11:16:52 +0800 Subject: [PATCH 4/7] refactor(query): do some internal refactor. --- source/dnode/vnode/src/inc/tsdb.h | 4 ++-- source/dnode/vnode/src/tsdb/tsdbCache.c | 5 ++++- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 20 +++++--------------- source/dnode/vnode/src/tsdb/tsdbRead.c | 3 +-- 4 files changed, 12 insertions(+), 20 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 916311bbee..75b2f74096 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -662,8 +662,8 @@ typedef struct { } SSkmInfo; int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pBlockLoadInfo, STSchema *pSchema, - int16_t *pCols, int32_t numOfCols, const char *idStr); + STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, + bool destroyLoadInfo, const char *idStr); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); TSDBROW tMergeTreeGetRow(SMergeTree *pMTree); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 775f452864..3a921349e6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -420,6 +420,7 @@ typedef enum { typedef struct { SFSLASTNEXTROWSTATES state; // [input] STsdb *pTsdb; // [input] + STSchema *pTSchema;// [input] tb_uid_t suid; tb_uid_t uid; int32_t nFileSet; @@ -455,9 +456,10 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet); if (code) goto _err; + SSttBlockLoadInfo* pLoadInfo = tCreateLastBlockLoadInfo(state->pTSchema, NULL, 0); tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid, &(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, - &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, NULL, NULL, NULL, 0, NULL); + &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, pLoadInfo,true, NULL); bool hasVal = tMergeTreeNext(&state->mergeTree); if (!hasVal) { state->state = SFSLASTNEXTROW_FILESET; @@ -892,6 +894,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS; pIter->fsLastState.pTsdb = pTsdb; pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet; + pIter->fsLastState.pTSchema = pTSchema; pIter->fsLastState.suid = suid; pIter->fsLastState.uid = uid; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 126d2d729b..7e6a0d04ff 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -474,8 +474,8 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) { } int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo, STSchema* pSchema, - int16_t* pCols, int32_t numOfCols, const char* idStr) { + STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, + bool destroyLoadInfo, const char *idStr) { pMTree->backward = backward; pMTree->pIter = NULL; pMTree->pIterList = taosArrayInit(4, POINTER_BYTES); @@ -488,22 +488,12 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn); int32_t code = TSDB_CODE_SUCCESS; - SSttBlockLoadInfo* pLoadInfo = NULL; - if (pBlockLoadInfo == NULL) { - ASSERT(0); - if (pMTree->pLoadInfo == NULL) { - pMTree->destroyLoadInfo = true; - pMTree->pLoadInfo = tCreateLastBlockLoadInfo(pSchema, pCols, numOfCols); - } - - pLoadInfo = pMTree->pLoadInfo; - } else { - pLoadInfo = pBlockLoadInfo; - } + pMTree->pLoadInfo = pBlockLoadInfo; + pMTree->destroyLoadInfo = destroyLoadInfo; for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file struct SLDataIter* pIter = NULL; - code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, &pLoadInfo[i]); + code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, &pMTree->pLoadInfo[i]); if (code != TSDB_CODE_SUCCESS) { goto _end; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index f9b65b6527..2ea6a57b9e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2005,8 +2005,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader, - pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, - pReader->pSchema, pReader->suppInfo.colIds, pReader->suppInfo.numOfCols, pReader->idStr); + pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, false, pReader->idStr); if (code != TSDB_CODE_SUCCESS) { return false; } From 027a0c11bb0560a8ff0891d79f2b25faf5e7408d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 28 Sep 2022 14:18:32 +0800 Subject: [PATCH 5/7] refactor(query): do some internal refactor. --- source/client/src/clientEnv.c | 5 +++-- source/client/src/clientImpl.c | 27 +++++++++++++-------------- source/client/src/clientMain.c | 8 ++++++-- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 6a4ef3d821..9ba529783b 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -85,11 +85,12 @@ static void deregisterRequest(SRequestObj *pRequest) { atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 - "us, planner:%" PRId64 "us, exec:%" PRId64 "us", + "us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%"PRIx64, duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart, pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.ctgEnd, pRequest->metric.planEnd - pRequest->metric.semanticEnd, - pRequest->metric.resultReady - pRequest->metric.planEnd); + pRequest->metric.resultReady - pRequest->metric.planEnd, pRequest->requestId); + atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index be907ea1e2..dd541b3ac1 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -870,8 +870,9 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { SRequestObj* pRequest = (SRequestObj*)param; - pRequest->code = code; + STscObj* pTscObj = pRequest->pTscObj; + pRequest->code = code; pRequest->metric.resultReady = taosGetTimestampUs(); if (pResult) { @@ -879,31 +880,28 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult)); } - if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || - TDMT_VND_CREATE_TABLE == pRequest->type) { + int32_t type = pRequest->type; + if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) { if (pResult) { pRequest->body.resInfo.numOfRows = pResult->numOfRows; - if (TDMT_VND_SUBMIT == pRequest->type) { - STscObj* pTscObj = pRequest->pTscObj; + + // record the insert rows + if (TDMT_VND_SUBMIT == type) { SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary; atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows); } } schedulerFreeJob(&pRequest->body.queryJob, 0); - - pRequest->metric.execEnd = taosGetTimestampUs(); } taosMemoryFree(pResult); + tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code), + pRequest->requestId); - tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, - tstrerror(code), pRequest->requestId); - - STscObj* pTscObj = pRequest->pTscObj; if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) { - tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, - pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); + tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d, reqId:0x%" PRIx64, + pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId); pRequest->prevCode = code; schedulerFreeJob(&pRequest->body.queryJob, 0); doAsyncQuery(pRequest, true); @@ -915,7 +913,8 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { removeMeta(pTscObj, pRequest->targetTableList); } - handleQueryExecRsp(pRequest); + pRequest->metric.execEnd = taosGetTimestampUs(); + code = handleQueryExecRsp(pRequest); // return to client pRequest->body.queryFp(pRequest->body.param, pRequest, code); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 650b16f855..dd2046121e 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -700,6 +700,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { SQuery *pQuery = pRequest->pQuery; pRequest->metric.ctgEnd = taosGetTimestampUs(); + qDebug("0x%" PRIx64 " start to semantic analysis, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId); if (code == TSDB_CODE_SUCCESS) { code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); @@ -723,13 +724,16 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { destorySqlParseWrapper(pWrapper); - tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, reqId:0x%" PRIx64, pRequest->self, - pRequest->requestId); + double el = (pRequest->metric.semanticEnd - pRequest->metric.ctgEnd)/1000.0; + tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, elapsed time:%.2f ms, reqId:0x%" PRIx64, + pRequest->self, el, pRequest->requestId); + launchAsyncQuery(pRequest, pQuery, pResultMeta); } else { destorySqlParseWrapper(pWrapper); qDestroyQuery(pRequest->pQuery); pRequest->pQuery = NULL; + if (NEED_CLIENT_HANDLE_ERROR(code)) { tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); From e36eed852f9b8d2ac0fc265f000447c52c2b4c92 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 28 Sep 2022 14:39:24 +0800 Subject: [PATCH 6/7] refactor:do some internal refactor. --- source/client/src/clientEnv.c | 9 ++++----- source/libs/scheduler/src/schJob.c | 5 ++--- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 9ba529783b..2962aaa3b6 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -70,11 +70,10 @@ static void deregisterRequest(SRequestObj *pRequest) { int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); - int64_t nowUs = taosGetTimestampUs(); - int64_t duration = nowUs - pRequest->metric.start; - tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64 - " ms, current:%d, app current:%d", - pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst); + int64_t duration = taosGetTimestampUs() - pRequest->metric.start; + tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%.2f ms, " + "current:%d, app current:%d", + pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst); if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) { tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 5e47c0a0ed..69495c8b7a 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -47,8 +47,7 @@ void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) { return; _return: - - SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode)); + SCH_JOB_DLOG("job errCode updated to %s", tstrerror(errCode)); } bool schJobDone(SSchJob *pJob) { @@ -491,7 +490,7 @@ int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { int32_t code = atomic_load_32(&pJob->errCode); if (code) { - SCH_JOB_DLOG("job failed with error: %s", tstrerror(code)); + SCH_JOB_DLOG("job failed with error %s", tstrerror(code)); } schPostJobRes(pJob, 0); From 1ee075dce066fa9131b8a9451a5d5e99f34d0a03 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 28 Sep 2022 16:32:50 +0800 Subject: [PATCH 7/7] fix(query):set correct code. --- source/client/src/clientImpl.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index dd541b3ac1..7852a01bfb 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -815,7 +815,7 @@ int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) { int32_t handleQueryExecRsp(SRequestObj* pRequest) { if (NULL == pRequest->body.resInfo.execRes.res) { - return TSDB_CODE_SUCCESS; + return pRequest->code; } SCatalog* pCatalog = NULL; @@ -868,6 +868,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { return code; } +//todo refacto the error code mgmt void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { SRequestObj* pRequest = (SRequestObj*)param; STscObj* pTscObj = pRequest->pTscObj; @@ -914,7 +915,10 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { } pRequest->metric.execEnd = taosGetTimestampUs(); - code = handleQueryExecRsp(pRequest); + int32_t code1 = handleQueryExecRsp(pRequest); + if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) { + pRequest->code = code1; + } // return to client pRequest->body.queryFp(pRequest->body.param, pRequest, code);