From 40eb50676dc7fa2a7d96e26293359dff93e00958 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 17 Jul 2023 11:04:19 +0000 Subject: [PATCH] add checkpoint --- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 4 ++++ source/dnode/vnode/src/vnd/vnodeSnapshot.c | 8 ++++++-- source/libs/stream/src/streamSnapshot.c | 6 ++++-- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 666ee7c5a4..f7bae25043 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -49,6 +49,10 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "checkpoints"); if (streamSnapReaderOpen(pTq, sver, ever, tdir, &pSnapReader) == 0) { pReader->complete = 1; + } else { + code = -1; + taosMemoryFree(pReader); + goto _err; } pReader->pReaderImpl = pSnapReader; diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 146f40fc7e..2ef32a65b4 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -90,7 +90,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) // CONFIG ============== // FIXME: if commit multiple times and the config changed? if (!pReader->cfgDone) { - char fName[TSDB_FILENAME_LEN]; + char fName[TSDB_FILENAME_LEN]; int32_t offset = 0; vnodeGetPrimaryDir(pReader->pVnode->path, pReader->pVnode->pTfs, fName, TSDB_FILENAME_LEN); @@ -242,7 +242,11 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) if (pReader->pStreamStateReader == NULL) { code = streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamStateReader); - if (code) goto _err; + if (code) { + pReader->streamStateDone = 1; + pReader->pStreamStateReader = NULL; + goto _err; + } } code = streamStateSnapRead(pReader->pStreamStateReader, ppData); if (code) { diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 763e6d6b70..b1b05ff0ca 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -98,8 +98,10 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) { } SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile)); + pHandle->pBackendFile = pFile; pHandle->checkpointId = 0; pHandle->seraial = 0; + pFile->path = taosStrdup(path); pFile->pSst = taosArrayInit(16, sizeof(void*)); @@ -133,7 +135,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) { if (pFile->pCurrent == NULL) { code = -1; - return code; + goto _err; } SArray* list = taosArrayInit(64, sizeof(SBackendFileItem)); @@ -197,7 +199,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) { taosMemoryFree(pFile->pMainfest); taosMemoryFree(pFile->pOptions); taosMemoryFree(pFile->path); - for (int i = 0; pFile->pSst != NULL && i < taosArrayGetSize(pFile->pSst); i++) { + for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { char* sst = taosArrayGetP(pFile->pSst, i); taosMemoryFree(sst); }