From 39fb5f43341d83b588801f8ec43868d29800d789 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 13 Sep 2023 18:09:47 +0800 Subject: [PATCH] fix:add vnode snaphot for tmq --- source/dnode/vnode/CMakeLists.txt | 3 +- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 11 + .../dnode/vnode/src/tq/tqCheckInfoSnapshot.c | 211 ++++++++++++++++++ .../tq/{tqSnapshot.c => tqHandleSnapshot.c} | 12 +- source/dnode/vnode/src/tq/tqMeta.c | 56 ++--- source/dnode/vnode/src/tq/tqOffsetSnapshot.c | 1 + source/dnode/vnode/src/vnd/vnodeSnapshot.c | 76 +++++++ 8 files changed, 335 insertions(+), 37 deletions(-) create mode 100644 source/dnode/vnode/src/tq/tqCheckInfoSnapshot.c rename source/dnode/vnode/src/tq/{tqSnapshot.c => tqHandleSnapshot.c} (96%) diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index b66d811284..84d54f3350 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -65,7 +65,8 @@ set( "src/tq/tqSink.c" "src/tq/tqCommit.c" "src/tq/tqStreamTask.c" - "src/tq/tqSnapshot.c" + "src/tq/tqHandleSnapshot.c" + "src/tq/tqCheckInfoSnapshot.c" "src/tq/tqOffsetSnapshot.c" "src/tq/tqStreamStateSnap.c" "src/tq/tqStreamTaskSnap.c" diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 72310f6b19..ee96e602d8 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -134,7 +134,7 @@ int32_t tqMetaOpen(STQ* pTq); int32_t tqMetaClose(STQ* pTq); int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle); int32_t tqMetaDeleteHandle(STQ* pTq, const char* key); -int32_t tqMetaRestoreHandle(STQ* pTq); +//int32_t tqMetaRestoreHandle(STQ* pTq); int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen); int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key); int32_t tqMetaRestoreCheckInfo(STQ* pTq); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 536273c044..f4e3b0bd58 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -69,6 +69,8 @@ typedef struct STqSnapReader STqSnapReader; typedef struct STqSnapWriter STqSnapWriter; typedef struct STqOffsetReader STqOffsetReader; typedef struct STqOffsetWriter STqOffsetWriter; +typedef struct STqCheckInfoReader STqCheckInfoReader; +typedef struct STqCheckInfoWriter STqCheckInfoWriter; typedef struct SStreamTaskReader SStreamTaskReader; typedef struct SStreamTaskWriter SStreamTaskWriter; typedef struct SStreamStateReader SStreamStateReader; @@ -308,6 +310,14 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData); int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter); int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback); int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData); +// STqCheckInfoshotReader == +int32_t tqCheckInfoReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoReader** ppReader); +int32_t tqCheckInfoReaderClose(STqCheckInfoReader** ppReader); +int32_t tqCheckInfoRead(STqCheckInfoReader* pReader, uint8_t** ppData); +// STqCheckInfoshotWriter ====================================== +int32_t tqCheckInfoWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoWriter** ppWriter); +int32_t tqCheckInfoWriterClose(STqCheckInfoWriter** ppWriter, int8_t rollback); +int32_t tqCheckInfoWrite(STqCheckInfoWriter* pWriter, uint8_t* pData, uint32_t nData); // STqOffsetReader ======================================== int32_t tqOffsetReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetReader** ppReader); int32_t tqOffsetReaderClose(STqOffsetReader** ppReader); @@ -503,6 +513,7 @@ enum { SNAP_DATA_STREAM_TASK_CHECKPOINT = 10, SNAP_DATA_STREAM_STATE = 11, SNAP_DATA_STREAM_STATE_BACKEND = 12, + SNAP_DATA_TQ_CHECKINFO = 13, }; struct SSnapDataHdr { diff --git a/source/dnode/vnode/src/tq/tqCheckInfoSnapshot.c b/source/dnode/vnode/src/tq/tqCheckInfoSnapshot.c new file mode 100644 index 0000000000..18f8f0fecc --- /dev/null +++ b/source/dnode/vnode/src/tq/tqCheckInfoSnapshot.c @@ -0,0 +1,211 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "meta.h" +#include "tdbInt.h" +#include "tq.h" + +// STqCheckInfoReader ======================================== +struct STqCheckInfoReader { + STQ* pTq; + int64_t sver; + int64_t ever; + TBC* pCur; +}; + +int32_t tqCheckInfoReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoReader** ppReader) { + int32_t code = 0; + STqCheckInfoReader* pReader = NULL; + + // alloc + pReader = (STqCheckInfoReader*)taosMemoryCalloc(1, sizeof(STqCheckInfoReader)); + if (pReader == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pReader->pTq = pTq; + pReader->sver = sver; + pReader->ever = ever; + + // impl + code = tdbTbcOpen(pTq->pCheckStore, &pReader->pCur, NULL); + if (code) { + taosMemoryFree(pReader); + goto _err; + } + + code = tdbTbcMoveToFirst(pReader->pCur); + if (code) { + taosMemoryFree(pReader); + goto _err; + } + + tqInfo("vgId:%d, vnode checkinfo tq reader opened", TD_VID(pTq->pVnode)); + + *ppReader = pReader; + return code; + +_err: + tqError("vgId:%d, vnode checkinfo tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + *ppReader = NULL; + return code; +} + +int32_t tqCheckInfoReaderClose(STqCheckInfoReader** ppReader) { + int32_t code = 0; + + tdbTbcClose((*ppReader)->pCur); + taosMemoryFree(*ppReader); + *ppReader = NULL; + + return code; +} + +int32_t tqCheckInfoRead(STqCheckInfoReader* pReader, uint8_t** ppData) { + int32_t code = 0; + void* pKey = NULL; + void* pVal = NULL; + int32_t kLen = 0; + int32_t vLen = 0; + SDecoder decoder; + STqCheckInfo info; + + *ppData = NULL; + if (tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { + goto _exit; + } + + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + if (tDecodeSTqCheckInfo(&decoder, &info) < 0) { + tdbFree(pKey); + tdbFree(pVal); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + tdbFree(pKey); + tdbFree(pVal); + tDecoderClear(&decoder); + + *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); + if (*ppData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); + pHdr->type = SNAP_DATA_TQ_CHECKINFO; + pHdr->size = vLen; + memcpy(pHdr->data, pVal, vLen); + + tqInfo("vgId:%d, vnode check info tq read data, topic: %s vLen:%d", TD_VID(pReader->pTq->pVnode), + info.topic, vLen); + +_exit: + return code; + +_err: + tqError("vgId:%d, vnode check info tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); + return code; +} + +// STqCheckInfoWriter ======================================== +struct STqCheckInfoWriter { + STQ* pTq; + int64_t sver; + int64_t ever; + TXN* txn; +}; + +int32_t tqCheckInfoWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoWriter** ppWriter) { + int32_t code = 0; + STqCheckInfoWriter* pWriter; + + // alloc + pWriter = (STqCheckInfoWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); + if (pWriter == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pWriter->pTq = pTq; + pWriter->sver = sver; + pWriter->ever = ever; + + if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { + code = -1; + taosMemoryFree(pWriter); + goto _err; + } + + *ppWriter = pWriter; + return code; + +_err: + tqError("vgId:%d, tq check info writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + *ppWriter = NULL; + return code; +} + +int32_t tqCheckInfoWriterClose(STqCheckInfoWriter** ppWriter, int8_t rollback) { + int32_t code = 0; + STqCheckInfoWriter* pWriter = *ppWriter; + STQ* pTq = pWriter->pTq; + + if (rollback) { + tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn); + } else { + code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn); + if (code) goto _err; + code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn); + if (code) goto _err; + } + + int vgId = TD_VID(pWriter->pTq->pVnode); + + taosMemoryFree(pWriter); + *ppWriter = NULL; + + // restore from metastore + if (tqMetaRestoreCheckInfo(pTq) < 0) { + goto _err; + } + + return code; + +_err: + tqError("vgId:%d, tq check info writer close failed since %s", vgId, tstrerror(code)); + return code; +} + +int32_t tqCheckInfoWrite(STqCheckInfoWriter* pWriter, uint8_t* pData, uint32_t nData) { + int32_t code = 0; + STQ* pTq = pWriter->pTq; + STqCheckInfo info = {0}; + SDecoder decoder; + SDecoder* pDecoder = &decoder; + + + code = tDecodeSTqCheckInfo(pDecoder, &info); + if (code) goto _err; + code = tqMetaSaveCheckInfo(pTq, info.topic, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); + if (code < 0) goto _err; + tDecoderClear(pDecoder); + + return code; + +_err: + tDecoderClear(pDecoder); + tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + return code; +} diff --git a/source/dnode/vnode/src/tq/tqSnapshot.c b/source/dnode/vnode/src/tq/tqHandleSnapshot.c similarity index 96% rename from source/dnode/vnode/src/tq/tqSnapshot.c rename to source/dnode/vnode/src/tq/tqHandleSnapshot.c index 5c0649c109..17f73f6bf2 100644 --- a/source/dnode/vnode/src/tq/tqSnapshot.c +++ b/source/dnode/vnode/src/tq/tqHandleSnapshot.c @@ -173,20 +173,18 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { if (code) goto _err; } - int vgId = TD_VID(pWriter->pTq->pVnode); - taosMemoryFree(pWriter); *ppWriter = NULL; - // restore from metastore - if (tqMetaRestoreHandle(pTq) < 0) { - goto _err; - } +// // restore from metastore +// if (tqMetaRestoreHandle(pTq) < 0) { +// goto _err; +// } return code; _err: - tqError("vgId:%d, tq snapshot writer close failed since %s", vgId, tstrerror(code)); + tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 154ac1e8c1..bea63fccb9 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -388,34 +388,34 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); } -int32_t tqMetaRestoreHandle(STQ* pTq) { - int code = 0; - TBC* pCur = NULL; - if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) { - return -1; - } - - void* pKey = NULL; - int kLen = 0; - void* pVal = NULL; - int vLen = 0; - - tdbTbcMoveToFirst(pCur); - - while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { - STqHandle handle = {0}; - code = restoreHandle(pTq, pVal, vLen, &handle); - if (code < 0) { - tqDestroyTqHandle(&handle); - break; - } - } - - tdbFree(pKey); - tdbFree(pVal); - tdbTbcClose(pCur); - return code; -} +//int32_t tqMetaRestoreHandle(STQ* pTq) { +// int code = 0; +// TBC* pCur = NULL; +// if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) { +// return -1; +// } +// +// void* pKey = NULL; +// int kLen = 0; +// void* pVal = NULL; +// int vLen = 0; +// +// tdbTbcMoveToFirst(pCur); +// +// while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { +// STqHandle handle = {0}; +// code = restoreHandle(pTq, pVal, vLen, &handle); +// if (code < 0) { +// tqDestroyTqHandle(&handle); +// break; +// } +// } +// +// tdbFree(pKey); +// tdbFree(pVal); +// tdbTbcClose(pCur); +// return code; +//} int32_t tqMetaGetHandle(STQ* pTq, const char* key) { void* pVal = NULL; diff --git a/source/dnode/vnode/src/tq/tqOffsetSnapshot.c b/source/dnode/vnode/src/tq/tqOffsetSnapshot.c index 6a66da30c6..85d4dc6c0f 100644 --- a/source/dnode/vnode/src/tq/tqOffsetSnapshot.c +++ b/source/dnode/vnode/src/tq/tqOffsetSnapshot.c @@ -159,6 +159,7 @@ int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nDa taosCloseFile(&pFile); return -1; } + taosCloseFile(&pFile); } else { return -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index f19068ea88..3abcf79839 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -34,6 +34,8 @@ struct SVSnapReader { STqSnapReader *pTqSnapReader; int8_t tqOffsetDone; STqOffsetReader *pTqOffsetReader; + int8_t tqCheckInfoDone; + STqCheckInfoReader *pTqCheckInfoReader; // stream int8_t streamTaskDone; SStreamTaskReader *pStreamTaskReader; @@ -81,6 +83,18 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) { metaSnapReaderClose(&pReader->pMetaReader); } + if (pReader->pTqSnapReader) { + tqSnapReaderClose(&pReader->pTqSnapReader); + } + + if (pReader->pTqOffsetReader) { + tqOffsetReaderClose(&pReader->pTqOffsetReader); + } + + if (pReader->pTqCheckInfoReader) { + tqCheckInfoReaderClose(&pReader->pTqCheckInfoReader); + } + taosMemoryFree(pReader); } @@ -181,6 +195,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) } // TQ ================ + vInfo("vgId:%d tq transform start", vgId); if (!pReader->tqHandleDone) { if (pReader->pTqSnapReader == NULL) { code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqSnapReader); @@ -200,6 +215,25 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) } } } + if (!pReader->tqCheckInfoDone) { + if (pReader->pTqCheckInfoReader == NULL) { + code = tqCheckInfoReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqCheckInfoReader); + if (code < 0) goto _err; + } + + code = tqCheckInfoRead(pReader->pTqCheckInfoReader, ppData); + if (code) { + goto _err; + } else { + if (*ppData) { + goto _exit; + } else { + pReader->tqCheckInfoDone = 1; + code = tqCheckInfoReaderClose(&pReader->pTqCheckInfoReader); + if (code) goto _err; + } + } + } if (!pReader->tqOffsetDone) { if (pReader->pTqOffsetReader == NULL) { code = tqOffsetReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqOffsetReader); @@ -334,6 +368,7 @@ struct SVSnapWriter { // tq STqSnapWriter *pTqSnapWriter; STqOffsetWriter *pTqOffsetWriter; + STqCheckInfoWriter *pTqCheckInfoWriter; // stream SStreamTaskWriter *pStreamTaskWriter; SStreamStateWriter *pStreamStateWriter; @@ -411,6 +446,21 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * if (code) goto _exit; } + if (pWriter->pTqSnapWriter) { + code = tqSnapWriterClose(&pWriter->pTqSnapWriter, rollback); + if (code) goto _exit; + } + + if (pWriter->pTqCheckInfoWriter) { + code = tqCheckInfoWriterClose(&pWriter->pTqCheckInfoWriter, rollback); + if (code) goto _exit; + } + + if (pWriter->pTqOffsetWriter) { + code = tqOffsetWriterClose(&pWriter->pTqOffsetWriter, rollback); + if (code) goto _exit; + } + if (pWriter->pStreamTaskWriter) { code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback); if (code) goto _exit; @@ -519,8 +569,34 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { if (code) goto _err; } break; case SNAP_DATA_TQ_HANDLE: { + // tq handle + if (pWriter->pTqSnapWriter == NULL) { + code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapWriter); + if (code) goto _err; + } + + code = tqSnapWrite(pWriter->pTqSnapWriter, pData, nData); + if (code) goto _err; + } break; + case SNAP_DATA_TQ_CHECKINFO: { + // tq checkinfo + if (pWriter->pTqCheckInfoWriter == NULL) { + code = tqCheckInfoWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqCheckInfoWriter); + if (code) goto _err; + } + + code = tqCheckInfoWrite(pWriter->pTqCheckInfoWriter, pData, nData); + if (code) goto _err; } break; case SNAP_DATA_TQ_OFFSET: { + // tq offset + if (pWriter->pTqOffsetWriter == NULL) { + code = tqOffsetWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqOffsetWriter); + if (code) goto _err; + } + + code = tqOffsetSnapWrite(pWriter->pTqOffsetWriter, pData, nData); + if (code) goto _err; } break; case SNAP_DATA_STREAM_TASK: case SNAP_DATA_STREAM_TASK_CHECKPOINT: {