more work
This commit is contained in:
parent
c9a3a7a6c6
commit
555469c775
|
@ -42,7 +42,6 @@ typedef struct SMemTable SMemTable;
|
||||||
typedef struct STbDataIter STbDataIter;
|
typedef struct STbDataIter STbDataIter;
|
||||||
typedef struct SMergeInfo SMergeInfo;
|
typedef struct SMergeInfo SMergeInfo;
|
||||||
typedef struct STable STable;
|
typedef struct STable STable;
|
||||||
typedef struct SOffset SOffset;
|
|
||||||
typedef struct SMapData SMapData;
|
typedef struct SMapData SMapData;
|
||||||
typedef struct SColData SColData;
|
typedef struct SColData SColData;
|
||||||
typedef struct SColDataBlock SColDataBlock;
|
typedef struct SColDataBlock SColDataBlock;
|
||||||
|
@ -93,10 +92,11 @@ int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback);
|
||||||
// SDataFWriter
|
// SDataFWriter
|
||||||
typedef struct SDataFWriter SDataFWriter;
|
typedef struct SDataFWriter SDataFWriter;
|
||||||
|
|
||||||
int32_t tsdbDataFWriterOpen(SDataFWriter *pWriter, STsdb *pTsdb, SDFileSet *pSet);
|
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
|
||||||
int32_t tsdbDataFWriterClose(SDataFWriter *pWriter);
|
int32_t tsdbDataFWriterClose(SDataFWriter *pWriter, int8_t sync);
|
||||||
|
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf);
|
||||||
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf);
|
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf);
|
||||||
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, int64_t *rOffset, int64_t *rSize);
|
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx);
|
||||||
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SColDataBlock *pBlockData, uint8_t **ppBuf, int64_t *rOffset,
|
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SColDataBlock *pBlockData, uint8_t **ppBuf, int64_t *rOffset,
|
||||||
int64_t *rSize);
|
int64_t *rSize);
|
||||||
int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize);
|
int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize);
|
||||||
|
@ -104,7 +104,7 @@ int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *
|
||||||
// SDataFReader
|
// SDataFReader
|
||||||
typedef struct SDataFReader SDataFReader;
|
typedef struct SDataFReader SDataFReader;
|
||||||
|
|
||||||
int32_t tsdbDataFReaderOpen(SDataFReader *pReader, STsdb *pTsdb, SDFileSet *pSet);
|
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet);
|
||||||
int32_t tsdbDataFReaderClose(SDataFReader *pReader);
|
int32_t tsdbDataFReaderClose(SDataFReader *pReader);
|
||||||
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **ppBuf);
|
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **ppBuf);
|
||||||
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf);
|
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf);
|
||||||
|
@ -165,12 +165,6 @@ int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile);
|
||||||
int32_t tGetDelFileHdr(uint8_t *p, SDelFile *pDelFile);
|
int32_t tGetDelFileHdr(uint8_t *p, SDelFile *pDelFile);
|
||||||
|
|
||||||
// structs
|
// structs
|
||||||
struct SOffset {
|
|
||||||
int32_t nOffset;
|
|
||||||
uint8_t flag;
|
|
||||||
uint8_t *pOffset;
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int minFid;
|
int minFid;
|
||||||
int midFid;
|
int midFid;
|
||||||
|
|
|
@ -17,33 +17,23 @@
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STsdb *pTsdb;
|
STsdb *pTsdb;
|
||||||
uint8_t *pBuf1;
|
|
||||||
uint8_t *pBuf2;
|
|
||||||
uint8_t *pBuf3;
|
|
||||||
uint8_t *pBuf4;
|
|
||||||
uint8_t *pBuf5;
|
|
||||||
/* commit data */
|
/* commit data */
|
||||||
int32_t minutes;
|
int32_t minutes;
|
||||||
int8_t precision;
|
int8_t precision;
|
||||||
int32_t minRow;
|
int32_t minRow;
|
||||||
int32_t maxRow;
|
int32_t maxRow;
|
||||||
// commit file data
|
// --------------
|
||||||
TSKEY nextKey;
|
TSKEY nextKey;
|
||||||
int32_t commitFid;
|
int32_t commitFid;
|
||||||
TSKEY minKey;
|
TSKEY minKey;
|
||||||
TSKEY maxKey;
|
TSKEY maxKey;
|
||||||
|
// commit file data
|
||||||
SDataFReader *pReader;
|
SDataFReader *pReader;
|
||||||
SMapData oBlockIdx; // SMapData<SBlockIdx>, read from reader
|
SMapData oBlockIdx; // SMapData<SBlockIdx>, read from reader
|
||||||
|
SMapData oBlock; // SMapData<SBlock>, read from reader
|
||||||
SDataFWriter *pWriter;
|
SDataFWriter *pWriter;
|
||||||
SMapData nBlockIdx; // SMapData<SBlockIdx>, build by committer
|
SMapData nBlockIdx; // SMapData<SBlockIdx>, build by committer
|
||||||
// commit table data
|
SMapData nBlock; // SMapData<SBlock>
|
||||||
STbDataIter iter;
|
|
||||||
STbDataIter *pIter;
|
|
||||||
SBlockIdx *pBlockIdx;
|
|
||||||
SMapData oBlock;
|
|
||||||
SMapData nBlock;
|
|
||||||
SColDataBlock oColDataBlock;
|
|
||||||
SColDataBlock nColDataBlock;
|
|
||||||
/* commit del */
|
/* commit del */
|
||||||
SDelFReader *pDelFReader;
|
SDelFReader *pDelFReader;
|
||||||
SMapData oDelIdxMap; // SMapData<SDelIdx>, old
|
SMapData oDelIdxMap; // SMapData<SDelIdx>, old
|
||||||
|
@ -124,21 +114,6 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
memset(pCommitter, 0, sizeof(*pCommitter));
|
|
||||||
ASSERT(pTsdb->mem && pTsdb->imem == NULL);
|
|
||||||
// lock();
|
|
||||||
pTsdb->imem = pTsdb->mem;
|
|
||||||
pTsdb->mem = NULL;
|
|
||||||
// unlock();
|
|
||||||
|
|
||||||
pCommitter->pTsdb = pTsdb;
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
|
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STsdb *pTsdb = pCommitter->pTsdb;
|
STsdb *pTsdb = pCommitter->pTsdb;
|
||||||
|
@ -371,15 +346,192 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
|
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
// TODO
|
STbDataIter iter;
|
||||||
|
TSDBROW row;
|
||||||
|
SBlockIdx blockIdx;
|
||||||
|
|
||||||
|
// check: if no memory data and no disk data, exit
|
||||||
|
if (pTbData) {
|
||||||
|
tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, &iter);
|
||||||
|
if ((!tsdbTbDataIterGet(&iter, &row) || row.pTSRow->ts > pCommitter->maxKey) && pBlockIdx == NULL) {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// start
|
||||||
|
tMapDataReset(&pCommitter->oBlock);
|
||||||
|
tMapDataReset(&pCommitter->nBlock);
|
||||||
|
if (pBlockIdx) {
|
||||||
|
code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlock, NULL);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// impl
|
||||||
|
|
||||||
|
// end
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tsdbError("vgId:%d commit Table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter);
|
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
||||||
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter);
|
int32_t code = 0;
|
||||||
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter);
|
STsdb *pTsdb = pCommitter->pTsdb;
|
||||||
|
SDFileSet *pRSet = NULL; // TODO
|
||||||
|
SDFileSet *pWSet = NULL; // TODO
|
||||||
|
|
||||||
|
// memory
|
||||||
|
pCommitter->nextKey = TSKEY_MAX;
|
||||||
|
tMapDataReset(&pCommitter->oBlockIdx);
|
||||||
|
tMapDataReset(&pCommitter->oBlock);
|
||||||
|
tMapDataReset(&pCommitter->nBlockIdx);
|
||||||
|
tMapDataReset(&pCommitter->nBlock);
|
||||||
|
|
||||||
|
// load old
|
||||||
|
if (pRSet) {
|
||||||
|
code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
code = tsdbReadBlockIdx(pCommitter->pReader, &pCommitter->oBlockIdx, NULL);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// create new
|
||||||
|
code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, pWSet);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t c;
|
||||||
|
STsdb *pTsdb = pCommitter->pTsdb;
|
||||||
|
SMemTable *pMemTable = pTsdb->imem;
|
||||||
|
int32_t iTbData = 0;
|
||||||
|
int32_t nTbData = taosArrayGetSize(pMemTable->aTbData);
|
||||||
|
int32_t iBlockIdx = 0;
|
||||||
|
int32_t nBlockIdx = pCommitter->oBlockIdx.nItem;
|
||||||
|
STbData *pTbData;
|
||||||
|
SBlockIdx *pBlockIdx = NULL;
|
||||||
|
SBlockIdx blockIdx;
|
||||||
|
|
||||||
|
ASSERT(nTbData > 0);
|
||||||
|
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
|
||||||
|
if (iBlockIdx < nBlockIdx) {
|
||||||
|
code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, &blockIdx, tGetBlockIdx);
|
||||||
|
if (code) goto _err;
|
||||||
|
pBlockIdx = &blockIdx;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
if (pTbData == NULL && pBlockIdx == NULL) break;
|
||||||
|
|
||||||
|
if (pTbData && pBlockIdx) {
|
||||||
|
c = tTABLEIDCmprFn(pTbData, pBlockIdx);
|
||||||
|
|
||||||
|
if (c == 0) {
|
||||||
|
goto _commit_mem_and_disk_data;
|
||||||
|
} else if (c < 0) {
|
||||||
|
goto _commit_mem_data;
|
||||||
|
} else {
|
||||||
|
goto _commit_disk_data;
|
||||||
|
}
|
||||||
|
} else if (pTbData) {
|
||||||
|
goto _commit_mem_data;
|
||||||
|
} else {
|
||||||
|
goto _commit_disk_data;
|
||||||
|
}
|
||||||
|
|
||||||
|
_commit_mem_data:
|
||||||
|
code = tsdbCommitTableData(pCommitter, pTbData, NULL);
|
||||||
|
if (code) goto _err;
|
||||||
|
iTbData++;
|
||||||
|
if (iTbData < nTbData) {
|
||||||
|
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
|
||||||
|
} else {
|
||||||
|
pTbData = NULL;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
|
||||||
|
_commit_disk_data:
|
||||||
|
code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx);
|
||||||
|
if (code) goto _err;
|
||||||
|
iBlockIdx++;
|
||||||
|
if (iBlockIdx < nBlockIdx) {
|
||||||
|
code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, &blockIdx, tGetBlockIdx);
|
||||||
|
if (code) goto _err;
|
||||||
|
pBlockIdx = &blockIdx;
|
||||||
|
} else {
|
||||||
|
pBlockIdx = NULL;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
|
||||||
|
_commit_mem_and_disk_data:
|
||||||
|
code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx);
|
||||||
|
if (code) goto _err;
|
||||||
|
iTbData++;
|
||||||
|
iBlockIdx++;
|
||||||
|
if (iTbData < nTbData) {
|
||||||
|
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
|
||||||
|
} else {
|
||||||
|
pTbData = NULL;
|
||||||
|
}
|
||||||
|
if (iBlockIdx < nBlockIdx) {
|
||||||
|
code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, &blockIdx, tGetBlockIdx);
|
||||||
|
if (code) goto _err;
|
||||||
|
pBlockIdx = &blockIdx;
|
||||||
|
} else {
|
||||||
|
pBlockIdx = NULL;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tsdbError("vgId:%d commit file data impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
// write blockIdx
|
||||||
|
code = tsdbWriteBlockIdx(pCommitter->pWriter, &pCommitter->nBlockIdx, NULL);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
// update file header
|
||||||
|
code = tsdbUpdateDFileSetHeader(pCommitter->pWriter, NULL);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
// close and sync
|
||||||
|
code = tsdbDataFWriterClose(pCommitter->pWriter, 1);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
if (pCommitter->pReader) {
|
||||||
|
code = tsdbDataFReaderClose(pCommitter->pReader);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tsdbError("vgId:%d commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
|
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -408,227 +560,19 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
// ----------------------------------------------------------------------------
|
||||||
int32_t code = 0;
|
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
|
||||||
STsdb *pTsdb = pCommitter->pTsdb;
|
|
||||||
SDFileSet *pRSet = NULL; // TODO
|
|
||||||
SDFileSet *pWSet = NULL; // TODO
|
|
||||||
|
|
||||||
// memory
|
|
||||||
tMapDataReset(&pCommitter->oBlockIdx);
|
|
||||||
tMapDataReset(&pCommitter->nBlockIdx);
|
|
||||||
|
|
||||||
// load old
|
|
||||||
if (pRSet) {
|
|
||||||
code = tsdbDFileSetReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
code = tsdbReadBlockIdx(pCommitter->pReader, &pCommitter->oBlockIdx);
|
|
||||||
if (code) goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// create new
|
|
||||||
code = tsdbDFileSetWriterOpen(&pCommitter->pWriter, pTsdb, pWSet);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitTableData(SCommitter *pCommitter);
|
|
||||||
|
|
||||||
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
|
|
||||||
int32_t code = 0;
|
|
||||||
STsdb *pTsdb = pCommitter->pTsdb;
|
|
||||||
SMemTable *pMemTable = pTsdb->imem;
|
|
||||||
int32_t iTbData = 0;
|
|
||||||
int32_t nTbData = taosArrayGetSize(pMemTable->aTbData);
|
|
||||||
int32_t iBlockIdx = 0;
|
|
||||||
int32_t nBlockIdx = pCommitter->oBlockIdx.nItem;
|
|
||||||
STbData *pTbData;
|
|
||||||
SBlockIdx *pBlockIdx;
|
|
||||||
SBlockIdx blockIdx;
|
|
||||||
int32_t c;
|
|
||||||
|
|
||||||
while (iTbData < nTbData || iBlockIdx < nBlockIdx) {
|
|
||||||
pTbData = NULL;
|
|
||||||
pBlockIdx = NULL;
|
|
||||||
if (iTbData < nTbData) {
|
|
||||||
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
|
|
||||||
}
|
|
||||||
if (iBlockIdx < nBlockIdx) {
|
|
||||||
tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, &blockIdx, NULL /* TODO */);
|
|
||||||
pBlockIdx = &blockIdx;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTbData && pBlockIdx) {
|
|
||||||
c = tTABLEIDCmprFn(pTbData, pBlockIdx);
|
|
||||||
|
|
||||||
if (c == 0) {
|
|
||||||
iTbData++;
|
|
||||||
iBlockIdx++;
|
|
||||||
} else if (c < 0) {
|
|
||||||
iTbData++;
|
|
||||||
pBlockIdx = NULL;
|
|
||||||
} else {
|
|
||||||
iBlockIdx++;
|
|
||||||
pTbData = NULL;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (pTbData) {
|
|
||||||
iBlockIdx++;
|
|
||||||
}
|
|
||||||
if (pBlockIdx) {
|
|
||||||
iTbData++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTbData &&
|
|
||||||
!tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, &pCommitter->iter)) {
|
|
||||||
pTbData = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTbData == NULL && pBlockIdx == NULL) continue;
|
|
||||||
|
|
||||||
pCommitter->pTbData = pTbData;
|
|
||||||
pCommitter->pBlockIdx = pBlockIdx;
|
|
||||||
|
|
||||||
code = tsdbCommitTableData(pCommitter);
|
|
||||||
if (code) goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
code = tsdbWriteBlockIdx(pCommitter->pWriter, pCommitter->nBlockIdx, NULL);
|
memset(pCommitter, 0, sizeof(*pCommitter));
|
||||||
if (code) goto _err;
|
ASSERT(pTsdb->mem && pTsdb->imem == NULL);
|
||||||
|
// lock();
|
||||||
|
pTsdb->imem = pTsdb->mem;
|
||||||
|
pTsdb->mem = NULL;
|
||||||
|
// unlock();
|
||||||
|
|
||||||
code = tsdbUpdateDFileSetHeader(pCommitter->pWriter, NULL);
|
pCommitter->pTsdb = pTsdb;
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
code = tsdbDFileSetWriterClose(pCommitter->pWriter, 1);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
if (pCommitter->pReader) {
|
|
||||||
code = tsdbDFileSetReaderClose(pCommitter->pReader);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitTableDataStart(SCommitter *pCommitter);
|
|
||||||
static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter);
|
|
||||||
static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter);
|
|
||||||
|
|
||||||
static int32_t tsdbCommitTableData(SCommitter *pCommitter) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
// start
|
|
||||||
code = tsdbCommitTableDataStart(pCommitter);
|
|
||||||
if (code) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// impl
|
|
||||||
code = tsdbCommitTableDataImpl(pCommitter);
|
|
||||||
if (code) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// end
|
|
||||||
code = tsdbCommitTableDataEnd(pCommitter);
|
|
||||||
if (code) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitTableDataStart(SCommitter *pCommitter) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
// old
|
|
||||||
tMapDataReset(&pCommitter->oBlock);
|
|
||||||
if (pCommitter->pBlockIdx) {
|
|
||||||
code = tsdbReadBlock(pCommitter->pReader, &pCommitter->oBlock, NULL);
|
|
||||||
if (code) goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// new
|
|
||||||
tMapDataReset(&pCommitter->nBlock);
|
|
||||||
|
|
||||||
_err:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter) {
|
|
||||||
int32_t code = 0;
|
|
||||||
STsdb *pTsdb = pCommitter->pTsdb;
|
|
||||||
STbDataIter *pIter = NULL;
|
|
||||||
int32_t iBlock = 0;
|
|
||||||
int32_t nBlock = pCommitter->nBlock.nItem;
|
|
||||||
SBlock *pBlock;
|
|
||||||
SBlock block;
|
|
||||||
TSDBROW *pRow;
|
|
||||||
TSDBROW row;
|
|
||||||
int32_t iRow = 0;
|
|
||||||
STSchema *pTSchema = NULL;
|
|
||||||
|
|
||||||
if (pCommitter->pTbData) {
|
|
||||||
code = tsdbTbDataIterCreate(pCommitter->pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, &pIter);
|
|
||||||
if (code) goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (iBlock < nBlock) {
|
|
||||||
pBlock = █
|
|
||||||
} else {
|
|
||||||
pBlock = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbTbDataIterGet(pIter, pRow);
|
|
||||||
|
|
||||||
// loop to merge memory data and disk data
|
|
||||||
for (; pBlock == NULL || (pRow && pRow->pTSRow->ts <= pCommitter->maxKey);) {
|
|
||||||
if (pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) {
|
|
||||||
// only has block data, then move to new index file
|
|
||||||
} else if (0) {
|
|
||||||
// only commit memory data
|
|
||||||
} else {
|
|
||||||
// merge memory and block data
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbTbDataIterDestroy(pIter);
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
tsdbError("vgId:%d commit table data impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
|
||||||
tsdbTbDataIterDestroy(pIter);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter) {
|
|
||||||
int32_t code = 0;
|
|
||||||
// TODO
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -702,3 +646,9 @@ static int32_t tsdbCommitCache(SCommitter *pCommitter) {
|
||||||
// TODO
|
// TODO
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
|
@ -18,64 +18,6 @@
|
||||||
#define TSDB_FHDR_SIZE 512
|
#define TSDB_FHDR_SIZE 512
|
||||||
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
|
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
|
||||||
|
|
||||||
// SDFileSetWritter ====================================================
|
|
||||||
struct SDFileSetWritter {
|
|
||||||
STsdb *pTsdb;
|
|
||||||
int32_t szBuf1;
|
|
||||||
uint8_t *pBuf1;
|
|
||||||
int32_t szBuf2;
|
|
||||||
uint8_t *pBuf2;
|
|
||||||
};
|
|
||||||
|
|
||||||
// SDFileSetReader ====================================================
|
|
||||||
struct SDFileSetReader {
|
|
||||||
STsdb *pTsdb;
|
|
||||||
int32_t szBuf1;
|
|
||||||
uint8_t *pBuf1;
|
|
||||||
int32_t szBuf2;
|
|
||||||
uint8_t *pBuf2;
|
|
||||||
};
|
|
||||||
|
|
||||||
int32_t tsdbDFileSetReaderOpen(SDFileSetReader *pReader, STsdb *pTsdb, SDFileSet *pSet) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
memset(pReader, 0, sizeof(*pReader));
|
|
||||||
pReader->pTsdb = pTsdb;
|
|
||||||
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
tsdbError("vgId:%d failed to open SDFileSetReader since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbDFileSetReaderClose(SDFileSetReader *pReader) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
taosMemoryFreeClear(pReader->pBuf1);
|
|
||||||
taosMemoryFreeClear(pReader->pBuf2);
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbLoadSBlockIdx(SDFileSetReader *pReader, SArray *pArray) {
|
|
||||||
int32_t code = 0;
|
|
||||||
// TODO
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbLoadSBlockInfo(SDFileSetReader *pReader, SBlockIdx *pBlockIdx, SBlockInfo *pBlockInfo) {
|
|
||||||
int32_t code = 0;
|
|
||||||
// TODO
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbLoadSBlockStatis(SDFileSetReader *pReader, SBlock *pBlock, SBlockStatis *pBlockStatis) {
|
|
||||||
int32_t code = 0;
|
|
||||||
// TODO
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
// SDelFWriter ====================================================
|
// SDelFWriter ====================================================
|
||||||
struct SDelFWriter {
|
struct SDelFWriter {
|
||||||
STsdb *pTsdb;
|
STsdb *pTsdb;
|
||||||
|
@ -440,3 +382,96 @@ _err:
|
||||||
tsdbError("vgId:%d read del idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d read del idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SDataFReader ====================================================
|
||||||
|
struct SDataFReader {
|
||||||
|
STsdb *pTsdb;
|
||||||
|
SDFileSet *pSet;
|
||||||
|
TdFilePtr pReadH;
|
||||||
|
};
|
||||||
|
|
||||||
|
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbDataFReaderClose(SDataFReader *pReader) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **ppBuf) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlock *pBlock, SColDataBlock *pBlockData, uint8_t **ppBuf) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// SDataFWriter ====================================================
|
||||||
|
struct SDataFWriter {
|
||||||
|
STsdb *pTsdb;
|
||||||
|
SDFileSet *pSet;
|
||||||
|
TdFilePtr pWriteH;
|
||||||
|
};
|
||||||
|
|
||||||
|
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbDataFWriterClose(SDataFWriter *pWriter, int8_t sync) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SColDataBlock *pBlockData, uint8_t **ppBuf, int64_t *rOffset,
|
||||||
|
int64_t *rSize) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
|
@ -15,121 +15,10 @@
|
||||||
|
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
// SOffset =======================================================================
|
|
||||||
#define TSDB_OFFSET_I32 ((uint8_t)0)
|
#define TSDB_OFFSET_I32 ((uint8_t)0)
|
||||||
#define TSDB_OFFSET_I16 ((uint8_t)1)
|
#define TSDB_OFFSET_I16 ((uint8_t)1)
|
||||||
#define TSDB_OFFSET_I8 ((uint8_t)2)
|
#define TSDB_OFFSET_I8 ((uint8_t)2)
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tsdbOffsetSize(SOffset *pOfst) {
|
|
||||||
switch (pOfst->flag) {
|
|
||||||
case TSDB_OFFSET_I32:
|
|
||||||
return sizeof(int32_t);
|
|
||||||
case TSDB_OFFSET_I16:
|
|
||||||
return sizeof(int16_t);
|
|
||||||
case TSDB_OFFSET_I8:
|
|
||||||
return sizeof(int8_t);
|
|
||||||
default:
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tsdbGetOffset(SOffset *pOfst, int32_t idx) {
|
|
||||||
int32_t offset = -1;
|
|
||||||
|
|
||||||
if (idx >= 0 && idx < pOfst->nOffset) {
|
|
||||||
switch (pOfst->flag) {
|
|
||||||
case TSDB_OFFSET_I32:
|
|
||||||
offset = ((int32_t *)pOfst->pOffset)[idx];
|
|
||||||
break;
|
|
||||||
case TSDB_OFFSET_I16:
|
|
||||||
offset = ((int16_t *)pOfst->pOffset)[idx];
|
|
||||||
break;
|
|
||||||
case TSDB_OFFSET_I8:
|
|
||||||
offset = ((int8_t *)pOfst->pOffset)[idx];
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(offset >= 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
return offset;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tsdbAddOffset(SOffset *pOfst, int32_t offset) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t nOffset = pOfst->nOffset;
|
|
||||||
|
|
||||||
ASSERT(pOfst->flag == TSDB_OFFSET_I32);
|
|
||||||
ASSERT(offset >= 0);
|
|
||||||
|
|
||||||
pOfst->nOffset++;
|
|
||||||
|
|
||||||
// alloc
|
|
||||||
code = tsdbRealloc(&pOfst->pOffset, sizeof(int32_t) * pOfst->nOffset);
|
|
||||||
if (code) goto _exit;
|
|
||||||
|
|
||||||
// put
|
|
||||||
((int32_t *)pOfst->pOffset)[nOffset] = offset;
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tPutOffset(uint8_t *p, SOffset *pOfst) {
|
|
||||||
int32_t n = 0;
|
|
||||||
int32_t maxOffset;
|
|
||||||
|
|
||||||
ASSERT(pOfst->flag == TSDB_OFFSET_I32);
|
|
||||||
ASSERT(pOfst->nOffset > 0);
|
|
||||||
|
|
||||||
maxOffset = tsdbGetOffset(pOfst, pOfst->nOffset - 1);
|
|
||||||
|
|
||||||
n += tPutI32v(p ? p + n : p, pOfst->nOffset);
|
|
||||||
if (maxOffset <= INT8_MAX) {
|
|
||||||
n += tPutU8(p ? p + n : p, TSDB_OFFSET_I8);
|
|
||||||
for (int32_t iOffset = 0; iOffset < pOfst->nOffset; iOffset++) {
|
|
||||||
n += tPutI8(p ? p + n : p, (int8_t)tsdbGetOffset(pOfst, iOffset));
|
|
||||||
}
|
|
||||||
} else if (maxOffset <= INT16_MAX) {
|
|
||||||
n += tPutU8(p ? p + n : p, TSDB_OFFSET_I16);
|
|
||||||
for (int32_t iOffset = 0; iOffset < pOfst->nOffset; iOffset++) {
|
|
||||||
n += tPutI16(p ? p + n : p, (int16_t)tsdbGetOffset(pOfst, iOffset));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
n += tPutU8(p ? p + n : p, TSDB_OFFSET_I32);
|
|
||||||
for (int32_t iOffset = 0; iOffset < pOfst->nOffset; iOffset++) {
|
|
||||||
n += tPutI32(p ? p + n : p, (int32_t)tsdbGetOffset(pOfst, iOffset));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return n;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tGetOffset(uint8_t *p, SOffset *pOfst) {
|
|
||||||
int32_t n = 0;
|
|
||||||
|
|
||||||
n += tGetI32v(p + n, &pOfst->nOffset);
|
|
||||||
n += tGetU8(p + n, &pOfst->flag);
|
|
||||||
pOfst->pOffset = p + n;
|
|
||||||
switch (pOfst->flag) {
|
|
||||||
case TSDB_OFFSET_I32:
|
|
||||||
n = n + pOfst->nOffset + sizeof(int32_t);
|
|
||||||
break;
|
|
||||||
case TSDB_OFFSET_I16:
|
|
||||||
n = n + pOfst->nOffset + sizeof(int16_t);
|
|
||||||
break;
|
|
||||||
case TSDB_OFFSET_I8:
|
|
||||||
n = n + pOfst->nOffset + sizeof(int8_t);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
return n;
|
|
||||||
}
|
|
||||||
|
|
||||||
// SMapData =======================================================================
|
// SMapData =======================================================================
|
||||||
void tMapDataReset(SMapData *pMapData) {
|
void tMapDataReset(SMapData *pMapData) {
|
||||||
pMapData->flag = TSDB_OFFSET_I32;
|
pMapData->flag = TSDB_OFFSET_I32;
|
||||||
|
@ -299,6 +188,7 @@ void tsdbFree(uint8_t *pBuf) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TABLEID =======================================================================
|
||||||
int32_t tTABLEIDCmprFn(const void *p1, const void *p2) {
|
int32_t tTABLEIDCmprFn(const void *p1, const void *p2) {
|
||||||
TABLEID *pId1 = (TABLEID *)p1;
|
TABLEID *pId1 = (TABLEID *)p1;
|
||||||
TABLEID *pId2 = (TABLEID *)p2;
|
TABLEID *pId2 = (TABLEID *)p2;
|
||||||
|
@ -318,6 +208,7 @@ int32_t tTABLEIDCmprFn(const void *p1, const void *p2) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TSDBKEY =======================================================================
|
||||||
int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
|
int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
|
||||||
TSDBKEY *pKey1 = (TSDBKEY *)p1;
|
TSDBKEY *pKey1 = (TSDBKEY *)p1;
|
||||||
TSDBKEY *pKey2 = (TSDBKEY *)p2;
|
TSDBKEY *pKey2 = (TSDBKEY *)p2;
|
||||||
|
|
Loading…
Reference in New Issue