Merge pull request #23596 from taosdata/fix/streamTransferError

fix stream transfer err
This commit is contained in:
Haojun Liao 2023-11-09 10:51:22 +08:00 committed by GitHub
commit ef646873ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 6 additions and 4 deletions

View File

@ -19,7 +19,6 @@
#include "streamBackendRocksdb.h" #include "streamBackendRocksdb.h"
#include "streamInt.h" #include "streamInt.h"
#include "tcommon.h" #include "tcommon.h"
#include "streamInt.h"
enum SBackendFileType { enum SBackendFileType {
ROCKSDB_OPTIONS_TYPE = 1, ROCKSDB_OPTIONS_TYPE = 1,
@ -52,6 +51,7 @@ struct SStreamSnapHandle {
int8_t filetype; int8_t filetype;
SArray* pFileList; SArray* pFileList;
int32_t currFileIdx; int32_t currFileIdx;
int8_t delFlag; // 0 : not del, 1: del
}; };
struct SStreamSnapBlockHdr { struct SStreamSnapBlockHdr {
int8_t type; int8_t type;
@ -148,6 +148,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
taosMemoryFree(tdir); taosMemoryFree(tdir);
return code; return code;
} }
pHandle->delFlag = 1;
chkpId = 0; chkpId = 0;
} }
@ -274,7 +275,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
if (handle->checkpointId == 0) { if (handle->checkpointId == 0) {
// del tmp dir // del tmp dir
if (pFile && taosIsDir(pFile->path)) { if (pFile && taosIsDir(pFile->path)) {
taosRemoveDir(pFile->path); if (handle->delFlag) taosRemoveDir(pFile->path);
} }
} else { } else {
streamBackendDelInUseChkp(handle->handle, handle->checkpointId); streamBackendDelInUseChkp(handle->handle, handle->checkpointId);
@ -344,10 +345,10 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER, stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
if(buf == NULL){ if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
if (nread == -1) { if (nread == -1) {
taosMemoryFree(buf); taosMemoryFree(buf);
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
@ -423,6 +424,7 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path
pHandle->pFileList = list; pHandle->pFileList = list;
pHandle->currFileIdx = 0; pHandle->currFileIdx = 0;
pHandle->offset = 0; pHandle->offset = 0;
pHandle->delFlag = 0;
*ppWriter = pWriter; *ppWriter = pWriter;
return 0; return 0;