more vnode snapshot writer

This commit is contained in:
Hongze Cheng 2022-07-13 09:55:55 +00:00
parent dc599896fb
commit a32c147b3b
2 changed files with 115 additions and 65 deletions

View File

@ -284,16 +284,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
.fLast = {.commitID = pCommitter->commitID, .size = 0},
.fSma = pRSet->fSma};
} else {
STfs *pTfs = pTsdb->pVnode->pTfs;
SDiskID did = {.level = 0, .id = 0};
// TODO: alloc a new disk
// tfsAllocDisk(pTfs, 0, &did);
// create the directory
tfsMkdirRecurAt(pTfs, pTsdb->path, did);
wSet = (SDFileSet){.diskId = did,
wSet = (SDFileSet){.diskId = (SDiskID){.level = 0, .id = 0},
.fid = pCommitter->commitFid,
.fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
.fData = {.commitID = pCommitter->commitID, .size = 0},

View File

@ -365,36 +365,39 @@ struct STsdbSnapWriter {
int32_t minRow;
int32_t maxRow;
int8_t cmprAlg;
int64_t commitID;
// for data file
SBlockData bData;
int32_t fid;
SDataFReader* pDataFReader;
SArray* aBlockIdx;
SArray* aBlockIdx; // SArray<SBlockIdx>
int32_t iBlockIdx;
SBlockIdx* pBlockIdx;
SMapData mBlock;
SMapData mBlock; // SMapData<SBlock>
int32_t iBlock;
SBlock* pBlock;
SBlock block;
SBlockData blockData;
SBlockData bDataR;
int32_t iRow;
SDataFWriter* pDataFWriter;
SArray* aBlockIdxN;
SBlockIdx* pBlockIdxN;
SArray* aBlockIdxW; // SArray<SBlockIdx>
SBlockIdx* pBlockIdxW;
SBlockIdx blockIdx;
SMapData mBlockN;
SBlock* pBlockN;
SBlock blockN;
SBlockData nBlockData;
SMapData mBlockW; // SMapData<SBlock>
SBlock* pBlockW;
SBlock blockW;
SBlockData bDataW;
// for del file
SDelFReader* pDelFReader;
SDelFWriter* pDelFWriter;
int32_t iDelIdx;
SArray* aDelIdx;
SArray* aDelIdxR;
SArray* aDelData;
SArray* aDelIdxN;
SArray* aDelIdxW;
};
static int32_t tsdbSnapRollback(STsdbSnapWriter* pWriter) {
@ -440,7 +443,7 @@ static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData,
SBlockData* pBlockData = NULL; // todo
while (iRow < nRow) {
code = tBlockDataAppendRow(&pWriter->nBlockData, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
if (code) goto _err;
}
@ -478,14 +481,14 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, uint8_t* pData,
if (pWriter->pBlockIdx == NULL || tTABLEIDCmprFn(&id, pWriter->pBlockIdx) < 0) {
int32_t c;
if (pWriter->pBlockIdxN && ((c = tTABLEIDCmprFn(&id, pWriter->pBlockIdxN)) != 0)) {
if (pWriter->pBlockIdxW && ((c = tTABLEIDCmprFn(&id, pWriter->pBlockIdxW)) != 0)) {
ASSERT(c > 0);
code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err;
}
if (pWriter->pBlockIdxN == NULL) {
if (pWriter->pBlockIdxW == NULL) {
pWriter->pBlockIdx = &pWriter->blockIdx;
pWriter->pBlockIdx->suid = id.suid;
pWriter->pBlockIdx->uid = id.uid;
@ -496,12 +499,12 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, uint8_t* pData,
int32_t nRow = 0; // todo
SBlockData* pBlockData = NULL; // todo
for (int32_t iRow = 0; iRow < nRow; iRow++) {
code = tBlockDataAppendRow(&pWriter->nBlockData, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
if (code) goto _err;
if (pWriter->nBlockData.nRow > pWriter->maxRow * 4 / 5) {
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->nBlockData, NULL, NULL, pWriter->pBlockIdxN,
pWriter->pBlockN, pWriter->cmprAlg);
if (pWriter->bDataW.nRow > pWriter->maxRow * 4 / 5) {
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
pWriter->pBlockW, pWriter->cmprAlg);
if (code) goto _err;
}
}
@ -512,7 +515,7 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, uint8_t* pData,
if (pWriter->pBlock->last) break;
if (tBlockCmprFn(&(SBlock){.minKey = {0}, .maxKey = {0}}, pWriter->pBlock) >= 0) break;
code = tMapDataPutItem(&pWriter->mBlockN, pWriter->pBlock, tPutBlock);
code = tMapDataPutItem(&pWriter->mBlockW, pWriter->pBlock, tPutBlock);
if (code) goto _err;
}
@ -533,15 +536,15 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, uint8_t* pData,
SBlockData* pBlockData = NULL;
for (int32_t iRow = 0; iRow < nRow; iRow++) {
code = tBlockDataAppendRow(&pWriter->nBlockData, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
if (code) goto _err;
if (pWriter->nBlockData.nRow >= pWriter->maxRow * 4 / 5) {
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->nBlockData, NULL, NULL, pWriter->pBlockIdxN,
pWriter->pBlockN, pWriter->cmprAlg);
if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) {
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
pWriter->pBlockW, pWriter->cmprAlg);
if (code) goto _err;
tBlockDataClearData(&pWriter->nBlockData);
tBlockDataClearData(&pWriter->bDataW);
}
}
}
@ -560,60 +563,77 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
TABLEID id = *(TABLEID*)(&pHdr[1]);
int64_t n;
SBlockData bData = {0};
SBlockData* pBlockData = &bData;
// decode
code = tBlockDataInit(pBlockData);
if (code) goto _err;
SBlockData* pBlockData = &pWriter->bData;
n = tGetBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pBlockData);
ASSERT(n + sizeof(SSnapDataHdr) + sizeof(TABLEID) == nData);
#if 0
int32_t fid = tsdbKeyFid(skey, pWriter->minutes, pWriter->precision);
ASSERT(fid == tsdbKeyFid(ekey, pWriter->minutes, pWriter->precision));
// open file
TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData);
TSDBKEY keyLast = tBlockDataLastKey(pBlockData);
// begin
int32_t fid = tsdbKeyFid(keyFirst.ts, pWriter->minutes, pWriter->precision);
ASSERT(fid == tsdbKeyFid(keyLast.ts, pWriter->minutes, pWriter->precision));
if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) {
code = tsdbSnapWriteDataEnd(pWriter);
code = tsdbSnapWriteDataEnd(pWriter); // todo
if (code) goto _err;
pWriter->fid = fid;
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid, TD_EQ);
// reader
// read
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid, TD_EQ); // todo: check nState is valid
if (pSet) {
// open
code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet);
if (code) goto _err;
// SBlockIdx
code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx, NULL);
if (code) goto _err;
} else {
ASSERT(pWriter->pDataFReader == NULL);
taosArrayClear(pWriter->aBlockIdx);
}
pWriter->iBlockIdx = 0;
pWriter->pBlockIdx = NULL;
tMapDataReset(&pWriter->mBlock);
pWriter->iBlock = 0;
pWriter->pBlock = NULL;
tBlockDataReset(&pWriter->bDataR);
pWriter->iRow = 0;
// writer
SDFileSet wSet = {0};
if (pSet == NULL) {
wSet = (SDFileSet){0}; // todo
// write
SDFileSet wSet;
if (pSet) {
wSet = (SDFileSet){.diskId = pSet->diskId,
.fid = fid,
.fHead = {.commitID = pWriter->commitID, .offset = 0, .size = 0},
.fData = pSet->fData,
.fLast = {.commitID = pWriter->commitID, .size = 0},
.fSma = pSet->fSma};
} else {
wSet = (SDFileSet){0}; // todo
wSet = (SDFileSet){.diskId = (SDiskID){.level = 0, .id = 0},
.fid = fid,
.fHead = {.commitID = pWriter->commitID, .offset = 0, .size = 0},
.fData = {.commitID = pWriter->commitID, .size = 0},
.fLast = {.commitID = pWriter->commitID, .size = 0},
.fSma = {.commitID = pWriter->commitID, .size = 0}};
}
code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet);
if (code) goto _err;
taosArrayClear(pWriter->aBlockIdxN);
taosArrayClear(pWriter->aBlockIdxW);
pWriter->pBlockIdxW = NULL;
tMapDataReset(&pWriter->mBlockW);
pWriter->pBlockW = NULL;
tBlockDataReset(&pWriter->bDataW);
}
code = tsdbSnapWriteTableData(pWriter, pData, nData);
if (code) goto _err;
#endif
// write data block (todo)
tsdbInfo("vgId:%d vnode snapshot tsdb write data, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(pTsdb->pVnode),
id.suid, id.suid, pBlockData->nRow);
tsdbInfo("vgId:%d vnode snapshot tsdb write data, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d",
TD_VID(pTsdb->pVnode), fid, id.suid, id.suid, pBlockData->nRow);
return code;
_err:
@ -633,7 +653,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb, NULL);
if (code) goto _err;
code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdx, NULL);
code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR, NULL);
if (code) goto _err;
}
@ -653,8 +673,8 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
SDelIdx delIdx;
int8_t toBreak = 0;
if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdx)) {
pDelIdx = taosArrayGet(pWriter->aDelIdx, pWriter->iDelIdx);
if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR)) {
pDelIdx = taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
}
if (pDelIdx) {
@ -695,7 +715,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, NULL, &delIdx);
if (code) goto _err;
if (taosArrayPush(pWriter->aDelIdxN, &delIdx) == NULL) {
if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
@ -716,8 +736,8 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
STsdb* pTsdb = pWriter->pTsdb;
if (pWriter->pDelFWriter == NULL) goto _exit;
for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdx); pWriter->iDelIdx++) {
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdx, pWriter->iDelIdx);
for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR); pWriter->iDelIdx++) {
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, NULL);
if (code) goto _err;
@ -726,7 +746,7 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, NULL, &delIdx);
if (code) goto _err;
if (taosArrayPush(pWriter->aDelIdx, &delIdx) == NULL) {
if (taosArrayPush(pWriter->aDelIdxR, &delIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
@ -768,11 +788,50 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
pWriter->sver = sver;
pWriter->ever = ever;
// config
pWriter->minutes = pTsdb->keepCfg.days;
pWriter->precision = pTsdb->keepCfg.precision;
pWriter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
pWriter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
pWriter->commitID = pTsdb->pVnode->state.commitID;
// for data file
code = tBlockDataInit(&pWriter->bData);
if (code) goto _err;
pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pWriter->aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tBlockDataInit(&pWriter->bDataR);
if (code) goto _err;
pWriter->aBlockIdxW = taosArrayInit(0, sizeof(SBlockIdx));
if (pWriter->aBlockIdxW == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tBlockDataInit(&pWriter->bDataW);
if (code) goto _err;
// for del file
pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx));
if (pWriter->aDelIdxR == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->aDelData = taosArrayInit(0, sizeof(SDelData));
if (pWriter->aDelData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->aDelIdxW = taosArrayInit(0, sizeof(SDelIdx));
if (pWriter->aDelIdxW == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*ppWriter = pWriter;
return code;