more work
This commit is contained in:
parent
75816b7e7a
commit
39759c6004
|
@ -31,6 +31,9 @@ extern "C" {
|
||||||
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
|
#define TSDB_MAX_SUBBLOCKS 8
|
||||||
|
|
||||||
typedef struct TSDBROW TSDBROW;
|
typedef struct TSDBROW TSDBROW;
|
||||||
typedef struct TSDBKEY TSDBKEY;
|
typedef struct TSDBKEY TSDBKEY;
|
||||||
typedef struct TABLEID TABLEID;
|
typedef struct TABLEID TABLEID;
|
||||||
|
@ -97,8 +100,8 @@ int32_t tsdbDataFWriterClose(SDataFWriter *pWriter, int8_t sync);
|
||||||
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf);
|
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf);
|
||||||
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf);
|
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf);
|
||||||
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx);
|
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx);
|
||||||
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SColDataBlock *pBlockData, uint8_t **ppBuf, int64_t *rOffset,
|
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf, SBlockIdx *pBlockIdx,
|
||||||
int64_t *rSize);
|
SBlock *pBlock);
|
||||||
int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize);
|
int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize);
|
||||||
|
|
||||||
// SDataFReader
|
// SDataFReader
|
||||||
|
@ -153,10 +156,16 @@ int32_t tPutBlockIdx(uint8_t *p, void *ph);
|
||||||
int32_t tGetBlockIdx(uint8_t *p, void *ph);
|
int32_t tGetBlockIdx(uint8_t *p, void *ph);
|
||||||
|
|
||||||
// SBlock
|
// SBlock
|
||||||
|
#define BLOCK_INIT_VAL ((SBlock){})
|
||||||
|
|
||||||
int32_t tPutBlock(uint8_t *p, void *ph);
|
int32_t tPutBlock(uint8_t *p, void *ph);
|
||||||
int32_t tGetBlock(uint8_t *p, void *ph);
|
int32_t tGetBlock(uint8_t *p, void *ph);
|
||||||
int32_t tBlockCmprFn(const void *p1, const void *p2);
|
int32_t tBlockCmprFn(const void *p1, const void *p2);
|
||||||
|
|
||||||
|
// SBlockData
|
||||||
|
void tsdbBlockDataReset(SBlockData *pBlockData);
|
||||||
|
int32_t tsdbBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
|
|
||||||
// SDelIdx
|
// SDelIdx
|
||||||
int32_t tPutDelIdx(uint8_t *p, void *ph);
|
int32_t tPutDelIdx(uint8_t *p, void *ph);
|
||||||
int32_t tGetDelIdx(uint8_t *p, void *ph);
|
int32_t tGetDelIdx(uint8_t *p, void *ph);
|
||||||
|
@ -293,14 +302,21 @@ struct SBlockInfo {
|
||||||
int64_t smaSize;
|
int64_t smaSize;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t offset;
|
||||||
|
int64_t size;
|
||||||
|
} SSubBlock;
|
||||||
|
|
||||||
struct SBlock {
|
struct SBlock {
|
||||||
TSDBKEY minKey;
|
TSDBKEY minKey;
|
||||||
TSDBKEY maxKey;
|
TSDBKEY maxKey;
|
||||||
int64_t minVersion;
|
int64_t minVersion;
|
||||||
int64_t maxVersion;
|
int64_t maxVersion;
|
||||||
int32_t nRows;
|
int32_t nRows;
|
||||||
int8_t nBlockInfo;
|
int8_t last;
|
||||||
SBlockInfo blockInfos[];
|
int8_t hasDup;
|
||||||
|
int8_t nSubBlock;
|
||||||
|
SSubBlock sBlocks[TSDB_MAX_SUBBLOCKS];
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SBlockCol {
|
struct SBlockCol {
|
||||||
|
@ -322,9 +338,7 @@ struct SAggrBlkCol {
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SBlockData {
|
struct SBlockData {
|
||||||
int32_t delimiter; // For recovery usage
|
int32_t nRow;
|
||||||
int32_t numOfCols; // For recovery usage
|
|
||||||
uint64_t uid; // For recovery usage
|
|
||||||
SBlockCol cols[];
|
SBlockCol cols[];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -348,9 +348,60 @@ _err:
|
||||||
|
|
||||||
#define ROW_END(pRow, maxKey) (((pRow) == NULL) || ((pRow)->pTSRow->ts > (maxKey)))
|
#define ROW_END(pRow, maxKey) (((pRow) == NULL) || ((pRow)->pTSRow->ts > (maxKey)))
|
||||||
|
|
||||||
static int32_t tsdbMergeCommit(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
|
static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
// TODO
|
TSDBROW *pRow;
|
||||||
|
SBlock block = BLOCK_INIT_VAL;
|
||||||
|
SBlockData bData;
|
||||||
|
|
||||||
|
if (pBlock == NULL) {
|
||||||
|
while (true) {
|
||||||
|
pRow = tsdbTbDataIterGet(pIter);
|
||||||
|
|
||||||
|
if (pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) {
|
||||||
|
if (bData.nRow == 0) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
goto _write_block_data;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tsdbBlockDataAppendRow(&bData, pRow, NULL /*TODO*/);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
if (bData.nRow >= pCommitter->maxRow * 4 / 5) {
|
||||||
|
goto _write_block_data;
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
_write_block_data:
|
||||||
|
block.last = (bData.nRow > pCommitter->minRow) ? 0 : 1;
|
||||||
|
code = tsdbWriteBlockData(pCommitter->pWriter, &bData, NULL, pBlockIdx, &block);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
code = tMapDataPutItem(&pCommitter->nBlock, &block, tPutBlock);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
// reset block and bdata
|
||||||
|
block = BLOCK_INIT_VAL;
|
||||||
|
tsdbBlockDataReset(&bData);
|
||||||
|
}
|
||||||
|
} else if (pBlock->last) {
|
||||||
|
// 1. read last block data
|
||||||
|
// 2. loop to merge memory data and last block data to write to .data file or .last file
|
||||||
|
} else {
|
||||||
|
// while (true) {
|
||||||
|
// pRow = tsdbTbDataIterGet(pIter);
|
||||||
|
|
||||||
|
// if (pRow == NULL) /* code */
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tsdbError("vgId:%d merge commit failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -390,7 +441,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
|
||||||
pRow = tsdbTbDataIterGet(pIter);
|
pRow = tsdbTbDataIterGet(pIter);
|
||||||
while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlock) {
|
while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlock) {
|
||||||
tMapDataGetItemByIdx(&pCommitter->oBlock, iBlock, pBlock, tGetBlock);
|
tMapDataGetItemByIdx(&pCommitter->oBlock, iBlock, pBlock, tGetBlock);
|
||||||
code = tsdbMergeCommit(pCommitter, pIter, pBlock);
|
code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, pBlock);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
pRow = tsdbTbDataIterGet(pIter);
|
pRow = tsdbTbDataIterGet(pIter);
|
||||||
|
@ -400,7 +451,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
|
||||||
// mem
|
// mem
|
||||||
pRow = tsdbTbDataIterGet(pIter);
|
pRow = tsdbTbDataIterGet(pIter);
|
||||||
while (!ROW_END(pRow, pCommitter->maxKey)) {
|
while (!ROW_END(pRow, pCommitter->maxKey)) {
|
||||||
code = tsdbMergeCommit(pCommitter, pIter, NULL);
|
code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, NULL);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
pRow = tsdbTbDataIterGet(pIter);
|
pRow = tsdbTbDataIterGet(pIter);
|
||||||
|
@ -410,7 +461,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
|
||||||
while (iBlock < nBlock) {
|
while (iBlock < nBlock) {
|
||||||
tMapDataGetItemByIdx(&pCommitter->oBlock, iBlock, pBlock, tGetBlock);
|
tMapDataGetItemByIdx(&pCommitter->oBlock, iBlock, pBlock, tGetBlock);
|
||||||
|
|
||||||
code = tsdbMergeCommit(pCommitter, NULL, pBlock);
|
code = tsdbMergeCommit(pCommitter, &blockIdx, NULL, pBlock);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
iBlock++;
|
iBlock++;
|
||||||
|
|
|
@ -720,8 +720,8 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SColDataBlock *pBlockData, uint8_t **ppBuf, int64_t *rOffset,
|
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf, SBlockIdx *pBlockIdx,
|
||||||
int64_t *rSize) {
|
SBlock *pBlock) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
// TODO
|
// TODO
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -562,5 +562,12 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
|
||||||
taosArrayDestroy(aSkyline2);
|
taosArrayDestroy(aSkyline2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// SBlockData ======================================================
|
||||||
|
int32_t tsdbBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
Loading…
Reference in New Issue