add checkpoint

This commit is contained in:
yihaoDeng 2023-06-29 06:56:14 +00:00
parent 3d532a54a8
commit 5cebd36c1c
1 changed files with 6 additions and 1 deletions

View File

@ -45,7 +45,7 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
pReader->pTq = pTq;
pReader->sver = sver;
pReader->ever = ever;
pReader->tdbTbList = taosArrayInit(4, sizeof(void*));
pReader->tdbTbList = taosArrayInit(4, sizeof(STablePair));
STablePair pair1 = {.tbl = pTq->pStreamMeta->pTaskDb, .type = SNAP_DATA_STREAM_TASK};
taosArrayPush(pReader->tdbTbList, &pair1);
@ -58,12 +58,16 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
STablePair* pPair = taosArrayGet(pReader->tdbTbList, pReader->pos);
code = tdbTbcOpen(pPair->tbl, &pReader->pCur, NULL);
if (code) {
tqInfo("vgId:%d, vnode stream-task snapshot reader failed to open, reason: %s", TD_VID(pTq->pVnode),
tstrerror(code));
taosMemoryFree(pReader);
goto _err;
}
code = tdbTbcMoveToFirst(pReader->pCur);
if (code) {
tqInfo("vgId:%d, vnode stream-task snapshot reader failed to iterate, reason: %s", TD_VID(pTq->pVnode),
tstrerror(code));
taosMemoryFree(pReader);
goto _err;
}
@ -71,6 +75,7 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
tqInfo("vgId:%d, vnode stream-task snapshot reader opened", TD_VID(pTq->pVnode));
*ppReader = pReader;
return code;
_err:
tqError("vgId:%d, vnode stream-task snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));