From aa055abd7cda13e627bef0d756cc2fb8a44a0abd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 27 Sep 2022 10:52:55 +0800 Subject: [PATCH 1/2] refactor:do some internal refactor. --- include/libs/executor/executor.h | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 3 +- source/libs/executor/src/executor.c | 3 +- source/libs/qworker/src/qworker.c | 38 ++++++++++++++++---------- 4 files changed, 28 insertions(+), 18 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 8c1d957381..81882ca1ac 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -136,7 +136,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table * @param handle * @return */ -int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch *pLocal); +int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch *pLocal); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds); /** diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 8d1525e081..aad141a404 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -690,7 +690,8 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma while (1) { uint64_t ts; - int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, NULL); + bool hasMore = false; + int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL); if (code < 0) { if (code == TSDB_CODE_QRY_IN_EXEC) { break; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 373cb451f4..0fbbc25873 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -479,7 +479,7 @@ static void freeBlock(void* param) { blockDataDestroy(pBlock); } -int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch* pLocal) { +int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; int64_t threadId = taosGetSelfPthreadId(); @@ -536,6 +536,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SL } } + *hasMore = (pRes != NULL); uint64_t el = (taosGetTimestampUs() - st); pTaskInfo->cost.elapsedTime += el; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 3c102938d0..ae9dd82a58 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -107,9 +107,12 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { QW_TASK_DLOG("start to execTask, loopIdx:%d", i++); // if *taskHandle is NULL, it's killed right now + bool hasMore = false; + if (taskHandle) { qwDbgSimulateSleep(); - code = qExecTaskOpt(taskHandle, pResList, &useconds, &localFetch); + + code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch); if (code) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) { QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); @@ -122,20 +125,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ++execNum; - if (taosArrayGetSize(pResList) == 0) { - QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds); - dsEndPut(sinkHandle, useconds); - - QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx)); - - if (queryStop) { - *queryStop = true; - } - - break; - } - - for (int32_t j = 0; j < taosArrayGetSize(pResList); ++j) { + size_t numOfResBlock = taosArrayGetSize(pResList); + for (int32_t j = 0; j < numOfResBlock; ++j) { SSDataBlock *pRes = taosArrayGetP(pResList, j); ASSERT(pRes->info.rows > 0); @@ -149,6 +140,23 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue); } + if (numOfResBlock == 0 || (hasMore == false)) { + if (numOfResBlock == 0) { + QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds); + } else { + QW_TASK_DLOG("qExecTask done", ""); + } + + dsEndPut(sinkHandle, useconds); + QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx)); + + if (queryStop) { + *queryStop = true; + } + + break; + } + if (!qcontinue) { if (queryStop) { *queryStop = true; From 398c23edecb8aafa3feb30d0a1ead1253343376c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 27 Sep 2022 18:37:02 +0800 Subject: [PATCH 2/2] enh(query): only load the required columns. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 39 ++++++++------------------ 1 file changed, 12 insertions(+), 27 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index b76b6b6280..c6fb817f6d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -951,15 +951,22 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn return TSDB_CODE_SUCCESS; } -static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData) { +static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData, uint64_t uid) { int64_t st = taosGetTimestampUs(); + tBlockDataReset(pBlockData); + TABLEID tid = {.suid = pReader->suid, .uid = uid}; + int32_t code = tBlockDataInit(pBlockData, &tid, pReader->pSchema, pReader->suppInfo.colIds, pReader->suppInfo.numOfCols); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; ASSERT(pBlockInfo != NULL); SDataBlk* pBlock = getCurrentBlock(pBlockIter); - int32_t code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData); + code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, code:%s %s", @@ -2455,14 +2462,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ASSERT(pBlockIter->numOfBlocks == 0); code = buildComposedDataBlock(pReader); } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) { - tBlockDataReset(&pStatus->fileBlockData); - TABLEID tid = {.suid = pReader->suid, .uid = pScanInfo->uid}; - code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData); + code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2936,14 +2936,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step); // 3. load the neighbor block, and set it to be the currently accessed file data block - tBlockDataReset(&pStatus->fileBlockData); - TABLEID tid = {.suid = pReader->suid, .uid = pFBlock->uid}; - int32_t code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData); + int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pFBlock->uid); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3701,15 +3694,7 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter); STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); - tBlockDataReset(&pStatus->fileBlockData); - TABLEID tid = {.suid = pReader->suid, .uid = pBlockScanInfo->uid}; - int32_t code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return NULL; - } - - code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData); + int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid); if (code != TSDB_CODE_SUCCESS) { tBlockDataDestroy(&pStatus->fileBlockData, 1); terrno = code;