From 25a2d04b85c669c7d4c68f9add1bc26cb767f6a0 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Jul 2022 07:37:53 +0000 Subject: [PATCH] more vnode snapshot writer --- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/inc/tsdb.h | 1 + source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 153 ++++++--------------- source/dnode/vnode/src/tsdb/tsdbUtil.c | 40 ++++++ 4 files changed, 85 insertions(+), 111 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index f0a6b6505d..6b5f795eb0 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -138,7 +138,7 @@ void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta); int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader); -int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray* pTableUids); +int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); int32_t tsdbLastrowReaderClose(void *pReader); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a2ac995c13..3f667a7465 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -135,6 +135,7 @@ int32_t tGetColData(uint8_t *p, SColData *pColData); int32_t tBlockDataInit(SBlockData *pBlockData); void tBlockDataReset(SBlockData *pBlockData); int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema); +int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom); void tBlockDataClearData(SBlockData *pBlockData); void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear); int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 8927155173..5c5e81cd49 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -383,11 +383,10 @@ struct STsdbSnapWriter { int32_t iRow; SDataFWriter* pDataFWriter; - SBlockIdx* pBlockIdxW; - SBlockIdx blockIdx; - SBlock* pBlockW; + SBlockIdx* pBlockIdxW; // NULL when no committing table SBlock blockW; SBlockData bDataW; + SBlockIdx blockIdxW; SMapData mBlockW; // SMapData SArray* aBlockIdxW; // SArray @@ -455,110 +454,40 @@ _err: return code; } -static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWrite) { +static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { int32_t code = 0; - // TODO - return code; -} -#if 0 -static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - TABLEID id = {0}; // TODO + ASSERT(pWriter->pBlockIdxW != NULL); - // skip - while (pWriter->pBlockIdx && tTABLEIDCmprFn(&id, pWriter->pBlockIdx) < 0) { - code = tsdbSnapWriteTableDataEnd(pWriter); + // write remain data if has + if (pWriter->bDataW.nRow > 0) { + if (pWriter->bDataW.nRow >= pWriter->minRow) { + pWriter->blockW.last = 0; + } else { + pWriter->blockW.last = 1; + } + + code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, + &pWriter->blockW, pWriter->cmprAlg); if (code) goto _err; - pWriter->iBlockIdx++; - if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { - pWriter->pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); - } else { - pWriter->pBlockIdx = NULL; - } + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + if (code) goto _err; } - // new or merge - if (pWriter->pBlockIdx == NULL || tTABLEIDCmprFn(&id, pWriter->pBlockIdx) < 0) { - int32_t c; + code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW); + if (code) goto _err; - if (pWriter->pBlockIdxW && ((c = tTABLEIDCmprFn(&id, pWriter->pBlockIdxW)) != 0)) { - ASSERT(c > 0); - - code = tsdbSnapWriteTableDataEnd(pWriter); - if (code) goto _err; - } - - if (pWriter->pBlockIdxW == NULL) { - pWriter->pBlockIdx = &pWriter->blockIdx; - pWriter->pBlockIdx->suid = id.suid; - pWriter->pBlockIdx->uid = id.uid; - } - - // loop to write the data - TSDBROW* pRow = NULL; // todo - int32_t nRow = 0; // todo - SBlockData* pBlockData = NULL; // todo - for (int32_t iRow = 0; iRow < nRow; iRow++) { - code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pBlockData, iRow), NULL); - if (code) goto _err; - - if (pWriter->bDataW.nRow > pWriter->maxRow * 4 / 5) { - code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, - pWriter->pBlockW, pWriter->cmprAlg); - if (code) goto _err; - } - } - } else { - // skip - while (true) { - if (pWriter->pBlock == NULL) break; - if (pWriter->pBlock->last) break; - if (tBlockCmprFn(&(SBlock){.minKey = {0}, .maxKey = {0}}, pWriter->pBlock) >= 0) break; - - code = tMapDataPutItem(&pWriter->mBlockW, pWriter->pBlock, tPutBlock); - if (code) goto _err; - } - - if (pWriter->pBlock) { - if (pWriter->pBlock->last) { - // load the last block and merge with the data (todo) - } else { - int32_t c = tBlockCmprFn(&(SBlock){0 /*TODO*/}, pWriter->pBlock); - - if (c > 0) { - // commit until pWriter->pBlock (todo) - } else { - // load the block and merge with the data (todo) - } - } - } else { - int32_t nRow = 0; - SBlockData* pBlockData = NULL; - - for (int32_t iRow = 0; iRow < nRow; iRow++) { - code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pBlockData, iRow), NULL); - if (code) goto _err; - - if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) { - code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, - pWriter->pBlockW, pWriter->cmprAlg); - if (code) goto _err; - - tBlockDataClearData(&pWriter->bDataW); - } - } - } + if (taosArrayPush(pWriter->aBlockIdxW, pWriter->pBlockIdxW) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; } return code; _err: - tsdbError("vgId:%d tsdb snapshot write table data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } -#endif static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) { int32_t code = 0; @@ -567,11 +496,16 @@ static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) { if (pWriter->pDataFReader == NULL) { // no old data - // end last table data commit if id not same + // end last table write if need if (pWriter->pBlockIdxW) { - int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdx, &id); + int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id); + if (c < 0) { - // commit last table data and reset (todo) + // end last table data write + code = tsdbSnapWriteTableDataEnd(pWriter); + if (code) goto _err; + + // reset pWriter->pBlockIdxW = NULL; } else if (c > 0) { ASSERT(0); @@ -580,41 +514,40 @@ static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) { // start a new table data if need if (pWriter->pBlockIdxW == NULL) { - pWriter->pBlockIdxW = &pWriter->blockIdx; + pWriter->pBlockIdxW = &pWriter->blockIdxW; pWriter->pBlockIdxW->suid = id.suid; pWriter->pBlockIdxW->uid = id.uid; - pWriter->pBlockW = &pWriter->blockW; - tBlockReset(pWriter->pBlockW); + tBlockReset(&pWriter->blockW); tBlockDataReset(&pWriter->bDataW); tMapDataReset(&pWriter->mBlockW); } - // set block schema (todo) + // set block schema + code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData); + if (code) goto _err; // add rows for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { - TSDBROW* pRow = &tsdbRowFromBlockData(pBlockData, iRow); + TSDBROW row = tsdbRowFromBlockData(pBlockData, iRow); - code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); + code = tBlockDataAppendRow(&pWriter->bDataW, &row, NULL); if (code) goto _err; if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) { - // write the block to file - pWriter->pBlockW->last = 0; - + pWriter->blockW.last = 0; code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, - pWriter->pBlockW, pWriter->cmprAlg); + &pWriter->blockW, pWriter->cmprAlg); if (code) goto _err; - code = tMapDataPutItem(&pWriter->mBlockW, pWriter->pBlockW, tPutBlock); + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); if (code) goto _err; // reset - tBlockReset(pWriter->pBlockW); - tBlockDataReset(&pWriter->bDataW); + tBlockReset(&pWriter->blockW); + tBlockDataClearData(&pWriter->bDataW); } } } else { @@ -647,6 +580,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 int32_t fid = tsdbKeyFid(keyFirst.ts, pWriter->minutes, pWriter->precision); ASSERT(fid == tsdbKeyFid(keyLast.ts, pWriter->minutes, pWriter->precision)); if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) { + // end last file data write if need code = tsdbSnapWriteDataEnd(pWriter); // todo if (code) goto _err; @@ -697,7 +631,6 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 taosArrayClear(pWriter->aBlockIdxW); pWriter->pBlockIdxW = NULL; tMapDataReset(&pWriter->mBlockW); - pWriter->pBlockW = NULL; tBlockDataReset(&pWriter->bDataW); } @@ -709,7 +642,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 return code; _err: - tsdbError("vgId:%d tsdb snapshot write data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb write data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index be1b848d96..6320a48e77 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1132,6 +1132,46 @@ _err: return code; } +int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom) { + int32_t code = 0; + + int32_t iColData = 0; + for (int32_t iColDataFrom = 0; iColDataFrom < taosArrayGetSize(pBlockDataFrom->aIdx); iColDataFrom++) { + SColData *pColDataFrom = tBlockDataGetColDataByIdx(pBlockDataFrom, iColDataFrom); + + while (true) { + SColData *pColData; + if (iColData < taosArrayGetSize(pBlockData->aIdx)) { + pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); + } else { + pColData = NULL; + } + + if (pColData == NULL || pColData->cid > pColDataFrom->cid) { + code = tBlockDataAddColData(pBlockData, iColData, &pColData); + if (code) goto _exit; + + tColDataInit(pColData, pColDataFrom->cid, pColData->type, pColData->smaOn); + for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { + code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); + if (code) goto _exit; + } + + iColData++; + break; + } else if (pColData->cid == pColDataFrom->cid) { + iColData++; + break; + } else { + iColData++; + } + } + } + +_exit: + return code; +} + int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData) { int32_t code = 0;