From 93391f73e575e79e760d4084cec9e2aa719a01e9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 30 Jun 2023 07:46:56 +0000 Subject: [PATCH] vnode snapshot read --- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 25 +++++++++++-------- source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 21 ++++++++-------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 4ee6c587b0..666ee7c5a4 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -52,20 +52,20 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS } pReader->pReaderImpl = pSnapReader; - tqInfo("vgId:%d, vnode stream-state snapshot reader opened", TD_VID(pTq->pVnode)); + tqDebug("vgId:%d, vnode stream-state snapshot reader opened", TD_VID(pTq->pVnode)); *ppReader = pReader; return code; _err: - tqError("vgId:%d, vnode stream-state snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + tqError("vgId:%d, vnode stream-state snapshot reader failed to open since %s", TD_VID(pTq->pVnode), tstrerror(code)); *ppReader = NULL; return code; } int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) { int32_t code = 0; - tqInfo("vgId:%d, vnode stream-state snapshot reader closed", TD_VID(pReader->pTq->pVnode)); + tqDebug("vgId:%d, vnode stream-state snapshot reader closed", TD_VID(pReader->pTq->pVnode)); streamSnapReaderClose(pReader->pReaderImpl); taosMemoryFree(pReader); return code; @@ -93,11 +93,11 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) { pHdr->type = SNAP_DATA_STREAM_STATE_BACKEND; pHdr->size = len; memcpy(pHdr->data, rowData, len); - tqInfo("vgId:%d, vnode stream-state snapshot read data", TD_VID(pReader->pTq->pVnode)); + tqDebug("vgId:%d, vnode stream-state snapshot read data success", TD_VID(pReader->pTq->pVnode)); return code; _err: - tqError("vgId:%d, vnode stream-state snapshot read data failed since %s", TD_VID(pReader->pTq->pVnode), + tqError("vgId:%d, vnode stream-state snapshot failed to read since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); return code; } @@ -129,20 +129,23 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS sprintf(tdir, "%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM); SStreamSnapWriter* pSnapWriter = NULL; - streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter); + if (streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter) < 0) { + goto _err; + } - tqInfo("vgId:%d, snapshot writer opened", TD_VID(pTq->pVnode)); + tqDebug("vgId:%d, vnode stream-state snapshot writer opened", TD_VID(pTq->pVnode)); pWriter->pWriterImpl = pSnapWriter; return code; _err: - tqError("vgId:%d, vnode snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + tqError("vgId:%d, vnode stream-state snapshot writer failed to open since %s", TD_VID(pTq->pVnode), tstrerror(code)); + taosMemoryFree(pWriter); *ppWriter = NULL; - return code; + return -1; } int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) { int32_t code = 0; - tqError("vgId:%d, vnode snapshot writer close", TD_VID(pWriter->pTq->pVnode)); + tqDebug("vgId:%d, vnode stream-state snapshot writer closed", TD_VID(pWriter->pTq->pVnode)); code = streamSnapWriterClose(pWriter->pWriterImpl, rollback); taosMemoryFree(pWriter); return code; @@ -150,7 +153,7 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; - tqError("vgId:%d, vnode snapshot write", TD_VID(pWriter->pTq->pVnode)); + tqDebug("vgId:%d, vnode stream-state snapshot write", TD_VID(pWriter->pTq->pVnode)); code = streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); return code; } diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 2bd975a0d6..687e66b324 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -72,7 +72,7 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa goto _err; } - tqInfo("vgId:%d, vnode stream-task snapshot reader opened", TD_VID(pTq->pVnode)); + tqDebug("vgId:%d, vnode stream-task snapshot reader opened", TD_VID(pTq->pVnode)); *ppReader = pReader; return code; @@ -105,7 +105,7 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) { *ppData = NULL; int8_t except = 0; - tqInfo("vgId:%d, vnode stream-task snapshot start read data", TD_VID(pReader->pTq->pVnode)); + tqDebug("vgId:%d, vnode stream-task snapshot start read data", TD_VID(pReader->pTq->pVnode)); STablePair* pPair = taosArrayGet(pReader->tdbTbList, pReader->pos); NextTbl: @@ -132,7 +132,7 @@ NextTbl: } if (pVal == NULL || vLen == 0) { *ppData = NULL; - tqInfo("vgId:%d, vnode stream-task snapshot finished read data", TD_VID(pReader->pTq->pVnode)); + tqDebug("vgId:%d, vnode stream-task snapshot finished read data", TD_VID(pReader->pTq->pVnode)); return code; } *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); @@ -146,7 +146,7 @@ NextTbl: pHdr->size = vLen; memcpy(pHdr->data, pVal, vLen); - tqInfo("vgId:%d, vnode stream-task snapshot read data vLen:%d", TD_VID(pReader->pTq->pVnode), vLen); + tqDebug("vgId:%d, vnode stream-task snapshot read data vLen:%d", TD_VID(pReader->pTq->pVnode), vLen); return code; _err: @@ -184,11 +184,11 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa } *ppWriter = pWriter; - tqInfo("vgId:%d, stream-task snapshot writer opened", TD_VID(pTq->pVnode)); + tqDebug("vgId:%d, vnode stream-task snapshot writer opened", TD_VID(pTq->pVnode)); return code; _err: - tqError("vgId:%d, stream-task snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + tqError("vgId:%d, vnode stream-task snapshot writer failed to write since %s", TD_VID(pTq->pVnode), tstrerror(code)); *ppWriter = NULL; return code; return 0; @@ -198,7 +198,7 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) { int32_t code = 0; STQ* pTq = pWriter->pTq; - tqInfo("vgId:%d, stream-task snapshot writer closed", TD_VID(pTq->pVnode)); + tqDebug("vgId:%d, vnode stream-task snapshot writer closed", TD_VID(pTq->pVnode)); if (rollback) { tdbAbort(pWriter->pTq->pStreamMeta->db, pWriter->txn); } else { @@ -218,7 +218,8 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) { return code; _err: - tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code)); + tqError("vgId:%d, vnode stream-task snapshot writer failed to close since %s", TD_VID(pWriter->pTq->pVnode), + tstrerror(code)); return code; return 0; } @@ -253,11 +254,11 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t } else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) { // do nothing } - tqInfo("vgId:%d, stream-task snapshot write", TD_VID(pTq->pVnode)); + tqDebug("vgId:%d, vnode stream-task snapshot write", TD_VID(pTq->pVnode)); return code; _err: - tqError("vgId:%d, stream-task snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + tqError("vgId:%d, vnode stream-task snapshot failed to write since %s", TD_VID(pTq->pVnode), tstrerror(code)); return code; }