add checkpoint

This commit is contained in:
yihaoDeng 2023-06-27 09:56:14 +00:00
parent a824db4fd5
commit 0a0410cd09
2 changed files with 169 additions and 236 deletions

View File

@ -24,6 +24,8 @@ struct SStreamStateReader {
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
TBC* pCur; TBC* pCur;
SStreamSnapReader* pReaderImpl;
}; };
int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader) { int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader) {
@ -40,86 +42,51 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
pReader->sver = sver; pReader->sver = sver;
pReader->ever = ever; pReader->ever = ever;
// impl SStreamSnapReader* pSnapReader = NULL;
code = tdbTbcOpen(pTq->pExecStore, &pReader->pCur, NULL); streamSnapReaderOpen(pTq, sver, ever, &pSnapReader);
if (code) {
taosMemoryFree(pReader);
goto _err;
}
code = tdbTbcMoveToFirst(pReader->pCur); pReader->pReaderImpl = pSnapReader;
if (code) {
taosMemoryFree(pReader);
goto _err;
}
tqInfo("vgId:%d, vnode snapshot tq reader opened", TD_VID(pTq->pVnode)); tqInfo("vgId:%d, vnode stream-state snapshot reader opened", TD_VID(pTq->pVnode));
*ppReader = pReader; *ppReader = pReader;
return code; return code;
_err: _err:
tqError("vgId:%d, vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); tqError("vgId:%d, vnode stream-state snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppReader = NULL; *ppReader = NULL;
return code; return code;
} }
int32_t streamStatSnapReaderClose(SStreamStateReader** ppReader) { int32_t streamStatSnapReaderClose(SStreamStateReader* pReader) {
int32_t code = 0; int32_t code = 0;
streamSnapReaderClose(pReader->pReaderImpl);
tdbTbcClose((*ppReader)->pCur); taosMemoryFree(pReader);
taosMemoryFree(*ppReader);
*ppReader = NULL;
return code; return code;
} }
int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) { int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) {
int32_t code = 0; int32_t code = 0;
const void* pKey = NULL;
const void* pVal = NULL;
int32_t kLen = 0;
int32_t vLen = 0;
SDecoder decoder;
STqHandle handle;
*ppData = NULL; uint8_t* rowData = NULL;
for (;;) { int64_t len;
if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { code = streamSnapRead(pReader->pReaderImpl, &rowData, &len);
goto _exit; *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + len);
}
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle);
tDecoderClear(&decoder);
if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) {
tdbTbcMoveToNext(pReader->pCur);
break;
} else {
tdbTbcMoveToNext(pReader->pCur);
}
}
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
if (*ppData == NULL) { if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
// refactor later, avoid mem/free freq
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = SNAP_DATA_TQ_HANDLE; pHdr->type = SNAP_DATA_STREAM_STATE_BACKEND;
pHdr->size = vLen; pHdr->size = len;
memcpy(pHdr->data, pVal, vLen); memcpy(pHdr->data, rowData, len);
tqInfo("vgId:%d, vnode stream-state snapshot read data", TD_VID(pReader->pTq->pVnode));
tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode),
handle.snapshotVer, handle.subKey, vLen);
_exit:
return code; return code;
_err: _err:
tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); tqError("vgId:%d, vnode stream-state snapshot read data failed since %s", TD_VID(pReader->pTq->pVnode),
tstrerror(code));
return code; return code;
} }
@ -129,6 +96,8 @@ struct SStreamStateWriter {
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
TXN* txn; TXN* txn;
SStreamSnapWriter* pWriterImpl;
}; };
int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter) { int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter) {
@ -145,14 +114,10 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
pWriter->sver = sver; pWriter->sver = sver;
pWriter->ever = ever; pWriter->ever = ever;
if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { SStreamSnapWriter* pSnapWriter = NULL;
code = -1; streamSnapWriterOpen(pTq, sver, ever, &pSnapWriter);
taosMemoryFree(pWriter);
goto _err;
}
*ppWriter = pWriter; pWriter->pWriterImpl = pSnapWriter;
return code;
_err: _err:
tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
@ -160,53 +125,15 @@ _err:
return code; return code;
} }
int32_t streamStateSnapWriterClose(SStreamStateWriter** ppWriter, int8_t rollback) { int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) {
int32_t code = 0; int32_t code = 0;
SStreamStateWriter* pWriter = *ppWriter; code = streamSnapWriterClose(pWriter->pWriterImpl, rollback);
STQ* pTq = pWriter->pTq;
if (rollback) {
tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
} else {
code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
if (code) goto _err;
code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn);
if (code) goto _err;
}
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
*ppWriter = NULL;
// restore from metastore
if (tqMetaRestoreHandle(pTq) < 0) {
goto _err;
}
return code;
_err:
tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code));
return code; return code;
} }
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
STQ* pTq = pWriter->pTq; code = streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
SDecoder decoder = {0};
SDecoder* pDecoder = &decoder;
STqHandle handle;
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tDecodeSTqHandle(pDecoder, &handle);
if (code) goto _err;
code = tqMetaSaveHandle(pTq, handle.subKey, &handle);
if (code < 0) goto _err;
tDecoderClear(pDecoder);
return code;
_err:
tDecoderClear(pDecoder);
tqError("vgId:%d, vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
return code; return code;
} }

View File

@ -26,100 +26,103 @@ struct SStreamTaskReader {
}; };
int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskReader** ppReader) { int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskReader** ppReader) {
int32_t code = 0; // int32_t code = 0;
SStreamTaskReader* pReader = NULL; // SStreamTaskReader* pReader = NULL;
// alloc // // alloc
pReader = (SStreamTaskReader*)taosMemoryCalloc(1, sizeof(SStreamTaskReader)); // pReader = (SStreamTaskReader*)taosMemoryCalloc(1, sizeof(SStreamTaskReader));
if (pReader == NULL) { // if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; // code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; // goto _err;
} // }
pReader->pTq = pTq; // pReader->pTq = pTq;
pReader->sver = sver; // pReader->sver = sver;
pReader->ever = ever; // pReader->ever = ever;
// impl // // impl
code = tdbTbcOpen(pTq->pExecStore, &pReader->pCur, NULL); // code = tdbTbcOpen(pTq->pExecStore, &pReader->pCur, NULL);
if (code) { // if (code) {
taosMemoryFree(pReader); // taosMemoryFree(pReader);
goto _err; // goto _err;
} // }
code = tdbTbcMoveToFirst(pReader->pCur); // code = tdbTbcMoveToFirst(pReader->pCur);
if (code) { // if (code) {
taosMemoryFree(pReader); // taosMemoryFree(pReader);
goto _err; // goto _err;
} // }
tqInfo("vgId:%d, vnode snapshot tq reader opened", TD_VID(pTq->pVnode)); // tqInfo("vgId:%d, vnode snapshot tq reader opened", TD_VID(pTq->pVnode));
*ppReader = pReader; // *ppReader = pReader;
return code;
_err: // _err:
tqError("vgId:%d, vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); // tqError("vgId:%d, vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppReader = NULL; // *ppReader = NULL;
return code; // return code;
return 0;
} }
int32_t streamTaskSnapReaderClose(SStreamTaskReader** ppReader) { int32_t streamTaskSnapReaderClose(SStreamTaskReader** ppReader) {
int32_t code = 0; // int32_t code = 0;
tdbTbcClose((*ppReader)->pCur); // tdbTbcClose((*ppReader)->pCur);
taosMemoryFree(*ppReader); // taosMemoryFree(*ppReader);
*ppReader = NULL; // *ppReader = NULL;
return code; // return code;
return 0;
} }
int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) { int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) {
int32_t code = 0; // int32_t code = 0;
const void* pKey = NULL; // const void* pKey = NULL;
const void* pVal = NULL; // const void* pVal = NULL;
int32_t kLen = 0; // int32_t kLen = 0;
int32_t vLen = 0; // int32_t vLen = 0;
SDecoder decoder; // SDecoder decoder;
STqHandle handle; // STqHandle handle;
*ppData = NULL; // *ppData = NULL;
for (;;) { // for (;;) {
if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { // if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
goto _exit; // goto _exit;
} // }
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); // tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle); // tDecodeSTqHandle(&decoder, &handle);
tDecoderClear(&decoder); // tDecoderClear(&decoder);
if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) { // if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) {
tdbTbcMoveToNext(pReader->pCur); // tdbTbcMoveToNext(pReader->pCur);
break; // break;
} else { // } else {
tdbTbcMoveToNext(pReader->pCur); // tdbTbcMoveToNext(pReader->pCur);
} // }
} // }
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); // *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
if (*ppData == NULL) { // if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; // code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; // goto _err;
} // }
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); // SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = SNAP_DATA_TQ_HANDLE; // pHdr->type = SNAP_DATA_TQ_HANDLE;
pHdr->size = vLen; // pHdr->size = vLen;
memcpy(pHdr->data, pVal, vLen); // memcpy(pHdr->data, pVal, vLen);
tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode), // tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d",
handle.snapshotVer, handle.subKey, vLen); // TD_VID(pReader->pTq->pVnode),
// handle.snapshotVer, handle.subKey, vLen);
_exit: // _exit:
return code; // return code;
_err: // _err:
tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); // tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
return code; // return code;
return 0;
} }
// STqSnapWriter ======================================== // STqSnapWriter ========================================
@ -131,81 +134,84 @@ struct SStreamTaskWriter {
}; };
int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskWriter** ppWriter) { int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskWriter** ppWriter) {
int32_t code = 0; // int32_t code = 0;
SStreamTaskWriter* pWriter; // SStreamTaskWriter* pWriter;
// alloc // // alloc
pWriter = (SStreamTaskWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); // pWriter = (SStreamTaskWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) { // if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; // code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; // goto _err;
} // }
pWriter->pTq = pTq; // pWriter->pTq = pTq;
pWriter->sver = sver; // pWriter->sver = sver;
pWriter->ever = ever; // pWriter->ever = ever;
if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { // if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
code = -1; // code = -1;
taosMemoryFree(pWriter); // taosMemoryFree(pWriter);
goto _err; // goto _err;
} // }
*ppWriter = pWriter; // *ppWriter = pWriter;
return code; // return code;
_err: // _err:
tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); // tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppWriter = NULL; // *ppWriter = NULL;
return code; // return code;
return 0;
} }
int32_t streamTaskSnapWriterClose(SStreamTaskWriter** ppWriter, int8_t rollback) { int32_t streamTaskSnapWriterClose(SStreamTaskWriter** ppWriter, int8_t rollback) {
int32_t code = 0; // int32_t code = 0;
SStreamTaskWriter* pWriter = *ppWriter; // SStreamTaskWriter* pWriter = *ppWriter;
STQ* pTq = pWriter->pTq; // STQ* pTq = pWriter->pTq;
if (rollback) { // if (rollback) {
tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn); // tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
} else { // } else {
code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn); // code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
if (code) goto _err; // if (code) goto _err;
code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn); // code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn);
if (code) goto _err; // if (code) goto _err;
} // }
taosMemoryFree(pWriter); // taosMemoryFree(pWriter);
*ppWriter = NULL; // *ppWriter = NULL;
// restore from metastore // // restore from metastore
if (tqMetaRestoreHandle(pTq) < 0) { // if (tqMetaRestoreHandle(pTq) < 0) {
goto _err; // goto _err;
} // }
return code; // return code;
_err: // _err:
tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code)); // tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code));
return code; // return code;
return 0;
} }
int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0; // int32_t code = 0;
STQ* pTq = pWriter->pTq; // STQ* pTq = pWriter->pTq;
SDecoder decoder = {0}; // SDecoder decoder = {0};
SDecoder* pDecoder = &decoder; // SDecoder* pDecoder = &decoder;
STqHandle handle; // STqHandle handle;
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); // tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tDecodeSTqHandle(pDecoder, &handle); // code = tDecodeSTqHandle(pDecoder, &handle);
if (code) goto _err; // if (code) goto _err;
code = tqMetaSaveHandle(pTq, handle.subKey, &handle); // code = tqMetaSaveHandle(pTq, handle.subKey, &handle);
if (code < 0) goto _err; // if (code < 0) goto _err;
tDecoderClear(pDecoder); // tDecoderClear(pDecoder);
return code; // return code;
_err: // _err:
tDecoderClear(pDecoder); // tDecoderClear(pDecoder);
tqError("vgId:%d, vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); // tqError("vgId:%d, vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
return code; // return code;
return 0;
} }