add checkpoint

This commit is contained in:
yihaoDeng 2023-06-27 12:03:29 +00:00
parent 4de7db06f9
commit 9d69b284db
1 changed files with 131 additions and 131 deletions

View File

@ -26,102 +26,101 @@ struct SStreamTaskReader {
}; };
int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskReader** ppReader) { int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskReader** ppReader) {
// int32_t code = 0; int32_t code = 0;
// SStreamTaskReader* pReader = NULL; SStreamTaskReader* pReader = NULL;
// // alloc // alloc
// pReader = (SStreamTaskReader*)taosMemoryCalloc(1, sizeof(SStreamTaskReader)); pReader = (SStreamTaskReader*)taosMemoryCalloc(1, sizeof(SStreamTaskReader));
// if (pReader == NULL) { if (pReader == NULL) {
// code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
// goto _err; goto _err;
// } }
// pReader->pTq = pTq; pReader->pTq = pTq;
// pReader->sver = sver; pReader->sver = sver;
// pReader->ever = ever; pReader->ever = ever;
// // impl // impl
// code = tdbTbcOpen(pTq->pExecStore, &pReader->pCur, NULL); code = tdbTbcOpen(pTq->pStreamMeta->pTaskDb, &pReader->pCur, NULL);
// if (code) { if (code) {
// taosMemoryFree(pReader); taosMemoryFree(pReader);
// goto _err; goto _err;
// } }
// code = tdbTbcMoveToFirst(pReader->pCur); code = tdbTbcMoveToFirst(pReader->pCur);
// if (code) { if (code) {
// taosMemoryFree(pReader); taosMemoryFree(pReader);
// goto _err; goto _err;
// } }
// tqInfo("vgId:%d, vnode snapshot tq reader opened", TD_VID(pTq->pVnode)); tqInfo("vgId:%d, vnode stream-task snapshot reader opened", TD_VID(pTq->pVnode));
// *ppReader = pReader; *ppReader = pReader;
// _err: _err:
// tqError("vgId:%d, vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); tqError("vgId:%d, vnode stream-task snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
// *ppReader = NULL; *ppReader = NULL;
// return code; return code;
return 0; return 0;
} }
int32_t streamTaskSnapReaderClose(SStreamTaskReader** ppReader) { int32_t streamTaskSnapReaderClose(SStreamTaskReader** ppReader) {
// int32_t code = 0; int32_t code = 0;
// tdbTbcClose((*ppReader)->pCur); tdbTbcClose((*ppReader)->pCur);
// taosMemoryFree(*ppReader); taosMemoryFree(*ppReader);
// *ppReader = NULL; *ppReader = NULL;
// return code; return code;
return 0; return 0;
} }
int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) { int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) {
// int32_t code = 0; int32_t code = 0;
// const void* pKey = NULL; const void* pKey = NULL;
// const void* pVal = NULL; const void* pVal = NULL;
// int32_t kLen = 0; int32_t kLen = 0;
// int32_t vLen = 0; int32_t vLen = 0;
// SDecoder decoder; SDecoder decoder;
// STqHandle handle; STqHandle handle;
// *ppData = NULL; *ppData = NULL;
// for (;;) { for (;;) {
// if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
// goto _exit; goto _exit;
// } }
// tDecoderInit(&decoder, (uint8_t*)pVal, vLen); // tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
// tDecodeSTqHandle(&decoder, &handle); // tDecodeSTqHandle(&decoder, &handle);
// tDecoderClear(&decoder); // tDecoderClear(&decoder);
// if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) { if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) {
// tdbTbcMoveToNext(pReader->pCur); tdbTbcMoveToNext(pReader->pCur);
// break; break;
// } else { } else {
// tdbTbcMoveToNext(pReader->pCur); tdbTbcMoveToNext(pReader->pCur);
// } }
// } }
// *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
// if (*ppData == NULL) { if (*ppData == NULL) {
// code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
// goto _err; goto _err;
// } }
// SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
// pHdr->type = SNAP_DATA_TQ_HANDLE; pHdr->type = SNAP_DATA_STREAM_TASK;
// pHdr->size = vLen; pHdr->size = vLen;
// memcpy(pHdr->data, pVal, vLen); memcpy(pHdr->data, pVal, vLen);
// tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode),
// TD_VID(pReader->pTq->pVnode), handle.snapshotVer, handle.subKey, vLen);
// handle.snapshotVer, handle.subKey, vLen);
// _exit: _exit:
// return code; return code;
// _err: _err:
// tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
// return code; return code;
return 0; return 0;
} }
@ -134,71 +133,71 @@ struct SStreamTaskWriter {
}; };
int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskWriter** ppWriter) { int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskWriter** ppWriter) {
// int32_t code = 0; int32_t code = 0;
// SStreamTaskWriter* pWriter; SStreamTaskWriter* pWriter;
// // alloc // alloc
// pWriter = (SStreamTaskWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); pWriter = (SStreamTaskWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
// if (pWriter == NULL) { if (pWriter == NULL) {
// code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
// goto _err; goto _err;
// } }
// pWriter->pTq = pTq; pWriter->pTq = pTq;
// pWriter->sver = sver; pWriter->sver = sver;
// pWriter->ever = ever; pWriter->ever = ever;
// if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { if (tdbBegin(pTq->pStreamMeta->db, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
// code = -1; code = -1;
// taosMemoryFree(pWriter); taosMemoryFree(pWriter);
// goto _err; goto _err;
// } }
// *ppWriter = pWriter; *ppWriter = pWriter;
// return code; return code;
// _err: _err:
// tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); tqError("vgId:%d, stream-task snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
// *ppWriter = NULL; *ppWriter = NULL;
// return code; return code;
return 0; return 0;
} }
int32_t streamTaskSnapWriterClose(SStreamTaskWriter** ppWriter, int8_t rollback) { int32_t streamTaskSnapWriterClose(SStreamTaskWriter** ppWriter, int8_t rollback) {
// int32_t code = 0; int32_t code = 0;
// SStreamTaskWriter* pWriter = *ppWriter; SStreamTaskWriter* pWriter = *ppWriter;
// STQ* pTq = pWriter->pTq; STQ* pTq = pWriter->pTq;
// if (rollback) { if (rollback) {
// tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn); tdbAbort(pWriter->pTq->pStreamMeta->db, pWriter->txn);
// } else { } else {
// code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn); code = tdbCommit(pWriter->pTq->pStreamMeta->db, pWriter->txn);
// if (code) goto _err; if (code) goto _err;
// code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn); code = tdbPostCommit(pWriter->pTq->pStreamMeta->db, pWriter->txn);
// if (code) goto _err; if (code) goto _err;
// } }
// taosMemoryFree(pWriter); taosMemoryFree(pWriter);
// *ppWriter = NULL; *ppWriter = NULL;
// // restore from metastore // restore from metastore
// if (tqMetaRestoreHandle(pTq) < 0) { // if (tqMetaRestoreHandle(pTq) < 0) {
// goto _err; // goto _err;
// } // }
// return code; return code;
// _err: _err:
// tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code)); tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code));
// return code; return code;
return 0; return 0;
} }
int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) {
// int32_t code = 0; int32_t code = 0;
// STQ* pTq = pWriter->pTq; STQ* pTq = pWriter->pTq;
// SDecoder decoder = {0}; SDecoder decoder = {0};
// SDecoder* pDecoder = &decoder; SDecoder* pDecoder = &decoder;
// STqHandle handle; STqHandle handle;
// tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); // tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
// code = tDecodeSTqHandle(pDecoder, &handle); // code = tDecodeSTqHandle(pDecoder, &handle);
@ -206,12 +205,13 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
// code = tqMetaSaveHandle(pTq, handle.subKey, &handle); // code = tqMetaSaveHandle(pTq, handle.subKey, &handle);
// if (code < 0) goto _err; // if (code < 0) goto _err;
// tDecoderClear(pDecoder); // tDecoderClear(pDecoder);
// insert into pStreamMeta tdb table
// return code; return code;
// _err: _err:
// tDecoderClear(pDecoder); tDecoderClear(pDecoder);
// tqError("vgId:%d, vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); tqError("vgId:%d, stream-task snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
// return code; return code;
return 0; return 0;
} }