From b3c056e6cbd18152fe63d5db9217943795cb1a13 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 8 Nov 2023 15:12:47 +0800 Subject: [PATCH] fix stream transfer err --- source/libs/stream/src/streamSnapshot.c | 31 ++++++++++++++----------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index d988d242c8..655814bb19 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -17,8 +17,8 @@ #include "query.h" #include "rocksdb/c.h" #include "streamBackendRocksdb.h" -#include "tcommon.h" #include "streamInt.h" +#include "tcommon.h" enum SBackendFileType { ROCKSDB_OPTIONS_TYPE = 1, @@ -51,6 +51,7 @@ struct SStreamSnapHandle { int8_t filetype; SArray* pFileList; int32_t currFileIdx; + int8_t delFlag; }; struct SStreamSnapBlockHdr { int8_t type; @@ -126,7 +127,8 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk stInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir); streamBackendAddInUseChkp(pMeta, chkpId); } else { - stWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, tdir); + stWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, + tdir); } } @@ -146,6 +148,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk taosMemoryFree(tdir); return code; } + pHandle->delFlag = 1; chkpId = 0; } @@ -271,8 +274,8 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) { if (handle->checkpointId == 0) { // del tmp dir - if (taosIsDir(pFile->path)) { - taosRemoveDir(pFile->path); + if (pFile && taosIsDir(pFile->path)) { + if (handle->delFlag) taosRemoveDir(pFile->path); } } else { streamBackendDelInUseChkp(handle->handle, handle->checkpointId); @@ -335,24 +338,24 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si } else { pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ); stDebug("%s open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER, - item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); } } stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER, - item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); if (nread == -1) { taosMemoryFree(buf); code = TAOS_SYSTEM_ERROR(terrno); stError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name, - item->type, tstrerror(code)); + item->type, tstrerror(code)); return -1; } else if (nread > 0 && nread <= kBlockSize) { // left bytes less than kBlockSize stDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER, - item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); pHandle->offset += nread; if (pHandle->offset >= item->size || nread < kBlockSize) { taosCloseFile(&pHandle->fd); @@ -361,7 +364,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si } } else { stDebug("%s no data read, close file no.%d, move to next file, open and read", STREAM_STATE_TRANSFER, - pHandle->currFileIdx); + pHandle->currFileIdx); taosCloseFile(&pHandle->fd); pHandle->offset = 0; pHandle->currFileIdx += 1; @@ -379,7 +382,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si pHandle->offset += nread; stDebug("%s open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", - STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); } SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf; @@ -434,8 +437,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pHandle->fd == NULL) { code = TAOS_SYSTEM_ERROR(terrno); - stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name, - tstrerror(code)); + stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, + pHdr->name, tstrerror(code)); } } @@ -461,8 +464,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pHandle->fd == NULL) { code = TAOS_SYSTEM_ERROR(terrno); - stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name, - tstrerror(code)); + stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, + pHdr->name, tstrerror(code)); } taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);