add checkpoint

This commit is contained in:
yihaoDeng 2023-06-28 09:57:08 +00:00
parent 871f66565d
commit 6b6e237079
7 changed files with 30 additions and 15 deletions

View File

@ -22,12 +22,12 @@ typedef struct SStreamSnapWriter SStreamSnapWriter;
typedef struct SStreamSnapHandle SStreamSnapHandle;
typedef struct SStreamSnapBlockHdr SStreamSnapBlockHdr;
int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSnapReader** ppReader);
int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapReader** ppReader);
int32_t streamSnapReaderClose(SStreamSnapReader* pReader);
int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* size);
// SMetaSnapWriter ========================================
int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSnapWriter** ppWriter);
int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapWriter** ppWriter);
int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t streamSnapWriterClose(SStreamSnapWriter* ppWriter, int8_t rollback);

View File

@ -89,6 +89,7 @@ typedef struct SQueryNode SQueryNode;
#define VNODE_RSMA0_DIR "tsdb"
#define VNODE_RSMA1_DIR "rsma1"
#define VNODE_RSMA2_DIR "rsma2"
#define VNODE_TQ_STREAM "stream"
#define VNODE_BUFPOOL_SEGMENTS 3

View File

@ -32,6 +32,8 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
int32_t code = 0;
SStreamStateReader* pReader = NULL;
char tdir[TSDB_FILENAME_LEN * 2] = {0};
// alloc
pReader = (SStreamStateReader*)taosMemoryCalloc(1, sizeof(SStreamStateReader));
if (pReader == NULL) {
@ -43,7 +45,8 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
pReader->ever = ever;
SStreamSnapReader* pSnapReader = NULL;
streamSnapReaderOpen(pTq, sver, ever, &pSnapReader);
sprintf(tdir, "%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM);
streamSnapReaderOpen(pTq, sver, ever, tdir, &pSnapReader);
pReader->pReaderImpl = pSnapReader;
@ -104,6 +107,7 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
int32_t code = 0;
SStreamStateWriter* pWriter;
char tdir[TSDB_FILENAME_LEN * 2] = {0};
// alloc
pWriter = (SStreamStateWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) {
@ -114,8 +118,9 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
pWriter->sver = sver;
pWriter->ever = ever;
sprintf(tdir, "%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM);
SStreamSnapWriter* pSnapWriter = NULL;
streamSnapWriterOpen(pTq, sver, ever, &pSnapWriter);
streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter);
pWriter->pWriterImpl = pSnapWriter;

View File

@ -102,6 +102,7 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) {
STablePair* pPair = taosArrayGet(pReader->tdbTbList, pReader->pos);
NextTbl:
except = 0;
for (;;) {
if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
except = 1;
@ -123,7 +124,10 @@ NextTbl:
goto NextTbl;
}
}
if (pVal == NULL || vLen == 0) {
*ppData = NULL;
return code;
}
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -235,6 +235,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
pReader->streamTaskDone = 1;
code = streamTaskSnapReaderClose(pReader->pStreamTaskReader);
if (code) goto _err;
pReader->pStreamTaskReader = NULL;
}
}
}
@ -254,6 +255,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
pReader->streamStateDone = 1;
code = streamStateSnapReaderClose(pReader->pStreamStateReader);
if (code) goto _err;
pReader->pStreamStateReader = NULL;
}
}
}

View File

@ -216,7 +216,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);
if (!pVnode->restored) {
vGError("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
vGError("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg,
TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_SYN_RESTORING;
vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
rpcFreeCont(pMsg->pCont);
@ -279,7 +280,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle);
if (!pVnode->restored) {
vGError("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
vGError("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg,
TMSG_INFO(pMsg->msgType));
vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
@ -526,7 +528,7 @@ static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *p
}
static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
SVnode *pVnode = pFsm->data;
SVnode *pVnode = pFsm->data;
SyncIndex appliedIdx = -1;
do {
@ -660,8 +662,8 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
SNodeInfo *pNode = &pCfg->nodeInfo[i];
vInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pVnode->config.vgId, i, pNode->nodeFqdn, pNode->nodePort,
pNode->nodeId, pNode->clusterId);
vInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pVnode->config.vgId, i, pNode->nodeFqdn,
pNode->nodePort, pNode->nodeId, pNode->clusterId);
}
pVnode->sync = syncOpen(&syncInfo);

View File

@ -205,13 +205,13 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
return;
}
int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSnapReader** ppReader) {
int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapReader** ppReader) {
// impl later
SStreamSnapReader* pReader = taosMemoryCalloc(1, sizeof(SStreamSnapReader));
if (pReader == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
const char* path = NULL;
// const char* path = NULL;
if (streamSnapHandleInit(&pReader->handle, (char*)path) < 0) {
return -1;
}
@ -243,7 +243,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
return code;
// handle later
return -1;
} else if (nread <= kBlockSize) {
} else if (nread > 0 && nread <= kBlockSize) {
// left bytes less than kBlockSize
pHandle->offset += nread;
if (pHandle->offset >= item->size || nread < kBlockSize) {
@ -254,6 +254,8 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
} else {
if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) {
// finish
*ppData = NULL;
*size = 0;
return 0;
}
item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
@ -278,7 +280,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
return 0;
}
// SMetaSnapWriter ========================================
int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSnapWriter** ppWriter) {
int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapWriter** ppWriter) {
// impl later
SStreamSnapWriter* pWriter = taosMemoryCalloc(1, sizeof(SStreamSnapWriter));
if (pWriter == NULL) {
@ -286,7 +288,6 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSna
}
SStreamSnapHandle* pHandle = &pWriter->handle;
const char* path = NULL;
SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile));
pFile->path = taosStrdup(path);
SArray* list = taosArrayInit(64, sizeof(SBackendFileItem));