add checkpoint

This commit is contained in:
yihaoDeng 2023-06-20 18:32:56 +08:00
parent f1fcc1d2f1
commit d9e24566c0
2 changed files with 144 additions and 30 deletions

View File

@ -20,14 +20,15 @@ typedef struct SStreamSnapReader SStreamSnapReader;
typedef struct StreamSnapWriter StreamSnapWriter;
typedef struct SStreamSnapHandle SStreamSnapHandle;
typedef struct SStreamSnapBlockHdr SStreamSnapBlockHdr;
int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, void** ppReader);
int32_t streamSnapReaderClose(void** ppReader);
int32_t streamSnapRead(void* pReader, uint8_t** ppData);
int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSnapReader** ppReader);
int32_t streamSnapReaderClose(SStreamSnapReader* pReader);
int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* size);
// SMetaSnapWriter ========================================
int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, void** ppWriter);
int32_t streamSnapWrite(void* pWriter, uint8_t* pData, uint32_t nData);
int32_t streamSnapWriterClose(void** ppWriter, int8_t rollback);
int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, StreamSnapWriter** ppWriter);
int32_t streamSnapWrite(StreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t streamSnapWriterClose(StreamSnapWriter** ppWriter, int8_t rollback);
#endif

View File

@ -17,42 +17,69 @@
#include "rocksdb/c.h"
#include "tcommon.h"
enum SBackendFileType {
ROCKSDB_OPTIONS_TYPE = 1,
ROCKSDB_MAINFEST_TYPE = 2,
ROCKSDB_SST_TYPE = 3,
ROCKSDB_CURRENT_TYPE = 4,
ROCKSDB_CHECKPOINT_META_TYPE = 5,
};
typedef struct SBackendFileItem {
char* name;
int8_t type;
} SBackendFileItem;
typedef struct SBackendFile {
SArray* pSst;
char* pCurrent;
char* pMainfest;
char* pOptions;
SArray* pSst;
char* pCheckpointMeta;
char* path;
} SBanckendFile;
struct SStreamSnapHandle {
void* handle;
SArray* fileList;
SBanckendFile* pBackendFile;
int64_t checkpointId;
int64_t seraial;
int64_t offset;
TdFilePtr fd;
int8_t filetype;
SArray* pFileList;
int32_t currFileIdx;
};
struct SStreamSnapBlockHdr {
int8_t type;
int8_t flag;
int64_t index;
char name[128];
int64_t size;
uint8_t data[];
};
struct SStreamSnapReader {
void* pMeta;
int64_t sver;
int64_t ever;
SStreamSnapHandle handle;
};
// SMetaSnapWriter ========================================
struct StreamSnapWriter {
void* pMeta;
int64_t sver;
int64_t ever;
};
const char* ROCKSDB_OPTIONS = "OPTIONS";
const char* ROCKSDB_MAINFEST = "MANIFEST";
const char* ROCKSDB_SST = "sst";
const char* ROCKSDB_CURRENT = "CURRENT";
const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT";
static int64_t kBlockSize = 64 * 1024;
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path);
void streamSnapHandleDestroy(SStreamSnapHandle* handle);
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path) {
// impl later
int32_t code = 0;
handle->fileList = taosArrayInit(32, sizeof(void*));
TdDirPtr pDir = taosOpenDir(path);
if (NULL == pDir) {
@ -60,6 +87,9 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path) {
}
SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile));
handle->checkpointId = 0;
handle->seraial = 0;
pFile->path = taosStrdup(path);
pFile->pSst = taosArrayInit(16, sizeof(void*));
TdDirEntryPtr pDirEntry;
@ -90,8 +120,47 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path) {
}
taosCloseDir(&pDir);
SArray* list = taosArrayInit(64, sizeof(SBackendFileItem));
SBackendFileItem item;
item.name = pFile->pCurrent;
item.type = ROCKSDB_CURRENT_TYPE;
taosArrayPush(list, &item);
item.name = pFile->pMainfest;
item.type = ROCKSDB_MAINFEST_TYPE;
taosArrayPush(list, &item);
item.name = pFile->pOptions;
item.type = ROCKSDB_OPTIONS_TYPE;
taosArrayPush(list, &item);
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
char* sst = taosArrayGetP(pFile->pSst, i);
item.name = sst;
item.type = ROCKSDB_SST_TYPE;
taosArrayPush(list, &item);
}
item.name = pFile->pCheckpointMeta;
item.type = ROCKSDB_CHECKPOINT_META_TYPE;
taosArrayPush(list, &item);
handle->pBackendFile = pFile;
handle->currFileIdx = 0;
handle->pFileList = list;
handle->fd = taosOpenFile(taosArrayGetP(handle->pFileList, handle->currFileIdx), TD_FILE_READ);
if (handle->fd == NULL) {
goto _err;
}
handle->seraial = 0;
handle->offset = 0;
return 0;
_err:
streamSnapHandleDestroy(handle);
code = -1;
return code;
}
@ -101,37 +170,81 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
taosMemoryFree(pFile->pCurrent);
taosMemoryFree(pFile->pMainfest);
taosMemoryFree(pFile->pOptions);
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
taosMemoryFree(pFile->path);
for (int i = 0; pFile->pSst != NULL && i < taosArrayGetSize(pFile->pSst); i++) {
char* sst = taosArrayGetP(pFile->pSst, i);
taosMemoryFree(sst);
}
taosArrayDestroy(pFile->pSst);
taosArrayDestroy(handle->pFileList);
taosMemoryFree(pFile);
return;
}
int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, void** ppReader) {
int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSnapReader** ppReader) {
// impl later
rocksdb_t* db = NULL;
SStreamSnapReader* pReader = taosMemoryCalloc(1, sizeof(SStreamSnapReader));
if (pReader == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
const char* path = NULL;
if (streamSnapHandleInit(&pReader->handle, (char*)path) < 0) {
return -1;
}
*ppReader = pReader;
return 0;
}
int32_t streamSnapReaderClose(void** ppReader) {
// impl later
int32_t streamSnapReaderClose(SStreamSnapReader* pReader) {
if (pReader == NULL) return 0;
streamSnapHandleDestroy(&pReader->handle);
taosMemoryFree(pReader);
return 0;
}
int32_t streamSnapRead(void* pReader, uint8_t** ppData) {
int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* size) {
// impl later
SStreamSnapHandle* pHandle = &pReader->handle;
SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
int64_t nread = taosReadFile(pHandle->fd, buf, kBlockSize);
if (nread == -1) {
// handle later
return -1;
} else {
taosCloseFile(&pHandle->fd);
pHandle->currFileIdx += 1;
if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) {
// finish
return 0;
}
item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
pHandle->fd = taosOpenFile(item->name, TD_FILE_READ);
// handle err later
nread = taosReadFile(pHandle->fd, buf, kBlockSize);
}
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf;
pHdr->size = nread;
pHdr->type = item->type;
memcpy(pHdr->name, item->name, strlen(item->name));
pHandle->seraial += nread;
*ppData = buf;
*size = sizeof(SStreamSnapBlockHdr) + nread;
return 0;
}
// SMetaSnapWriter ========================================
int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, void** ppWriter) {
int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, StreamSnapWriter** ppWriter) {
// impl later
return 0;
}
int32_t streamSnapWrite(void* pWriter, uint8_t* pData, uint32_t nData) {
int32_t streamSnapWrite(StreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
// impl later
return 0;
}
int32_t streamSnapWriterClose(void** ppWriter, int8_t rollback) { return 0; }
int32_t streamSnapWriterClose(StreamSnapWriter** ppWriter, int8_t rollback) { return 0; }