From d67fdb5e46f75bbda6699338ecaed08c09dec74d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 17 Oct 2023 21:08:31 +0800 Subject: [PATCH] fix transfer crash --- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 2 +- source/libs/stream/src/streamSnapshot.c | 22 ++++++++++++++----- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index b2835f2198..b7c440dfa5 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -139,7 +139,7 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS pWriter->sver = sver; pWriter->ever = ever; - sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "received"); + sprintf(tdir, "%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM); taosMkDir(tdir); SStreamSnapWriter* pSnapWriter = NULL; diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 88e47e127b..70c3031eb2 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -393,8 +393,10 @@ _NEXT: } item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx); - qDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER, - item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx); + qDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 + ", file no.%d, total set:%d, current set idx: %d", + STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx, + (int)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset); @@ -422,9 +424,16 @@ _NEXT: if (pSnapFile->currFileIdx >= taosArrayGetSize(pSnapFile->pFileList)) { // finish - *ppData = NULL; - *size = 0; - return 0; + if (pHandle->currIdx + 1 < taosArrayGetSize(pHandle->pDbSnapSet)) { + // skip to next snap set + pHandle->currIdx += 1; + pSnapFile = taosArrayGet(pHandle->pDbSnapSet, pHandle->currIdx); + goto _NEXT; + } else { + *ppData = NULL; + *size = 0; + return 0; + } } item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx); pSnapFile->fd = streamOpenFile(pSnapFile->path, item->name, TD_FILE_READ); @@ -541,10 +550,11 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa sprintf(idstr, "0x%" PRIx64 "-0x%x", snapInfo.streamId, (int32_t)(snapInfo.taskId)); char* path = taosMemoryCalloc(1, strlen(pHandle->metaPath) + 256); - sprintf(path, "%s%s%s%s%s%s%s%" PRId64 "", path, TD_DIRSEP, idstr, TD_DIRSEP, "checkpoints", TD_DIRSEP, + sprintf(path, "%s%s%s%s%s%s%s%" PRId64 "", pHandle->metaPath, TD_DIRSEP, idstr, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", snapInfo.chkpId); if (!taosIsDir(path)) { code = taosMulMkDir(path); + qInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path); ASSERT(code == 0); }