add checkpoint

This commit is contained in:
yihaoDeng 2023-06-27 12:46:08 +00:00
parent f2b96fb4b3
commit 74764108fd
1 changed files with 36 additions and 24 deletions

View File

@ -88,20 +88,22 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) {
if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
goto _exit;
}
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
// SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
// if (pTask == NULL) {
// return -1;
// }
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
code = tDecodeStreamTask(&decoder, pTask);
if (code < 0) {
tDecoderClear(&decoder);
taosMemoryFree(pTask);
goto _err;
}
tDecoderClear(&decoder);
// SDecoder decoder;
// tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
// code = tDecodeStreamTask(&decoder, pTask);
// if (code < 0) {
// //tDecoderClear(&decoder);
// //taosMemoryFree(pTask);
// goto _err;
// }
// tDecoderClear(&decoder);
tdbTbcMoveToNext(pReader->pCur);
break;
}
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
@ -197,23 +199,33 @@ _err:
int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STQ* pTq = pWriter->pTq;
SDecoder decoder = {0};
SDecoder* pDecoder = &decoder;
STqHandle handle;
// tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
// code = tDecodeSTqHandle(pDecoder, &handle);
// if (code) goto _err;
// code = tqMetaSaveHandle(pTq, handle.subKey, &handle);
// if (code < 0) goto _err;
// tDecoderClear(pDecoder);
// insert into pStreamMeta tdb table
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tDecodeStreamTask(&decoder, pTask);
if (code < 0) {
tDecoderClear(&decoder);
taosMemoryFree(pTask);
goto _err;
}
tDecoderClear(&decoder);
// tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn)
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), (uint8_t*)pData + sizeof(SSnapDataHdr),
nData - sizeof(SSnapDataHdr), NULL) < 0) {
taosMemoryFree(pTask);
return -1;
}
return code;
_err:
tDecoderClear(pDecoder);
tqError("vgId:%d, stream-task snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
return code;
return 0;
}