finish tsdb snapshot

This commit is contained in:
Hongze Cheng 2022-07-10 08:47:28 +00:00
parent 53a13db814
commit 8c45b028c1
1 changed files with 108 additions and 48 deletions

View File

@ -312,6 +312,9 @@ struct STsdbSnapWriter {
// config
int32_t minutes;
int8_t precision;
int32_t minRow;
int32_t maxRow;
int8_t cmprAlg;
// for data file
int32_t fid;
@ -321,14 +324,18 @@ struct STsdbSnapWriter {
SBlockIdx* pBlockIdx;
SMapData mBlock;
int32_t iBlock;
SBlock* pBlock;
SBlock block;
SBlockData blockData;
int32_t iRow;
SDataFWriter* pDataFWriter;
SArray* aBlockIdxN;
SBlockIdx* pBlockIdxN;
SBlockIdx blockIdx;
SMapData mBlockN;
SBlock block;
SBlock* pBlockN;
SBlock blockN;
SBlockData nBlockData;
// for del file
@ -394,19 +401,114 @@ _err:
return code;
}
static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWrite) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
TABLEID id = {0}; // TODO
// skip
while (pWriter->pBlockIdx && tTABLEIDCmprFn(&id, pWriter->pBlockIdx) < 0) {
code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err;
pWriter->iBlockIdx++;
if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
pWriter->pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx);
} else {
pWriter->pBlockIdx = NULL;
}
}
// new or merge
if (pWriter->pBlockIdx == NULL || tTABLEIDCmprFn(&id, pWriter->pBlockIdx) < 0) {
int32_t c;
if (pWriter->pBlockIdxN && ((c = tTABLEIDCmprFn(&id, pWriter->pBlockIdxN)) != 0)) {
ASSERT(c > 0);
code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err;
}
if (pWriter->pBlockIdxN == NULL) {
pWriter->pBlockIdx = &pWriter->blockIdx;
pWriter->pBlockIdx->suid = id.suid;
pWriter->pBlockIdx->uid = id.uid;
}
// loop to write the data
TSDBROW* pRow = NULL; // todo
int32_t nRow = 0; // todo
SBlockData* pBlockData = NULL; // todo
for (int32_t iRow = 0; iRow < nRow; iRow++) {
code = tBlockDataAppendRow(&pWriter->nBlockData, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
if (code) goto _err;
if (pWriter->nBlockData.nRow > pWriter->maxRow * 4 / 5) {
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->nBlockData, NULL, NULL, pWriter->pBlockIdxN,
pWriter->pBlockN, pWriter->cmprAlg);
if (code) goto _err;
}
}
} else {
// skip
while (true) {
if (pWriter->pBlock == NULL) break;
if (pWriter->pBlock->last) break;
if (tBlockCmprFn(&(SBlock){.minKey = {0}, .maxKey = {0}}, pWriter->pBlock) >= 0) break;
code = tMapDataPutItem(&pWriter->mBlockN, pWriter->pBlock, tPutBlock);
if (code) goto _err;
}
if (pWriter->pBlock) {
if (pWriter->pBlock->last) {
// load the last block and merge with the data (todo)
} else {
int32_t c = tBlockCmprFn(&(SBlock){/*TODO*/}, pWriter->pBlock);
if (c > 0) {
// commit until pWriter->pBlock (todo)
} else {
// load the block and merge with the data (todo)
}
}
} else {
int32_t nRow = 0;
SBlockData* pBlockData = NULL;
for (int32_t iRow = 0; iRow < nRow; iRow++) {
code = tBlockDataAppendRow(&pWriter->nBlockData, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
if (code) goto _err;
if (pWriter->nBlockData.nRow >= pWriter->maxRow * 4 / 5) {
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->nBlockData, NULL, NULL, pWriter->pBlockIdxN,
pWriter->pBlockN, pWriter->cmprAlg);
if (code) goto _err;
tBlockDataClearData(&pWriter->nBlockData);
}
}
}
}
return code;
_err:
tsdbError("vgId:%d tsdb snapshot write table data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb;
int64_t suid = 0; // todo
int64_t uid = 0; // todo
int64_t skey; // todo
int64_t ekey; // todo
int64_t skey; // todo
int64_t ekey; // todo
int32_t fid = tsdbKeyFid(skey, pWriter->minutes, pWriter->precision);
ASSERT(fid == tsdbKeyFid(ekey, pWriter->minutes, pWriter->precision));
@ -449,48 +551,6 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
code = tsdbSnapWriteTableData(pWriter, pData, nData);
if (code) goto _err;
// // process
// TABLEID id = {0}; // TODO
// TSKEY minKey = 0; // TODO
// TSKEY maxKey = 0; // TODO
// while (true) {
// if (pWriter->pBlockIdx) {
// int32_t c = tTABLEIDCmprFn(&id, pWriter->pBlockIdx);
// if (c == 0) {
// } else if (c < 0) {
// // keep merge
// } else {
// // code = tsdbSnapWriteTableDataEnd(pWriter);
// if (code) goto _err;
// pWriter->iBlockIdx++;
// if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
// pWriter->pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx);
// } else {
// pWriter->pBlockIdx = NULL;
// }
// if (pWriter->pBlockIdx) {
// code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock, NULL);
// if (code) goto _err;
// }
// }
// } else {
// int32_t c = tTABLEIDCmprFn(&id, &pWriter->blockIdx);
// if (c == 0) {
// // merge commit the block data
// } else if (c > 0) {
// // code = tsdbSnapWriteTableDataEnd(pWriter);
// if (code) goto _err;
// } else {
// ASSERT(0);
// }
// }
// }
return code;
_err: