From d9e24566c0697f5f012832a1b20cc8bd99471630 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 20 Jun 2023 18:32:56 +0800 Subject: [PATCH] add checkpoint --- include/libs/stream/streamSnapshot.h | 15 +-- source/libs/stream/src/streamSnapshot.c | 159 ++++++++++++++++++++---- 2 files changed, 144 insertions(+), 30 deletions(-) diff --git a/include/libs/stream/streamSnapshot.h b/include/libs/stream/streamSnapshot.h index 881ed43d78..ec14f271e6 100644 --- a/include/libs/stream/streamSnapshot.h +++ b/include/libs/stream/streamSnapshot.h @@ -19,15 +19,16 @@ typedef struct SStreamSnapReader SStreamSnapReader; typedef struct StreamSnapWriter StreamSnapWriter; -typedef struct SStreamSnapHandle SStreamSnapHandle; +typedef struct SStreamSnapHandle SStreamSnapHandle; +typedef struct SStreamSnapBlockHdr SStreamSnapBlockHdr; -int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, void** ppReader); -int32_t streamSnapReaderClose(void** ppReader); -int32_t streamSnapRead(void* pReader, uint8_t** ppData); +int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSnapReader** ppReader); +int32_t streamSnapReaderClose(SStreamSnapReader* pReader); +int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* size); // SMetaSnapWriter ======================================== -int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, void** ppWriter); -int32_t streamSnapWrite(void* pWriter, uint8_t* pData, uint32_t nData); -int32_t streamSnapWriterClose(void** ppWriter, int8_t rollback); +int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, StreamSnapWriter** ppWriter); +int32_t streamSnapWrite(StreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData); +int32_t streamSnapWriterClose(StreamSnapWriter** ppWriter, int8_t rollback); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index affc21a37e..0868f80f59 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -17,42 +17,69 @@ #include "rocksdb/c.h" #include "tcommon.h" +enum SBackendFileType { + ROCKSDB_OPTIONS_TYPE = 1, + ROCKSDB_MAINFEST_TYPE = 2, + ROCKSDB_SST_TYPE = 3, + ROCKSDB_CURRENT_TYPE = 4, + ROCKSDB_CHECKPOINT_META_TYPE = 5, +}; + +typedef struct SBackendFileItem { + char* name; + int8_t type; +} SBackendFileItem; typedef struct SBackendFile { - SArray* pSst; char* pCurrent; char* pMainfest; char* pOptions; + SArray* pSst; char* pCheckpointMeta; + char* path; } SBanckendFile; struct SStreamSnapHandle { void* handle; - SArray* fileList; SBanckendFile* pBackendFile; + int64_t checkpointId; + int64_t seraial; + int64_t offset; + TdFilePtr fd; + int8_t filetype; + SArray* pFileList; + int32_t currFileIdx; +}; +struct SStreamSnapBlockHdr { + int8_t type; + int8_t flag; + int64_t index; + char name[128]; + int64_t size; + uint8_t data[]; }; - struct SStreamSnapReader { - void* pMeta; - int64_t sver; - int64_t ever; + void* pMeta; + int64_t sver; + int64_t ever; + SStreamSnapHandle handle; }; - -// SMetaSnapWriter ======================================== struct StreamSnapWriter { void* pMeta; int64_t sver; int64_t ever; }; +const char* ROCKSDB_OPTIONS = "OPTIONS"; +const char* ROCKSDB_MAINFEST = "MANIFEST"; +const char* ROCKSDB_SST = "sst"; +const char* ROCKSDB_CURRENT = "CURRENT"; +const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT"; +static int64_t kBlockSize = 64 * 1024; -const char* ROCKSDB_OPTIONS = "OPTIONS"; -const char* ROCKSDB_MAINFEST = "MANIFEST"; -const char* ROCKSDB_SST = "sst"; -const char* ROCKSDB_CURRENT = "CURRENT"; -const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT"; +int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path); +void streamSnapHandleDestroy(SStreamSnapHandle* handle); int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path) { // impl later int32_t code = 0; - handle->fileList = taosArrayInit(32, sizeof(void*)); TdDirPtr pDir = taosOpenDir(path); if (NULL == pDir) { @@ -60,6 +87,9 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path) { } SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile)); + handle->checkpointId = 0; + handle->seraial = 0; + pFile->path = taosStrdup(path); pFile->pSst = taosArrayInit(16, sizeof(void*)); TdDirEntryPtr pDirEntry; @@ -90,8 +120,47 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path) { } taosCloseDir(&pDir); + SArray* list = taosArrayInit(64, sizeof(SBackendFileItem)); + + SBackendFileItem item; + + item.name = pFile->pCurrent; + item.type = ROCKSDB_CURRENT_TYPE; + taosArrayPush(list, &item); + + item.name = pFile->pMainfest; + item.type = ROCKSDB_MAINFEST_TYPE; + taosArrayPush(list, &item); + + item.name = pFile->pOptions; + item.type = ROCKSDB_OPTIONS_TYPE; + taosArrayPush(list, &item); + + for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { + char* sst = taosArrayGetP(pFile->pSst, i); + item.name = sst; + item.type = ROCKSDB_SST_TYPE; + taosArrayPush(list, &item); + } + + item.name = pFile->pCheckpointMeta; + item.type = ROCKSDB_CHECKPOINT_META_TYPE; + taosArrayPush(list, &item); + handle->pBackendFile = pFile; + + handle->currFileIdx = 0; + handle->pFileList = list; + handle->fd = taosOpenFile(taosArrayGetP(handle->pFileList, handle->currFileIdx), TD_FILE_READ); + if (handle->fd == NULL) { + goto _err; + } + handle->seraial = 0; + handle->offset = 0; + return 0; _err: + streamSnapHandleDestroy(handle); + code = -1; return code; } @@ -101,37 +170,81 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) { taosMemoryFree(pFile->pCurrent); taosMemoryFree(pFile->pMainfest); taosMemoryFree(pFile->pOptions); - for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { + taosMemoryFree(pFile->path); + for (int i = 0; pFile->pSst != NULL && i < taosArrayGetSize(pFile->pSst); i++) { char* sst = taosArrayGetP(pFile->pSst, i); taosMemoryFree(sst); } taosArrayDestroy(pFile->pSst); + + taosArrayDestroy(handle->pFileList); taosMemoryFree(pFile); return; } -int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, void** ppReader) { +int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSnapReader** ppReader) { // impl later - rocksdb_t* db = NULL; + SStreamSnapReader* pReader = taosMemoryCalloc(1, sizeof(SStreamSnapReader)); + if (pReader == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + const char* path = NULL; + if (streamSnapHandleInit(&pReader->handle, (char*)path) < 0) { + return -1; + } + + *ppReader = pReader; return 0; } -int32_t streamSnapReaderClose(void** ppReader) { - // impl later +int32_t streamSnapReaderClose(SStreamSnapReader* pReader) { + if (pReader == NULL) return 0; + + streamSnapHandleDestroy(&pReader->handle); + taosMemoryFree(pReader); return 0; } -int32_t streamSnapRead(void* pReader, uint8_t** ppData) { +int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* size) { // impl later + SStreamSnapHandle* pHandle = &pReader->handle; + SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); + uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); + int64_t nread = taosReadFile(pHandle->fd, buf, kBlockSize); + if (nread == -1) { + // handle later + return -1; + } else { + taosCloseFile(&pHandle->fd); + pHandle->currFileIdx += 1; + if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) { + // finish + return 0; + } + item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); + pHandle->fd = taosOpenFile(item->name, TD_FILE_READ); + // handle err later + nread = taosReadFile(pHandle->fd, buf, kBlockSize); + } + + SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf; + pHdr->size = nread; + pHdr->type = item->type; + + memcpy(pHdr->name, item->name, strlen(item->name)); + pHandle->seraial += nread; + + *ppData = buf; + *size = sizeof(SStreamSnapBlockHdr) + nread; return 0; } // SMetaSnapWriter ======================================== -int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, void** ppWriter) { +int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, StreamSnapWriter** ppWriter) { // impl later return 0; } -int32_t streamSnapWrite(void* pWriter, uint8_t* pData, uint32_t nData) { +int32_t streamSnapWrite(StreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { // impl later return 0; } -int32_t streamSnapWriterClose(void** ppWriter, int8_t rollback) { return 0; } +int32_t streamSnapWriterClose(StreamSnapWriter** ppWriter, int8_t rollback) { return 0; }