diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index c2f0d58279..916311bbee 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -32,6 +32,12 @@ extern "C" { #define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on +#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \ + if (CODE) { \ + LINO = __LINE__; \ + goto LABEL; \ + } + typedef struct TSDBROW TSDBROW; typedef struct TABLEID TABLEID; typedef struct TSDBKEY TSDBKEY; @@ -150,7 +156,7 @@ int32_t tCmprBlockL(void const *lhs, void const *rhs); int32_t tBlockDataCreate(SBlockData *pBlockData); void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear); -int32_t tBlockDataInit(SBlockData *pBlockData, int64_t suid, int64_t uid, STSchema *pTSchema); +int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid); int32_t tBlockDataInitEx(SBlockData *pBlockData, SBlockData *pBlockDataFrom); void tBlockDataReset(SBlockData *pBlockData); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); @@ -272,6 +278,7 @@ int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk); int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg); int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData); int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData); +int32_t tsdbReadSttBlockEx(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData); // SDelFWriter int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb); int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync); @@ -633,6 +640,9 @@ typedef struct SSttBlockLoadInfo { int32_t currentLoadBlockIndex; int32_t loadBlocks; double elapsedTime; + STSchema *pSchema; + int16_t *colIds; + int32_t numOfCols; } SSttBlockLoadInfo; typedef struct SMergeTree { @@ -652,13 +662,14 @@ typedef struct { } SSkmInfo; int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pLoadInfo, const char *idStr); + STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pBlockLoadInfo, STSchema *pSchema, + int16_t *pCols, int32_t numOfCols, const char *idStr); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); TSDBROW tMergeTreeGetRow(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree); -SSttBlockLoadInfo *tCreateLastBlockLoadInfo(); +SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols); void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el); void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 32272b541c..775f452864 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -457,7 +457,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid, &(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, - &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, NULL, NULL); + &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, NULL, NULL, NULL, 0, NULL); bool hasVal = tMergeTreeNext(&state->mergeTree); if (!hasVal) { state->state = SFSLASTNEXTROW_FILESET; @@ -612,7 +612,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk); /* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL); */ tBlockDataReset(state->pBlockData); - code = tBlockDataInit(state->pBlockData, state->suid, state->uid, state->pTSchema); + TABLEID tid = {.suid = state->suid, .uid = state->uid}; + code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, NULL, 0); if (code) goto _err; code = tsdbReadDataBlock(state->pDataFReader, &block, state->pBlockData); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index a619b9f2e4..5403395623 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -437,7 +437,7 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { pIter->iSttBlk = 0; SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, 0); - code = tsdbReadSttBlock(pCommitter->dReader.pReader, iStt, pSttBlk, &pIter->bData); + code = tsdbReadSttBlockEx(pCommitter->dReader.pReader, iStt, pSttBlk, &pIter->bData); if (code) goto _err; pIter->iRow = 0; @@ -1049,7 +1049,7 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { if (pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk)) { SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk); - code = tsdbReadSttBlock(pCommitter->dReader.pReader, pIter->iStt, pSttBlk, &pIter->bData); + code = tsdbReadSttBlockEx(pCommitter->dReader.pReader, pIter->iStt, pSttBlk, &pIter->bData); if (code) goto _exit; pIter->iRow = 0; @@ -1305,7 +1305,8 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) { if (!pBDatal->suid && !pBDatal->uid) { ASSERT(pCommitter->skmTable.suid == id.suid); ASSERT(pCommitter->skmTable.uid == id.uid); - code = tBlockDataInit(pBDatal, id.suid, id.suid ? 0 : id.uid, pCommitter->skmTable.pTSchema); + TABLEID tid = {.suid = id.suid, .uid = id.suid ? 0 : id.uid}; + code = tBlockDataInit(pBDatal, &tid, pCommitter->skmTable.pTSchema, NULL, 0); if (code) goto _exit; } @@ -1428,9 +1429,9 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { // impl code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable); if (code) goto _err; - code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema); + code = tBlockDataInit(&pCommitter->dReader.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0); if (code) goto _err; - code = tBlockDataInit(&pCommitter->dWriter.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema); + code = tBlockDataInit(&pCommitter->dWriter.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0); if (code) goto _err; /* merge with data in .data file */ diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 5f03a82bc0..126d2d729b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -28,11 +28,10 @@ struct SLDataIter { uint64_t uid; STimeWindow timeWindow; SVersionRange verRange; - SSttBlockLoadInfo* pBlockLoadInfo; }; -SSttBlockLoadInfo* tCreateLastBlockLoadInfo() { +SSttBlockLoadInfo* tCreateLastBlockLoadInfo(STSchema* pSchema, int16_t* colList, int32_t numOfCols) { SSttBlockLoadInfo* pLoadInfo = taosMemoryCalloc(TSDB_DEFAULT_STT_FILE, sizeof(SSttBlockLoadInfo)); if (pLoadInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -55,6 +54,9 @@ SSttBlockLoadInfo* tCreateLastBlockLoadInfo() { } pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk)); + pLoadInfo[i].pSchema = pSchema; + pLoadInfo[i].colIds = colList; + pLoadInfo[i].numOfCols = numOfCols; } return pLoadInfo; @@ -111,7 +113,19 @@ static SBlockData* loadLastBlock(SLDataIter *pIter, const char* idStr) { pInfo->currentLoadBlockIndex ^= 1; if (pIter->pSttBlk != NULL) { // current block not loaded yet int64_t st = taosGetTimestampUs(); - code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, &pInfo->blockData[pInfo->currentLoadBlockIndex]); + + SBlockData* pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex]; + + TABLEID id = {0}; + if (pIter->pSttBlk->suid != 0) { + id.suid = pIter->pSttBlk->suid; + } else { + id.uid = pIter->uid; + } + + tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols); + code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock); + double el = (taosGetTimestampUs() - st)/ 1000.0; pInfo->elapsedTime += el; pInfo->loadBlocks += 1; @@ -460,7 +474,8 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) { } int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo, const char* idStr) { + STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo, STSchema* pSchema, + int16_t* pCols, int32_t numOfCols, const char* idStr) { pMTree->backward = backward; pMTree->pIter = NULL; pMTree->pIterList = taosArrayInit(4, POINTER_BYTES); @@ -475,9 +490,10 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead SSttBlockLoadInfo* pLoadInfo = NULL; if (pBlockLoadInfo == NULL) { + ASSERT(0); if (pMTree->pLoadInfo == NULL) { pMTree->destroyLoadInfo = true; - pMTree->pLoadInfo = tCreateLastBlockLoadInfo(); + pMTree->pLoadInfo = tCreateLastBlockLoadInfo(pSchema, pCols, numOfCols); } pLoadInfo = pMTree->pLoadInfo; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 3d2c89ba02..b76b6b6280 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -79,6 +79,7 @@ typedef struct SBlockLoadSuppInfo { SColumnDataAgg tsColAgg; SColumnDataAgg** plist; int16_t* colIds; // column ids for loading file block data + int32_t numOfCols; char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. } SBlockLoadSuppInfo; @@ -203,6 +204,7 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { size_t numOfCols = blockDataGetNumOfCols(pBlock); + pSupInfo->numOfCols = numOfCols; pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t)); pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES); if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) { @@ -352,7 +354,8 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdb tMergeTreeClose(&pLReader->mergeTree); if (pLReader->pInfo == NULL) { - pLReader->pInfo = tCreateLastBlockLoadInfo(); + // here we ignore the first column, which is always be the primary timestamp column + pLReader->pInfo = tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1); if (pLReader->pInfo == NULL) { tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr); return terrno; @@ -1741,7 +1744,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* tsLast = getCurrentKeyInLastBlock(pLastBlockReader); } - int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo)? pBlockData->aTSKEY[pDumpInfo->rowIndex]:INT64_MIN; + int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); @@ -1995,7 +1998,8 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader, - pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, pReader->idStr); + pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, + pReader->pSchema, pReader->suppInfo.colIds, pReader->suppInfo.numOfCols, pReader->idStr); if (code != TSDB_CODE_SUCCESS) { return false; } @@ -2009,12 +2013,12 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { } static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; } -bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) { - if (pBlockData->nRow > 0) { - ASSERT(pBlockData->nRow == pDumpInfo->totalRows); +bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) { + if (pBlockData->nRow > 0) { + ASSERT(pBlockData->nRow == pDumpInfo->totalRows); } - return pBlockData->nRow > 0 && (!pDumpInfo->allDumped); + return pBlockData->nRow > 0 && (!pDumpInfo->allDumped); } int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, @@ -2452,7 +2456,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { code = buildComposedDataBlock(pReader); } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) { tBlockDataReset(&pStatus->fileBlockData); - code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema); + TABLEID tid = {.suid = pReader->suid, .uid = pScanInfo->uid}; + code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2932,7 +2937,8 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn // 3. load the neighbor block, and set it to be the currently accessed file data block tBlockDataReset(&pStatus->fileBlockData); - int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pFBlock->uid, pReader->pSchema); + TABLEID tid = {.suid = pReader->suid, .uid = pFBlock->uid}; + int32_t code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3404,10 +3410,14 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl // we need only one row pPrevReader->capacity = 1; pPrevReader->status.pTableMap = pReader->status.pTableMap; + pPrevReader->pSchema = pReader->pSchema; + pPrevReader->pMemSchema = pReader->pMemSchema; pPrevReader->pReadSnap = pReader->pReadSnap; pNextReader->capacity = 1; pNextReader->status.pTableMap = pReader->status.pTableMap; + pNextReader->pSchema = pReader->pSchema; + pNextReader->pMemSchema = pReader->pMemSchema; pNextReader->pReadSnap = pReader->pReadSnap; code = doOpenReaderImpl(pPrevReader); @@ -3441,11 +3451,19 @@ void tsdbReaderClose(STsdbReader* pReader) { { if (pReader->innerReader[0] != NULL) { - pReader->innerReader[0]->status.pTableMap = NULL; - pReader->innerReader[0]->pReadSnap = NULL; + STsdbReader* p = pReader->innerReader[0]; - pReader->innerReader[1]->status.pTableMap = NULL; - pReader->innerReader[1]->pReadSnap = NULL; + p->status.pTableMap = NULL; + p->pReadSnap = NULL; + p->pSchema = NULL; + p->pMemSchema = NULL; + + p = pReader->innerReader[1]; + + p->status.pTableMap = NULL; + p->pReadSnap = NULL; + p->pSchema = NULL; + p->pMemSchema = NULL; tsdbReaderClose(pReader->innerReader[0]); tsdbReaderClose(pReader->innerReader[1]); @@ -3684,7 +3702,8 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); tBlockDataReset(&pStatus->fileBlockData); - int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pBlockScanInfo->uid, pReader->pSchema); + TABLEID tid = {.suid = pReader->suid, .uid = pBlockScanInfo->uid}; + int32_t code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 5fe0b408b1..fc577e3962 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -926,12 +926,13 @@ _err: return code; } -static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, SBlockData *pBlockData) { +static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, SBlockData *pBlockData, + int32_t iStt) { int32_t code = 0; tBlockDataClear(pBlockData); - STsdbFD *pFD = pReader->pDataFD; + STsdbFD *pFD = (iStt < 0) ? pReader->pDataFD : pReader->aSttFD[iStt]; // uid + version + tskey code = tRealloc(&pReader->aBuf[0], pBlkInfo->szKey); @@ -1070,9 +1071,12 @@ _err: int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) { int32_t code = 0; - code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], pBlockData); + code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], pBlockData, -1); if (code) goto _err; + ASSERT(pDataBlk->nSubBlock == 1); + +#if 0 if (pDataBlk->nSubBlock > 1) { SBlockData bData1; SBlockData bData2; @@ -1113,6 +1117,7 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData tBlockDataDestroy(&bData1, 1); tBlockDataDestroy(&bData2, 1); } +#endif return code; @@ -1123,23 +1128,38 @@ _err: int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData) { int32_t code = 0; + int32_t lino = 0; + + code = tsdbReadBlockDataImpl(pReader, &pSttBlk->bInfo, pBlockData, iStt); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +int32_t tsdbReadSttBlockEx(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData) { + int32_t code = 0; + int32_t lino = 0; // alloc code = tRealloc(&pReader->aBuf[0], pSttBlk->bInfo.szBlock); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // read code = tsdbReadFile(pReader->aSttFD[iStt], pSttBlk->bInfo.offset, pReader->aBuf[0], pSttBlk->bInfo.szBlock); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // decmpr code = tDecmprBlockData(pReader->aBuf[0], pSttBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); - return code; - -_err: - tsdbError("vgId:%d tsdb read stt block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 99e88a442c..a928dc3484 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -140,7 +140,7 @@ static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) { if (pSttBlk->minVer > pReader->ever) continue; if (pSttBlk->maxVer < pReader->sver) continue; - code = tsdbReadSttBlock(pReader->pDataFReader, iStt, pSttBlk, &pIter->bData); + code = tsdbReadSttBlockEx(pReader->pDataFReader, iStt, pSttBlk, &pIter->bData); if (code) goto _err; for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) { @@ -223,7 +223,7 @@ static int32_t tsdbSnapNextRow(STsdbSnapReader* pReader) { if (pSttBlk->minVer > pReader->ever || pSttBlk->maxVer < pReader->sver) continue; - code = tsdbReadSttBlock(pReader->pDataFReader, pIter->iStt, pSttBlk, &pIter->bData); + code = tsdbReadSttBlockEx(pReader->pDataFReader, pIter->iStt, pSttBlk, &pIter->bData); if (code) goto _err; pIter->iRow = -1; @@ -319,7 +319,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, id.suid, id.uid, &pReader->skmTable); if (code) goto _err; - code = tBlockDataInit(pBlockData, id.suid, id.uid, pReader->skmTable.pTSchema); + code = tBlockDataInit(pBlockData, &id, pReader->skmTable.pTSchema, NULL, 0); if (code) goto _err; while (pRowInfo->suid == id.suid && pRowInfo->uid == id.uid) { @@ -715,7 +715,7 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI if (code) goto _err; tMapDataReset(&pWriter->dWriter.mDataBlk); - code = tBlockDataInit(&pWriter->dWriter.bData, pId->suid, pId->uid, pWriter->skmTable.pTSchema); + code = tBlockDataInit(&pWriter->dWriter.bData, pId, pWriter->skmTable.pTSchema, NULL, 0); if (code) goto _err; return code; @@ -1000,7 +1000,8 @@ static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) { code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pWriter->id.suid, pWriter->id.uid, &pWriter->skmTable); if (code) goto _err; - code = tBlockDataInit(pBData, pWriter->id.suid, pWriter->id.suid ? 0 : pWriter->id.uid, pWriter->skmTable.pTSchema); + TABLEID tid = {.suid = pWriter->id.suid, .uid = pWriter->id.suid ? 0 : pWriter->id.uid}; + code = tBlockDataInit(pBData, &tid, pWriter->skmTable.pTSchema, NULL, 0); if (code) goto _err; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 8026441cbc..4999e7a49a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -948,24 +948,47 @@ void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear) { pBlockData->aColData = NULL; } -int32_t tBlockDataInit(SBlockData *pBlockData, int64_t suid, int64_t uid, STSchema *pTSchema) { +int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid) { int32_t code = 0; - ASSERT(suid || uid); + ASSERT(pId->suid || pId->uid); - pBlockData->suid = suid; - pBlockData->uid = uid; + pBlockData->suid = pId->suid; + pBlockData->uid = pId->uid; pBlockData->nRow = 0; taosArrayClear(pBlockData->aIdx); - for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) { + if (aCid) { + int32_t iColumn = 1; STColumn *pTColumn = &pTSchema->columns[iColumn]; + for (int32_t iCid = 0; iCid < nCid; iCid++) { + while (pTColumn && pTColumn->colId < aCid[iCid]) { + iColumn++; + pTColumn = (iColumn < pTSchema->numOfCols) ? &pTSchema->columns[iColumn] : NULL; + } - SColData *pColData; - code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData); - if (code) goto _exit; + if (pTColumn == NULL) { + break; + } else if (pTColumn->colId == aCid[iCid]) { + SColData *pColData; + code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData); + if (code) goto _exit; + tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0); - tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0); + iColumn++; + pTColumn = (iColumn < pTSchema->numOfCols) ? &pTSchema->columns[iColumn] : NULL; + } + } + } else { + for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) { + STColumn *pTColumn = &pTSchema->columns[iColumn]; + + SColData *pColData; + code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData); + if (code) goto _exit; + + tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0); + } } _exit: