fix stream transfer err

This commit is contained in:
yihaoDeng 2023-11-08 15:12:47 +08:00
parent 634264087e
commit b3c056e6cb
1 changed files with 17 additions and 14 deletions

View File

@ -17,8 +17,8 @@
#include "query.h" #include "query.h"
#include "rocksdb/c.h" #include "rocksdb/c.h"
#include "streamBackendRocksdb.h" #include "streamBackendRocksdb.h"
#include "tcommon.h"
#include "streamInt.h" #include "streamInt.h"
#include "tcommon.h"
enum SBackendFileType { enum SBackendFileType {
ROCKSDB_OPTIONS_TYPE = 1, ROCKSDB_OPTIONS_TYPE = 1,
@ -51,6 +51,7 @@ struct SStreamSnapHandle {
int8_t filetype; int8_t filetype;
SArray* pFileList; SArray* pFileList;
int32_t currFileIdx; int32_t currFileIdx;
int8_t delFlag;
}; };
struct SStreamSnapBlockHdr { struct SStreamSnapBlockHdr {
int8_t type; int8_t type;
@ -126,7 +127,8 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
stInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir); stInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir);
streamBackendAddInUseChkp(pMeta, chkpId); streamBackendAddInUseChkp(pMeta, chkpId);
} else { } else {
stWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, tdir); stWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER,
tdir);
} }
} }
@ -146,6 +148,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
taosMemoryFree(tdir); taosMemoryFree(tdir);
return code; return code;
} }
pHandle->delFlag = 1;
chkpId = 0; chkpId = 0;
} }
@ -271,8 +274,8 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
if (handle->checkpointId == 0) { if (handle->checkpointId == 0) {
// del tmp dir // del tmp dir
if (taosIsDir(pFile->path)) { if (pFile && taosIsDir(pFile->path)) {
taosRemoveDir(pFile->path); if (handle->delFlag) taosRemoveDir(pFile->path);
} }
} else { } else {
streamBackendDelInUseChkp(handle->handle, handle->checkpointId); streamBackendDelInUseChkp(handle->handle, handle->checkpointId);
@ -335,24 +338,24 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
} else { } else {
pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ); pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ);
stDebug("%s open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER, stDebug("%s open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
} }
} }
stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER, stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
if (nread == -1) { if (nread == -1) {
taosMemoryFree(buf); taosMemoryFree(buf);
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
stError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name, stError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name,
item->type, tstrerror(code)); item->type, tstrerror(code));
return -1; return -1;
} else if (nread > 0 && nread <= kBlockSize) { } else if (nread > 0 && nread <= kBlockSize) {
// left bytes less than kBlockSize // left bytes less than kBlockSize
stDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER, stDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
pHandle->offset += nread; pHandle->offset += nread;
if (pHandle->offset >= item->size || nread < kBlockSize) { if (pHandle->offset >= item->size || nread < kBlockSize) {
taosCloseFile(&pHandle->fd); taosCloseFile(&pHandle->fd);
@ -361,7 +364,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
} }
} else { } else {
stDebug("%s no data read, close file no.%d, move to next file, open and read", STREAM_STATE_TRANSFER, stDebug("%s no data read, close file no.%d, move to next file, open and read", STREAM_STATE_TRANSFER,
pHandle->currFileIdx); pHandle->currFileIdx);
taosCloseFile(&pHandle->fd); taosCloseFile(&pHandle->fd);
pHandle->offset = 0; pHandle->offset = 0;
pHandle->currFileIdx += 1; pHandle->currFileIdx += 1;
@ -379,7 +382,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
pHandle->offset += nread; pHandle->offset += nread;
stDebug("%s open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", stDebug("%s open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d",
STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
} }
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf; SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf;
@ -434,8 +437,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) { if (pHandle->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name, stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP,
tstrerror(code)); pHdr->name, tstrerror(code));
} }
} }
@ -461,8 +464,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) { if (pHandle->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name, stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP,
tstrerror(code)); pHdr->name, tstrerror(code));
} }
taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset); taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);