add checkpoint

This commit is contained in:
yihaoDeng 2023-06-28 02:39:01 +00:00
parent 9adccf5715
commit f6d5d0c8ca
2 changed files with 52 additions and 35 deletions

View File

@ -470,8 +470,9 @@ enum {
SNAP_DATA_TQ_HANDLE = 7, SNAP_DATA_TQ_HANDLE = 7,
SNAP_DATA_TQ_OFFSET = 8, SNAP_DATA_TQ_OFFSET = 8,
SNAP_DATA_STREAM_TASK = 9, SNAP_DATA_STREAM_TASK = 9,
SNAP_DATA_STREAM_STATE = 10, SNAP_DATA_STREAM_TASK_CHECKPOINT = 10,
SNAP_DATA_STREAM_STATE_BACKEND = 11, SNAP_DATA_STREAM_STATE = 11,
SNAP_DATA_STREAM_STATE_BACKEND = 12,
}; };
struct SSnapDataHdr { struct SSnapDataHdr {

View File

@ -18,6 +18,11 @@
#include "tq.h" #include "tq.h"
// STqSnapReader ======================================== // STqSnapReader ========================================
typedef struct {
int8_t type;
TTB* tbl;
} STablePair;
struct SStreamTaskReader { struct SStreamTaskReader {
STQ* pTq; STQ* pTq;
int64_t sver; int64_t sver;
@ -41,13 +46,17 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
pReader->sver = sver; pReader->sver = sver;
pReader->ever = ever; pReader->ever = ever;
pReader->tdbTbList = taosArrayInit(4, sizeof(void*)); pReader->tdbTbList = taosArrayInit(4, sizeof(void*));
taosArrayPush(pReader->tdbTbList, &pTq->pStreamMeta->pTaskDb);
taosArrayPush(pReader->tdbTbList, &pTq->pStreamMeta->pCheckpointDb); STablePair pair1 = {.tbl = pTq->pStreamMeta->pTaskDb, .type = SNAP_DATA_STREAM_TASK};
taosArrayPush(pReader->tdbTbList, &pair1);
STablePair pair2 = {.tbl = pTq->pStreamMeta->pCheckpointDb, .type = SNAP_DATA_STREAM_TASK_CHECKPOINT};
taosArrayPush(pReader->tdbTbList, &pair2);
pReader->pos = 0; pReader->pos = 0;
// impl STablePair* pPair = taosArrayGet(pReader->tdbTbList, pReader->pos);
code = tdbTbcOpen(pPair->tbl, &pReader->pCur, NULL);
code = tdbTbcOpen(taosArrayGetP(pReader->tdbTbList, pReader->pos), &pReader->pCur, NULL);
if (code) { if (code) {
taosMemoryFree(pReader); taosMemoryFree(pReader);
goto _err; goto _err;
@ -72,7 +81,7 @@ _err:
int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) { int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) {
int32_t code = 0; int32_t code = 0;
taosArrayDestroy(pReader->tdbTbList);
tdbTbcClose(pReader->pCur); tdbTbcClose(pReader->pCur);
taosMemoryFree(pReader); taosMemoryFree(pReader);
@ -90,6 +99,8 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) {
*ppData = NULL; *ppData = NULL;
int8_t except = 0; int8_t except = 0;
STablePair* pPair = taosArrayGet(pReader->tdbTbList, pReader->pos);
NextTbl: NextTbl:
for (;;) { for (;;) {
if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
@ -106,7 +117,9 @@ NextTbl:
pReader->pos += 1; pReader->pos += 1;
code = tdbTbcOpen(taosArrayGetP(pReader->tdbTbList, pReader->pos), &pReader->pCur, NULL); code = tdbTbcOpen(taosArrayGetP(pReader->tdbTbList, pReader->pos), &pReader->pCur, NULL);
tdbTbcMoveToNext(pReader->pCur); tdbTbcMoveToFirst(pReader->pCur);
pPair = taosArrayGet(pReader->tdbTbList, pReader->pos);
goto NextTbl; goto NextTbl;
} }
} }
@ -118,18 +131,18 @@ NextTbl:
} }
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = SNAP_DATA_STREAM_TASK; pHdr->type = pPair->type;
pHdr->size = vLen; pHdr->size = vLen;
memcpy(pHdr->data, pVal, vLen); memcpy(pHdr->data, pVal, vLen);
tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode), tqInfo("vgId:%d, vnode stream-task snapshot read data vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
handle.snapshotVer, handle.subKey, vLen);
return code; return code;
_exit: _exit:
return code; return code;
_err: _err:
tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); tqError("vgId:%d, vnode stream-task snapshot read data failed since %s", TD_VID(pReader->pTq->pVnode),
tstrerror(code));
return code; return code;
} }
@ -205,7 +218,8 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
int32_t code = 0; int32_t code = 0;
STQ* pTq = pWriter->pTq; STQ* pTq = pWriter->pTq;
STqHandle handle; STqHandle handle;
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
if (pHdr->type == SNAP_DATA_STREAM_TASK) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
@ -221,13 +235,15 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
// tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) // tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn)
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t),
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), (uint8_t*)pData + sizeof(SSnapDataHdr), (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) {
nData - sizeof(SSnapDataHdr), NULL) < 0) {
taosMemoryFree(pTask); taosMemoryFree(pTask);
return -1; return -1;
} }
taosMemoryFree(pTask); taosMemoryFree(pTask);
} else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) {
// do nothing
}
return code; return code;