Merge pull request #16653 from taosdata/refact/tsdb_new_format
refact: new file format
This commit is contained in:
commit
3d7eb5202b
|
@ -65,12 +65,14 @@ typedef struct SBlockInfo SBlockInfo;
|
||||||
typedef struct SSmaInfo SSmaInfo;
|
typedef struct SSmaInfo SSmaInfo;
|
||||||
typedef struct SBlockCol SBlockCol;
|
typedef struct SBlockCol SBlockCol;
|
||||||
typedef struct SVersionRange SVersionRange;
|
typedef struct SVersionRange SVersionRange;
|
||||||
|
typedef struct SLDataIter SLDataIter;
|
||||||
|
|
||||||
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
|
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
|
||||||
#define TSDB_MAX_SUBBLOCKS 8
|
#define TSDB_MAX_SUBBLOCKS 8
|
||||||
#define TSDB_MAX_LAST_FILE 16
|
#define TSDB_MAX_SST_FILE 16
|
||||||
#define TSDB_DEFAULT_LAST_FILE 8
|
#define TSDB_DEFAULT_SST_FILE 8
|
||||||
#define TSDB_FHDR_SIZE 512
|
#define TSDB_FHDR_SIZE 512
|
||||||
|
#define TSDB_DEFAULT_PAGE_SIZE 4096
|
||||||
|
|
||||||
#define HAS_NONE ((int8_t)0x1)
|
#define HAS_NONE ((int8_t)0x1)
|
||||||
#define HAS_NULL ((int8_t)0x2)
|
#define HAS_NULL ((int8_t)0x2)
|
||||||
|
@ -82,6 +84,14 @@ typedef struct SVersionRange SVersionRange;
|
||||||
#define TSDBKEY_MIN ((TSDBKEY){.ts = TSKEY_MIN, .version = VERSION_MIN})
|
#define TSDBKEY_MIN ((TSDBKEY){.ts = TSKEY_MIN, .version = VERSION_MIN})
|
||||||
#define TSDBKEY_MAX ((TSDBKEY){.ts = TSKEY_MAX, .version = VERSION_MAX})
|
#define TSDBKEY_MAX ((TSDBKEY){.ts = TSKEY_MAX, .version = VERSION_MAX})
|
||||||
|
|
||||||
|
#define PAGE_CONTENT_SIZE(PAGE) ((PAGE) - sizeof(TSCKSUM))
|
||||||
|
#define LOGIC_TO_FILE_OFFSET(LOFFSET, PAGE) \
|
||||||
|
((LOFFSET) / PAGE_CONTENT_SIZE(PAGE) * (PAGE) + (LOFFSET) % PAGE_CONTENT_SIZE(PAGE))
|
||||||
|
#define FILE_TO_LOGIC_OFFSET(OFFSET, PAGE) ((OFFSET) / (PAGE)*PAGE_CONTENT_SIZE(PAGE) + (OFFSET) % (PAGE))
|
||||||
|
#define PAGE_OFFSET(PGNO, PAGE) (((PGNO)-1) * (PAGE))
|
||||||
|
#define OFFSET_PGNO(OFFSET, PAGE) ((OFFSET) / (PAGE) + 1)
|
||||||
|
#define LOGIC_TO_FILE_SIZE(LSIZE, PAGE) OFFSET_PGNO(LOGIC_TO_FILE_OFFSET(LSIZE, PAGE), PAGE) * (PAGE)
|
||||||
|
|
||||||
// tsdbUtil.c ==============================================================================================
|
// tsdbUtil.c ==============================================================================================
|
||||||
// TSDBROW
|
// TSDBROW
|
||||||
#define TSDBROW_TS(ROW) (((ROW)->type == 0) ? (ROW)->pTSRow->ts : (ROW)->pBlockData->aTSKEY[(ROW)->iRow])
|
#define TSDBROW_TS(ROW) (((ROW)->type == 0) ? (ROW)->pTSRow->ts : (ROW)->pBlockData->aTSKEY[(ROW)->iRow])
|
||||||
|
@ -195,7 +205,6 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol
|
||||||
uint8_t **ppBuf);
|
uint8_t **ppBuf);
|
||||||
int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData,
|
int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData,
|
||||||
uint8_t **ppBuf);
|
uint8_t **ppBuf);
|
||||||
int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck);
|
|
||||||
// tsdbMemTable ==============================================================================================
|
// tsdbMemTable ==============================================================================================
|
||||||
// SMemTable
|
// SMemTable
|
||||||
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
|
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
|
||||||
|
@ -563,7 +572,7 @@ struct SDFileSet {
|
||||||
SDataFile *pDataF;
|
SDataFile *pDataF;
|
||||||
SSmaFile *pSmaF;
|
SSmaFile *pSmaF;
|
||||||
uint8_t nSstF;
|
uint8_t nSstF;
|
||||||
SSstFile *aSstF[TSDB_MAX_LAST_FILE];
|
SSstFile *aSstF[TSDB_MAX_SST_FILE];
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SRowIter {
|
struct SRowIter {
|
||||||
|
@ -578,45 +587,53 @@ struct SRowMerger {
|
||||||
SArray *pArray; // SArray<SColVal>
|
SArray *pArray; // SArray<SColVal>
|
||||||
};
|
};
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char *path;
|
||||||
|
int32_t szPage;
|
||||||
|
int32_t flag;
|
||||||
|
TdFilePtr pFD;
|
||||||
|
int64_t pgno;
|
||||||
|
uint8_t *pBuf;
|
||||||
|
int64_t szFile;
|
||||||
|
} STsdbFD;
|
||||||
|
|
||||||
struct SDelFWriter {
|
struct SDelFWriter {
|
||||||
STsdb *pTsdb;
|
STsdb *pTsdb;
|
||||||
SDelFile fDel;
|
SDelFile fDel;
|
||||||
TdFilePtr pWriteH;
|
STsdbFD *pWriteH;
|
||||||
|
|
||||||
uint8_t *aBuf[1];
|
uint8_t *aBuf[1];
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SDataFWriter {
|
|
||||||
STsdb *pTsdb;
|
|
||||||
SDFileSet wSet;
|
|
||||||
|
|
||||||
TdFilePtr pHeadFD;
|
|
||||||
TdFilePtr pDataFD;
|
|
||||||
TdFilePtr pSmaFD;
|
|
||||||
TdFilePtr pLastFD;
|
|
||||||
|
|
||||||
SHeadFile fHead;
|
|
||||||
SDataFile fData;
|
|
||||||
SSmaFile fSma;
|
|
||||||
SSstFile fSst[TSDB_MAX_LAST_FILE];
|
|
||||||
|
|
||||||
uint8_t *aBuf[4];
|
|
||||||
};
|
|
||||||
|
|
||||||
struct STsdbReadSnap {
|
struct STsdbReadSnap {
|
||||||
SMemTable *pMem;
|
SMemTable *pMem;
|
||||||
SMemTable *pIMem;
|
SMemTable *pIMem;
|
||||||
STsdbFS fs;
|
STsdbFS fs;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct SDataFWriter {
|
||||||
|
STsdb *pTsdb;
|
||||||
|
SDFileSet wSet;
|
||||||
|
|
||||||
|
STsdbFD *pHeadFD;
|
||||||
|
STsdbFD *pDataFD;
|
||||||
|
STsdbFD *pSmaFD;
|
||||||
|
STsdbFD *pSstFD;
|
||||||
|
|
||||||
|
SHeadFile fHead;
|
||||||
|
SDataFile fData;
|
||||||
|
SSmaFile fSma;
|
||||||
|
SSstFile fSst[TSDB_MAX_SST_FILE];
|
||||||
|
|
||||||
|
uint8_t *aBuf[4];
|
||||||
|
};
|
||||||
|
|
||||||
struct SDataFReader {
|
struct SDataFReader {
|
||||||
STsdb *pTsdb;
|
STsdb *pTsdb;
|
||||||
SDFileSet *pSet;
|
SDFileSet *pSet;
|
||||||
TdFilePtr pHeadFD;
|
STsdbFD *pHeadFD;
|
||||||
TdFilePtr pDataFD;
|
STsdbFD *pDataFD;
|
||||||
TdFilePtr pSmaFD;
|
STsdbFD *pSmaFD;
|
||||||
TdFilePtr aLastFD[TSDB_MAX_LAST_FILE];
|
STsdbFD *aSstFD[TSDB_MAX_SST_FILE];
|
||||||
|
|
||||||
uint8_t *aBuf[3];
|
uint8_t *aBuf[3];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -630,11 +647,12 @@ typedef struct SMergeTree {
|
||||||
int8_t backward;
|
int8_t backward;
|
||||||
SRBTree rbt;
|
SRBTree rbt;
|
||||||
SArray *pIterList;
|
SArray *pIterList;
|
||||||
struct SLDataIter *pIter;
|
SLDataIter *pIter;
|
||||||
} SMergeTree;
|
} SMergeTree;
|
||||||
|
|
||||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader, uint64_t uid, STimeWindow* pTimeWindow, SVersionRange* pVerRange);
|
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t uid,
|
||||||
void tMergeTreeAddIter(SMergeTree *pMTree, struct SLDataIter *pIter);
|
STimeWindow *pTimeWindow, SVersionRange *pVerRange);
|
||||||
|
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
||||||
bool tMergeTreeNext(SMergeTree *pMTree);
|
bool tMergeTreeNext(SMergeTree *pMTree);
|
||||||
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
|
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
|
||||||
void tMergeTreeClose(SMergeTree *pMTree);
|
void tMergeTreeClose(SMergeTree *pMTree);
|
||||||
|
|
|
@ -71,7 +71,7 @@ typedef struct {
|
||||||
SDataIter *pIter;
|
SDataIter *pIter;
|
||||||
SRBTree rbt;
|
SRBTree rbt;
|
||||||
SDataIter dataIter;
|
SDataIter dataIter;
|
||||||
SDataIter aDataIter[TSDB_MAX_LAST_FILE];
|
SDataIter aDataIter[TSDB_MAX_SST_FILE];
|
||||||
int8_t toLastOnly;
|
int8_t toLastOnly;
|
||||||
};
|
};
|
||||||
struct {
|
struct {
|
||||||
|
@ -92,9 +92,6 @@ typedef struct {
|
||||||
SArray *aDelData; // SArray<SDelData>
|
SArray *aDelData; // SArray<SDelData>
|
||||||
} SCommitter;
|
} SCommitter;
|
||||||
|
|
||||||
extern int32_t tsdbReadSstBlockEx(SDataFReader *pReader, int32_t iSst, SSstBlk *aSstBlk,
|
|
||||||
SBlockData *pBlockData); // todo
|
|
||||||
|
|
||||||
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
|
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
|
||||||
static int32_t tsdbCommitData(SCommitter *pCommitter);
|
static int32_t tsdbCommitData(SCommitter *pCommitter);
|
||||||
static int32_t tsdbCommitDel(SCommitter *pCommitter);
|
static int32_t tsdbCommitDel(SCommitter *pCommitter);
|
||||||
|
@ -445,7 +442,7 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
|
||||||
|
|
||||||
pIter->iSstBlk = 0;
|
pIter->iSstBlk = 0;
|
||||||
SSstBlk *pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, 0);
|
SSstBlk *pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, 0);
|
||||||
code = tsdbReadSstBlockEx(pCommitter->dReader.pReader, iSst, pSstBlk, &pIter->bData);
|
code = tsdbReadSstBlock(pCommitter->dReader.pReader, iSst, pSstBlk, &pIter->bData);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
pIter->iRow = 0;
|
pIter->iRow = 0;
|
||||||
|
@ -760,7 +757,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
|
||||||
pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
|
pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
|
||||||
pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
|
pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
|
||||||
pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
|
pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
|
||||||
pCommitter->maxLast = TSDB_DEFAULT_LAST_FILE; // TODO: make it as a config
|
pCommitter->maxLast = TSDB_DEFAULT_SST_FILE; // TODO: make it as a config
|
||||||
pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
|
pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
|
||||||
if (pCommitter->aTbDataP == NULL) {
|
if (pCommitter->aTbDataP == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -790,7 +787,7 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
|
|
||||||
// merger
|
// merger
|
||||||
for (int32_t iSst = 0; iSst < TSDB_MAX_LAST_FILE; iSst++) {
|
for (int32_t iSst = 0; iSst < TSDB_MAX_SST_FILE; iSst++) {
|
||||||
SDataIter *pIter = &pCommitter->aDataIter[iSst];
|
SDataIter *pIter = &pCommitter->aDataIter[iSst];
|
||||||
pIter->aSstBlk = taosArrayInit(0, sizeof(SSstBlk));
|
pIter->aSstBlk = taosArrayInit(0, sizeof(SSstBlk));
|
||||||
if (pIter->aSstBlk == NULL) {
|
if (pIter->aSstBlk == NULL) {
|
||||||
|
@ -832,7 +829,7 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) {
|
||||||
tBlockDataDestroy(&pCommitter->dReader.bData, 1);
|
tBlockDataDestroy(&pCommitter->dReader.bData, 1);
|
||||||
|
|
||||||
// merger
|
// merger
|
||||||
for (int32_t iSst = 0; iSst < TSDB_MAX_LAST_FILE; iSst++) {
|
for (int32_t iSst = 0; iSst < TSDB_MAX_SST_FILE; iSst++) {
|
||||||
SDataIter *pIter = &pCommitter->aDataIter[iSst];
|
SDataIter *pIter = &pCommitter->aDataIter[iSst];
|
||||||
taosArrayDestroy(pIter->aSstBlk);
|
taosArrayDestroy(pIter->aSstBlk);
|
||||||
tBlockDataDestroy(&pIter->bData, 1);
|
tBlockDataDestroy(&pIter->bData, 1);
|
||||||
|
@ -1059,7 +1056,7 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
|
||||||
if (pIter->iSstBlk < taosArrayGetSize(pIter->aSstBlk)) {
|
if (pIter->iSstBlk < taosArrayGetSize(pIter->aSstBlk)) {
|
||||||
SSstBlk *pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, pIter->iSstBlk);
|
SSstBlk *pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, pIter->iSstBlk);
|
||||||
|
|
||||||
code = tsdbReadSstBlockEx(pCommitter->dReader.pReader, pIter->iSst, pSstBlk, &pIter->bData);
|
code = tsdbReadSstBlock(pCommitter->dReader.pReader, pIter->iSst, pSstBlk, &pIter->bData);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
|
|
||||||
pIter->iRow = 0;
|
pIter->iRow = 0;
|
||||||
|
|
|
@ -21,6 +21,9 @@ static int32_t tsdbEncodeFS(uint8_t *p, STsdbFS *pFS) {
|
||||||
int8_t hasDel = pFS->pDelFile ? 1 : 0;
|
int8_t hasDel = pFS->pDelFile ? 1 : 0;
|
||||||
uint32_t nSet = taosArrayGetSize(pFS->aDFileSet);
|
uint32_t nSet = taosArrayGetSize(pFS->aDFileSet);
|
||||||
|
|
||||||
|
// version
|
||||||
|
n += tPutI8(p ? p + n : p, 0);
|
||||||
|
|
||||||
// SDelFile
|
// SDelFile
|
||||||
n += tPutI8(p ? p + n : p, hasDel);
|
n += tPutI8(p ? p + n : p, hasDel);
|
||||||
if (hasDel) {
|
if (hasDel) {
|
||||||
|
@ -292,7 +295,7 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
if (size != pSet->pHeadF->size) {
|
if (size != LOGIC_TO_FILE_SIZE(pSet->pHeadF->size, TSDB_DEFAULT_PAGE_SIZE)) {
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
code = TSDB_CODE_FILE_CORRUPTED;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -303,10 +306,10 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
if (size < pSet->pDataF->size) {
|
if (size < LOGIC_TO_FILE_SIZE(pSet->pDataF->size, TSDB_DEFAULT_PAGE_SIZE)) {
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
code = TSDB_CODE_FILE_CORRUPTED;
|
||||||
goto _err;
|
goto _err;
|
||||||
} else if (size > pSet->pDataF->size) {
|
} else if (size > LOGIC_TO_FILE_SIZE(pSet->pDataF->size, TSDB_DEFAULT_PAGE_SIZE)) {
|
||||||
code = tsdbDFileRollback(pTsdb, pSet, TSDB_DATA_FILE);
|
code = tsdbDFileRollback(pTsdb, pSet, TSDB_DATA_FILE);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
@ -317,10 +320,10 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
if (size < pSet->pSmaF->size) {
|
if (size < LOGIC_TO_FILE_SIZE(pSet->pSmaF->size, TSDB_DEFAULT_PAGE_SIZE)) {
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
code = TSDB_CODE_FILE_CORRUPTED;
|
||||||
goto _err;
|
goto _err;
|
||||||
} else if (size > pSet->pSmaF->size) {
|
} else if (size > LOGIC_TO_FILE_SIZE(pSet->pSmaF->size, TSDB_DEFAULT_PAGE_SIZE)) {
|
||||||
code = tsdbDFileRollback(pTsdb, pSet, TSDB_SMA_FILE);
|
code = tsdbDFileRollback(pTsdb, pSet, TSDB_SMA_FILE);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
@ -332,7 +335,7 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
if (size != pSet->aSstF[iSst]->size) {
|
if (size != LOGIC_TO_FILE_SIZE(pSet->aSstF[iSst]->size, TSDB_DEFAULT_PAGE_SIZE)) {
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
code = TSDB_CODE_FILE_CORRUPTED;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -364,10 +367,12 @@ static int32_t tsdbRecoverFS(STsdb *pTsdb, uint8_t *pData, int64_t nData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int8_t hasDel;
|
int8_t hasDel;
|
||||||
uint32_t nSet;
|
uint32_t nSet;
|
||||||
int32_t n;
|
int32_t n = 0;
|
||||||
|
|
||||||
|
// version
|
||||||
|
n += tGetI8(pData + n, NULL);
|
||||||
|
|
||||||
// SDelFile
|
// SDelFile
|
||||||
n = 0;
|
|
||||||
n += tGetI8(pData + n, &hasDel);
|
n += tGetI8(pData + n, &hasDel);
|
||||||
if (hasDel) {
|
if (hasDel) {
|
||||||
pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
|
pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
|
||||||
|
|
|
@ -148,7 +148,7 @@ int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ftruncate
|
// ftruncate
|
||||||
if (taosFtruncateFile(pFD, size) < 0) {
|
if (taosFtruncateFile(pFD, LOGIC_TO_FILE_SIZE(size, TSDB_DEFAULT_PAGE_SIZE)) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
// SLDataIter =================================================
|
// SLDataIter =================================================
|
||||||
typedef struct SLDataIter {
|
struct SLDataIter {
|
||||||
SRBTreeNode node;
|
SRBTreeNode node;
|
||||||
SSstBlk *pSstBlk;
|
SSstBlk *pSstBlk;
|
||||||
SDataFReader *pReader;
|
SDataFReader *pReader;
|
||||||
|
@ -31,13 +31,11 @@ typedef struct SLDataIter {
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
STimeWindow timeWindow;
|
STimeWindow timeWindow;
|
||||||
SVersionRange verRange;
|
SVersionRange verRange;
|
||||||
} SLDataIter;
|
};
|
||||||
|
|
||||||
static SBlockData* getCurrentBlock(SLDataIter* pIter) {
|
static SBlockData *getCurrentBlock(SLDataIter *pIter) { return &pIter->bData[pIter->loadIndex]; }
|
||||||
return &pIter->bData[pIter->loadIndex];
|
|
||||||
}
|
|
||||||
|
|
||||||
static SBlockData* getNextBlock(SLDataIter* pIter) {
|
static SBlockData *getNextBlock(SLDataIter *pIter) {
|
||||||
pIter->loadIndex ^= 1;
|
pIter->loadIndex ^= 1;
|
||||||
return getCurrentBlock(pIter);
|
return getCurrentBlock(pIter);
|
||||||
}
|
}
|
||||||
|
@ -116,8 +114,6 @@ void tLDataIterClose(SLDataIter *pIter) {
|
||||||
taosMemoryFree(pIter);
|
taosMemoryFree(pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
extern int32_t tsdbReadSstBlockEx(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData);
|
|
||||||
|
|
||||||
void tLDataIterNextBlock(SLDataIter *pIter) {
|
void tLDataIterNextBlock(SLDataIter *pIter) {
|
||||||
int32_t step = pIter->backward ? -1 : 1;
|
int32_t step = pIter->backward ? -1 : 1;
|
||||||
pIter->iSstBlk += step;
|
pIter->iSstBlk += step;
|
||||||
|
@ -152,7 +148,7 @@ static void findNextValidRow(SLDataIter *pIter) {
|
||||||
|
|
||||||
bool hasVal = false;
|
bool hasVal = false;
|
||||||
int32_t i = pIter->iRow;
|
int32_t i = pIter->iRow;
|
||||||
SBlockData* pBlockData = getCurrentBlock(pIter);
|
SBlockData *pBlockData = getCurrentBlock(pIter);
|
||||||
|
|
||||||
for (; i < pBlockData->nRow && i >= 0; i += step) {
|
for (; i < pBlockData->nRow && i >= 0; i += step) {
|
||||||
if (pBlockData->aUid != NULL) {
|
if (pBlockData->aUid != NULL) {
|
||||||
|
@ -221,11 +217,11 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t iBlockL = pIter->iSstBlk;
|
int32_t iBlockL = pIter->iSstBlk;
|
||||||
SBlockData* pBlockData = getCurrentBlock(pIter);
|
SBlockData *pBlockData = getCurrentBlock(pIter);
|
||||||
|
|
||||||
if (pBlockData->nRow == 0 && pIter->pSstBlk != NULL) { // current block not loaded yet
|
if (pBlockData->nRow == 0 && pIter->pSstBlk != NULL) { // current block not loaded yet
|
||||||
pBlockData = getNextBlock(pIter);
|
pBlockData = getNextBlock(pIter);
|
||||||
code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, pBlockData);
|
code = tsdbReadSstBlock(pIter->pReader, pIter->iSst, pIter->pSstBlk, pBlockData);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
@ -249,7 +245,7 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
|
||||||
|
|
||||||
if (iBlockL != pIter->iSstBlk) {
|
if (iBlockL != pIter->iSstBlk) {
|
||||||
pBlockData = getNextBlock(pIter);
|
pBlockData = getNextBlock(pIter);
|
||||||
code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, pBlockData);
|
code = tsdbReadSstBlock(pIter->pReader, pIter->iSst, pIter->pSstBlk, pBlockData);
|
||||||
if (code) {
|
if (code) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
@ -306,7 +302,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
|
||||||
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
|
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
|
||||||
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
|
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
struct SLDataIter *pIterList[TSDB_DEFAULT_LAST_FILE] = {0};
|
struct SLDataIter *pIterList[TSDB_DEFAULT_SST_FILE] = {0};
|
||||||
for (int32_t i = 0; i < pFReader->pSet->nSstF; ++i) { // open all last file
|
for (int32_t i = 0; i < pFReader->pSet->nSstF; ++i) { // open all last file
|
||||||
code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, uid, pTimeWindow, pVerRange);
|
code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, uid, pTimeWindow, pVerRange);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1548,7 +1548,7 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut,
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
|
|
||||||
blockCol.offset = aBufN[0];
|
blockCol.offset = aBufN[0];
|
||||||
aBufN[0] = aBufN[0] + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM);
|
aBufN[0] = aBufN[0] + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tRealloc(&aBuf[1], hdr.szBlkCol + tPutBlockCol(NULL, &blockCol));
|
code = tRealloc(&aBuf[1], hdr.szBlkCol + tPutBlockCol(NULL, &blockCol));
|
||||||
|
@ -1556,15 +1556,8 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut,
|
||||||
hdr.szBlkCol += tPutBlockCol(aBuf[1] + hdr.szBlkCol, &blockCol);
|
hdr.szBlkCol += tPutBlockCol(aBuf[1] + hdr.szBlkCol, &blockCol);
|
||||||
}
|
}
|
||||||
|
|
||||||
aBufN[1] = 0;
|
// SBlockCol
|
||||||
if (hdr.szBlkCol > 0) {
|
aBufN[1] = hdr.szBlkCol;
|
||||||
aBufN[1] = hdr.szBlkCol + sizeof(TSCKSUM);
|
|
||||||
|
|
||||||
code = tRealloc(&aBuf[1], aBufN[1]);
|
|
||||||
if (code) goto _exit;
|
|
||||||
|
|
||||||
taosCalcChecksumAppend(0, aBuf[1], aBufN[1]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// uid + version + tskey
|
// uid + version + tskey
|
||||||
aBufN[2] = 0;
|
aBufN[2] = 0;
|
||||||
|
@ -1585,16 +1578,11 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut,
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
aBufN[2] += hdr.szKey;
|
aBufN[2] += hdr.szKey;
|
||||||
|
|
||||||
aBufN[2] += sizeof(TSCKSUM);
|
|
||||||
code = tRealloc(&aBuf[2], aBufN[2]);
|
|
||||||
if (code) goto _exit;
|
|
||||||
|
|
||||||
// hdr
|
// hdr
|
||||||
aBufN[3] = tPutDiskDataHdr(NULL, &hdr);
|
aBufN[3] = tPutDiskDataHdr(NULL, &hdr);
|
||||||
code = tRealloc(&aBuf[3], aBufN[3]);
|
code = tRealloc(&aBuf[3], aBufN[3]);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
tPutDiskDataHdr(aBuf[3], &hdr);
|
tPutDiskDataHdr(aBuf[3], &hdr);
|
||||||
taosCalcChecksumAppend(taosCalcChecksum(0, aBuf[3], aBufN[3]), aBuf[2], aBufN[2]);
|
|
||||||
|
|
||||||
// aggragate
|
// aggragate
|
||||||
if (ppOut) {
|
if (ppOut) {
|
||||||
|
@ -1626,10 +1614,6 @@ int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uin
|
||||||
|
|
||||||
// SDiskDataHdr
|
// SDiskDataHdr
|
||||||
n += tGetDiskDataHdr(pIn + n, &hdr);
|
n += tGetDiskDataHdr(pIn + n, &hdr);
|
||||||
if (!taosCheckChecksumWhole(pIn, n + hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM))) {
|
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
ASSERT(hdr.delimiter == TSDB_FILE_DLMT);
|
ASSERT(hdr.delimiter == TSDB_FILE_DLMT);
|
||||||
|
|
||||||
pBlockData->suid = hdr.suid;
|
pBlockData->suid = hdr.suid;
|
||||||
|
@ -1657,7 +1641,7 @@ int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uin
|
||||||
code = tsdbDecmprData(pIn + n, hdr.szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr.cmprAlg, (uint8_t **)&pBlockData->aTSKEY,
|
code = tsdbDecmprData(pIn + n, hdr.szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr.cmprAlg, (uint8_t **)&pBlockData->aTSKEY,
|
||||||
sizeof(TSKEY) * hdr.nRow, &aBuf[0]);
|
sizeof(TSKEY) * hdr.nRow, &aBuf[0]);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
n = n + hdr.szKey + sizeof(TSCKSUM);
|
n += hdr.szKey;
|
||||||
|
|
||||||
// loop to decode each column data
|
// loop to decode each column data
|
||||||
if (hdr.szBlkCol == 0) goto _exit;
|
if (hdr.szBlkCol == 0) goto _exit;
|
||||||
|
@ -1679,8 +1663,8 @@ int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uin
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
code = tsdbDecmprColData(pIn + n + hdr.szBlkCol + sizeof(TSCKSUM) + blockCol.offset, &blockCol, hdr.cmprAlg,
|
code = tsdbDecmprColData(pIn + n + hdr.szBlkCol + blockCol.offset, &blockCol, hdr.cmprAlg, hdr.nRow, pColData,
|
||||||
hdr.nRow, pColData, &aBuf[0]);
|
&aBuf[0]);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2062,12 +2046,6 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol
|
||||||
}
|
}
|
||||||
size += pBlockCol->szValue;
|
size += pBlockCol->szValue;
|
||||||
|
|
||||||
// checksum
|
|
||||||
size += sizeof(TSCKSUM);
|
|
||||||
code = tRealloc(ppOut, nOut + size);
|
|
||||||
if (code) goto _exit;
|
|
||||||
taosCalcChecksumAppend(0, *ppOut + nOut, size);
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2076,12 +2054,6 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in
|
||||||
uint8_t **ppBuf) {
|
uint8_t **ppBuf) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
int32_t size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM);
|
|
||||||
if (!taosCheckChecksumWhole(pIn, size)) {
|
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(pColData->cid == pBlockCol->cid);
|
ASSERT(pColData->cid == pBlockCol->cid);
|
||||||
ASSERT(pColData->type == pBlockCol->type);
|
ASSERT(pColData->type == pBlockCol->type);
|
||||||
pColData->smaOn = pBlockCol->smaOn;
|
pColData->smaOn = pBlockCol->smaOn;
|
||||||
|
@ -2153,37 +2125,3 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
// alloc
|
|
||||||
code = tRealloc(ppOut, size);
|
|
||||||
if (code) goto _exit;
|
|
||||||
|
|
||||||
// seek
|
|
||||||
int64_t n = taosLSeekFile(pFD, offset, SEEK_SET);
|
|
||||||
if (n < 0) {
|
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
// read
|
|
||||||
n = taosReadFile(pFD, *ppOut, size);
|
|
||||||
if (n < 0) {
|
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
goto _exit;
|
|
||||||
} else if (n < size) {
|
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
// check
|
|
||||||
if (toCheck && !taosCheckChecksumWhole(*ppOut, size)) {
|
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue