diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 52b030c72b..a177527dab 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -262,7 +262,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync); int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter); int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx); -int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, SBlockIdx *pBlockIdx); +int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *pBlockIdx); int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk); int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, int8_t cmprAlg, int8_t toLast); @@ -272,7 +272,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo); int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFReaderClose(SDataFReader **ppReader); int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx); -int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData); +int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk); int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk); int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg); int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 8af764c4bc..05c95df798 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -589,7 +589,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { } tMapDataReset(&state->blockMap); - code = tsdbReadBlock(state->pDataFReader, state->pBlockIdx, &state->blockMap); + code = tsdbReadDataBlk(state->pDataFReader, state->pBlockIdx, &state->blockMap); if (code) goto _err; state->nBlock = state->blockMap.nItem; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 5e01a7f4c9..52da3a47d2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -377,7 +377,7 @@ static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) { pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx); - code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); + code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); if (code) goto _exit; ASSERT(pCommitter->dReader.mBlock.nItem > 0); @@ -493,7 +493,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { pCommitter->dReader.iBlockIdx = 0; if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) { pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0); - code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); + code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); if (code) goto _err; } else { pCommitter->dReader.pBlockIdx = NULL; @@ -688,7 +688,7 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) { SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx; - code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx); + code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx); if (code) goto _err; if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { @@ -1451,7 +1451,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { // end if (pCommitter->dWriter.mBlock.nItem > 0) { SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid}; - code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx); + code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx); if (code) goto _err; if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index facfdb9a62..ea488d8f1c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -16,7 +16,7 @@ #include "osDef.h" #include "tsdb.h" -#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) +#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) typedef enum { EXTERNAL_ROWS_PREV = 0x1, @@ -81,11 +81,11 @@ typedef struct SBlockLoadSuppInfo { } SBlockLoadSuppInfo; typedef struct SLastBlockReader { - STimeWindow window; - SVersionRange verRange; - int32_t order; - uint64_t uid; - SMergeTree mergeTree; + STimeWindow window; + SVersionRange verRange; + int32_t order; + uint64_t uid; + SMergeTree mergeTree; SSttBlockLoadInfo* pInfo; } SLastBlockReader; @@ -229,10 +229,10 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid}; if (ASCENDING_TRAVERSE(pTsdbReader->order)) { int64_t skey = pTsdbReader->window.skey; - info.lastKey = (skey > INT64_MIN)? (skey - 1):skey; + info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey; } else { int64_t ekey = pTsdbReader->window.ekey; - info.lastKey = (ekey < INT64_MAX)? (ekey + 1):ekey; + info.lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; } taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); @@ -596,7 +596,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t)); tMapDataReset(&pScanInfo->mapData); - tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); + tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); sizeInDisk += pScanInfo->mapData.nData; for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { @@ -1385,7 +1385,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, bool mergeBlockData) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); + int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); STSRow* pTSRow = NULL; SRowMerger merge = {0}; @@ -1886,7 +1886,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan initMemDataIterator(pScanInfo, pReader); pLBlockReader->uid = pScanInfo->uid; - int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order)? 1:-1; + int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1; STimeWindow w = pLBlockReader->window; if (ASCENDING_TRAVERSE(pLBlockReader->order)) { w.skey = pScanInfo->lastKey + step; @@ -3559,7 +3559,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); resetDataBlockIterator(&pReader->status.blockIter, pReader->order); - int64_t ts = ASCENDING_TRAVERSE(pReader->order)?pReader->window.skey-1:pReader->window.ekey+1; + int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1; resetDataBlockScanInfo(pReader->status.pTableMap, ts); int32_t code = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 781cb957f4..7415e985e7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -418,21 +418,21 @@ _err: return code; } -int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBlockIdx) { +int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *pBlockIdx) { int32_t code = 0; SHeadFile *pHeadFile = &pWriter->fHead; int64_t size; int64_t n; - ASSERT(mBlock->nItem > 0); + ASSERT(mDataBlk->nItem > 0); // alloc - size = tPutMapData(NULL, mBlock); + size = tPutMapData(NULL, mDataBlk); code = tRealloc(&pWriter->aBuf[0], size); if (code) goto _err; // build - n = tPutMapData(pWriter->aBuf[0], mBlock); + n = tPutMapData(pWriter->aBuf[0], mDataBlk); // write code = tsdbWriteFile(pWriter->pHeadFD, pHeadFile->size, pWriter->aBuf[0], size); @@ -446,7 +446,7 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBloc tsdbTrace("vgId:%d, write block, file ID:%d commit ID:%d suid:%" PRId64 " uid:%" PRId64 " offset:%" PRId64 " size:%" PRId64 " nItem:%d", TD_VID(pWriter->pTsdb->pVnode), pWriter->wSet.fid, pHeadFile->commitID, pBlockIdx->suid, pBlockIdx->uid, - pBlockIdx->offset, pBlockIdx->size, mBlock->nItem); + pBlockIdx->offset, pBlockIdx->size, mDataBlk->nItem); return code; _err: @@ -872,7 +872,7 @@ _err: return code; } -int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock) { +int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk) { int32_t code = 0; int64_t offset = pBlockIdx->offset; int64_t size = pBlockIdx->size; @@ -886,7 +886,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl if (code) goto _err; // decode - int64_t n = tGetMapData(pReader->aBuf[0], mBlock); + int64_t n = tGetMapData(pReader->aBuf[0], mDataBlk); if (n < 0) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 1949b62432..081be8290d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -91,7 +91,7 @@ static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) { for (pIter->iBlockIdx = 0; pIter->iBlockIdx < taosArrayGetSize(pIter->aBlockIdx); pIter->iBlockIdx++) { pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx); - code = tsdbReadBlock(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock); + code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock); if (code) goto _err; for (pIter->iBlock = 0; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) { @@ -211,7 +211,7 @@ static int32_t tsdbSnapNextRow(STsdbSnapReader* pReader) { if (pIter->iBlockIdx >= taosArrayGetSize(pIter->aBlockIdx)) break; pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx); - code = tsdbReadBlock(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock); + code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock); if (code) goto _err; pIter->iBlock = -1; } @@ -725,7 +725,7 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { } // SDataBlk - // code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW); + // code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW); // if (code) goto _err; // SBlockIdx @@ -747,7 +747,7 @@ _err: static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* pBlockIdx) { int32_t code = 0; - code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock); + code = tsdbReadDataBlk(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock); if (code) goto _err; // SBlockData @@ -774,7 +774,7 @@ static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* p // SDataBlk SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; - code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, &blockIdx); + code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pWriter->mBlockW, &blockIdx); if (code) goto _err; // SBlockIdx @@ -987,7 +987,7 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) { } if (pWriter->pBlockIdx) { - code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock); + code = tsdbReadDataBlk(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock); if (code) goto _err; } else { tMapDataReset(&pWriter->mBlock); @@ -1074,18 +1074,55 @@ _err: } #endif -static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter); +static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) { + int32_t code = 0; + + pWriter->dReader.iBlockIdx++; + if (pWriter->dReader.iBlockIdx < taosArrayGetSize(pWriter->dReader.aBlockIdx)) { + pWriter->dReader.pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->dReader.aBlockIdx, pWriter->dReader.iBlockIdx); + + code = tsdbReadDataBlk(pWriter->dReader.pReader, pWriter->dReader.pBlockIdx, &pWriter->dReader.mDataBlk); + if (code) goto _exit; + + pWriter->dReader.iDataBlk = -1; + tBlockDataReset(&pWriter->dReader.bData); + pWriter->dReader.iRow = 0; + } else { + pWriter->dReader.pBlockIdx = NULL; + } + +_exit: + return code; +} + +static int32_t tsdbSnapWriteCopyData(STsdbSnapWriter* pWriter, TABLEID* pId) { + int32_t code = 0; + + while (true) { + if (pWriter->dReader.pBlockIdx == NULL) break; + if (tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, pId) >= 0) break; + + SBlockIdx blkIdx = *pWriter->dReader.pBlockIdx; + code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dReader.mDataBlk, &blkIdx); + if (code) goto _exit; + + if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blkIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + code = tsdbSnapNextTableData(pWriter); + if (code) goto _exit; + } + +_exit: + return code; +} + static int32_t tsdbSnapWriteOpenFile(STsdbSnapWriter* pWriter, int32_t fid) { int32_t code = 0; STsdb* pTsdb = pWriter->pTsdb; - // close last file if need - if (pWriter->dWriter.pWriter) { - ASSERT(fid > pWriter->fid); - code = tsdbSnapWriteCloseFile(pWriter); - if (code) goto _err; - } - ASSERT(pWriter->dWriter.pWriter == NULL); // open new @@ -1139,49 +1176,18 @@ _err: static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter) { int32_t code = 0; - // TODO - return code; -} -static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) { - int32_t code = 0; + ASSERT(pWriter->dWriter.pWriter); - pWriter->dReader.iBlockIdx++; - if (pWriter->dReader.iBlockIdx < taosArrayGetSize(pWriter->dReader.aBlockIdx)) { - pWriter->dReader.pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->dReader.aBlockIdx, pWriter->dReader.iBlockIdx); + // (todo) - code = tsdbReadBlock(pWriter->dReader.pReader, pWriter->dReader.pBlockIdx, &pWriter->dReader.mDataBlk); - if (code) goto _exit; + // copy remain table data + TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX}; + code = tsdbSnapWriteCopyData(pWriter, &id); + if (code) goto _exit; - pWriter->dReader.iDataBlk = -1; - tBlockDataReset(&pWriter->dReader.bData); - pWriter->dReader.iRow = 0; - } else { - pWriter->dReader.pBlockIdx = NULL; - } - -_exit: - return code; -} - -static int32_t tsdbSnapWriteCopyData(STsdbSnapWriter* pWriter, TABLEID* pId) { - int32_t code = 0; - - while (true) { - if (pWriter->dReader.pBlockIdx == NULL) break; - if (tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, pId) >= 0) break; - - SBlockIdx blkIdx = *pWriter->dReader.pBlockIdx; - code = tsdbWriteBlock(pWriter->dWriter.pWriter, &pWriter->dReader.mDataBlk, &blkIdx); - if (code) goto _exit; - - if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blkIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - code = tsdbSnapNextTableData(pWriter); - if (code) goto _exit; + if (pWriter->dWriter.sData.nRow > 0) { + // TODO: write the last block } _exit: @@ -1299,10 +1305,17 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 // Loop to handle each row for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { - // open file if need TSKEY ts = pBlockData->aTSKEY[iRow]; int32_t fid = tsdbKeyFid(ts, pWriter->minutes, pWriter->precision); + if (pWriter->dWriter.pWriter == NULL || pWriter->fid != fid) { + if (pWriter->dWriter.pWriter) { + ASSERT(fid > pWriter->fid); + + code = tsdbSnapWriteCloseFile(pWriter); + if (code) goto _err; + } + code = tsdbSnapWriteOpenFile(pWriter, fid); if (code) goto _err; } @@ -1311,8 +1324,6 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 if (code) goto _err; } - // tsdbInfo("vgId:%d, vnode snapshot tsdb write data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d", - // TD_VID(pTsdb->pVnode), pTsdb->path, fid, id.suid, id.suid, pBlockData->nRow); return code; _err: