diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 908f7c014e..c436e4ffd2 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -129,10 +129,6 @@ int32_t tsMinIntervalTime = 1; int32_t tsQueryBufferSize = -1; int64_t tsQueryBufferSizeBytes = -1; -// tsdb config -// For backward compatibility -bool tsdbForceKeepFile = false; - int32_t tsDiskCfgNum = 0; SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0}; diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 918e3e648a..7546b0943e 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -42,15 +42,15 @@ typedef struct SMemTable SMemTable; typedef struct STbDataIter STbDataIter; typedef struct SMapData SMapData; typedef struct SBlockIdx SBlockIdx; -typedef struct SBlock SBlock; -typedef struct SBlockL SBlockL; +typedef struct SDataBlk SDataBlk; +typedef struct SSstBlk SSstBlk; typedef struct SColData SColData; typedef struct SDiskDataHdr SDiskDataHdr; typedef struct SBlockData SBlockData; typedef struct SDelFile SDelFile; typedef struct SHeadFile SHeadFile; typedef struct SDataFile SDataFile; -typedef struct SLastFile SLastFile; +typedef struct SSstFile SSstFile; typedef struct SSmaFile SSmaFile; typedef struct SDFileSet SDFileSet; typedef struct SDataFWriter SDataFWriter; @@ -114,15 +114,15 @@ int32_t tTABLEIDCmprFn(const void *p1, const void *p2); int32_t tPutBlockCol(uint8_t *p, void *ph); int32_t tGetBlockCol(uint8_t *p, void *ph); int32_t tBlockColCmprFn(const void *p1, const void *p2); -// SBlock -void tBlockReset(SBlock *pBlock); -int32_t tPutBlock(uint8_t *p, void *ph); -int32_t tGetBlock(uint8_t *p, void *ph); -int32_t tBlockCmprFn(const void *p1, const void *p2); -bool tBlockHasSma(SBlock *pBlock); -// SBlockL -int32_t tPutBlockL(uint8_t *p, void *ph); -int32_t tGetBlockL(uint8_t *p, void *ph); +// SDataBlk +void tDataBlkReset(SDataBlk *pBlock); +int32_t tPutDataBlk(uint8_t *p, void *ph); +int32_t tGetDataBlk(uint8_t *p, void *ph); +int32_t tDataBlkCmprFn(const void *p1, const void *p2); +bool tDataBlkHasSma(SDataBlk *pDataBlk); +// SSstBlk +int32_t tPutSstBlk(uint8_t *p, void *ph); +int32_t tGetSstBlk(uint8_t *p, void *ph); // SBlockIdx int32_t tPutBlockIdx(uint8_t *p, void *ph); int32_t tGetBlockIdx(uint8_t *p, void *ph); @@ -219,7 +219,7 @@ bool tsdbDelFileIsSame(SDelFile *pDelFile1, SDelFile *pDelFile2); int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype); int32_t tPutHeadFile(uint8_t *p, SHeadFile *pHeadFile); int32_t tPutDataFile(uint8_t *p, SDataFile *pDataFile); -int32_t tPutLastFile(uint8_t *p, SLastFile *pLastFile); +int32_t tPutSstFile(uint8_t *p, SSstFile *pSstFile); int32_t tPutSmaFile(uint8_t *p, SSmaFile *pSmaFile); int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile); int32_t tGetDelFile(uint8_t *p, SDelFile *pDelFile); @@ -228,7 +228,7 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet); void tsdbHeadFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SHeadFile *pHeadF, char fname[]); void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF, char fname[]); -void tsdbLastFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SLastFile *pLastF, char fname[]); +void tsdbSstFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSstFile *pSstF, char fname[]); void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]); // SDelFile void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]); @@ -254,7 +254,7 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync); int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter); int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx); int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, SBlockIdx *pBlockIdx); -int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL); +int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk); int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, int8_t cmprAlg, int8_t toLast); @@ -264,10 +264,10 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS int32_t tsdbDataFReaderClose(SDataFReader **ppReader); int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx); int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData); -int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL); -int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg); -int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData); -int32_t tsdbReadLastBlock(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData); +int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk); +int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg); +int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData); +int32_t tsdbReadSstBlock(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData); // SDelFWriter int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb); int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync); @@ -427,7 +427,7 @@ struct SSmaInfo { int32_t size; }; -struct SBlock { +struct SDataBlk { TSDBKEY minKey; TSDBKEY maxKey; int64_t minVer; @@ -439,7 +439,7 @@ struct SBlock { SSmaInfo smaInfo; }; -struct SBlockL { +struct SSstBlk { int64_t suid; int64_t minUid; int64_t maxUid; @@ -478,12 +478,6 @@ struct SBlockData { SArray *aColData; // SArray }; -// ================== TSDB global config -extern bool tsdbForceKeepFile; - -#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC -#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC - struct TABLEID { tb_uid_t suid; tb_uid_t uid; @@ -547,7 +541,7 @@ struct SDataFile { int64_t size; }; -struct SLastFile { +struct SSstFile { volatile int32_t nRef; int64_t commitID; @@ -568,8 +562,8 @@ struct SDFileSet { SHeadFile *pHeadF; SDataFile *pDataF; SSmaFile *pSmaF; - uint8_t nLastF; - SLastFile *aLastF[TSDB_MAX_LAST_FILE]; + uint8_t nSstF; + SSstFile *aSstF[TSDB_MAX_LAST_FILE]; }; struct SRowIter { @@ -604,7 +598,7 @@ struct SDataFWriter { SHeadFile fHead; SDataFile fData; SSmaFile fSma; - SLastFile fLast[TSDB_MAX_LAST_FILE]; + SSstFile fSst[TSDB_MAX_LAST_FILE]; uint8_t *aBuf[4]; }; @@ -633,17 +627,17 @@ typedef struct { } SRowInfo; typedef struct SMergeTree { - int8_t backward; - SRBTree rbt; - SArray *pIterList; - struct SLDataIter *pIter; + int8_t backward; + SRBTree rbt; + SArray *pIterList; + struct SLDataIter *pIter; } SMergeTree; int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader, uint64_t uid, STimeWindow* pTimeWindow, SVersionRange* pVerRange); void tMergeTreeAddIter(SMergeTree *pMTree, struct SLDataIter *pIter); -bool tMergeTreeNext(SMergeTree* pMTree); -TSDBROW tMergeTreeGetRow(SMergeTree* pMTree); -void tMergeTreeClose(SMergeTree* pMTree); +bool tMergeTreeNext(SMergeTree *pMTree); +TSDBROW tMergeTreeGetRow(SMergeTree *pMTree); +void tMergeTreeClose(SMergeTree *pMTree); // ========== inline functions ========== static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index a0d52b8b3f..64caff1542 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -527,7 +527,7 @@ typedef struct SFSNextRowIter { SMapData blockMap; int32_t nBlock; int32_t iBlock; - SBlock block; + SDataBlk block; SBlockData blockData; SBlockData *pBlockData; int32_t nRow; @@ -602,13 +602,13 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { } case SFSNEXTROW_BLOCKDATA: if (state->iBlock >= 0) { - SBlock block = {0}; + SDataBlk block = {0}; - tBlockReset(&block); + tDataBlkReset(&block); // tBlockDataReset(&state->blockData); tBlockDataReset(state->pBlockData); - tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetBlock); + tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk); /* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL); */ tBlockDataReset(state->pBlockData); code = tBlockDataInit(state->pBlockData, state->suid, state->uid, state->pTSchema); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 3c497d143f..42ab6ce3b2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -32,12 +32,12 @@ typedef struct { STbDataIter iter; }; // memory data iter struct { - int32_t iLast; - SArray *aBlockL; - int32_t iBlockL; + int32_t iSst; + SArray *aSstBlk; + int32_t iSstBlk; SBlockData bData; int32_t iRow; - }; // last file data iter + }; // sst file data iter }; } SDataIter; @@ -64,7 +64,7 @@ typedef struct { SArray *aBlockIdx; // SArray int32_t iBlockIdx; SBlockIdx *pBlockIdx; - SMapData mBlock; // SMapData + SMapData mBlock; // SMapData SBlockData bData; } dReader; struct { @@ -77,8 +77,8 @@ typedef struct { struct { SDataFWriter *pWriter; SArray *aBlockIdx; // SArray - SArray *aBlockL; // SArray - SMapData mBlock; // SMapData + SArray *aSstBlk; // SArray + SMapData mBlock; // SMapData SBlockData bData; SBlockData bDatal; } dWriter; @@ -92,8 +92,8 @@ typedef struct { SArray *aDelData; // SArray } SCommitter; -extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, - SBlockData *pBlockData); // todo +extern int32_t tsdbReadSstBlockEx(SDataFReader *pReader, int32_t iSst, SSstBlk *aSstBlk, + SBlockData *pBlockData); // todo static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter); static int32_t tsdbCommitData(SCommitter *pCommitter); @@ -431,21 +431,21 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { pCommitter->toLastOnly = 0; SDataFReader *pReader = pCommitter->dReader.pReader; if (pReader) { - if (pReader->pSet->nLastF >= pCommitter->maxLast) { + if (pReader->pSet->nSstF >= pCommitter->maxLast) { int8_t iIter = 0; - for (int32_t iLast = 0; iLast < pReader->pSet->nLastF; iLast++) { + for (int32_t iSst = 0; iSst < pReader->pSet->nSstF; iSst++) { pIter = &pCommitter->aDataIter[iIter]; pIter->type = LAST_DATA_ITER; - pIter->iLast = iLast; + pIter->iSst = iSst; - code = tsdbReadBlockL(pCommitter->dReader.pReader, iLast, pIter->aBlockL); + code = tsdbReadSstBlk(pCommitter->dReader.pReader, iSst, pIter->aSstBlk); if (code) goto _err; - if (taosArrayGetSize(pIter->aBlockL) == 0) continue; + if (taosArrayGetSize(pIter->aSstBlk) == 0) continue; - pIter->iBlockL = 0; - SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, 0); - code = tsdbReadLastBlockEx(pCommitter->dReader.pReader, iLast, pBlockL, &pIter->bData); + pIter->iSstBlk = 0; + SSstBlk *pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, 0); + code = tsdbReadSstBlockEx(pCommitter->dReader.pReader, iSst, pSstBlk, &pIter->bData); if (code) goto _err; pIter->iRow = 0; @@ -457,9 +457,9 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { iIter++; } } else { - for (int32_t iLast = 0; iLast < pReader->pSet->nLastF; iLast++) { - SLastFile *pLastFile = pReader->pSet->aLastF[iLast]; - if (pLastFile->size > pLastFile->offset) { + for (int32_t iSst = 0; iSst < pReader->pSet->nSstF; iSst++) { + SSstFile *pSstFile = pReader->pSet->aSstF[iSst]; + if (pSstFile->size > pSstFile->offset) { pCommitter->toLastOnly = 1; break; } @@ -515,34 +515,34 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { SHeadFile fHead = {.commitID = pCommitter->commitID}; SDataFile fData = {.commitID = pCommitter->commitID}; SSmaFile fSma = {.commitID = pCommitter->commitID}; - SLastFile fLast = {.commitID = pCommitter->commitID}; + SSstFile fSst = {.commitID = pCommitter->commitID}; SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma}; if (pRSet) { - ASSERT(pRSet->nLastF <= pCommitter->maxLast); + ASSERT(pRSet->nSstF <= pCommitter->maxLast); fData = *pRSet->pDataF; fSma = *pRSet->pSmaF; wSet.diskId = pRSet->diskId; - if (pRSet->nLastF < pCommitter->maxLast) { - for (int32_t iLast = 0; iLast < pRSet->nLastF; iLast++) { - wSet.aLastF[iLast] = pRSet->aLastF[iLast]; + if (pRSet->nSstF < pCommitter->maxLast) { + for (int32_t iSst = 0; iSst < pRSet->nSstF; iSst++) { + wSet.aSstF[iSst] = pRSet->aSstF[iSst]; } - wSet.nLastF = pRSet->nLastF + 1; + wSet.nSstF = pRSet->nSstF + 1; } else { - wSet.nLastF = 1; + wSet.nSstF = 1; } } else { SDiskID did = {0}; tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did); tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did); wSet.diskId = did; - wSet.nLastF = 1; + wSet.nSstF = 1; } - wSet.aLastF[wSet.nLastF - 1] = &fLast; + wSet.aSstF[wSet.nSstF - 1] = &fSst; code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet); if (code) goto _err; taosArrayClear(pCommitter->dWriter.aBlockIdx); - taosArrayClear(pCommitter->dWriter.aBlockL); + taosArrayClear(pCommitter->dWriter.aSstBlk); tMapDataReset(&pCommitter->dWriter.mBlock); tBlockDataReset(&pCommitter->dWriter.bData); tBlockDataReset(&pCommitter->dWriter.bDatal); @@ -562,11 +562,11 @@ _err: static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { int32_t code = 0; SBlockData *pBlockData = &pCommitter->dWriter.bData; - SBlock block; + SDataBlk block; ASSERT(pBlockData->nRow > 0); - tBlockReset(&block); + tDataBlkReset(&block); // info block.nRow += pBlockData->nRow; @@ -597,8 +597,8 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { ((block.nSubBlock == 1) && !block.hasDup) ? &block.smaInfo : NULL, pCommitter->cmprAlg, 0); if (code) goto _err; - // put SBlock - code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutBlock); + // put SDataBlk + code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutDataBlk); if (code) goto _err; // clear @@ -613,7 +613,7 @@ _err: static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { int32_t code = 0; - SBlockL blockL; + SSstBlk blockL; SBlockData *pBlockData = &pCommitter->dWriter.bDatal; ASSERT(pBlockData->nRow > 0); @@ -638,8 +638,8 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &blockL.bInfo, NULL, pCommitter->cmprAlg, 1); if (code) goto _err; - // push SBlockL - if (taosArrayPush(pCommitter->dWriter.aBlockL, &blockL) == NULL) { + // push SSstBlk + if (taosArrayPush(pCommitter->dWriter.aSstBlk, &blockL) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -661,8 +661,8 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx); if (code) goto _err; - // write aBlockL - code = tsdbWriteBlockL(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockL); + // write aSstBlk + code = tsdbWriteSstBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSstBlk); if (code) goto _err; // update file header @@ -790,10 +790,10 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { if (code) goto _exit; // merger - for (int32_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) { - SDataIter *pIter = &pCommitter->aDataIter[iLast]; - pIter->aBlockL = taosArrayInit(0, sizeof(SBlockL)); - if (pIter->aBlockL == NULL) { + for (int32_t iSst = 0; iSst < TSDB_MAX_LAST_FILE; iSst++) { + SDataIter *pIter = &pCommitter->aDataIter[iSst]; + pIter->aSstBlk = taosArrayInit(0, sizeof(SSstBlk)); + if (pIter->aSstBlk == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -809,8 +809,8 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { goto _exit; } - pCommitter->dWriter.aBlockL = taosArrayInit(0, sizeof(SBlockL)); - if (pCommitter->dWriter.aBlockL == NULL) { + pCommitter->dWriter.aSstBlk = taosArrayInit(0, sizeof(SSstBlk)); + if (pCommitter->dWriter.aSstBlk == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -832,15 +832,15 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) { tBlockDataDestroy(&pCommitter->dReader.bData, 1); // merger - for (int32_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) { - SDataIter *pIter = &pCommitter->aDataIter[iLast]; - taosArrayDestroy(pIter->aBlockL); + for (int32_t iSst = 0; iSst < TSDB_MAX_LAST_FILE; iSst++) { + SDataIter *pIter = &pCommitter->aDataIter[iSst]; + taosArrayDestroy(pIter->aSstBlk); tBlockDataDestroy(&pIter->bData, 1); } // writer taosArrayDestroy(pCommitter->dWriter.aBlockIdx); - taosArrayDestroy(pCommitter->dWriter.aBlockL); + taosArrayDestroy(pCommitter->dWriter.aSstBlk); tMapDataClear(&pCommitter->dWriter.mBlock); tBlockDataDestroy(&pCommitter->dWriter.bData, 1); tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1); @@ -1055,11 +1055,11 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; pIter->r.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); } else { - pIter->iBlockL++; - if (pIter->iBlockL < taosArrayGetSize(pIter->aBlockL)) { - SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL); + pIter->iSstBlk++; + if (pIter->iSstBlk < taosArrayGetSize(pIter->aSstBlk)) { + SSstBlk *pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, pIter->iSstBlk); - code = tsdbReadLastBlockEx(pCommitter->dReader.pReader, pIter->iLast, pBlockL, &pIter->bData); + code = tsdbReadSstBlockEx(pCommitter->dReader.pReader, pIter->iSst, pSstBlk, &pIter->bData); if (code) goto _exit; pIter->iRow = 0; @@ -1098,7 +1098,7 @@ _exit: return code; } -static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SBlock *pBlock) { +static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) { int32_t code = 0; SBlockData *pBlockData = &pCommitter->dWriter.bData; SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); @@ -1122,7 +1122,7 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SBlock *pBlock) { pRowInfo = NULL; } else { TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row); - if (tsdbKeyCmprFn(&tKey, &pBlock->minKey) >= 0) pRowInfo = NULL; + if (tsdbKeyCmprFn(&tKey, &pDataBlk->minKey) >= 0) pRowInfo = NULL; } } @@ -1144,14 +1144,14 @@ _err: return code; } -static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SBlock *pBlock) { +static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) { int32_t code = 0; SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid}; SBlockData *pBDataR = &pCommitter->dReader.bData; SBlockData *pBDataW = &pCommitter->dWriter.bData; - code = tsdbReadDataBlock(pCommitter->dReader.pReader, pBlock, pBDataR); + code = tsdbReadDataBlock(pCommitter->dReader.pReader, pDataBlk, pBDataR); if (code) goto _err; tBlockDataClear(pBDataW); @@ -1188,7 +1188,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SBlock *pBlock) { pRowInfo = NULL; } else { TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row); - if (tsdbKeyCmprFn(&tKey, &pBlock->maxKey) > 0) pRowInfo = NULL; + if (tsdbKeyCmprFn(&tKey, &pDataBlk->maxKey) > 0) pRowInfo = NULL; } } } else { @@ -1237,57 +1237,57 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { ASSERT(pBlockIdx == NULL || tTABLEIDCmprFn(pBlockIdx, &id) >= 0); if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) { int32_t iBlock = 0; - SBlock block; - SBlock *pBlock = █ + SDataBlk block; + SDataBlk *pDataBlk = █ SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); ASSERT(pRowInfo->suid == id.suid && pRowInfo->uid == id.uid); - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); - while (pBlock && pRowInfo) { - SBlock tBlock = {.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)}; - int32_t c = tBlockCmprFn(pBlock, &tBlock); + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk); + while (pDataBlk && pRowInfo) { + SDataBlk tBlock = {.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)}; + int32_t c = tDataBlkCmprFn(pDataBlk, &tBlock); if (c < 0) { - code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); + code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk); if (code) goto _err; iBlock++; if (iBlock < pCommitter->dReader.mBlock.nItem) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk); } else { - pBlock = NULL; + pDataBlk = NULL; } } else if (c > 0) { - code = tsdbCommitAheadBlock(pCommitter, pBlock); + code = tsdbCommitAheadBlock(pCommitter, pDataBlk); if (code) goto _err; pRowInfo = tsdbGetCommitRow(pCommitter); if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL; } else { - code = tsdbCommitMergeBlock(pCommitter, pBlock); + code = tsdbCommitMergeBlock(pCommitter, pDataBlk); if (code) goto _err; iBlock++; if (iBlock < pCommitter->dReader.mBlock.nItem) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk); } else { - pBlock = NULL; + pDataBlk = NULL; } pRowInfo = tsdbGetCommitRow(pCommitter); if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL; } } - while (pBlock) { - code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); + while (pDataBlk) { + code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk); if (code) goto _err; iBlock++; if (iBlock < pCommitter->dReader.mBlock.nItem) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk); } else { - pBlock = NULL; + pDataBlk = NULL; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 1a142fde1a..37bd1ccbf6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -110,7 +110,7 @@ _err: // taosRemoveFile(fname); // } -// // last +// // sst // if (isSameDisk && pFrom->pLastF->commitID == pTo->pLastF->commitID) { // if (pFrom->pLastF->size > pTo->pLastF->size) { // code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_LAST_FILE); @@ -140,7 +140,7 @@ _err: // tsdbDataFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pDataF, fname); // taosRemoveFile(fname); -// // last +// // sst // tsdbLastFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pLastF, fname); // taosRemoveFile(fname); @@ -255,8 +255,8 @@ void tsdbFSDestroy(STsdbFS *pFS) { taosMemoryFree(pSet->pHeadF); taosMemoryFree(pSet->pDataF); taosMemoryFree(pSet->pSmaF); - for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { - taosMemoryFree(pSet->aLastF[iLast]); + for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { + taosMemoryFree(pSet->aSstF[iSst]); } } @@ -311,13 +311,13 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) { if (code) goto _err; } - // last =========== - tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aLastF[0], fname); + // sst =========== + tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[0], fname); if (taosStatFile(fname, &size, NULL)) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (size != pSet->aLastF[0]->size) { + if (size != pSet->aSstF[0]->size) { code = TSDB_CODE_FILE_CORRUPTED; goto _err; } @@ -508,13 +508,15 @@ int32_t tsdbFSClose(STsdb *pTsdb) { ASSERT(pSet->pDataF->nRef == 1); taosMemoryFree(pSet->pDataF); - // last - ASSERT(pSet->aLastF[0]->nRef == 1); - taosMemoryFree(pSet->aLastF[0]); - // sma ASSERT(pSet->pSmaF->nRef == 1); taosMemoryFree(pSet->pSmaF); + + // sst + for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { + ASSERT(pSet->aSstF[iSst]->nRef == 1); + taosMemoryFree(pSet->aSstF[iSst]); + } } taosArrayDestroy(pTsdb->fs.aDFileSet); @@ -570,14 +572,14 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { } *fSet.pSmaF = *pSet->pSmaF; - // last - for (fSet.nLastF = 0; fSet.nLastF < pSet->nLastF; fSet.nLastF++) { - fSet.aLastF[fSet.nLastF] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); - if (fSet.aLastF[fSet.nLastF] == NULL) { + // sst + for (fSet.nSstF = 0; fSet.nSstF < pSet->nSstF; fSet.nSstF++) { + fSet.aSstF[fSet.nSstF] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile)); + if (fSet.aSstF[fSet.nSstF] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - *fSet.aLastF[fSet.nLastF] = *pSet->aLastF[fSet.nLastF]; + *fSet.aSstF[fSet.nSstF] = *pSet->aSstF[fSet.nSstF]; } if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) { @@ -630,28 +632,28 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) { *pDFileSet->pHeadF = *pSet->pHeadF; *pDFileSet->pDataF = *pSet->pDataF; *pDFileSet->pSmaF = *pSet->pSmaF; - // last - if (pSet->nLastF > pDFileSet->nLastF) { - ASSERT(pSet->nLastF == pDFileSet->nLastF + 1); + // sst + if (pSet->nSstF > pDFileSet->nSstF) { + ASSERT(pSet->nSstF == pDFileSet->nSstF + 1); - pDFileSet->aLastF[pDFileSet->nLastF] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); - if (pDFileSet->aLastF[pDFileSet->nLastF] == NULL) { + pDFileSet->aSstF[pDFileSet->nSstF] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile)); + if (pDFileSet->aSstF[pDFileSet->nSstF] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - *pDFileSet->aLastF[pDFileSet->nLastF] = *pSet->aLastF[pSet->nLastF - 1]; - pDFileSet->nLastF++; - } else if (pSet->nLastF < pDFileSet->nLastF) { - ASSERT(pSet->nLastF == 1); - for (int32_t iLast = 1; iLast < pDFileSet->nLastF; iLast++) { - taosMemoryFree(pDFileSet->aLastF[iLast]); + *pDFileSet->aSstF[pDFileSet->nSstF] = *pSet->aSstF[pSet->nSstF - 1]; + pDFileSet->nSstF++; + } else if (pSet->nSstF < pDFileSet->nSstF) { + ASSERT(pSet->nSstF == 1); + for (int32_t iSst = 1; iSst < pDFileSet->nSstF; iSst++) { + taosMemoryFree(pDFileSet->aSstF[iSst]); } - *pDFileSet->aLastF[0] = *pSet->aLastF[0]; - pDFileSet->nLastF = 1; + *pDFileSet->aSstF[0] = *pSet->aSstF[0]; + pDFileSet->nSstF = 1; } else { - for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { - *pDFileSet->aLastF[iLast] = *pSet->aLastF[iLast]; + for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { + *pDFileSet->aSstF[iSst] = *pSet->aSstF[iSst]; } } @@ -659,8 +661,8 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) { } } - ASSERT(pSet->nLastF == 1); - SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nLastF = 1}; + ASSERT(pSet->nSstF == 1); + SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nSstF = 1}; // head fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); @@ -686,13 +688,13 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) { } *fSet.pSmaF = *pSet->pSmaF; - // last - fSet.aLastF[0] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); - if (fSet.aLastF[0] == NULL) { + // sst + fSet.aSstF[0] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile)); + if (fSet.aSstF[0] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - *fSet.aLastF[0] = *pSet->aLastF[0]; + *fSet.aSstF[0] = *pSet->aSstF[0]; if (taosArrayInsert(pFS->aDFileSet, idx, &fSet) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -860,76 +862,76 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { pSetOld->pSmaF->size = pSetNew->pSmaF->size; } - // last + // sst if (sameDisk) { - if (pSetNew->nLastF > pSetOld->nLastF) { - ASSERT(pSetNew->nLastF = pSetOld->nLastF + 1); - pSetOld->aLastF[pSetOld->nLastF] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); - if (pSetOld->aLastF[pSetOld->nLastF] == NULL) { + if (pSetNew->nSstF > pSetOld->nSstF) { + ASSERT(pSetNew->nSstF = pSetOld->nSstF + 1); + pSetOld->aSstF[pSetOld->nSstF] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile)); + if (pSetOld->aSstF[pSetOld->nSstF] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - *pSetOld->aLastF[pSetOld->nLastF] = *pSetNew->aLastF[pSetOld->nLastF]; - pSetOld->aLastF[pSetOld->nLastF]->nRef = 1; - pSetOld->nLastF++; - } else if (pSetNew->nLastF < pSetOld->nLastF) { - ASSERT(pSetNew->nLastF == 1); - for (int32_t iLast = 0; iLast < pSetOld->nLastF; iLast++) { - SLastFile *pLastFile = pSetOld->aLastF[iLast]; - nRef = atomic_sub_fetch_32(&pLastFile->nRef, 1); + *pSetOld->aSstF[pSetOld->nSstF] = *pSetNew->aSstF[pSetOld->nSstF]; + pSetOld->aSstF[pSetOld->nSstF]->nRef = 1; + pSetOld->nSstF++; + } else if (pSetNew->nSstF < pSetOld->nSstF) { + ASSERT(pSetNew->nSstF == 1); + for (int32_t iSst = 0; iSst < pSetOld->nSstF; iSst++) { + SSstFile *pSstFile = pSetOld->aSstF[iSst]; + nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1); if (nRef == 0) { - tsdbLastFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pLastFile, fname); + tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname); taosRemoveFile(fname); - taosMemoryFree(pLastFile); + taosMemoryFree(pSstFile); } - pSetOld->aLastF[iLast] = NULL; + pSetOld->aSstF[iSst] = NULL; } - pSetOld->nLastF = 1; - pSetOld->aLastF[0] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); - if (pSetOld->aLastF[0] == NULL) { + pSetOld->nSstF = 1; + pSetOld->aSstF[0] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile)); + if (pSetOld->aSstF[0] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - *pSetOld->aLastF[0] = *pSetNew->aLastF[0]; - pSetOld->aLastF[0]->nRef = 1; + *pSetOld->aSstF[0] = *pSetNew->aSstF[0]; + pSetOld->aSstF[0]->nRef = 1; } else { - for (int32_t iLast = 0; iLast < pSetOld->nLastF; iLast++) { - SLastFile *pLastFile = pSetOld->aLastF[iLast]; - nRef = atomic_sub_fetch_32(&pLastFile->nRef, 1); + for (int32_t iSst = 0; iSst < pSetOld->nSstF; iSst++) { + SSstFile *pSstFile = pSetOld->aSstF[iSst]; + nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1); if (nRef == 0) { - tsdbLastFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pLastFile, fname); + tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname); taosRemoveFile(fname); - taosMemoryFree(pLastFile); + taosMemoryFree(pSstFile); } - pSetOld->aLastF[iLast] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); - if (pSetOld->aLastF[iLast] == NULL) { + pSetOld->aSstF[iSst] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile)); + if (pSetOld->aSstF[iSst] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - *pSetOld->aLastF[iLast] = *pSetNew->aLastF[iLast]; - pSetOld->aLastF[iLast]->nRef = 1; + *pSetOld->aSstF[iSst] = *pSetNew->aSstF[iSst]; + pSetOld->aSstF[iSst]->nRef = 1; } } } else { - ASSERT(pSetOld->nLastF == pSetNew->nLastF); - for (int32_t iLast = 0; iLast < pSetOld->nLastF; iLast++) { - SLastFile *pLastFile = pSetOld->aLastF[iLast]; - nRef = atomic_sub_fetch_32(&pLastFile->nRef, 1); + ASSERT(pSetOld->nSstF == pSetNew->nSstF); + for (int32_t iSst = 0; iSst < pSetOld->nSstF; iSst++) { + SSstFile *pSstFile = pSetOld->aSstF[iSst]; + nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1); if (nRef == 0) { - tsdbLastFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pLastFile, fname); + tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname); taosRemoveFile(fname); - taosMemoryFree(pLastFile); + taosMemoryFree(pSstFile); } - pSetOld->aLastF[iLast] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); - if (pSetOld->aLastF[iLast] == NULL) { + pSetOld->aSstF[iSst] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile)); + if (pSetOld->aSstF[iSst] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - *pSetOld->aLastF[iLast] = *pSetNew->aLastF[iLast]; - pSetOld->aLastF[iLast]->nRef = 1; + *pSetOld->aSstF[iSst] = *pSetNew->aSstF[iSst]; + pSetOld->aSstF[iSst]->nRef = 1; } } @@ -963,12 +965,12 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { taosMemoryFree(pSetOld->pSmaF); } - for (int8_t iLast = 0; iLast < pSetOld->nLastF; iLast++) { - nRef = atomic_sub_fetch_32(&pSetOld->aLastF[iLast]->nRef, 1); + for (int8_t iSst = 0; iSst < pSetOld->nSstF; iSst++) { + nRef = atomic_sub_fetch_32(&pSetOld->aSstF[iSst]->nRef, 1); if (nRef == 0) { - tsdbLastFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aLastF[iLast], fname); + tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aSstF[iSst], fname); taosRemoveFile(fname); - taosMemoryFree(pSetOld->aLastF[iLast]); + taosMemoryFree(pSetOld->aSstF[iSst]); } } @@ -976,7 +978,7 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { continue; _add_new: - fSet = (SDFileSet){.diskId = pSetNew->diskId, .fid = pSetNew->fid, .nLastF = 1}; + fSet = (SDFileSet){.diskId = pSetNew->diskId, .fid = pSetNew->fid, .nSstF = 1}; // head fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); @@ -1005,15 +1007,15 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { *fSet.pSmaF = *pSetNew->pSmaF; fSet.pSmaF->nRef = 1; - // last - ASSERT(pSetNew->nLastF == 1); - fSet.aLastF[0] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); - if (fSet.aLastF[0] == NULL) { + // sst + ASSERT(pSetNew->nSstF == 1); + fSet.aSstF[0] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile)); + if (fSet.aSstF[0] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - *fSet.aLastF[0] = *pSetNew->aLastF[0]; - fSet.aLastF[0]->nRef = 1; + *fSet.aSstF[0] = *pSetNew->aSstF[0]; + fSet.aSstF[0]->nRef = 1; if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1061,8 +1063,8 @@ int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) { nRef = atomic_fetch_add_32(&pSet->pSmaF->nRef, 1); ASSERT(nRef > 0); - for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { - nRef = atomic_fetch_add_32(&pSet->aLastF[iLast]->nRef, 1); + for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { + nRef = atomic_fetch_add_32(&pSet->aSstF[iSst]->nRef, 1); ASSERT(nRef > 0); } @@ -1120,14 +1122,14 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) { taosMemoryFree(pSet->pSmaF); } - // last - for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { - nRef = atomic_sub_fetch_32(&pSet->aLastF[iLast]->nRef, 1); + // sst + for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { + nRef = atomic_sub_fetch_32(&pSet->aSstF[iSst]->nRef, 1); ASSERT(nRef >= 0); if (nRef == 0) { - tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aLastF[iLast], fname); + tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iSst], fname); taosRemoveFile(fname); - taosMemoryFree(pSet->aLastF[iLast]); + taosMemoryFree(pSet->aSstF[iSst]); /* code */ } } diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index 4814d793f7..632a2c827b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -53,22 +53,22 @@ static int32_t tGetDataFile(uint8_t *p, SDataFile *pDataFile) { return n; } -int32_t tPutLastFile(uint8_t *p, SLastFile *pLastFile) { +int32_t tPutSstFile(uint8_t *p, SSstFile *pSstFile) { int32_t n = 0; - n += tPutI64v(p ? p + n : p, pLastFile->commitID); - n += tPutI64v(p ? p + n : p, pLastFile->size); - n += tPutI64v(p ? p + n : p, pLastFile->offset); + n += tPutI64v(p ? p + n : p, pSstFile->commitID); + n += tPutI64v(p ? p + n : p, pSstFile->size); + n += tPutI64v(p ? p + n : p, pSstFile->offset); return n; } -static int32_t tGetLastFile(uint8_t *p, SLastFile *pLastFile) { +static int32_t tGetSstFile(uint8_t *p, SSstFile *pSstFile) { int32_t n = 0; - n += tGetI64v(p + n, &pLastFile->commitID); - n += tGetI64v(p + n, &pLastFile->size); - n += tGetI64v(p + n, &pLastFile->offset); + n += tGetI64v(p + n, &pSstFile->commitID); + n += tGetI64v(p + n, &pSstFile->size); + n += tGetI64v(p + n, &pSstFile->offset); return n; } @@ -102,9 +102,9 @@ void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF, TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pDataF->commitID, ".data"); } -void tsdbLastFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SLastFile *pLastF, char fname[]) { +void tsdbSstFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSstFile *pSstF, char fname[]) { snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTsdb->pVnode->pTfs, did), - TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pLastF->commitID, ".last"); + TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pSstF->commitID, ".sst"); } void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]) { @@ -194,10 +194,10 @@ int32_t tPutDFileSet(uint8_t *p, SDFileSet *pSet) { n += tPutDataFile(p ? p + n : p, pSet->pDataF); n += tPutSmaFile(p ? p + n : p, pSet->pSmaF); - // last - n += tPutU8(p ? p + n : p, pSet->nLastF); - for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { - n += tPutLastFile(p ? p + n : p, pSet->aLastF[iLast]); + // sst + n += tPutU8(p ? p + n : p, pSet->nSstF); + for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { + n += tPutSstFile(p ? p + n : p, pSet->aSstF[iSst]); } return n; @@ -234,15 +234,15 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) { pSet->pSmaF->nRef = 1; n += tGetSmaFile(p + n, pSet->pSmaF); - // last - n += tGetU8(p + n, &pSet->nLastF); - for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { - pSet->aLastF[iLast] = (SLastFile *)taosMemoryCalloc(1, sizeof(SLastFile)); - if (pSet->aLastF[iLast] == NULL) { + // sst + n += tGetU8(p + n, &pSet->nSstF); + for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { + pSet->aSstF[iSst] = (SSstFile *)taosMemoryCalloc(1, sizeof(SSstFile)); + if (pSet->aSstF[iSst] == NULL) { return -1; } - pSet->aLastF[iLast]->nRef = 1; - n += tGetLastFile(p + n, pSet->aLastF[iLast]); + pSet->aSstF[iSst]->nRef = 1; + n += tGetSstFile(p + n, pSet->aSstF[iSst]); } return n; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 256898a597..ce36d74467 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -18,12 +18,12 @@ // SLDataIter ================================================= typedef struct SLDataIter { SRBTreeNode node; - SBlockL *pBlockL; + SSstBlk *pSstBlk; SDataFReader *pReader; - int32_t iLast; + int32_t iSst; int8_t backward; - SArray *aBlockL; - int32_t iBlockL; + SArray *aSstBlk; + int32_t iSstBlk; SBlockData bData; int32_t iRow; SRowInfo rInfo; @@ -32,7 +32,7 @@ typedef struct SLDataIter { SVersionRange verRange; } SLDataIter; -int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iLast, int8_t backward, uint64_t uid, +int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iSst, int8_t backward, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange) { int32_t code = 0; *pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); @@ -45,10 +45,10 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t (*pIter)->timeWindow = *pTimeWindow; (*pIter)->verRange = *pRange; (*pIter)->pReader = pReader; - (*pIter)->iLast = iLast; + (*pIter)->iSst = iSst; (*pIter)->backward = backward; - (*pIter)->aBlockL = taosArrayInit(0, sizeof(SBlockL)); - if ((*pIter)->aBlockL == NULL) { + (*pIter)->aSstBlk = taosArrayInit(0, sizeof(SSstBlk)); + if ((*pIter)->aSstBlk == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -58,18 +58,18 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t goto _exit; } - code = tsdbReadBlockL(pReader, iLast, (*pIter)->aBlockL); + code = tsdbReadSstBlk(pReader, iSst, (*pIter)->aSstBlk); if (code) { goto _exit; } - size_t size = taosArrayGetSize((*pIter)->aBlockL); + size_t size = taosArrayGetSize((*pIter)->aSstBlk); // find the start block int32_t index = -1; if (!backward) { // asc for (int32_t i = 0; i < size; ++i) { - SBlockL *p = taosArrayGet((*pIter)->aBlockL, i); + SSstBlk *p = taosArrayGet((*pIter)->aSstBlk, i); if (p->minUid <= uid && p->maxUid >= uid) { index = i; break; @@ -77,7 +77,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t } } else { // desc for (int32_t i = size - 1; i >= 0; --i) { - SBlockL *p = taosArrayGet((*pIter)->aBlockL, i); + SSstBlk *p = taosArrayGet((*pIter)->aSstBlk, i); if (p->minUid <= uid && p->maxUid >= uid) { index = i; break; @@ -85,9 +85,9 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t } } - (*pIter)->iBlockL = index; + (*pIter)->iSstBlk = index; if (index != -1) { - (*pIter)->pBlockL = taosArrayGet((*pIter)->aBlockL, (*pIter)->iBlockL); + (*pIter)->pSstBlk = taosArrayGet((*pIter)->aSstBlk, (*pIter)->iSstBlk); } _exit: @@ -96,20 +96,20 @@ _exit: void tLDataIterClose(SLDataIter *pIter) { tBlockDataDestroy(&pIter->bData, 1); - taosArrayDestroy(pIter->aBlockL); + taosArrayDestroy(pIter->aSstBlk); taosMemoryFree(pIter); } -extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData); +extern int32_t tsdbReadSstBlockEx(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData); void tLDataIterNextBlock(SLDataIter *pIter) { - int32_t step = pIter->backward? -1:1; - pIter->iBlockL += step; + int32_t step = pIter->backward ? -1 : 1; + pIter->iSstBlk += step; int32_t index = -1; - size_t size = taosArrayGetSize(pIter->aBlockL); - for(int32_t i = pIter->iBlockL; i < size && i >= 0; i += step) { - SBlockL *p = taosArrayGet(pIter->aBlockL, i); + size_t size = taosArrayGetSize(pIter->aSstBlk); + for (int32_t i = pIter->iSstBlk; i < size && i >= 0; i += step) { + SSstBlk *p = taosArrayGet(pIter->aSstBlk, i); if ((!pIter->backward) && p->minUid > pIter->uid) { break; } @@ -125,16 +125,16 @@ void tLDataIterNextBlock(SLDataIter *pIter) { } if (index == -1) { - pIter->pBlockL = NULL; - } else { - pIter->pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL); + pIter->pSstBlk = NULL; + } else { + pIter->pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, pIter->iSstBlk); } } -static void findNextValidRow(SLDataIter* pIter) { - int32_t step = pIter->backward? -1:1; +static void findNextValidRow(SLDataIter *pIter) { + int32_t step = pIter->backward ? -1 : 1; - bool hasVal = false; + bool hasVal = false; int32_t i = pIter->iRow; for (; i < pIter->bData.nRow && i >= 0; i += step) { if (pIter->bData.aUid != NULL) { @@ -154,7 +154,7 @@ static void findNextValidRow(SLDataIter* pIter) { } int64_t ts = pIter->bData.aTSKEY[i]; - if (!pIter->backward) { // asc + if (!pIter->backward) { // asc if (ts > pIter->timeWindow.ekey) { // no more data break; } else if (ts < pIter->timeWindow.skey) { @@ -190,45 +190,45 @@ static void findNextValidRow(SLDataIter* pIter) { break; } - pIter->iRow = (hasVal)? i:-1; + pIter->iRow = (hasVal) ? i : -1; } bool tLDataIterNextRow(SLDataIter *pIter) { int32_t code = 0; - int32_t step = pIter->backward? -1:1; + int32_t step = pIter->backward ? -1 : 1; // no qualified last file block in current file, no need to fetch row - if (pIter->pBlockL == NULL) { + if (pIter->pSstBlk == NULL) { return false; } - int32_t iBlockL = pIter->iBlockL; + int32_t iBlockL = pIter->iSstBlk; - if (pIter->bData.nRow == 0 && pIter->pBlockL != NULL) { // current block not loaded yet - code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, pIter->pBlockL, &pIter->bData); + if (pIter->bData.nRow == 0 && pIter->pSstBlk != NULL) { // current block not loaded yet + code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, &pIter->bData); if (code != TSDB_CODE_SUCCESS) { goto _exit; } - pIter->iRow = (pIter->backward)? pIter->bData.nRow:-1; + pIter->iRow = (pIter->backward) ? pIter->bData.nRow : -1; } pIter->iRow += step; - while(1) { + while (1) { findNextValidRow(pIter); if (pIter->iRow >= pIter->bData.nRow || pIter->iRow < 0) { tLDataIterNextBlock(pIter); - if (pIter->pBlockL == NULL) { // no more data + if (pIter->pSstBlk == NULL) { // no more data goto _exit; } } else { break; } - if (iBlockL != pIter->iBlockL) { - code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, pIter->pBlockL, &pIter->bData); + if (iBlockL != pIter->iSstBlk) { + code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, &pIter->bData); if (code) { goto _exit; } @@ -241,16 +241,14 @@ bool tLDataIterNextRow(SLDataIter *pIter) { pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); _exit: - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS) { terrno = code; } - return (code == TSDB_CODE_SUCCESS) && (pIter->pBlockL != NULL); + return (code == TSDB_CODE_SUCCESS) && (pIter->pSstBlk != NULL); } -SRowInfo *tLDataIterGet(SLDataIter *pIter) { - return &pIter->rInfo; -} +SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; } // SMergeTree ================================================= static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) { @@ -287,7 +285,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFRead int32_t code = TSDB_CODE_OUT_OF_MEMORY; struct SLDataIter* pIterList[TSDB_DEFAULT_LAST_FILE] = {0}; - for(int32_t i = 0; i < pFReader->pSet->nLastF; ++i) { // open all last file + for(int32_t i = 0; i < pFReader->pSet->nSstF; ++i) { // open all last file code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, uid, pTimeWindow, pVerRange); if (code != TSDB_CODE_SUCCESS) { goto _end; @@ -311,7 +309,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFRead void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); } -bool tMergeTreeNext(SMergeTree* pMTree) { +bool tMergeTreeNext(SMergeTree *pMTree) { int32_t code = TSDB_CODE_SUCCESS; if (pMTree->pIter) { SLDataIter *pIter = pMTree->pIter; @@ -344,14 +342,12 @@ bool tMergeTreeNext(SMergeTree* pMTree) { return pMTree->pIter != NULL; } -TSDBROW tMergeTreeGetRow(SMergeTree* pMTree) { - return pMTree->pIter->rInfo.row; -} +TSDBROW tMergeTreeGetRow(SMergeTree *pMTree) { return pMTree->pIter->rInfo.row; } -void tMergeTreeClose(SMergeTree* pMTree) { +void tMergeTreeClose(SMergeTree *pMTree) { size_t size = taosArrayGetSize(pMTree->pIterList); - for(int32_t i = 0; i < size; ++i) { - SLDataIter* pIter = taosArrayGetP(pMTree->pIterList, i); + for (int32_t i = 0; i < size; ++i) { + SLDataIter *pIter = taosArrayGetP(pMTree->pIterList, i); tLDataIterClose(pIter); } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index e77123c880..d76787915a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -109,7 +109,7 @@ typedef struct SDataBlockIter { int32_t index; SArray* blockList; // SArray int32_t order; - SBlock block; // current SBlock data + SDataBlk block; // current SDataBlk data SHashObj* pTableMap; } SDataBlockIter; @@ -176,10 +176,10 @@ static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pR static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); -static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, - STsdbReader* pReader, bool* freeTSRow); -static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, - STSRow** pTSRow); +static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, + STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow); +static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, + STsdbReader* pReader, STSRow** pTSRow); static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader); @@ -590,8 +590,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN sizeInDisk += pScanInfo->mapData.nData; for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { - SBlock block = {0}; - tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetBlock); + SDataBlk block = {0}; + tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk); // 1. time range check if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) { @@ -617,7 +617,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN } } - pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nLastF; + pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSstF; int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks; double el = (taosGetTimestampUs() - st) / 1000.0; @@ -665,7 +665,7 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { return pBlockInfo; } -static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; } +static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; } static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { SReaderStatus* pStatus = &pReader->status; @@ -673,7 +673,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn SBlockData* pBlockData = &pStatus->fileBlockData; SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); - SBlock* pBlock = getCurrentBlock(pBlockIter); + SDataBlk* pBlock = getCurrentBlock(pBlockIter); SSDataBlock* pResBlock = pReader->pResBlock; int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock); @@ -758,8 +758,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; ASSERT(pBlockInfo != NULL); - SBlock* pBlock = getCurrentBlock(pBlockIter); - int32_t code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData); + SDataBlk* pBlock = getCurrentBlock(pBlockIter); + int32_t code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, code:%s %s", @@ -836,7 +836,7 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) { if (pBlockInfo != NULL) { STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx); - tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetBlock); + tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetDataBlk); } #if 0 @@ -887,12 +887,12 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte } sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf; - SBlock block = {0}; + SDataBlk block = {0}; for (int32_t k = 0; k < num; ++k) { SBlockOrderWrapper wrapper = {0}; int32_t* mapDataIndex = taosArrayGet(pTableScanInfo->pBlockList, k); - tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetBlock); + tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetDataBlk); wrapper.uid = pTableScanInfo->uid; wrapper.offset = block.aSubBlock[0].offset; @@ -981,15 +981,15 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) { /** * This is an two rectangles overlap cases. */ -static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) { +static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) { return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) || (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) || (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) || (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer); } -static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo, - int32_t* nextIndex, int32_t order) { +static SDataBlk* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo, + int32_t* nextIndex, int32_t order) { bool asc = ASCENDING_TRAVERSE(order); if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) { return NULL; @@ -1002,10 +1002,10 @@ static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STab int32_t step = asc ? 1 : -1; *nextIndex = pFBlockInfo->tbBlockIdx + step; - SBlock* pBlock = taosMemoryCalloc(1, sizeof(SBlock)); - int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); + SDataBlk* pBlock = taosMemoryCalloc(1, sizeof(SDataBlk)); + int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); - tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetBlock); + tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetDataBlk); return pBlock; } @@ -1048,7 +1048,7 @@ static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t return TSDB_CODE_SUCCESS; } -static bool overlapWithNeighborBlock(SBlock* pBlock, SBlock* pNeighbor, int32_t order) { +static bool overlapWithNeighborBlock(SDataBlk* pBlock, SDataBlk* pNeighbor, int32_t order) { // it is the last block in current file, no chance to overlap with neighbor blocks. if (ASCENDING_TRAVERSE(order)) { return pBlock->maxKey.ts == pNeighbor->minKey.ts; @@ -1057,19 +1057,19 @@ static bool overlapWithNeighborBlock(SBlock* pBlock, SBlock* pNeighbor, int32_t } } -static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) { +static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) { bool ascScan = ASCENDING_TRAVERSE(order); return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) || (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts)); } -static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) { +static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) { return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) && (pBlock->minVer <= pVerRange->maxVer); } -static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock) { +static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock) { size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline); for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) { @@ -1103,7 +1103,7 @@ static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, cons return false; } -static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock, int32_t order) { +static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) { if (pBlockScanInfo->delSkyline == NULL) { return false; } @@ -1138,10 +1138,10 @@ static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBl // 3. current timestamp should not be overlap with each other // 4. output buffer should be large enough to hold all rows in current block // 5. delete info should not overlap with current block data -static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock, +static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SDataBlk* pBlock, STableBlockScanInfo* pScanInfo, TSDBKEY key, SLastBlockReader* pLastBlockReader) { - int32_t neighborIndex = 0; - SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order); + int32_t neighborIndex = 0; + SDataBlk* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order); // overlap with neighbor bool overlapWithNeighbor = false; @@ -1158,9 +1158,9 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc // todo bool overlapWithlastBlock = false; #if 0 - if (taosArrayGetSize(pLastBlockReader->pBlockL) > 0 && (pLastBlockReader->currentBlockIndex != -1)) { - SBlockL* pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex); - overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey); + if (taosArrayGetSize(pLastBlockReader->pSstBlk) > 0 && (pLastBlockReader->currentBlockIndex != -1)) { + SSstBlk* pSstBlk = taosArrayGet(pLastBlockReader->pSstBlk, pLastBlockReader->currentBlockIndex); + overlapWithlastBlock = !(pBlock->maxKey.ts < pSstBlk->minKey || pBlock->minKey.ts > pSstBlk->maxKey); } #endif @@ -1374,7 +1374,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, bool mergeBlockData) { - int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); + int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); STSRow* pTSRow = NULL; SRowMerger merge = {0}; @@ -1942,9 +1942,7 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { return key.ts; } -static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { - return pLastBlockReader->mergeTree.pIter != NULL; -} +static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; } int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader) { @@ -2032,7 +2030,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { pDumpInfo->rowIndex += step; - SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter); + SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) { setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); break; @@ -2040,7 +2038,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } } - bool hasBlockLData = hasDataInLastBlock(pLastBlockReader); // no data in last block and block, no need to proceed. @@ -2052,7 +2049,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { // currently loaded file data block is consumed if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) { - SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter); + SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); break; } @@ -2203,7 +2200,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) { return code; } - if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nLastF > 0) { + if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSstF > 0) { code = doLoadFileBlock(pReader, pIndexList, pBlockNum); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pIndexList); @@ -2341,8 +2338,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { } static int32_t doBuildDataBlock(STsdbReader* pReader) { - int32_t code = TSDB_CODE_SUCCESS; - SBlock* pBlock = NULL; + int32_t code = TSDB_CODE_SUCCESS; + SDataBlk* pBlock = NULL; SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; @@ -2439,7 +2436,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { // set the correct start position in case of the first/last file block, according to the query time window static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { - SBlock* pBlock = getCurrentBlock(pBlockIter); + SDataBlk* pBlock = getCurrentBlock(pBlockIter); SReaderStatus* pStatus = &pReader->status; @@ -2810,7 +2807,7 @@ typedef enum { CHECK_FILEBLOCK_QUIT = 0x2, } CHECK_FILEBLOCK_STATE; -static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SBlock* pBlock, +static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock, SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key, CHECK_FILEBLOCK_STATE* state) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; @@ -2819,8 +2816,8 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn *state = CHECK_FILEBLOCK_QUIT; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; - int32_t nextIndex = -1; - SBlock* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order); + int32_t nextIndex = -1; + SDataBlk* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order); if (pNeighborBlock == NULL) { // do nothing return 0; } @@ -2884,7 +2881,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc CHECK_FILEBLOCK_STATE st; SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); - SBlock* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter); + SDataBlk* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter); checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st); if (st == CHECK_FILEBLOCK_QUIT) { break; @@ -2964,7 +2961,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, } int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, - STSRow** pTSRow) { + STSRow** pTSRow) { SRowMerger merge = {0}; TSDBKEY k = TSDBROW_KEY(pRow); @@ -3038,7 +3035,8 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR } if (pBlockScanInfo->iter.hasVal && pRow != NULL) { - return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); + return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, + freeTSRow); } if (pBlockScanInfo->iiter.hasVal && piRow != NULL) { @@ -3467,12 +3465,12 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); - SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter); - int64_t stime = taosGetTimestampUs(); + SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); + int64_t stime = taosGetTimestampUs(); SBlockLoadSuppInfo* pSup = &pReader->suppInfo; - if (tBlockHasSma(pBlock)) { + if (tDataBlkHasSma(pBlock)) { code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg); if (code != TSDB_CODE_SUCCESS) { tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code), @@ -3649,7 +3647,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa while (true) { if (hasNext) { - SBlock* pBlock = getCurrentBlock(pBlockIter); + SDataBlk* pBlock = getCurrentBlock(pBlockIter); int32_t numOfRows = pBlock->nRow; pTableBlockInfo->totalRows += numOfRows; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 664c7d1572..dbaf9b234c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -433,11 +433,11 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS goto _err; } - // last - for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { - tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aLastF[iLast], fname); - pReader->aLastFD[iLast] = taosOpenFile(fname, TD_FILE_READ); - if (pReader->aLastFD[iLast] == NULL) { + // sst + for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { + tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iSst], fname); + pReader->aLastFD[iSst] = taosOpenFile(fname, TD_FILE_READ); + if (pReader->aLastFD[iSst] == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } @@ -474,9 +474,9 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) { goto _err; } - // last - for (int32_t iLast = 0; iLast < (*ppReader)->pSet->nLastF; iLast++) { - if (taosCloseFile(&(*ppReader)->aLastFD[iLast]) < 0) { + // sst + for (int32_t iSst = 0; iSst < (*ppReader)->pSet->nSstF; iSst++) { + if (taosCloseFile(&(*ppReader)->aLastFD[iSst]) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } @@ -559,14 +559,14 @@ _err: return code; } -int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL) { +int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) { int32_t code = 0; - int64_t offset = pReader->pSet->aLastF[iLast]->offset; - int64_t size = pReader->pSet->aLastF[iLast]->size - offset; + int64_t offset = pReader->pSet->aSstF[iSst]->offset; + int64_t size = pReader->pSet->aSstF[iSst]->size - offset; int64_t n; uint32_t delimiter; - taosArrayClear(aBlockL); + taosArrayClear(aSstBlk); if (size == 0) { goto _exit; } @@ -576,13 +576,13 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL) { if (code) goto _err; // seek - if (taosLSeekFile(pReader->aLastFD[iLast], offset, SEEK_SET) < 0) { + if (taosLSeekFile(pReader->aLastFD[iSst], offset, SEEK_SET) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } // read - n = taosReadFile(pReader->aLastFD[iLast], pReader->aBuf[0], size); + n = taosReadFile(pReader->aLastFD[iSst], pReader->aBuf[0], size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -603,10 +603,10 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL) { ASSERT(delimiter == TSDB_FILE_DLMT); while (n < size - sizeof(TSCKSUM)) { - SBlockL blockl; - n += tGetBlockL(pReader->aBuf[0] + n, &blockl); + SSstBlk blockl; + n += tGetSstBlk(pReader->aBuf[0] + n, &blockl); - if (taosArrayPush(aBlockL, &blockl) == NULL) { + if (taosArrayPush(aSstBlk, &blockl) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -677,9 +677,9 @@ _err: return code; } -int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg) { +int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aColumnDataAgg) { int32_t code = 0; - SSmaInfo *pSmaInfo = &pBlock->smaInfo; + SSmaInfo *pSmaInfo = &pDataBlk->smaInfo; ASSERT(pSmaInfo->size > 0); @@ -843,13 +843,13 @@ _err: return code; } -int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData) { +int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) { int32_t code = 0; - code = tsdbReadBlockDataImpl(pReader, &pBlock->aSubBlock[0], 0, pBlockData); + code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], 0, pBlockData); if (code) goto _err; - if (pBlock->nSubBlock > 1) { + if (pDataBlk->nSubBlock > 1) { SBlockData bData1; SBlockData bData2; @@ -863,8 +863,8 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBl tBlockDataInitEx(&bData1, pBlockData); tBlockDataInitEx(&bData2, pBlockData); - for (int32_t iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - code = tsdbReadBlockDataImpl(pReader, &pBlock->aSubBlock[iSubBlock], 0, &bData1); + for (int32_t iSubBlock = 1; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) { + code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[iSubBlock], 0, &bData1); if (code) { tBlockDataDestroy(&bData1, 1); tBlockDataDestroy(&bData2, 1); @@ -897,10 +897,10 @@ _err: return code; } -int32_t tsdbReadLastBlock(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData) { +int32_t tsdbReadSstBlock(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData) { int32_t code = 0; - code = tsdbReadBlockDataImpl(pReader, &pBlockL->bInfo, 1, pBlockData); + code = tsdbReadBlockDataImpl(pReader, &pSstBlk->bInfo, 1, pBlockData); if (code) goto _err; return code; @@ -910,15 +910,15 @@ _err: return code; } -int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData) { +int32_t tsdbReadSstBlockEx(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData) { int32_t code = 0; // read - code = tsdbReadAndCheck(pReader->aLastFD[iLast], pBlockL->bInfo.offset, &pReader->aBuf[0], pBlockL->bInfo.szBlock, 0); + code = tsdbReadAndCheck(pReader->aLastFD[iSst], pSstBlk->bInfo.offset, &pReader->aBuf[0], pSstBlk->bInfo.szBlock, 0); if (code) goto _exit; // decmpr - code = tDecmprBlockData(pReader->aBuf[0], pBlockL->bInfo.szBlock, pBlockData, &pReader->aBuf[1]); + code = tDecmprBlockData(pReader->aBuf[0], pSstBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]); if (code) goto _exit; _exit: @@ -947,14 +947,14 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS .pHeadF = &pWriter->fHead, .pDataF = &pWriter->fData, .pSmaF = &pWriter->fSma, - .nLastF = pSet->nLastF // + .nSstF = pSet->nSstF // }; pWriter->fHead = *pSet->pHeadF; pWriter->fData = *pSet->pDataF; pWriter->fSma = *pSet->pSmaF; - for (int8_t iLast = 0; iLast < pSet->nLastF; iLast++) { - pWriter->wSet.aLastF[iLast] = &pWriter->fLast[iLast]; - pWriter->fLast[iLast] = *pSet->aLastF[iLast]; + for (int8_t iSst = 0; iSst < pSet->nSstF; iSst++) { + pWriter->wSet.aSstF[iSst] = &pWriter->fSst[iSst]; + pWriter->fSst[iSst] = *pSet->aSstF[iSst]; } // head @@ -1036,10 +1036,10 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS ASSERT(n == pWriter->fSma.size); } - // last - ASSERT(pWriter->fLast[pSet->nLastF - 1].size == 0); + // sst + ASSERT(pWriter->fSst[pSet->nSstF - 1].size == 0); flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; - tsdbLastFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fLast[pSet->nLastF - 1], fname); + tsdbSstFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSst[pSet->nSstF - 1], fname); pWriter->pLastFD = taosOpenFile(fname, flag); if (pWriter->pLastFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); @@ -1050,7 +1050,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS code = TAOS_SYSTEM_ERROR(errno); goto _err; } - pWriter->fLast[pWriter->wSet.nLastF - 1].size += TSDB_FHDR_SIZE; + pWriter->fSst[pWriter->wSet.nSstF - 1].size += TSDB_FHDR_SIZE; *ppWriter = pWriter; return code; @@ -1179,9 +1179,9 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) { goto _err; } - // last ============== + // sst ============== memset(hdr, 0, TSDB_FHDR_SIZE); - tPutLastFile(hdr, &pWriter->fLast[pWriter->wSet.nLastF - 1]); + tPutSstFile(hdr, &pWriter->fSst[pWriter->wSet.nSstF - 1]); taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE); n = taosLSeekFile(pWriter->pLastFD, 0, SEEK_SET); @@ -1301,22 +1301,22 @@ _err: return code; } -int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL) { - int32_t code = 0; - SLastFile *pLastFile = &pWriter->fLast[pWriter->wSet.nLastF - 1]; - int64_t size; - int64_t n; +int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) { + int32_t code = 0; + SSstFile *pSstFile = &pWriter->fSst[pWriter->wSet.nSstF - 1]; + int64_t size; + int64_t n; // check - if (taosArrayGetSize(aBlockL) == 0) { - pLastFile->offset = pLastFile->size; + if (taosArrayGetSize(aSstBlk) == 0) { + pSstFile->offset = pSstFile->size; goto _exit; } // size size = sizeof(uint32_t); // TSDB_FILE_DLMT - for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aBlockL); iBlockL++) { - size += tPutBlockL(NULL, taosArrayGet(aBlockL, iBlockL)); + for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) { + size += tPutSstBlk(NULL, taosArrayGet(aSstBlk, iBlockL)); } size += sizeof(TSCKSUM); @@ -1327,8 +1327,8 @@ int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL) { // encode n = 0; n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT); - for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aBlockL); iBlockL++) { - n += tPutBlockL(pWriter->aBuf[0] + n, taosArrayGet(aBlockL, iBlockL)); + for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) { + n += tPutSstBlk(pWriter->aBuf[0] + n, taosArrayGet(aSstBlk, iBlockL)); } taosCalcChecksumAppend(0, pWriter->aBuf[0], size); @@ -1342,12 +1342,12 @@ int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL) { } // update - pLastFile->offset = pLastFile->size; - pLastFile->size += size; + pSstFile->offset = pSstFile->size; + pSstFile->size += size; _exit: tsdbTrace("vgId:%d tsdb write blockl, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), - pLastFile->offset, size); + pSstFile->offset, size); return code; _err: @@ -1355,28 +1355,28 @@ _err: return code; } -static void tsdbUpdateBlockInfo(SBlockData *pBlockData, SBlock *pBlock) { +static void tsdbUpdateBlockInfo(SBlockData *pBlockData, SDataBlk *pDataBlk) { for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]}; if (iRow == 0) { - if (tsdbKeyCmprFn(&pBlock->minKey, &key) > 0) { - pBlock->minKey = key; + if (tsdbKeyCmprFn(&pDataBlk->minKey, &key) > 0) { + pDataBlk->minKey = key; } } else { if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) { - pBlock->hasDup = 1; + pDataBlk->hasDup = 1; } } - if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&pBlock->maxKey, &key) < 0) { - pBlock->maxKey = key; + if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&pDataBlk->maxKey, &key) < 0) { + pDataBlk->maxKey = key; } - pBlock->minVer = TMIN(pBlock->minVer, key.version); - pBlock->maxVer = TMAX(pBlock->maxVer, key.version); + pDataBlk->minVer = TMIN(pDataBlk->minVer, key.version); + pDataBlk->maxVer = TMAX(pDataBlk->maxVer, key.version); } - pBlock->nRow += pBlockData->nRow; + pDataBlk->nRow += pBlockData->nRow; } static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData, SSmaInfo *pSmaInfo) { @@ -1431,7 +1431,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock ASSERT(pBlockData->nRow > 0); - pBlkInfo->offset = toLast ? pWriter->fLast[pWriter->wSet.nLastF - 1].size : pWriter->fData.size; + pBlkInfo->offset = toLast ? pWriter->fSst[pWriter->wSet.nSstF - 1].size : pWriter->fData.size; pBlkInfo->szBlock = 0; pBlkInfo->szKey = 0; @@ -1475,7 +1475,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock // update info if (toLast) { - pWriter->fLast[pWriter->wSet.nLastF - 1].size += pBlkInfo->szBlock; + pWriter->fSst[pWriter->wSet.nSstF - 1].size += pBlkInfo->szBlock; } else { pWriter->fData.size += pBlkInfo->szBlock; } @@ -1554,9 +1554,9 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { taosCloseFile(&pOutFD); taosCloseFile(&PInFD); - // last - tsdbLastFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aLastF[0], fNameFrom); - tsdbLastFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aLastF[0], fNameTo); + // sst + tsdbSstFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aSstF[0], fNameFrom); + tsdbSstFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aSstF[0], fNameTo); pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); if (pOutFD == NULL) { @@ -1570,7 +1570,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { goto _err; } - n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->aLastF[0]->size); + n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->aSstF[0]->size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 9fd2c4f6cd..ee29538a81 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -60,7 +60,7 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { if (expLevel < 0) { taosMemoryFree(pSet->pHeadF); taosMemoryFree(pSet->pDataF); - taosMemoryFree(pSet->aLastF[0]); + taosMemoryFree(pSet->aSstF[0]); taosMemoryFree(pSet->pSmaF); taosArrayRemove(fs.aDFileSet, iSet); iSet--; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index fa7d370583..8d19a2ffb8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -27,13 +27,13 @@ struct STsdbSnapReader { int32_t fid; SDataFReader* pDataFReader; SArray* aBlockIdx; // SArray - SArray* aBlockL; // SArray + SArray* aSstBlk; // SArray SBlockIdx* pBlockIdx; - SBlockL* pBlockL; + SSstBlk* pSstBlk; int32_t iBlockIdx; int32_t iBlockL; - SMapData mBlock; // SMapData + SMapData mBlock; // SMapData int32_t iBlock; SBlockData oBlockData; SBlockData nBlockData; @@ -64,7 +64,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx); if (code) goto _err; - code = tsdbReadBlockL(pReader->pDataFReader, 0, pReader->aBlockL); + code = tsdbReadSstBlk(pReader->pDataFReader, 0, pReader->aSstBlk); if (code) goto _err; // init @@ -82,13 +82,13 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { pReader->iBlockL = 0; while (true) { - if (pReader->iBlockL >= taosArrayGetSize(pReader->aBlockL)) { - pReader->pBlockL = NULL; + if (pReader->iBlockL >= taosArrayGetSize(pReader->aSstBlk)) { + pReader->pSstBlk = NULL; break; } - pReader->pBlockL = (SBlockL*)taosArrayGet(pReader->aBlockL, pReader->iBlockL); - if (pReader->pBlockL->minVer <= pReader->ever && pReader->pBlockL->maxVer >= pReader->sver) { + pReader->pSstBlk = (SSstBlk*)taosArrayGet(pReader->aSstBlk, pReader->iBlockL); + if (pReader->pSstBlk->minVer <= pReader->ever && pReader->pSstBlk->maxVer >= pReader->sver) { // TODO break; } @@ -101,8 +101,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { } while (true) { - if (pReader->pBlockIdx && pReader->pBlockL) { - TABLEID id = {.suid = pReader->pBlockL->suid, .uid = pReader->pBlockL->minUid}; + if (pReader->pBlockIdx && pReader->pSstBlk) { + TABLEID id = {.suid = pReader->pSstBlk->suid, .uid = pReader->pSstBlk->minUid}; ASSERT(0); @@ -115,8 +115,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { // } } else if (pReader->pBlockIdx) { while (pReader->iBlock < pReader->mBlock.nItem) { - SBlock block; - tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, &block, tGetBlock); + SDataBlk block; + tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, &block, tGetDataBlk); if (block.minVer <= pReader->ever && block.maxVer >= pReader->sver) { // load data (todo) @@ -142,18 +142,18 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { } if (*ppData) goto _exit; - } else if (pReader->pBlockL) { - while (pReader->pBlockL) { - if (pReader->pBlockL->minVer <= pReader->ever && pReader->pBlockL->maxVer >= pReader->sver) { + } else if (pReader->pSstBlk) { + while (pReader->pSstBlk) { + if (pReader->pSstBlk->minVer <= pReader->ever && pReader->pSstBlk->maxVer >= pReader->sver) { // load data (todo) } // next pReader->iBlockL++; - if (pReader->iBlockL < taosArrayGetSize(pReader->aBlockL)) { - pReader->pBlockL = (SBlockL*)taosArrayGetSize(pReader->aBlockL); + if (pReader->iBlockL < taosArrayGetSize(pReader->aSstBlk)) { + pReader->pSstBlk = (SSstBlk*)taosArrayGetSize(pReader->aSstBlk); } else { - pReader->pBlockL = NULL; + pReader->pSstBlk = NULL; } if (*ppData) goto _exit; @@ -298,8 +298,8 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - pReader->aBlockL = taosArrayInit(0, sizeof(SBlockL)); - if (pReader->aBlockL == NULL) { + pReader->aSstBlk = taosArrayInit(0, sizeof(SSstBlk)); + if (pReader->aSstBlk == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -338,7 +338,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { if (pReader->pDataFReader) { tsdbDataFReaderClose(&pReader->pDataFReader); } - taosArrayDestroy(pReader->aBlockL); + taosArrayDestroy(pReader->aSstBlk); taosArrayDestroy(pReader->aBlockIdx); tMapDataClear(&pReader->mBlock); tBlockDataDestroy(&pReader->oBlockData, 1); @@ -426,24 +426,24 @@ struct STsdbSnapWriter { SArray* aBlockIdx; // SArray int32_t iBlockIdx; SBlockIdx* pBlockIdx; - SMapData mBlock; // SMapData + SMapData mBlock; // SMapData int32_t iBlock; SBlockData* pBlockData; int32_t iRow; SBlockData bDataR; - SArray* aBlockL; // SArray + SArray* aSstBlk; // SArray int32_t iBlockL; SBlockData lDataR; SDataFWriter* pDataFWriter; SBlockIdx* pBlockIdxW; // NULL when no committing table - SBlock blockW; + SDataBlk blockW; SBlockData bDataW; SBlockIdx blockIdxW; - SMapData mBlockW; // SMapData + SMapData mBlockW; // SMapData SArray* aBlockIdxW; // SArray - SArray* aBlockLW; // SArray + SArray* aBlockLW; // SArray // for del file SDelFReader* pDelFReader; @@ -475,10 +475,10 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { // &pWriter->blockW, pWriter->cmprAlg); if (code) goto _err; - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); if (code) goto _err; - tBlockReset(&pWriter->blockW); + tDataBlkReset(&pWriter->blockW); tBlockDataClear(&pWriter->bDataW); } @@ -499,15 +499,15 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { // &pWriter->blockW, pWriter->cmprAlg); // if (code) goto _err; - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); if (code) goto _err; } while (true) { if (pWriter->iBlock >= pWriter->mBlock.nItem) break; - SBlock block; - tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock); + SDataBlk block; + tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetDataBlk); // if (block.last) { // code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); @@ -520,13 +520,13 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { // if (code) goto _err; // } - code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); + code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk); if (code) goto _err; pWriter->iBlock++; } - // SBlock + // SDataBlk // code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW); // if (code) goto _err; @@ -553,10 +553,10 @@ static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* p if (code) goto _err; // SBlockData - SBlock block; + SDataBlk block; tMapDataReset(&pWriter->mBlockW); for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) { - tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetBlock); + tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetDataBlk); // if (block.last) { // code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); @@ -570,11 +570,11 @@ static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* p // if (code) goto _err; // } - code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); + code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk); if (code) goto _err; } - // SBlock + // SDataBlk SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, &blockIdx); if (code) goto _err; @@ -642,10 +642,10 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { while (true) { if (pWriter->iBlock >= pWriter->mBlock.nItem) break; - SBlock block; - int32_t c; + SDataBlk block; + int32_t c; - tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock); + tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetDataBlk); // if (block.last) { // pWriter->pBlockData = &pWriter->bDataR; @@ -668,14 +668,14 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { // &pWriter->blockW, pWriter->cmprAlg); // if (code) goto _err; - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); if (code) goto _err; - tBlockReset(&pWriter->blockW); + tDataBlkReset(&pWriter->blockW); tBlockDataClear(&pWriter->bDataW); } - code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); + code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk); if (code) goto _err; pWriter->iBlock++; @@ -719,10 +719,10 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { // &pWriter->blockW, pWriter->cmprAlg); // if (code) goto _err; - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); if (code) goto _err; - tBlockReset(&pWriter->blockW); + tDataBlkReset(&pWriter->blockW); tBlockDataClear(&pWriter->bDataW); } @@ -803,7 +803,7 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) { pWriter->pBlockIdxW->suid = id.suid; pWriter->pBlockIdxW->uid = id.uid; - tBlockReset(&pWriter->blockW); + tDataBlkReset(&pWriter->blockW); tBlockDataReset(&pWriter->bDataW); tMapDataReset(&pWriter->mBlockW); } @@ -845,7 +845,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { // write remain stuff if (taosArrayGetSize(pWriter->aBlockLW) > 0) { - code = tsdbWriteBlockL(pWriter->pDataFWriter, pWriter->aBlockIdxW); + code = tsdbWriteSstBlk(pWriter->pDataFWriter, pWriter->aBlockIdxW); if (code) goto _err; } @@ -911,12 +911,12 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx); if (code) goto _err; - code = tsdbReadBlockL(pWriter->pDataFReader, 0, pWriter->aBlockL); + code = tsdbReadSstBlk(pWriter->pDataFReader, 0, pWriter->aSstBlk); if (code) goto _err; } else { ASSERT(pWriter->pDataFReader == NULL); taosArrayClear(pWriter->aBlockIdx); - taosArrayClear(pWriter->aBlockL); + taosArrayClear(pWriter->aSstBlk); } pWriter->iBlockIdx = 0; pWriter->pBlockIdx = NULL; @@ -931,25 +931,25 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 // write SHeadFile fHead; SDataFile fData; - SLastFile fLast; + SSstFile fLast; SSmaFile fSma; - SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .aLastF[0] = &fLast, .pSmaF = &fSma}; + SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .aSstF[0] = &fLast, .pSmaF = &fSma}; if (pSet) { wSet.diskId = pSet->diskId; wSet.fid = fid; - wSet.nLastF = 1; + wSet.nSstF = 1; fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0}; fData = *pSet->pDataF; - fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0}; + fLast = (SSstFile){.commitID = pWriter->commitID, .size = 0}; fSma = *pSet->pSmaF; } else { wSet.diskId = (SDiskID){.level = 0, .id = 0}; wSet.fid = fid; - wSet.nLastF = 1; + wSet.nSstF = 1; fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0}; fData = (SDataFile){.commitID = pWriter->commitID, .size = 0}; - fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0, .offset = 0}; + fLast = (SSstFile){.commitID = pWriter->commitID, .size = 0, .offset = 0}; fSma = (SSmaFile){.commitID = pWriter->commitID, .size = 0}; } @@ -1147,8 +1147,8 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr code = tBlockDataCreate(&pWriter->bDataR); if (code) goto _err; - pWriter->aBlockL = taosArrayInit(0, sizeof(SBlockL)); - if (pWriter->aBlockL == NULL) { + pWriter->aSstBlk = taosArrayInit(0, sizeof(SSstBlk)); + if (pWriter->aSstBlk == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -1161,7 +1161,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr code = tBlockDataCreate(&pWriter->bDataW); if (code) goto _err; - pWriter->aBlockLW = taosArrayInit(0, sizeof(SBlockL)); + pWriter->aBlockLW = taosArrayInit(0, sizeof(SSstBlk)); if (pWriter->aBlockLW == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index f1e09c7f29..6937a27fe4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -214,7 +214,7 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs) { int32_t tCmprBlockL(void const *lhs, void const *rhs) { SBlockIdx *lBlockIdx = (SBlockIdx *)lhs; - SBlockL *rBlockL = (SBlockL *)rhs; + SSstBlk *rBlockL = (SSstBlk *)rhs; if (lBlockIdx->suid < rBlockL->suid) { return -1; @@ -231,69 +231,69 @@ int32_t tCmprBlockL(void const *lhs, void const *rhs) { return 0; } -// SBlock ====================================================== -void tBlockReset(SBlock *pBlock) { - *pBlock = (SBlock){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVer = VERSION_MAX, .maxVer = VERSION_MIN}; +// SDataBlk ====================================================== +void tDataBlkReset(SDataBlk *pDataBlk) { + *pDataBlk = (SDataBlk){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVer = VERSION_MAX, .maxVer = VERSION_MIN}; } -int32_t tPutBlock(uint8_t *p, void *ph) { - int32_t n = 0; - SBlock *pBlock = (SBlock *)ph; +int32_t tPutDataBlk(uint8_t *p, void *ph) { + int32_t n = 0; + SDataBlk *pDataBlk = (SDataBlk *)ph; - n += tPutI64v(p ? p + n : p, pBlock->minKey.version); - n += tPutI64v(p ? p + n : p, pBlock->minKey.ts); - n += tPutI64v(p ? p + n : p, pBlock->maxKey.version); - n += tPutI64v(p ? p + n : p, pBlock->maxKey.ts); - n += tPutI64v(p ? p + n : p, pBlock->minVer); - n += tPutI64v(p ? p + n : p, pBlock->maxVer); - n += tPutI32v(p ? p + n : p, pBlock->nRow); - n += tPutI8(p ? p + n : p, pBlock->hasDup); - n += tPutI8(p ? p + n : p, pBlock->nSubBlock); - for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].offset); - n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szBlock); - n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szKey); + n += tPutI64v(p ? p + n : p, pDataBlk->minKey.version); + n += tPutI64v(p ? p + n : p, pDataBlk->minKey.ts); + n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.version); + n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.ts); + n += tPutI64v(p ? p + n : p, pDataBlk->minVer); + n += tPutI64v(p ? p + n : p, pDataBlk->maxVer); + n += tPutI32v(p ? p + n : p, pDataBlk->nRow); + n += tPutI8(p ? p + n : p, pDataBlk->hasDup); + n += tPutI8(p ? p + n : p, pDataBlk->nSubBlock); + for (int8_t iSubBlock = 0; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) { + n += tPutI64v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].offset); + n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szBlock); + n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szKey); } - if (pBlock->nSubBlock == 1 && !pBlock->hasDup) { - n += tPutI64v(p ? p + n : p, pBlock->smaInfo.offset); - n += tPutI32v(p ? p + n : p, pBlock->smaInfo.size); + if (pDataBlk->nSubBlock == 1 && !pDataBlk->hasDup) { + n += tPutI64v(p ? p + n : p, pDataBlk->smaInfo.offset); + n += tPutI32v(p ? p + n : p, pDataBlk->smaInfo.size); } return n; } -int32_t tGetBlock(uint8_t *p, void *ph) { - int32_t n = 0; - SBlock *pBlock = (SBlock *)ph; +int32_t tGetDataBlk(uint8_t *p, void *ph) { + int32_t n = 0; + SDataBlk *pDataBlk = (SDataBlk *)ph; - n += tGetI64v(p + n, &pBlock->minKey.version); - n += tGetI64v(p + n, &pBlock->minKey.ts); - n += tGetI64v(p + n, &pBlock->maxKey.version); - n += tGetI64v(p + n, &pBlock->maxKey.ts); - n += tGetI64v(p + n, &pBlock->minVer); - n += tGetI64v(p + n, &pBlock->maxVer); - n += tGetI32v(p + n, &pBlock->nRow); - n += tGetI8(p + n, &pBlock->hasDup); - n += tGetI8(p + n, &pBlock->nSubBlock); - for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].offset); - n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szBlock); - n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szKey); + n += tGetI64v(p + n, &pDataBlk->minKey.version); + n += tGetI64v(p + n, &pDataBlk->minKey.ts); + n += tGetI64v(p + n, &pDataBlk->maxKey.version); + n += tGetI64v(p + n, &pDataBlk->maxKey.ts); + n += tGetI64v(p + n, &pDataBlk->minVer); + n += tGetI64v(p + n, &pDataBlk->maxVer); + n += tGetI32v(p + n, &pDataBlk->nRow); + n += tGetI8(p + n, &pDataBlk->hasDup); + n += tGetI8(p + n, &pDataBlk->nSubBlock); + for (int8_t iSubBlock = 0; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) { + n += tGetI64v(p + n, &pDataBlk->aSubBlock[iSubBlock].offset); + n += tGetI32v(p + n, &pDataBlk->aSubBlock[iSubBlock].szBlock); + n += tGetI32v(p + n, &pDataBlk->aSubBlock[iSubBlock].szKey); } - if (pBlock->nSubBlock == 1 && !pBlock->hasDup) { - n += tGetI64v(p + n, &pBlock->smaInfo.offset); - n += tGetI32v(p + n, &pBlock->smaInfo.size); + if (pDataBlk->nSubBlock == 1 && !pDataBlk->hasDup) { + n += tGetI64v(p + n, &pDataBlk->smaInfo.offset); + n += tGetI32v(p + n, &pDataBlk->smaInfo.size); } else { - pBlock->smaInfo.offset = 0; - pBlock->smaInfo.size = 0; + pDataBlk->smaInfo.offset = 0; + pDataBlk->smaInfo.size = 0; } return n; } -int32_t tBlockCmprFn(const void *p1, const void *p2) { - SBlock *pBlock1 = (SBlock *)p1; - SBlock *pBlock2 = (SBlock *)p2; +int32_t tDataBlkCmprFn(const void *p1, const void *p2) { + SDataBlk *pBlock1 = (SDataBlk *)p1; + SDataBlk *pBlock2 = (SDataBlk *)p2; if (tsdbKeyCmprFn(&pBlock1->maxKey, &pBlock2->minKey) < 0) { return -1; @@ -304,48 +304,48 @@ int32_t tBlockCmprFn(const void *p1, const void *p2) { return 0; } -bool tBlockHasSma(SBlock *pBlock) { - if (pBlock->nSubBlock > 1) return false; - if (pBlock->hasDup) return false; +bool tDataBlkHasSma(SDataBlk *pDataBlk) { + if (pDataBlk->nSubBlock > 1) return false; + if (pDataBlk->hasDup) return false; - return pBlock->smaInfo.size > 0; + return pDataBlk->smaInfo.size > 0; } -// SBlockL ====================================================== -int32_t tPutBlockL(uint8_t *p, void *ph) { +// SSstBlk ====================================================== +int32_t tPutSstBlk(uint8_t *p, void *ph) { int32_t n = 0; - SBlockL *pBlockL = (SBlockL *)ph; + SSstBlk *pSstBlk = (SSstBlk *)ph; - n += tPutI64(p ? p + n : p, pBlockL->suid); - n += tPutI64(p ? p + n : p, pBlockL->minUid); - n += tPutI64(p ? p + n : p, pBlockL->maxUid); - n += tPutI64v(p ? p + n : p, pBlockL->minKey); - n += tPutI64v(p ? p + n : p, pBlockL->maxKey); - n += tPutI64v(p ? p + n : p, pBlockL->minVer); - n += tPutI64v(p ? p + n : p, pBlockL->maxVer); - n += tPutI32v(p ? p + n : p, pBlockL->nRow); - n += tPutI64v(p ? p + n : p, pBlockL->bInfo.offset); - n += tPutI32v(p ? p + n : p, pBlockL->bInfo.szBlock); - n += tPutI32v(p ? p + n : p, pBlockL->bInfo.szKey); + n += tPutI64(p ? p + n : p, pSstBlk->suid); + n += tPutI64(p ? p + n : p, pSstBlk->minUid); + n += tPutI64(p ? p + n : p, pSstBlk->maxUid); + n += tPutI64v(p ? p + n : p, pSstBlk->minKey); + n += tPutI64v(p ? p + n : p, pSstBlk->maxKey); + n += tPutI64v(p ? p + n : p, pSstBlk->minVer); + n += tPutI64v(p ? p + n : p, pSstBlk->maxVer); + n += tPutI32v(p ? p + n : p, pSstBlk->nRow); + n += tPutI64v(p ? p + n : p, pSstBlk->bInfo.offset); + n += tPutI32v(p ? p + n : p, pSstBlk->bInfo.szBlock); + n += tPutI32v(p ? p + n : p, pSstBlk->bInfo.szKey); return n; } -int32_t tGetBlockL(uint8_t *p, void *ph) { +int32_t tGetSstBlk(uint8_t *p, void *ph) { int32_t n = 0; - SBlockL *pBlockL = (SBlockL *)ph; + SSstBlk *pSstBlk = (SSstBlk *)ph; - n += tGetI64(p + n, &pBlockL->suid); - n += tGetI64(p + n, &pBlockL->minUid); - n += tGetI64(p + n, &pBlockL->maxUid); - n += tGetI64v(p + n, &pBlockL->minKey); - n += tGetI64v(p + n, &pBlockL->maxKey); - n += tGetI64v(p + n, &pBlockL->minVer); - n += tGetI64v(p + n, &pBlockL->maxVer); - n += tGetI32v(p + n, &pBlockL->nRow); - n += tGetI64v(p + n, &pBlockL->bInfo.offset); - n += tGetI32v(p + n, &pBlockL->bInfo.szBlock); - n += tGetI32v(p + n, &pBlockL->bInfo.szKey); + n += tGetI64(p + n, &pSstBlk->suid); + n += tGetI64(p + n, &pSstBlk->minUid); + n += tGetI64(p + n, &pSstBlk->maxUid); + n += tGetI64v(p + n, &pSstBlk->minKey); + n += tGetI64v(p + n, &pSstBlk->maxKey); + n += tGetI64v(p + n, &pSstBlk->minVer); + n += tGetI64v(p + n, &pSstBlk->maxVer); + n += tGetI32v(p + n, &pSstBlk->nRow); + n += tGetI64v(p + n, &pSstBlk->bInfo.offset); + n += tGetI32v(p + n, &pSstBlk->bInfo.szBlock); + n += tGetI32v(p + n, &pSstBlk->bInfo.szKey); return n; }