From b084dd8ee6ebea5ba33a145e2b150520ddaa5bb1 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 1 Aug 2022 19:12:37 +0800 Subject: [PATCH] feat(tq): transport snapshot --- include/libs/wal/wal.h | 1 + source/dnode/vnode/CMakeLists.txt | 2 + source/dnode/vnode/src/inc/meta.h | 3 +- source/dnode/vnode/src/inc/tq.h | 10 +- source/dnode/vnode/src/inc/vnodeInt.h | 64 ++++-- source/dnode/vnode/src/meta/metaOpen.c | 6 +- source/dnode/vnode/src/meta/metaStream.c | 16 ++ source/dnode/vnode/src/tq/tq.c | 1 + source/dnode/vnode/src/tq/tqCommit.c | 2 +- source/dnode/vnode/src/tq/tqMeta.c | 53 ++--- source/dnode/vnode/src/tq/tqOffset.c | 54 ++--- source/dnode/vnode/src/tq/tqOffsetSnapshot.c | 155 ++++++++++++++ source/dnode/vnode/src/tq/tqSnapshot.c | 209 +++++++++++++++++++ source/dnode/vnode/src/vnd/vnodeSnapshot.c | 75 ++++++- source/libs/wal/src/walRef.c | 5 + source/libs/wal/src/walWrite.c | 2 +- 16 files changed, 583 insertions(+), 75 deletions(-) create mode 100644 source/dnode/vnode/src/meta/metaStream.c create mode 100644 source/dnode/vnode/src/tq/tqOffsetSnapshot.c create mode 100644 source/dnode/vnode/src/tq/tqSnapshot.c diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 980e01a65c..de31a970df 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -203,6 +203,7 @@ SWalRef *walRefCommittedVer(SWal *); SWalRef *walOpenRef(SWal *); void walCloseRef(SWal *pWal, int64_t refId); int32_t walRefVer(SWalRef *, int64_t ver); +int32_t walPreRefVer(SWalRef *pRef, int64_t ver); void walUnrefVer(SWalRef *); // helper function for raft diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 3d8d46a0fb..cb9f3d9809 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -58,6 +58,8 @@ target_sources( "src/tq/tqPush.c" "src/tq/tqSink.c" "src/tq/tqCommit.c" + "src/tq/tqSnapshot.c" + "src/tq/tqOffsetSnapshot.c" ) target_include_directories( vnode diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index 12c3f7f2e8..a72546fe86 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -80,7 +80,8 @@ struct SMeta { TTB* pSmaIdx; - TTB* pTaskIdx; + // stream + TTB* pStreamDb; SMetaIdx* pIdx; }; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 262300a3e7..44b9d1f69c 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -133,6 +133,9 @@ typedef struct { static STqMgmt tqMgmt = {0}; +int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle); +int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); + // tqRead int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* offset); int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); @@ -146,6 +149,7 @@ int32_t tqMetaOpen(STQ* pTq); int32_t tqMetaClose(STQ* pTq); int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle); int32_t tqMetaDeleteHandle(STQ* pTq, const char* key); +int32_t tqMetaRestoreHandle(STQ* pTq); typedef struct { int32_t size; @@ -156,11 +160,15 @@ void tqOffsetClose(STqOffsetStore*); STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey); int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset); int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey); -int32_t tqOffsetSnapshot(STqOffsetStore* pStore); +int32_t tqOffsetCommitFile(STqOffsetStore* pStore); // tqSink void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); +// tqOffset +char* tqOffsetBuildFName(const char* path, int32_t ver); +int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); + static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) { pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA; pOffsetVal->uid = uid; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index cb24da2217..38f16b6231 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -49,22 +49,30 @@ extern "C" { #endif -typedef struct SVnodeInfo SVnodeInfo; -typedef struct SMeta SMeta; -typedef struct SSma SSma; -typedef struct STsdb STsdb; -typedef struct STQ STQ; -typedef struct SVState SVState; -typedef struct SVBufPool SVBufPool; -typedef struct SQWorker SQHandle; -typedef struct STsdbKeepCfg STsdbKeepCfg; -typedef struct SMetaSnapReader SMetaSnapReader; -typedef struct SMetaSnapWriter SMetaSnapWriter; -typedef struct STsdbSnapReader STsdbSnapReader; -typedef struct STsdbSnapWriter STsdbSnapWriter; -typedef struct SRsmaSnapReader SRsmaSnapReader; -typedef struct SRsmaSnapWriter SRsmaSnapWriter; -typedef struct SSnapDataHdr SSnapDataHdr; +typedef struct SVnodeInfo SVnodeInfo; +typedef struct SMeta SMeta; +typedef struct SSma SSma; +typedef struct STsdb STsdb; +typedef struct STQ STQ; +typedef struct SVState SVState; +typedef struct SVBufPool SVBufPool; +typedef struct SQWorker SQHandle; +typedef struct STsdbKeepCfg STsdbKeepCfg; +typedef struct SMetaSnapReader SMetaSnapReader; +typedef struct SMetaSnapWriter SMetaSnapWriter; +typedef struct STsdbSnapReader STsdbSnapReader; +typedef struct STsdbSnapWriter STsdbSnapWriter; +typedef struct STqSnapReader STqSnapReader; +typedef struct STqSnapWriter STqSnapWriter; +typedef struct STqOffsetReader STqOffsetReader; +typedef struct STqOffsetWriter STqOffsetWriter; +typedef struct SStreamTaskReader SStreamTaskReader; +typedef struct SStreamTaskWriter SStreamTaskWriter; +typedef struct SStreamStateReader SStreamStateReader; +typedef struct SStreamStateWriter SStreamStateWriter; +typedef struct SRsmaSnapReader SRsmaSnapReader; +typedef struct SRsmaSnapWriter SRsmaSnapWriter; +typedef struct SSnapDataHdr SSnapDataHdr; #define VNODE_META_DIR "meta" #define VNODE_TSDB_DIR "tsdb" @@ -205,6 +213,26 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData); int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter); int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback); +// STqSnapshotReader == +int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader); +int32_t tqSnapReaderClose(STqSnapReader** ppReader); +int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData); +// STqSnapshotWriter ====================================== +int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter); +int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback); +int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData); +// STqOffsetReader ======================================== +int32_t tqOffsetReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetReader** ppReader); +int32_t tqOffsetReaderClose(STqOffsetReader** ppReader); +int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData); +// STqOffsetWriter ======================================== +int32_t tqOffsetWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetWriter** ppWriter); +int32_t tqOffsetWriterClose(STqOffsetWriter** ppWriter, int8_t rollback); +int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nData); +// SStreamTaskWriter ====================================== +// SStreamTaskReader ====================================== +// SStreamStateWriter ===================================== +// SStreamStateReader ===================================== // SRsmaSnapReader ======================================== int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapReader** ppReader); int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader); @@ -331,6 +359,10 @@ enum { SNAP_DATA_RSMA1 = 3, SNAP_DATA_RSMA2 = 4, SNAP_DATA_QTASK = 5, + SNAP_DATA_TQ_HANDLE = 6, + SNAP_DATA_TQ_OFFSET = 7, + SNAP_DATA_STREAM_TASK = 8, + SNAP_DATA_STREAM_STATE = 9, }; struct SSnapDataHdr { diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index a5150edb7b..c2d3e1e476 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -131,7 +131,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { goto _err; } - ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pTaskIdx); + ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pStreamDb); if (ret < 0) { metaError("vgId: %d, failed to open meta stream task index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; @@ -150,7 +150,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { _err: if (pMeta->pIdx) metaCloseIdx(pMeta); - if (pMeta->pTaskIdx) tdbTbClose(pMeta->pTaskIdx); + if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx); if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); @@ -170,7 +170,7 @@ _err: int metaClose(SMeta *pMeta) { if (pMeta) { if (pMeta->pIdx) metaCloseIdx(pMeta); - if (pMeta->pTaskIdx) tdbTbClose(pMeta->pTaskIdx); + if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx); if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); diff --git a/source/dnode/vnode/src/meta/metaStream.c b/source/dnode/vnode/src/meta/metaStream.c new file mode 100644 index 0000000000..b7b84da231 --- /dev/null +++ b/source/dnode/vnode/src/meta/metaStream.c @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "meta.h" diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a05013d996..3e321fa2a1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -548,6 +548,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal); if (pRef == NULL) { ASSERT(0); + return -1; } int64_t ver = pRef->refVer; pHandle->pRef = pRef; diff --git a/source/dnode/vnode/src/tq/tqCommit.c b/source/dnode/vnode/src/tq/tqCommit.c index 639da22b1c..dabd97a345 100644 --- a/source/dnode/vnode/src/tq/tqCommit.c +++ b/source/dnode/vnode/src/tq/tqCommit.c @@ -15,4 +15,4 @@ #include "tq.h" -int tqCommit(STQ* pTq) { return tqOffsetSnapshot(pTq->pOffsetStore); } +int tqCommit(STQ* pTq) { return tqOffsetCommitFile(pTq->pOffsetStore); } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index e866112250..b8e021f795 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -15,7 +15,7 @@ #include "tdbInt.h" #include "tq.h" -static int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) { +int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1; if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1; @@ -29,7 +29,7 @@ static int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) { return pEncoder->pos; } -static int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { +int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1; if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1; @@ -43,33 +43,20 @@ static int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { return 0; } -int32_t tqMetaOpen(STQ* pTq) { - if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaStore) < 0) { +int32_t tqMetaRestoreHandle(STQ* pTq) { + TBC* pCur = NULL; + if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) { ASSERT(0); + return -1; } - if (tdbTbOpen("handles", -1, -1, NULL, pTq->pMetaStore, &pTq->pExecStore) < 0) { - ASSERT(0); - } - - TXN txn = {0}; - - if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { - ASSERT(0); - } - - TBC* pCur; - if (tdbTbcOpen(pTq->pExecStore, &pCur, &txn) < 0) { - ASSERT(0); - } - - void* pKey = NULL; - int kLen = 0; - void* pVal = NULL; - int vLen = 0; + void* pKey = NULL; + int kLen = 0; + void* pVal = NULL; + int vLen = 0; + SDecoder decoder; tdbTbcMoveToFirst(pCur); - SDecoder decoder; while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { STqHandle handle; @@ -79,6 +66,7 @@ int32_t tqMetaOpen(STQ* pTq) { handle.pRef = walOpenRef(pTq->pVnode->pWal); if (handle.pRef == NULL) { ASSERT(0); + return -1; } walRefVer(handle.pRef, handle.snapshotVer); @@ -109,9 +97,24 @@ int32_t tqMetaOpen(STQ* pTq) { } tdbTbcClose(pCur); - if (tdbTxnClose(&txn) < 0) { + return 0; +} + +int32_t tqMetaOpen(STQ* pTq) { + if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaStore) < 0) { ASSERT(0); + return -1; } + + if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaStore, &pTq->pExecStore) < 0) { + ASSERT(0); + return -1; + } + + if (tqMetaRestoreHandle(pTq) < 0) { + return -1; + } + return 0; } diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index ec9674d637..5c1d5d65b4 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -22,29 +22,15 @@ struct STqOffsetStore { SHashObj* pHash; // SHashObj }; -static char* buildFileName(const char* path) { +char* tqOffsetBuildFName(const char* path, int32_t ver) { int32_t len = strlen(path); - char* fname = taosMemoryCalloc(1, len + 20); - snprintf(fname, len + 20, "%s/offset", path); + char* fname = taosMemoryCalloc(1, len + 40); + snprintf(fname, len + 40, "%s/offset-ver%d", path, ver); return fname; } -STqOffsetStore* tqOffsetOpen(STQ* pTq) { - STqOffsetStore* pStore = taosMemoryCalloc(1, sizeof(STqOffsetStore)); - if (pStore == NULL) { - return NULL; - } - pStore->pTq = pTq; - pTq->pOffsetStore = pStore; - - pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); - if (pStore->pHash == NULL) { - if (pStore->pHash) taosHashCleanup(pStore->pHash); - return NULL; - } - char* fname = buildFileName(pStore->pTq->path); +int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); - taosMemoryFree(fname); if (pFile != NULL) { STqOffsetHead head = {0}; int64_t code; @@ -79,11 +65,32 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { taosCloseFile(&pFile); } + return 0; +} + +STqOffsetStore* tqOffsetOpen(STQ* pTq) { + STqOffsetStore* pStore = taosMemoryCalloc(1, sizeof(STqOffsetStore)); + if (pStore == NULL) { + return NULL; + } + pStore->pTq = pTq; + pTq->pOffsetStore = pStore; + + pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); + if (pStore->pHash == NULL) { + taosMemoryFree(pStore); + return NULL; + } + char* fname = tqOffsetBuildFName(pStore->pTq->path, 0); + if (tqOffsetRestoreFromFile(pStore, fname) < 0) { + ASSERT(0); + } + taosMemoryFree(fname); return pStore; } void tqOffsetClose(STqOffsetStore* pStore) { - tqOffsetSnapshot(pStore); + tqOffsetCommitFile(pStore); taosHashCleanup(pStore->pHash); taosMemoryFree(pStore); } @@ -93,8 +100,6 @@ STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) { } int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) { - /*ASSERT(pOffset->val.type == TMQ_OFFSET__LOG);*/ - /*ASSERT(pOffset->val.version >= 0);*/ return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset)); } @@ -102,10 +107,9 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) { return taosHashRemove(pStore->pHash, subscribeKey, strlen(subscribeKey)); } -int32_t tqOffsetSnapshot(STqOffsetStore* pStore) { - // open file - // TODO file name should be with a version - char* fname = buildFileName(pStore->pTq->path); +int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { + // TODO file name should be with a newer version + char* fname = tqOffsetBuildFName(pStore->pTq->path, 0); TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); taosMemoryFree(fname); if (pFile == NULL) { diff --git a/source/dnode/vnode/src/tq/tqOffsetSnapshot.c b/source/dnode/vnode/src/tq/tqOffsetSnapshot.c new file mode 100644 index 0000000000..cacb82b702 --- /dev/null +++ b/source/dnode/vnode/src/tq/tqOffsetSnapshot.c @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "meta.h" +#include "tdbInt.h" +#include "tq.h" + +// STqOffsetReader ======================================== +struct STqOffsetReader { + STQ* pTq; + int64_t sver; + int64_t ever; + int8_t readEnd; +}; + +int32_t tqOffsetReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetReader** ppReader) { + STqOffsetReader* pReader = NULL; + + pReader = taosMemoryCalloc(1, sizeof(STqOffsetReader)); + if (pReader == NULL) { + *ppReader = NULL; + return -1; + } + pReader->pTq = pTq; + pReader->sver = sver; + pReader->ever = ever; + + tqInfo("vgId:%d vnode snapshot tq offset reader opened", TD_VID(pTq->pVnode)); + + *ppReader = pReader; + return 0; +} + +int32_t tqOffsetReaderClose(STqOffsetReader** ppReader) { + taosMemoryFree(*ppReader); + *ppReader = NULL; + return 0; +} + +int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData) { + if (pReader->readEnd != 0) return 0; + + char* fname = tqOffsetBuildFName(pReader->pTq->path, 0); + TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); + taosMemoryFree(fname); + if (pFile != NULL) { + return 0; + } + + int64_t sz = 0; + if (taosStatFile(fname, &sz, NULL) < 0) { + ASSERT(0); + } + + SSnapDataHdr* buf = taosMemoryCalloc(1, sz + sizeof(SSnapDataHdr)); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + void* abuf = POINTER_SHIFT(buf, sizeof(SSnapDataHdr)); + int64_t contLen = taosReadFile(pFile, abuf, sz); + if (contLen != sz) { + ASSERT(0); + return -1; + } + buf->size = sz; + buf->type = SNAP_DATA_TQ_OFFSET; + *ppData = (uint8_t*)buf; + + pReader->readEnd = 1; + return 0; +} + +// STqOffseWriter ======================================== +struct STqOffsetWriter { + STQ* pTq; + int64_t sver; + int64_t ever; + int32_t tmpFileVer; + char* fname; +}; + +int32_t tqOffsetWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetWriter** ppWriter) { + int32_t code = 0; + STqOffsetWriter* pWriter; + + pWriter = (STqOffsetWriter*)taosMemoryCalloc(1, sizeof(STqOffsetWriter)); + if (pWriter == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pWriter->pTq = pTq; + pWriter->sver = sver; + pWriter->ever = ever; + + *ppWriter = pWriter; + return code; + +_err: + tqError("vgId:%d tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + *ppWriter = NULL; + return code; +} + +int32_t tqOffsetWriterClose(STqOffsetWriter** ppWriter, int8_t rollback) { + STqOffsetWriter* pWriter = *ppWriter; + STQ* pTq = pWriter->pTq; + char* fname = tqOffsetBuildFName(pTq->path, 0); + + if (rollback) { + taosRemoveFile(pWriter->fname); + } else { + taosRenameFile(pWriter->fname, fname); + if (tqOffsetRestoreFromFile(pTq->pOffsetStore, fname) < 0) { + ASSERT(0); + } + } + taosMemoryFree(fname); + taosMemoryFree(pWriter->fname); + taosMemoryFree(pWriter); + *ppWriter = NULL; + return 0; +} + +int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nData) { + STQ* pTq = pWriter->pTq; + pWriter->tmpFileVer = 1; + pWriter->fname = tqOffsetBuildFName(pTq->path, pWriter->tmpFileVer); + TdFilePtr pFile = taosOpenFile(pWriter->fname, TD_FILE_CREATE | TD_FILE_WRITE); + SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; + int64_t size = pHdr->size; + ASSERT(size == nData - sizeof(SSnapDataHdr)); + if (pFile) { + int64_t contLen = taosWriteFile(pFile, pHdr->data, size); + if (contLen != size) { + ASSERT(0); + } + } else { + ASSERT(0); + return -1; + } + return 0; +} diff --git a/source/dnode/vnode/src/tq/tqSnapshot.c b/source/dnode/vnode/src/tq/tqSnapshot.c new file mode 100644 index 0000000000..21172134ba --- /dev/null +++ b/source/dnode/vnode/src/tq/tqSnapshot.c @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "meta.h" +#include "tdbInt.h" +#include "tq.h" + +// STqSnapReader ======================================== +struct STqSnapReader { + STQ* pTq; + int64_t sver; + int64_t ever; + TBC* pCur; +}; + +int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader) { + int32_t code = 0; + STqSnapReader* pReader = NULL; + + // alloc + pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader)); + if (pReader == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pReader->pTq = pTq; + pReader->sver = sver; + pReader->ever = ever; + + // impl + code = tdbTbcOpen(pTq->pExecStore, &pReader->pCur, NULL); + if (code) { + taosMemoryFree(pReader); + goto _err; + } + + code = tdbTbcMoveToFirst(pReader->pCur); + if (code) { + taosMemoryFree(pReader); + goto _err; + } + + tqInfo("vgId:%d vnode snapshot tq reader opened", TD_VID(pTq->pVnode)); + + *ppReader = pReader; + return code; + +_err: + tqError("vgId:%d vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + *ppReader = NULL; + return code; +} + +int32_t tqSnapReaderClose(STqSnapReader** ppReader) { + int32_t code = 0; + + tdbTbcClose((*ppReader)->pCur); + taosMemoryFree(*ppReader); + *ppReader = NULL; + + return code; +} + +int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) { + int32_t code = 0; + const void* pKey = NULL; + const void* pVal = NULL; + int32_t kLen = 0; + int32_t vLen = 0; + SDecoder decoder; + STqHandle handle; + + *ppData = NULL; + for (;;) { + if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { + goto _exit; + } + + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + tDecodeSTqHandle(&decoder, &handle); + tDecoderClear(&decoder); + + if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) { + tdbTbcMoveToNext(pReader->pCur); + break; + } else { + tdbTbcMoveToNext(pReader->pCur); + } + } + + ASSERT(pVal && vLen); + + *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); + if (*ppData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); + pHdr->type = SNAP_DATA_TQ_HANDLE; + pHdr->size = vLen; + memcpy(pHdr->data, pVal, vLen); + + tqInfo("vgId:%d vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode), + handle.snapshotVer, handle.subKey, vLen); + +_exit: + return code; + +_err: + tqError("vgId:%d vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); + return code; +} + +// STqSnapWriter ======================================== +struct STqSnapWriter { + STQ* pTq; + int64_t sver; + int64_t ever; + TXN txn; +}; + +int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) { + int32_t code = 0; + STqSnapWriter* pWriter; + + // alloc + pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); + if (pWriter == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pWriter->pTq = pTq; + pWriter->sver = sver; + pWriter->ever = ever; + + if (tdbTxnOpen(&pWriter->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { + ASSERT(0); + } + + *ppWriter = pWriter; + return code; + +_err: + tqError("vgId:%d tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + *ppWriter = NULL; + return code; +} + +int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { + int32_t code = 0; + STqSnapWriter* pWriter = *ppWriter; + STQ* pTq = pWriter->pTq; + + if (rollback) { + ASSERT(0); + } else { + code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn); + if (code) goto _err; + } + + taosMemoryFree(pWriter); + *ppWriter = NULL; + + // restore from metastore + if (tqMetaRestoreHandle(pTq) < 0) { + goto _err; + } + + return code; + +_err: + tqError("vgId:%d tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code)); + return code; +} + +int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { + int32_t code = 0; + STQ* pTq = pWriter->pTq; + SDecoder decoder = {0}; + SDecoder* pDecoder = &decoder; + STqHandle handle; + + tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); + code = tDecodeSTqHandle(pDecoder, &handle); + if (code) goto _err; + code = tqMetaSaveHandle(pTq, handle.subKey, &handle); + if (code < 0) goto _err; + tDecoderClear(pDecoder); + + return code; + +_err: + tDecoderClear(pDecoder); + tqError("vgId:%d vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + return code; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 15cc6a7197..2bdd7fb513 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -27,6 +27,16 @@ struct SVSnapReader { // tsdb int8_t tsdbDone; STsdbSnapReader *pTsdbReader; + // tq + int8_t tqHandleDone; + STqSnapReader *pTqSnapReader; + int8_t tqOffsetDone; + STqOffsetReader *pTqOffsetReader; + // stream + int8_t streamTaskDone; + SStreamTaskReader *pStreamTaskReader; + int8_t streamStateDone; + SStreamStateReader *pStreamStateReader; // rsma int8_t rsmaDone; SRsmaSnapReader *pRsmaReader; @@ -104,7 +114,8 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) if (!pReader->tsdbDone) { // open if not if (pReader->pTsdbReader == NULL) { - code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, &pReader->pTsdbReader); + code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, + &pReader->pTsdbReader); if (code) goto _err; } @@ -122,6 +133,52 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) } } + // TQ ================ + if (!pReader->tqHandleDone) { + if (pReader->pTqSnapReader == NULL) { + code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqSnapReader); + if (code < 0) goto _err; + } + + code = tqSnapRead(pReader->pTqSnapReader, ppData); + if (code) { + goto _err; + } else { + if (*ppData) { + goto _exit; + } else { + pReader->tqHandleDone = 1; + code = tqSnapReaderClose(&pReader->pTqSnapReader); + if (code) goto _err; + } + } + } + if (!pReader->tqOffsetDone) { + if (pReader->pTqOffsetReader == NULL) { + code = tqOffsetReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqOffsetReader); + if (code < 0) goto _err; + } + + code = tqOffsetSnapRead(pReader->pTqOffsetReader, ppData); + if (code) { + goto _err; + } else { + if (*ppData) { + goto _exit; + } else { + pReader->tqHandleDone = 1; + code = tqOffsetReaderClose(&pReader->pTqOffsetReader); + if (code) goto _err; + } + } + } + + // STREAM ============ + if (!pReader->streamTaskDone) { + } + if (!pReader->streamStateDone) { + } + // RSMA ============== if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) { // open if not @@ -177,6 +234,12 @@ struct SVSnapWriter { SMetaSnapWriter *pMetaSnapWriter; // tsdb STsdbSnapWriter *pTsdbSnapWriter; + // tq + STqSnapWriter *pTqSnapWriter; + STqOffsetWriter *pTqOffsetWriter; + // stream + SStreamTaskWriter *pStreamTaskWriter; + SStreamStateWriter *pStreamStateWriter; // rsma SRsmaSnapWriter *pRsmaSnapWriter; }; @@ -301,6 +364,14 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData); if (code) goto _err; } break; + case SNAP_DATA_TQ_HANDLE: { + } break; + case SNAP_DATA_TQ_OFFSET: { + } break; + case SNAP_DATA_STREAM_TASK: { + } break; + case SNAP_DATA_STREAM_STATE: { + } break; case SNAP_DATA_RSMA1: case SNAP_DATA_RSMA2: { // rsma1/rsma2 @@ -332,4 +403,4 @@ _err: vError("vgId:%d vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), tstrerror(code), pHdr->index, pHdr->type, nData); return code; -} \ No newline at end of file +} diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index bd0f6fb1a8..2b29012040 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -62,6 +62,11 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) { return 0; } +int32_t walPreRefVer(SWalRef *pRef, int64_t ver) { + pRef->refVer = ver; + return 0; +} + void walUnrefVer(SWalRef *pRef) { pRef->refId = -1; pRef->refFile = -1; diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index a29c7a1309..eaf43ba7d7 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -26,7 +26,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { pIter = taosHashIterate(pWal->pRefHash, pIter); if (pIter == NULL) break; SWalRef *pRef = (SWalRef *)pIter; - if (pRef->refVer != -1) { + if (pRef->refVer != -1 && pRef->refVer <= ver) { taosHashCancelIterate(pWal->pRefHash, pIter); return -1; }