add checkpoint

This commit is contained in:
yihaoDeng 2023-06-25 17:59:36 +08:00
parent 832aed5e98
commit 74a4a89d70
1 changed files with 42 additions and 11 deletions

View File

@ -14,6 +14,7 @@
*/ */
#include "streamSnapshot.h" #include "streamSnapshot.h"
#include "query.h"
#include "rocksdb/c.h" #include "rocksdb/c.h"
#include "tcommon.h" #include "tcommon.h"
@ -28,6 +29,7 @@ enum SBackendFileType {
typedef struct SBackendFileItem { typedef struct SBackendFileItem {
char* name; char* name;
int8_t type; int8_t type;
int64_t size;
} SBackendFileItem; } SBackendFileItem;
typedef struct SBackendFile { typedef struct SBackendFile {
char* pCurrent; char* pCurrent;
@ -54,6 +56,7 @@ struct SStreamSnapBlockHdr {
int64_t index; int64_t index;
char name[128]; char name[128];
int64_t size; int64_t size;
int64_t totalSize;
uint8_t data[]; uint8_t data[];
}; };
struct SStreamSnapReader { struct SStreamSnapReader {
@ -127,25 +130,30 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path) {
// current // current
item.name = pFile->pCurrent; item.name = pFile->pCurrent;
item.type = ROCKSDB_CURRENT_TYPE; item.type = ROCKSDB_CURRENT_TYPE;
taosStatFile(pFile->pCurrent, &item.size, NULL);
taosArrayPush(list, &item); taosArrayPush(list, &item);
// mainfest // mainfest
item.name = pFile->pMainfest; item.name = pFile->pMainfest;
item.type = ROCKSDB_MAINFEST_TYPE; item.type = ROCKSDB_MAINFEST_TYPE;
taosStatFile(pFile->pMainfest, &item.size, NULL);
taosArrayPush(list, &item); taosArrayPush(list, &item);
// options // options
item.name = pFile->pOptions; item.name = pFile->pOptions;
item.type = ROCKSDB_OPTIONS_TYPE; item.type = ROCKSDB_OPTIONS_TYPE;
taosStatFile(pFile->pOptions, &item.size, NULL);
taosArrayPush(list, &item); taosArrayPush(list, &item);
// sst // sst
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
char* sst = taosArrayGetP(pFile->pSst, i); char* sst = taosArrayGetP(pFile->pSst, i);
item.name = sst; item.name = sst;
item.type = ROCKSDB_SST_TYPE; item.type = ROCKSDB_SST_TYPE;
taosStatFile(sst, &item.size, NULL);
taosArrayPush(list, &item); taosArrayPush(list, &item);
} }
// meta // meta
item.name = pFile->pCheckpointMeta; item.name = pFile->pCheckpointMeta;
item.type = ROCKSDB_CHECKPOINT_META_TYPE; item.type = ROCKSDB_CHECKPOINT_META_TYPE;
taosStatFile(pFile->pCheckpointMeta, &item.size, NULL);
taosArrayPush(list, &item); taosArrayPush(list, &item);
handle->pBackendFile = pFile; handle->pBackendFile = pFile;
@ -209,30 +217,43 @@ int32_t streamSnapReaderClose(SStreamSnapReader* pReader) {
} }
int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* size) { int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* size) {
// impl later // impl later
int32_t code = 0;
SStreamSnapHandle* pHandle = &pReader->handle; SStreamSnapHandle* pHandle = &pReader->handle;
SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
int64_t nread = taosReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
if (nread == -1) { if (nread == -1) {
code = TAOS_SYSTEM_ERROR(terrno);
qError("stream snap failed to read snap, file name:%s, reason:%s", item->name, tstrerror(code));
return code;
// handle later // handle later
return -1; return -1;
} else { } else if (nread <= kBlockSize) {
// left bytes less than kBlockSize
pHandle->offset += nread;
if (pHandle->offset >= item->size || nread < kBlockSize) {
taosCloseFile(&pHandle->fd); taosCloseFile(&pHandle->fd);
pHandle->currFileIdx += 1; pHandle->currFileIdx += 1;
}
} else {
if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) { if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) {
// finish // finish
return 0; return 0;
} }
item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
pHandle->offset = 0;
pHandle->fd = taosOpenFile(item->name, TD_FILE_READ); pHandle->fd = taosOpenFile(item->name, TD_FILE_READ);
// handle err later // handle err later
nread = taosReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize); nread = taosReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize);
pHandle->offset += nread;
} }
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf; SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf;
pHdr->size = nread; pHdr->size = nread;
pHdr->type = item->type; pHdr->type = item->type;
pHdr->totalSize = item->size;
memcpy(pHdr->name, item->name, strlen(item->name)); memcpy(pHdr->name, item->name, strlen(item->name));
pHandle->seraial += nread; pHandle->seraial += nread;
@ -264,18 +285,25 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSna
handle->pFileList = list; handle->pFileList = list;
handle->currFileIdx = 0; handle->currFileIdx = 0;
handle->offset = 0;
handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, handle->currFileIdx), TD_FILE_WRITE); handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, handle->currFileIdx), TD_FILE_WRITE);
*ppWriter = pWriter; *ppWriter = pWriter;
return 0; return 0;
} }
int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData; int32_t code = 0;
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData;
SStreamSnapHandle* handle = &pWriter->handle; SStreamSnapHandle* handle = &pWriter->handle;
SBackendFileItem* pItem = taosArrayGetP(handle->pFileList, handle->currFileIdx); SBackendFileItem* pItem = taosArrayGetP(handle->pFileList, handle->currFileIdx);
if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) { if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) {
taosWriteFile(handle->fd, pHdr->data, pHdr->size); if (taosPWriteFile(handle->fd, pHdr->data, pHdr->size, handle->offset) != pHdr->size) {
code = TAOS_SYSTEM_ERROR(terrno);
qError("stream snap failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code));
return code;
}
handle->offset += pHdr->size;
} else { } else {
taosCloseFile(&handle->fd); taosCloseFile(&handle->fd);
@ -284,9 +312,12 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
item.type = pHdr->type; item.type = pHdr->type;
taosArrayPush(handle->pFileList, &item); taosArrayPush(handle->pFileList, &item);
handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, taosArrayGetSize(handle->pFileList) - 1), TD_FILE_WRITE); handle->offset = 0;
taosWriteFile(handle->fd, pHdr->data, pHdr->size);
handle->currFileIdx += 1; handle->currFileIdx += 1;
handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, taosArrayGetSize(handle->pFileList) - 1), TD_FILE_WRITE);
taosPWriteFile(handle->fd, pHdr->data, pHdr->size, handle->offset);
handle->offset += pHdr->size;
} }
// impl later // impl later