diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index a4a507fc13..64ff7b7569 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -51,13 +51,14 @@ typedef struct SBackendSnapFiles2 { char* pCheckpointMeta; char* path; - int64_t checkpointId; - int64_t seraial; - int64_t offset; - TdFilePtr fd; - int8_t filetype; - SArray* pFileList; - int32_t currFileIdx; + int64_t checkpointId; + int64_t seraial; + int64_t offset; + TdFilePtr fd; + int8_t filetype; + SArray* pFileList; + int32_t currFileIdx; + SStreamTaskSnap snapInfo; } SBackendSnapFile2; struct SStreamSnapHandle { @@ -78,10 +79,13 @@ struct SStreamSnapBlockHdr { int8_t type; int8_t flag; int64_t index; - char name[128]; - int64_t totalSize; - int64_t size; - uint8_t data[]; + // int64_t streamId; + // int64_t taskId; + SStreamTaskSnap snapInfo; + char name[128]; + int64_t totalSize; + int64_t size; + uint8_t data[]; }; struct SStreamSnapReader { void* pMeta; @@ -234,6 +238,7 @@ int32_t streamBackendSnapInitFile(char* path, SStreamTaskSnap* pSnap, SBackendSn pSnapFile->pSst = taosArrayInit(16, sizeof(void*)); pSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem)); pSnapFile->path = snapPath; + pSnapFile->snapInfo = *pSnap; if ((code = snapFileReadMeta(pSnapFile)) != 0) { goto _ERROR; } @@ -355,31 +360,43 @@ int32_t streamSnapReaderClose(SStreamSnapReader* pReader) { taosMemoryFree(pReader); return 0; } + int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* size) { // impl later int32_t code = 0; SStreamSnapHandle* pHandle = &pReader->handle; - SBanckendFile* pFile = pHandle->pBackendFile; + int32_t idx = pHandle->currIdx; + SBackendSnapFile2* pSnapFile = taosArrayGet(pHandle->pBackendSnapSet, idx); + SBackendFileItem* item = NULL; - SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); +_NEXT: + + if (pSnapFile->fd == NULL) { + if (pSnapFile->currFileIdx >= taosArrayGetSize(pSnapFile->pFileList)) { + if (pHandle->currIdx + 1 < taosArrayGetSize(pHandle->pBackendSnapSet)) { + pHandle->currIdx += 1; + + pSnapFile = taosArrayGet(pHandle->pBackendSnapSet, pHandle->currIdx); + goto _NEXT; + } else { + *ppData = NULL; + *size = 0; + return 0; + } - if (pHandle->fd == NULL) { - if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) { - // finish - *ppData = NULL; - *size = 0; - return 0; } else { - pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ); + item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx); + pSnapFile->fd = streamOpenFile(pSnapFile->path, item->name, TD_FILE_READ); qDebug("%s open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER, - item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx); } } qDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER, - item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx); + uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); - int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); + int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset); if (nread == -1) { code = TAOS_SYSTEM_ERROR(terrno); qError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name, @@ -388,43 +405,44 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si } else if (nread > 0 && nread <= kBlockSize) { // left bytes less than kBlockSize qDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER, - item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); - pHandle->offset += nread; - if (pHandle->offset >= item->size || nread < kBlockSize) { - taosCloseFile(&pHandle->fd); - pHandle->offset = 0; - pHandle->currFileIdx += 1; + item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx); + pSnapFile->offset += nread; + if (pSnapFile->offset >= item->size || nread < kBlockSize) { + taosCloseFile(&pSnapFile->fd); + pSnapFile->offset = 0; + pSnapFile->currFileIdx += 1; } } else { qDebug("%s no data read, close file no.%d, move to next file, open and read", STREAM_STATE_TRANSFER, - pHandle->currFileIdx); - taosCloseFile(&pHandle->fd); - pHandle->offset = 0; - pHandle->currFileIdx += 1; + pSnapFile->currFileIdx); + taosCloseFile(&pSnapFile->fd); + pSnapFile->offset = 0; + pSnapFile->currFileIdx += 1; - if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) { + if (pSnapFile->currFileIdx >= taosArrayGetSize(pSnapFile->pFileList)) { // finish *ppData = NULL; *size = 0; return 0; } - item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); - pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ); + item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx); + pSnapFile->fd = streamOpenFile(pSnapFile->path, item->name, TD_FILE_READ); - nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); - pHandle->offset += nread; + nread = taosPReadFile(pSnapFile->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset); + pSnapFile->offset += nread; qDebug("%s open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", - STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx); } SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf; pHdr->size = nread; pHdr->type = item->type; pHdr->totalSize = item->size; + pHdr->snapInfo = pSnapFile->snapInfo; memcpy(pHdr->name, item->name, strlen(item->name)); - pHandle->seraial += nread; + pSnapFile->seraial += nread; *ppData = buf; *size = sizeof(SStreamSnapBlockHdr) + nread;