From 22c85fcbad47faf3e445548f09a2504b7ab6d642 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 25 Jul 2023 13:07:49 +0000 Subject: [PATCH] fix stream state transfer --- source/libs/stream/src/streamSnapshot.c | 84 ++++++++++++------------- 1 file changed, 39 insertions(+), 45 deletions(-) diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 43cb6511b8..5ae0f6f30e 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -89,7 +89,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle); sprintf(fullname, "%s%s%s", path, TD_DIRSEP, file); \ } while (0) -int32_t getFileSize(char* path, char* name, int64_t* sz) { +int32_t streamGetFileSize(char* path, char* name, int64_t* sz) { int ret = 0; char* fullname = taosMemoryCalloc(1, strlen(path) + 32); @@ -100,6 +100,13 @@ int32_t getFileSize(char* path, char* name, int64_t* sz) { return ret; } + +TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) { + char fullname[256] = {0}; + STREAM_ROCKSDB_BUILD_FULLNAME(path, name, fullname); + return taosOpenFile(fullname, opt); +} + int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId) { // impl later int len = strlen(path); @@ -140,7 +147,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk continue; } if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) { - pFile->pMainfest = taosStrdup(name); + pFile->pOptions = taosStrdup(name); continue; } if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_META) && @@ -168,50 +175,39 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk // current item.name = pFile->pCurrent; item.type = ROCKSDB_CURRENT_TYPE; - getFileSize(pFile->path, item.name, &item.size); + streamGetFileSize(pFile->path, item.name, &item.size); taosArrayPush(list, &item); // mainfest item.name = pFile->pMainfest; item.type = ROCKSDB_MAINFEST_TYPE; - getFileSize(pFile->path, item.name, &item.size); + streamGetFileSize(pFile->path, item.name, &item.size); taosArrayPush(list, &item); // options item.name = pFile->pOptions; item.type = ROCKSDB_OPTIONS_TYPE; - getFileSize(pFile->path, item.name, &item.size); + streamGetFileSize(pFile->path, item.name, &item.size); taosArrayPush(list, &item); // sst for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { char* sst = taosArrayGetP(pFile->pSst, i); item.name = sst; item.type = ROCKSDB_SST_TYPE; - getFileSize(pFile->path, item.name, &item.size); + streamGetFileSize(pFile->path, item.name, &item.size); taosArrayPush(list, &item); } // meta item.name = pFile->pCheckpointMeta; item.type = ROCKSDB_CHECKPOINT_META_TYPE; - getFileSize(pFile->path, item.name, &item.size); - taosArrayPush(list, &item); + if (streamGetFileSize(pFile->path, item.name, &item.size) == 0) { + taosArrayPush(list, &item); + } pHandle->pBackendFile = pFile; pHandle->currFileIdx = 0; pHandle->pFileList = list; - - char fullname[256] = {0}; - char* file = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, pHandle->currFileIdx))->name; - STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, file, fullname); - - pHandle->fd = taosOpenFile(fullname, TD_FILE_READ); - if (pHandle->fd == NULL) { - qError("stream-state failed to open %s, reason: %s", tdir, tstrerror(errno)); - tdir = NULL; - goto _err; - } - qDebug("stream-state open file %s, current offset %" PRId64 "", file, (int64_t)0); pHandle->seraial = 0; pHandle->offset = 0; return 0; @@ -275,29 +271,35 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); - qDebug("stream-state start to read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name, - (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + if (pHandle->fd == NULL) { + if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) { + // finish + *ppData = NULL; + *size = 0; + return 0; + } else { + pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ); + qDebug("stream-state open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", item->name, + (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + } + } + qDebug("stream-state start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", item->name, + (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); if (nread == -1) { code = TAOS_SYSTEM_ERROR(terrno); - qError("stream-state snap failed to read snap, file name:%s, type:%d, reason:%s", item->name, item->type, + qError("stream-state snap failed to read snap, file name:%s, type:%d,reason:%s", item->name, item->type, tstrerror(code)); - qDebug("stream-state failed to read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", - item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); - return code; - // handle later return -1; } else if (nread > 0 && nread <= kBlockSize) { // left bytes less than kBlockSize - qDebug("stream-state read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name, + qDebug("stream-state read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); pHandle->offset += nread; if (pHandle->offset >= item->size || nread < kBlockSize) { taosCloseFile(&pHandle->fd); - qDebug("stream-state close file no.%d, move to next file, next file no.%d", pHandle->currFileIdx, - pHandle->currFileIdx + 1); pHandle->offset = 0; pHandle->currFileIdx += 1; } @@ -314,15 +316,13 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si return 0; } item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); - char fullname[256] = {0}; - STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, item->name, fullname); + pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ); - pHandle->fd = taosOpenFile(fullname, TD_FILE_READ); nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); pHandle->offset += nread; - qDebug("read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name, - (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + qDebug("stream-state open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", + item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); } SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf; @@ -361,11 +361,8 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path pHandle->currFileIdx = 0; pHandle->offset = 0; - char fullname[256] = {0}; - char* name = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, pHandle->currFileIdx))->name; - STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, name, fullname); - - pHandle->fd = taosOpenFile(fullname, TD_FILE_WRITE); + SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); + pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_WRITE); *ppWriter = pWriter; return 0; } @@ -394,11 +391,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa item.type = pHdr->type; taosArrayPush(pHandle->pFileList, &item); - char fullname[256] = {0}; - - char* name = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, pHandle->currFileIdx))->name; - STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, name, fullname); - pHandle->fd = taosOpenFile(fullname, TD_FILE_WRITE); + SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); + pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_WRITE); taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset); pHandle->offset += pHdr->size;