From fc2d6744e290121826b9827bc1e4e3916a654eba Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 20 Jul 2023 10:52:34 +0000 Subject: [PATCH] fix stream state transfer --- source/libs/stream/src/streamSnapshot.c | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 50187cc01e..086da597f6 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -198,6 +198,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk tdir = NULL; goto _err; } + qDebug("open file %s, current offset %" PRId64 "", file, (int64_t)0); pHandle->seraial = 0; pHandle->offset = 0; return 0; @@ -261,24 +262,37 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); + qDebug("start to read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name, + (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); if (nread == -1) { code = TAOS_SYSTEM_ERROR(terrno); qError("stream snap failed to read snap, file name:%s, type:%d, reason:%s", item->name, item->type, tstrerror(code)); + qDebug("failed to read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name, + (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); return code; // handle later return -1; } else if (nread > 0 && nread <= kBlockSize) { // left bytes less than kBlockSize + qDebug("read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name, + (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); pHandle->offset += nread; if (pHandle->offset >= item->size || nread < kBlockSize) { taosCloseFile(&pHandle->fd); + qDebug("close file no.%d, move to next file, next file no.%d", pHandle->currFileIdx, pHandle->currFileIdx + 1); pHandle->offset = 0; pHandle->currFileIdx += 1; } } else { + qDebug("no data read, close file no.%d, move to next file, open and read", pHandle->currFileIdx); + taosCloseFile(&pHandle->fd); + pHandle->offset = 0; + pHandle->currFileIdx += 1; + if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) { // finish *ppData = NULL; @@ -292,6 +306,9 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); pHandle->offset += nread; + + qDebug("read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name, + (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); } SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf;