From 6a27e81964e22e6ef44f780f74282dd98be87112 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 24 Jul 2023 02:05:14 +0000 Subject: [PATCH] fix stream state transfer --- source/libs/stream/src/streamSnapshot.c | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index a036bcd877..44aa69a070 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -200,7 +200,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk tdir = NULL; goto _err; } - qDebug("open file %s, current offset %" PRId64 "", file, (int64_t)0); + qDebug("stream-state open file %s, current offset %" PRId64 "", file, (int64_t)0); pHandle->seraial = 0; pHandle->offset = 0; return 0; @@ -264,33 +264,34 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); - qDebug("start to read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name, + qDebug("stream-state start to read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); if (nread == -1) { code = TAOS_SYSTEM_ERROR(terrno); - qError("stream snap failed to read snap, file name:%s, type:%d, reason:%s", item->name, item->type, + qError("stream-state snap failed to read snap, file name:%s, type:%d, reason:%s", item->name, item->type, tstrerror(code)); - qDebug("failed to read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name, - (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + qDebug("stream-state failed to read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", + item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); return code; // handle later return -1; } else if (nread > 0 && nread <= kBlockSize) { // left bytes less than kBlockSize - qDebug("read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name, + qDebug("stream-state read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); pHandle->offset += nread; if (pHandle->offset >= item->size || nread < kBlockSize) { taosCloseFile(&pHandle->fd); - qDebug("close file no.%d, move to next file, next file no.%d", pHandle->currFileIdx, pHandle->currFileIdx + 1); + qDebug("stream-state close file no.%d, move to next file, next file no.%d", pHandle->currFileIdx, + pHandle->currFileIdx + 1); pHandle->offset = 0; pHandle->currFileIdx += 1; } } else { - qDebug("no data read, close file no.%d, move to next file, open and read", pHandle->currFileIdx); + qDebug("stream-state no data read, close file no.%d, move to next file, open and read", pHandle->currFileIdx); taosCloseFile(&pHandle->fd); pHandle->offset = 0; pHandle->currFileIdx += 1;