From 583742089e0b54a69c6d0ca0e87d8525fc474958 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 13 Jul 2022 08:14:20 +0000 Subject: [PATCH] more vnode snapshot writer --- source/dnode/vnode/src/inc/tsdb.h | 14 ++-- source/dnode/vnode/src/tsdb/tsdbCache.c | 12 ++-- source/dnode/vnode/src/tsdb/tsdbCommit.c | 4 +- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 28 ++++---- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 50 ++++++++++---- source/dnode/vnode/src/tsdb/tsdbUtil.c | 4 +- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 69 ++++++++++--------- 7 files changed, 105 insertions(+), 76 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index bfb319dce4..2e680511c3 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -24,12 +24,12 @@ extern "C" { // tsdbDebug ================ // clang-format off -#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) -#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) -#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) -#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) -#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) -#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) +#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSD FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) +#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSD ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) +#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSD WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) +#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSD ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) +#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSD ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) +#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on typedef struct TSDBROW TSDBROW; @@ -137,7 +137,7 @@ int32_t tBlockDataInit(SBlockData *pBlockData); void tBlockDataReset(SBlockData *pBlockData); int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema); void tBlockDataClearData(SBlockData *pBlockData); -void tBlockDataClear(SBlockData *pBlockData); +void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear); int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema); int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 605f4bf3d5..b3e0695c10 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -476,9 +476,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { if (--state->iFileSet >= 0) { pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet); } else { - // tBlockDataClear(&state->blockData); + // tBlockDataClear(&state->blockData, 1); if (state->pBlockData) { - tBlockDataClear(state->pBlockData); + tBlockDataClear(state->pBlockData, 1); state->pBlockData = NULL; } @@ -582,8 +582,8 @@ _err: state->aBlockIdx = NULL; } if (state->pBlockData) { - // tBlockDataClear(&state->blockData); - tBlockDataClear(state->pBlockData); + // tBlockDataClear(&state->blockData, 1); + tBlockDataClear(state->pBlockData, 1); state->pBlockData = NULL; } @@ -609,8 +609,8 @@ int32_t clearNextRowFromFS(void *iter) { state->aBlockIdx = NULL; } if (state->pBlockData) { - // tBlockDataClear(&state->blockData); - tBlockDataClear(state->pBlockData); + // tBlockDataClear(&state->blockData, 1); + tBlockDataClear(state->pBlockData, 1); state->pBlockData = NULL; } diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 577ef03a8f..ce14a92734 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -1001,10 +1001,10 @@ _exit: static void tsdbCommitDataEnd(SCommitter *pCommitter) { taosArrayDestroy(pCommitter->aBlockIdx); tMapDataClear(&pCommitter->oBlockMap); - tBlockDataClear(&pCommitter->oBlockData); + tBlockDataClear(&pCommitter->oBlockData, 1); taosArrayDestroy(pCommitter->aBlockIdxN); tMapDataClear(&pCommitter->nBlockMap); - tBlockDataClear(&pCommitter->nBlockData); + tBlockDataClear(&pCommitter->nBlockData, 1); tTSchemaDestroy(pCommitter->skmTable.pTSchema); tTSchemaDestroy(pCommitter->skmRow.pTSchema); } diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 4e7a9d3b04..0ed2e12542 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -979,21 +979,21 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl code = tBlockDataCopy(pBlockData, pBlockData2); if (code) { - tBlockDataClear(pBlockData1); - tBlockDataClear(pBlockData2); + tBlockDataClear(pBlockData1, 1); + tBlockDataClear(pBlockData2, 1); goto _err; } code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData); if (code) { - tBlockDataClear(pBlockData1); - tBlockDataClear(pBlockData2); + tBlockDataClear(pBlockData1, 1); + tBlockDataClear(pBlockData2, 1); goto _err; } } - tBlockDataClear(pBlockData1); - tBlockDataClear(pBlockData2); + tBlockDataClear(pBlockData1, 1); + tBlockDataClear(pBlockData2, 1); } tFree(pBuf1); @@ -1115,29 +1115,29 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2); if (code) { - tBlockDataClear(pBlockData1); - tBlockDataClear(pBlockData2); + tBlockDataClear(pBlockData1, 1); + tBlockDataClear(pBlockData2, 1); goto _err; } code = tBlockDataCopy(pBlockData, pBlockData2); if (code) { - tBlockDataClear(pBlockData1); - tBlockDataClear(pBlockData2); + tBlockDataClear(pBlockData1, 1); + tBlockDataClear(pBlockData2, 1); goto _err; } // merge two block data code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData); if (code) { - tBlockDataClear(pBlockData1); - tBlockDataClear(pBlockData2); + tBlockDataClear(pBlockData1, 1); + tBlockDataClear(pBlockData2, 1); goto _err; } } - tBlockDataClear(pBlockData1); - tBlockDataClear(pBlockData2); + tBlockDataClear(pBlockData1, 1); + tBlockDataClear(pBlockData2, 1); } ASSERT(pBlock->nRow == pBlockData->nRow); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index fbe14088df..563458ad85 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -296,8 +296,8 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { } taosArrayDestroy(pReader->aBlockIdx); tMapDataClear(&pReader->mBlock); - tBlockDataClear(&pReader->oBlockData); - tBlockDataClear(&pReader->nBlockData); + tBlockDataClear(&pReader->oBlockData, 1); + tBlockDataClear(&pReader->nBlockData, 1); if (pReader->pDelFReader) { tsdbDelFReaderClose(&pReader->pDelFReader); @@ -555,11 +555,21 @@ _err: } static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - STsdb* pTsdb = pWriter->pTsdb; - int64_t skey; // todo - int64_t ekey; // todo + int32_t code = 0; + STsdb* pTsdb = pWriter->pTsdb; + SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; + TABLEID id = *(TABLEID*)(&pHdr[1]); + int64_t n; + SBlockData bData = {0}; + SBlockData* pBlockData = &bData; + // decode + code = tBlockDataInit(pBlockData); + if (code) goto _err; + n = tGetBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pBlockData); + ASSERT(n + sizeof(SSnapDataHdr) + sizeof(TABLEID) == nData); + +#if 0 int32_t fid = tsdbKeyFid(skey, pWriter->minutes, pWriter->precision); ASSERT(fid == tsdbKeyFid(ekey, pWriter->minutes, pWriter->precision)); @@ -600,7 +610,10 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 code = tsdbSnapWriteTableData(pWriter, pData, nData); if (code) goto _err; +#endif + tsdbInfo("vgId:%d vnode snapshot tsdb write data, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(pTsdb->pVnode), + id.suid, id.suid, pBlockData->nRow); return code; _err: @@ -755,6 +768,12 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr pWriter->sver = sver; pWriter->ever = ever; + pWriter->minutes = pTsdb->keepCfg.days; + pWriter->precision = pTsdb->keepCfg.precision; + pWriter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; + pWriter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; + pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; + *ppWriter = pWriter; return code; @@ -793,24 +812,29 @@ _err: } int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - int8_t type = pData[0]; + int32_t code = 0; + SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; // ts data - if (type == 0) { - code = tsdbSnapWriteData(pWriter, pData + 1, nData - 1); + if (pHdr->type == 1) { + code = tsdbSnapWriteData(pWriter, pData, nData); if (code) goto _err; + + goto _exit; } else { - code = tsdbSnapWriteDataEnd(pWriter); - if (code) goto _err; + if (pWriter->pDataFWriter) { + code = tsdbSnapWriteDataEnd(pWriter); + if (code) goto _err; + } } // del data - if (type == 1) { + if (pHdr->type == 2) { code = tsdbSnapWriteDel(pWriter, pData + 1, nData - 1); if (code) goto _err; } +_exit: return code; _err: diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index db408df944..7667127fc9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1032,11 +1032,11 @@ void tBlockDataReset(SBlockData *pBlockData) { taosArrayClear(pBlockData->aIdx); } -void tBlockDataClear(SBlockData *pBlockData) { +void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear) { tFree((uint8_t *)pBlockData->aVersion); tFree((uint8_t *)pBlockData->aTSKEY); taosArrayDestroy(pBlockData->aIdx); - taosArrayDestroyEx(pBlockData->aColData, tColDataClear); + taosArrayDestroyEx(pBlockData->aColData, deepClear ? tColDataClear : NULL); } int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema) { diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 26dbf75426..674beb51ec 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -176,7 +176,6 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr pWriter->ever = ever; vInfo("vgId:%d vnode snapshot writer opened", TD_VID(pVnode)); - *ppWriter = pWriter; return code; @@ -189,15 +188,15 @@ _err: int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) { int32_t code = 0; - goto _exit; + // TODO - if (rollback) { - code = vnodeSnapRollback(pWriter); - if (code) goto _err; - } else { - code = vnodeSnapCommit(pWriter); - if (code) goto _err; - } + // if (rollback) { + // code = vnodeSnapRollback(pWriter); + // if (code) goto _err; + // } else { + // code = vnodeSnapCommit(pWriter); + // if (code) goto _err; + // } _exit: vInfo("vgId:%d vnode snapshot writer closed, rollback:%d", TD_VID(pWriter->pVnode), rollback); @@ -214,34 +213,40 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { SSnapDataHdr *pHdr = (SSnapDataHdr *)pData; SVnode *pVnode = pWriter->pVnode; - // ASSERT(pHdr->size + sizeof(SSnapDataHdr) == nData); + ASSERT(pHdr->size + sizeof(SSnapDataHdr) == nData); + ASSERT(pHdr->index == pWriter->index + 1); + pWriter->index = pHdr->index; - // if (pHdr->type == 0) { - // // meta - // if (pWriter->pMetaSnapWriter == NULL) { - // code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter); - // if (code) goto _err; - // } - - // code = metaSnapWrite(pWriter->pMetaSnapWriter, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); - // if (code) goto _err; - // } else { - // // tsdb - // if (pWriter->pTsdbSnapWriter == NULL) { - // code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter); - // if (code) goto _err; - // } - - // code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); - // if (code) goto _err; - // } - -_exit: vInfo("vgId:%d vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index, pHdr->type, nData); + + if (pHdr->type == 0) { + // meta + + // if (pWriter->pMetaSnapWriter == NULL) { + // code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter); + // if (code) goto _err; + // } + + // code = metaSnapWrite(pWriter->pMetaSnapWriter, pData , nData); + // if (code) goto _err; + } else { + // tsdb + + if (pWriter->pTsdbSnapWriter == NULL) { + code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter); + if (code) goto _err; + } + + code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData); + if (code) goto _err; + } + +_exit: return code; _err: - vError("vgId:%d vnode snapshot write failed since %s", TD_VID(pVnode), tstrerror(code)); + vError("vgId:%d vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), + tstrerror(code), pHdr->index, pHdr->type, nData); return code; } \ No newline at end of file