more vnode snapshot writer

This commit is contained in:
Hongze Cheng 2022-07-13 12:02:59 +00:00
parent 6c628ef6e9
commit e6e9926d38
4 changed files with 38 additions and 27 deletions

View File

@ -115,7 +115,6 @@ int32_t tGetBlock(uint8_t *p, void *ph);
int32_t tBlockCmprFn(const void *p1, const void *p2); int32_t tBlockCmprFn(const void *p1, const void *p2);
bool tBlockHasSma(SBlock *pBlock); bool tBlockHasSma(SBlock *pBlock);
// SBlockIdx // SBlockIdx
void tBlockIdxReset(SBlockIdx *pBlockIdx);
int32_t tPutBlockIdx(uint8_t *p, void *ph); int32_t tPutBlockIdx(uint8_t *p, void *ph);
int32_t tGetBlockIdx(uint8_t *p, void *ph); int32_t tGetBlockIdx(uint8_t *p, void *ph);
int32_t tCmprBlockIdx(void const *lhs, void const *rhs); int32_t tCmprBlockIdx(void const *lhs, void const *rhs);
@ -361,10 +360,6 @@ struct TSDBROW {
struct SBlockIdx { struct SBlockIdx {
int64_t suid; int64_t suid;
int64_t uid; int64_t uid;
TSKEY minKey;
TSKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
int64_t offset; int64_t offset;
int64_t size; int64_t size;
}; };

View File

@ -500,9 +500,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (code) goto _err; if (code) goto _err;
/* if (state->pBlockIdx) { */ /* if (state->pBlockIdx) { */
/* tBlockIdxReset(state->blockIdx); */
/* } */ /* } */
/* tBlockIdxReset(state->blockIdx); */
/* code = tMapDataSearch(&state->blockIdxMap, state->pBlockIdxExp, tGetBlockIdx, tCmprBlockIdx, /* code = tMapDataSearch(&state->blockIdxMap, state->pBlockIdxExp, tGetBlockIdx, tCmprBlockIdx,
* &state->blockIdx); * &state->blockIdx);
*/ */

View File

@ -383,14 +383,15 @@ struct STsdbSnapWriter {
int32_t iRow; int32_t iRow;
SDataFWriter* pDataFWriter; SDataFWriter* pDataFWriter;
SArray* aBlockIdxW; // SArray<SBlockIdx>
SBlockIdx* pBlockIdxW; SBlockIdx* pBlockIdxW;
SBlockIdx blockIdx; SBlockIdx blockIdx;
SMapData mBlockW; // SMapData<SBlock>
SBlock* pBlockW; SBlock* pBlockW;
SBlock blockW; SBlock blockW;
SBlockData bDataW; SBlockData bDataW;
SMapData mBlockW; // SMapData<SBlock>
SArray* aBlockIdxW; // SArray<SBlockIdx>
// for del file // for del file
SDelFReader* pDelFReader; SDelFReader* pDelFReader;
SDelFWriter* pDelFWriter; SDelFWriter* pDelFWriter;
@ -460,6 +461,7 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWrite) {
return code; return code;
} }
#if 0
static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
TABLEID id = {0}; // TODO TABLEID id = {0}; // TODO
@ -556,6 +558,38 @@ _err:
tsdbError("vgId:%d tsdb snapshot write table data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb snapshot write table data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
#endif
static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) {
int32_t code = 0;
SBlockData* pBlockData = &pWriter->bData;
if (pWriter->pDataFReader == NULL) {
// no old data
// end last table data commit if id not same
if (pWriter->pBlockIdxW) {
int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdx, &id);
if (c < 0) {
// commit last table data and reset (todo)
pWriter->pBlockIdxW = NULL;
} else if (c > 0) {
ASSERT(0);
}
}
// start a new table data if need
if (pWriter->pBlockIdxW == NULL) {
pWriter->pBlockIdxW = &pWriter->blockIdx;
pWriter->pBlockIdxW->suid = id.suid;
pWriter->pBlockIdxW->uid = id.uid;
}
} else {
}
return code;
}
static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
@ -630,7 +664,8 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
tBlockDataReset(&pWriter->bDataW); tBlockDataReset(&pWriter->bDataW);
} }
// write data block (todo) code = tsdbSnapWriteDataImpl(pWriter, id);
if (code) goto _err;
tsdbInfo("vgId:%d vnode snapshot tsdb write data, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d", tsdbInfo("vgId:%d vnode snapshot tsdb write data, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d",
TD_VID(pTsdb->pVnode), fid, id.suid, id.suid, pBlockData->nRow); TD_VID(pTsdb->pVnode), fid, id.suid, id.suid, pBlockData->nRow);

View File

@ -189,25 +189,12 @@ static FORCE_INLINE int32_t tGetTSDBKEY(uint8_t *p, TSDBKEY *pKey) {
} }
// SBlockIdx ====================================================== // SBlockIdx ======================================================
void tBlockIdxReset(SBlockIdx *pBlockIdx) {
pBlockIdx->minKey = TSKEY_MAX;
pBlockIdx->maxKey = TSKEY_MIN;
pBlockIdx->minVersion = VERSION_MAX;
pBlockIdx->maxVersion = VERSION_MIN;
pBlockIdx->offset = -1;
pBlockIdx->size = -1;
}
int32_t tPutBlockIdx(uint8_t *p, void *ph) { int32_t tPutBlockIdx(uint8_t *p, void *ph) {
int32_t n = 0; int32_t n = 0;
SBlockIdx *pBlockIdx = (SBlockIdx *)ph; SBlockIdx *pBlockIdx = (SBlockIdx *)ph;
n += tPutI64(p ? p + n : p, pBlockIdx->suid); n += tPutI64(p ? p + n : p, pBlockIdx->suid);
n += tPutI64(p ? p + n : p, pBlockIdx->uid); n += tPutI64(p ? p + n : p, pBlockIdx->uid);
n += tPutI64(p ? p + n : p, pBlockIdx->minKey);
n += tPutI64(p ? p + n : p, pBlockIdx->maxKey);
n += tPutI64v(p ? p + n : p, pBlockIdx->minVersion);
n += tPutI64v(p ? p + n : p, pBlockIdx->maxVersion);
n += tPutI64v(p ? p + n : p, pBlockIdx->offset); n += tPutI64v(p ? p + n : p, pBlockIdx->offset);
n += tPutI64v(p ? p + n : p, pBlockIdx->size); n += tPutI64v(p ? p + n : p, pBlockIdx->size);
@ -220,10 +207,6 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph) {
n += tGetI64(p + n, &pBlockIdx->suid); n += tGetI64(p + n, &pBlockIdx->suid);
n += tGetI64(p + n, &pBlockIdx->uid); n += tGetI64(p + n, &pBlockIdx->uid);
n += tGetI64(p + n, &pBlockIdx->minKey);
n += tGetI64(p + n, &pBlockIdx->maxKey);
n += tGetI64v(p + n, &pBlockIdx->minVersion);
n += tGetI64v(p + n, &pBlockIdx->maxVersion);
n += tGetI64v(p + n, &pBlockIdx->offset); n += tGetI64v(p + n, &pBlockIdx->offset);
n += tGetI64v(p + n, &pBlockIdx->size); n += tGetI64v(p + n, &pBlockIdx->size);