From cbe91fd4c380396152ab8e837560420e3d579684 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 27 Sep 2022 13:48:53 +0800 Subject: [PATCH] fix(query): initialize the blockdata before load last block. --- source/dnode/vnode/src/inc/tsdb.h | 8 ++++++-- source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 20 +++++++++++++++----- source/dnode/vnode/src/tsdb/tsdbRead.c | 8 ++++++-- 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 55a82fd776..9876c35ce4 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -633,6 +633,9 @@ typedef struct SSttBlockLoadInfo { int32_t currentLoadBlockIndex; int32_t loadBlocks; double elapsedTime; + STSchema *pSchema; + int16_t *colIds; + int32_t numOfCols; } SSttBlockLoadInfo; typedef struct SMergeTree { @@ -652,13 +655,14 @@ typedef struct { } SSkmInfo; int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pLoadInfo, const char *idStr); + STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo, STSchema* pSchema, + int16_t* pCols, int32_t numOfCols, const char* idStr); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); TSDBROW tMergeTreeGetRow(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree); -SSttBlockLoadInfo *tCreateLastBlockLoadInfo(); +SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema* pSchema, int16_t* colList, int32_t numOfCols); void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el); void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 3c145cc7ca..775f452864 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -457,7 +457,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid, &(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, - &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, NULL, NULL); + &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, NULL, NULL, NULL, 0, NULL); bool hasVal = tMergeTreeNext(&state->mergeTree); if (!hasVal) { state->state = SFSLASTNEXTROW_FILESET; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 5f03a82bc0..ab2fc22b4a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -28,11 +28,10 @@ struct SLDataIter { uint64_t uid; STimeWindow timeWindow; SVersionRange verRange; - SSttBlockLoadInfo* pBlockLoadInfo; }; -SSttBlockLoadInfo* tCreateLastBlockLoadInfo() { +SSttBlockLoadInfo* tCreateLastBlockLoadInfo(STSchema* pSchema, int16_t* colList, int32_t numOfCols) { SSttBlockLoadInfo* pLoadInfo = taosMemoryCalloc(TSDB_DEFAULT_STT_FILE, sizeof(SSttBlockLoadInfo)); if (pLoadInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -57,6 +56,9 @@ SSttBlockLoadInfo* tCreateLastBlockLoadInfo() { pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk)); } + pLoadInfo->pSchema = pSchema; + pLoadInfo->colIds = colList; + pLoadInfo->numOfCols = numOfCols; return pLoadInfo; } @@ -111,7 +113,13 @@ static SBlockData* loadLastBlock(SLDataIter *pIter, const char* idStr) { pInfo->currentLoadBlockIndex ^= 1; if (pIter->pSttBlk != NULL) { // current block not loaded yet int64_t st = taosGetTimestampUs(); - code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, &pInfo->blockData[pInfo->currentLoadBlockIndex]); + + SBlockData* pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex]; + TABLEID id = {.suid = pIter->pSttBlk->suid, .uid = 0}; + + tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols); + code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock); + double el = (taosGetTimestampUs() - st)/ 1000.0; pInfo->elapsedTime += el; pInfo->loadBlocks += 1; @@ -460,7 +468,8 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) { } int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo, const char* idStr) { + STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo, STSchema* pSchema, + int16_t* pCols, int32_t numOfCols, const char* idStr) { pMTree->backward = backward; pMTree->pIter = NULL; pMTree->pIterList = taosArrayInit(4, POINTER_BYTES); @@ -475,9 +484,10 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead SSttBlockLoadInfo* pLoadInfo = NULL; if (pBlockLoadInfo == NULL) { + ASSERT(0); if (pMTree->pLoadInfo == NULL) { pMTree->destroyLoadInfo = true; - pMTree->pLoadInfo = tCreateLastBlockLoadInfo(); + pMTree->pLoadInfo = tCreateLastBlockLoadInfo(pSchema, pCols, numOfCols); } pLoadInfo = pMTree->pLoadInfo; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 9283df7455..b403ad2e86 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -79,6 +79,7 @@ typedef struct SBlockLoadSuppInfo { SColumnDataAgg tsColAgg; SColumnDataAgg** plist; int16_t* colIds; // column ids for loading file block data + int32_t numOfCols; char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. } SBlockLoadSuppInfo; @@ -203,6 +204,7 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { size_t numOfCols = blockDataGetNumOfCols(pBlock); + pSupInfo->numOfCols = numOfCols; pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t)); pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES); if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) { @@ -352,7 +354,8 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdb tMergeTreeClose(&pLReader->mergeTree); if (pLReader->pInfo == NULL) { - pLReader->pInfo = tCreateLastBlockLoadInfo(); + // here we ignore the first column, which is always be the primary timestamp column + pLReader->pInfo = tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1); if (pLReader->pInfo == NULL) { tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr); return terrno; @@ -1995,7 +1998,8 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader, - pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, pReader->idStr); + pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, + pReader->pSchema, pReader->suppInfo.colIds, pReader->suppInfo.numOfCols, pReader->idStr); if (code != TSDB_CODE_SUCCESS) { return false; }