diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index d6dd4dccc8..4d116efe0f 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -14,6 +14,7 @@ */ #include "streamSnapshot.h" +#include "query.h" #include "rocksdb/c.h" #include "tcommon.h" @@ -26,8 +27,9 @@ enum SBackendFileType { }; typedef struct SBackendFileItem { - char* name; - int8_t type; + char* name; + int8_t type; + int64_t size; } SBackendFileItem; typedef struct SBackendFile { char* pCurrent; @@ -54,6 +56,7 @@ struct SStreamSnapBlockHdr { int64_t index; char name[128]; int64_t size; + int64_t totalSize; uint8_t data[]; }; struct SStreamSnapReader { @@ -127,25 +130,30 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path) { // current item.name = pFile->pCurrent; item.type = ROCKSDB_CURRENT_TYPE; + taosStatFile(pFile->pCurrent, &item.size, NULL); taosArrayPush(list, &item); // mainfest item.name = pFile->pMainfest; item.type = ROCKSDB_MAINFEST_TYPE; + taosStatFile(pFile->pMainfest, &item.size, NULL); taosArrayPush(list, &item); // options item.name = pFile->pOptions; item.type = ROCKSDB_OPTIONS_TYPE; + taosStatFile(pFile->pOptions, &item.size, NULL); taosArrayPush(list, &item); // sst for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { char* sst = taosArrayGetP(pFile->pSst, i); item.name = sst; item.type = ROCKSDB_SST_TYPE; + taosStatFile(sst, &item.size, NULL); taosArrayPush(list, &item); } // meta item.name = pFile->pCheckpointMeta; item.type = ROCKSDB_CHECKPOINT_META_TYPE; + taosStatFile(pFile->pCheckpointMeta, &item.size, NULL); taosArrayPush(list, &item); handle->pBackendFile = pFile; @@ -209,30 +217,43 @@ int32_t streamSnapReaderClose(SStreamSnapReader* pReader) { } int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* size) { // impl later + int32_t code = 0; SStreamSnapHandle* pHandle = &pReader->handle; SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); - int64_t nread = taosReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize); + int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); if (nread == -1) { + code = TAOS_SYSTEM_ERROR(terrno); + qError("stream snap failed to read snap, file name:%s, reason:%s", item->name, tstrerror(code)); + return code; // handle later return -1; + } else if (nread <= kBlockSize) { + // left bytes less than kBlockSize + pHandle->offset += nread; + if (pHandle->offset >= item->size || nread < kBlockSize) { + taosCloseFile(&pHandle->fd); + pHandle->currFileIdx += 1; + } } else { - taosCloseFile(&pHandle->fd); - pHandle->currFileIdx += 1; if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) { // finish return 0; } item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); + + pHandle->offset = 0; pHandle->fd = taosOpenFile(item->name, TD_FILE_READ); // handle err later nread = taosReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize); + pHandle->offset += nread; } SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf; pHdr->size = nread; pHdr->type = item->type; + pHdr->totalSize = item->size; memcpy(pHdr->name, item->name, strlen(item->name)); pHandle->seraial += nread; @@ -264,18 +285,25 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSna handle->pFileList = list; handle->currFileIdx = 0; + handle->offset = 0; handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, handle->currFileIdx), TD_FILE_WRITE); *ppWriter = pWriter; return 0; } int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData; + int32_t code = 0; - SStreamSnapHandle* handle = &pWriter->handle; - SBackendFileItem* pItem = taosArrayGetP(handle->pFileList, handle->currFileIdx); + SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData; + SStreamSnapHandle* handle = &pWriter->handle; + SBackendFileItem* pItem = taosArrayGetP(handle->pFileList, handle->currFileIdx); if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) { - taosWriteFile(handle->fd, pHdr->data, pHdr->size); + if (taosPWriteFile(handle->fd, pHdr->data, pHdr->size, handle->offset) != pHdr->size) { + code = TAOS_SYSTEM_ERROR(terrno); + qError("stream snap failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code)); + return code; + } + handle->offset += pHdr->size; } else { taosCloseFile(&handle->fd); @@ -284,9 +312,12 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa item.type = pHdr->type; taosArrayPush(handle->pFileList, &item); - handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, taosArrayGetSize(handle->pFileList) - 1), TD_FILE_WRITE); - taosWriteFile(handle->fd, pHdr->data, pHdr->size); + handle->offset = 0; handle->currFileIdx += 1; + handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, taosArrayGetSize(handle->pFileList) - 1), TD_FILE_WRITE); + + taosPWriteFile(handle->fd, pHdr->data, pHdr->size, handle->offset); + handle->offset += pHdr->size; } // impl later