diff --git a/include/libs/stream/streamSnapshot.h b/include/libs/stream/streamSnapshot.h index fdce6ea121..a47bf9e381 100644 --- a/include/libs/stream/streamSnapshot.h +++ b/include/libs/stream/streamSnapshot.h @@ -29,6 +29,6 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si // SMetaSnapWriter ======================================== int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSnapWriter** ppWriter); int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData); -int32_t streamSnapWriterClose(SStreamSnapWriter** ppWriter, int8_t rollback); +int32_t streamSnapWriterClose(SStreamSnapWriter* ppWriter, int8_t rollback); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 61afa13974..ff854e4a2e 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -152,7 +152,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path) { handle->currFileIdx = 0; handle->pFileList = list; - handle->fd = taosOpenFile(taosArrayGetP(handle->pFileList, handle->currFileIdx), TD_FILE_READ); + handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, handle->currFileIdx), TD_FILE_READ); if (handle->fd == NULL) { goto _err; } @@ -213,7 +213,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); - int64_t nread = taosReadFile(pHandle->fd, buf, kBlockSize); + int64_t nread = taosReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize); if (nread == -1) { // handle later return -1; @@ -227,7 +227,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); pHandle->fd = taosOpenFile(item->name, TD_FILE_READ); // handle err later - nread = taosReadFile(pHandle->fd, buf, kBlockSize); + nread = taosReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize); } SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf; @@ -248,47 +248,58 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSna if (pWriter == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - const char* path = NULL; - // if (streamSnapHandleInit(&pWriter->handle, (char*)path) < 0) { - // return -1; - // } + SStreamSnapHandle* handle = &pWriter->handle; + + const char* path = NULL; SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile)); pFile->path = taosStrdup(path); - pFile->pSst = taosArrayInit(16, sizeof(void*)); - SArray* list = taosArrayInit(64, sizeof(SBackendFileItem)); - // SBackendFileItem item; + SBackendFileItem item; + item.name = taosStrdup((char*)ROCKSDB_CURRENT); + item.type = ROCKSDB_CURRENT_TYPE; + taosArrayPush(list, &item); - // // current - // item.name = (char*)ROCKSDB_CURRENT; - // item.type = ROCKSDB_CURRENT_TYPE; - // taosArrayPush(list, &item); - // // mainfest - // item.name = pFile->pMainfest; - // item.type = ROCKSDB_MAINFEST_TYPE; - // taosArrayPush(list, &item); - // // options - // item.name = pFile->pOptions; - // item.type = ROCKSDB_OPTIONS_TYPE; - // taosArrayPush(list, &item); - // // sst - // for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { - // char* sst = taosArrayGetP(pFile->pSst, i); - // item.name = sst; - // item.type = ROCKSDB_SST_TYPE; - // taosArrayPush(list, &item); - // } - // // meta - // item.name = pFile->pCheckpointMeta; - // item.type = ROCKSDB_CHECKPOINT_META_TYPE; - // taosArrayPush(list, &item); + handle->pBackendFile = pFile; + handle->pFileList = list; + handle->currFileIdx = 0; + handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, handle->currFileIdx), TD_FILE_WRITE); *ppWriter = pWriter; return 0; } + int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { + SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData; + + SStreamSnapHandle* handle = &pWriter->handle; + SBackendFileItem* pItem = taosArrayGetP(handle->pFileList, handle->currFileIdx); + if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) { + taosWriteFile(handle->fd, pHdr->data, pHdr->size); + } else { + taosCloseFile(&handle->fd); + + SBackendFileItem item; + item.name = taosStrdup(pHdr->name); + item.type = pHdr->type; + taosArrayPush(handle->pFileList, &item); + + handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, taosArrayGetSize(handle->pFileList) - 1), TD_FILE_WRITE); + handle->currFileIdx += 1; + } + // impl later return 0; } -int32_t streamSnapWriterClose(SStreamSnapWriter** ppWriter, int8_t rollback) { return 0; } +int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) { + SStreamSnapHandle* handle = &pWriter->handle; + for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) { + SBackendFileItem* item = taosArrayGet(handle->pFileList, i); + taosMemoryFree(item->name); + } + + streamSnapHandleDestroy(handle); + taosMemoryFree(pWriter); + + return 0; +}