add checkpoint

This commit is contained in:
yihaoDeng 2023-06-27 14:31:50 +00:00
parent 977d6b7803
commit 9adccf5715
1 changed files with 27 additions and 22 deletions

View File

@ -23,6 +23,8 @@ struct SStreamTaskReader {
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
TBC* pCur; TBC* pCur;
SArray* tdbTbList;
int8_t pos;
}; };
int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskReader** ppReader) { int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskReader** ppReader) {
@ -38,9 +40,14 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
pReader->pTq = pTq; pReader->pTq = pTq;
pReader->sver = sver; pReader->sver = sver;
pReader->ever = ever; pReader->ever = ever;
pReader->tdbTbList = taosArrayInit(4, sizeof(void*));
taosArrayPush(pReader->tdbTbList, &pTq->pStreamMeta->pTaskDb);
taosArrayPush(pReader->tdbTbList, &pTq->pStreamMeta->pCheckpointDb);
pReader->pos = 0;
// impl // impl
code = tdbTbcOpen(pTq->pStreamMeta->pTaskDb, &pReader->pCur, NULL);
code = tdbTbcOpen(taosArrayGetP(pReader->tdbTbList, pReader->pos), &pReader->pCur, NULL);
if (code) { if (code) {
taosMemoryFree(pReader); taosMemoryFree(pReader);
goto _err; goto _err;
@ -63,15 +70,13 @@ _err:
return 0; return 0;
} }
int32_t streamTaskSnapReaderClose(SStreamTaskReader** ppReader) { int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) {
int32_t code = 0; int32_t code = 0;
tdbTbcClose((*ppReader)->pCur); tdbTbcClose(pReader->pCur);
taosMemoryFree(*ppReader); taosMemoryFree(pReader);
*ppReader = NULL;
return code; return code;
return 0;
} }
int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) { int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) {
@ -84,27 +89,27 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) {
STqHandle handle; STqHandle handle;
*ppData = NULL; *ppData = NULL;
int8_t except = 0;
NextTbl:
for (;;) { for (;;) {
if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
goto _exit; except = 1;
break;
} }
// SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
// if (pTask == NULL) {
// return -1;
// }
// SDecoder decoder;
// tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
// code = tDecodeStreamTask(&decoder, pTask);
// if (code < 0) {
// //tDecoderClear(&decoder);
// //taosMemoryFree(pTask);
// goto _err;
// }
// tDecoderClear(&decoder);
tdbTbcMoveToNext(pReader->pCur); tdbTbcMoveToNext(pReader->pCur);
break; break;
} }
if (except == 1) {
if (pReader->pos + 1 >= taosArrayGetSize(pReader->tdbTbList)) {
} else {
tdbTbcClose(pReader->pCur);
pReader->pos += 1;
code = tdbTbcOpen(taosArrayGetP(pReader->tdbTbList, pReader->pos), &pReader->pCur, NULL);
tdbTbcMoveToNext(pReader->pCur);
goto NextTbl;
}
}
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
if (*ppData == NULL) { if (*ppData == NULL) {
@ -218,7 +223,7 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
// tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) // tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn)
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), (uint8_t*)pData + sizeof(SSnapDataHdr), if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), (uint8_t*)pData + sizeof(SSnapDataHdr),
nData - sizeof(SSnapDataHdr), ) < 0) { nData - sizeof(SSnapDataHdr), NULL) < 0) {
taosMemoryFree(pTask); taosMemoryFree(pTask);
return -1; return -1;
} }