diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 0b758d0d22..4ee6c587b0 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -26,6 +26,7 @@ struct SStreamStateReader { TBC* pCur; SStreamSnapReader* pReaderImpl; + int32_t complete; }; int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader) { @@ -46,8 +47,9 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS SStreamSnapReader* pSnapReader = NULL; sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "checkpoints"); - streamSnapReaderOpen(pTq, sver, ever, tdir, &pSnapReader); - + if (streamSnapReaderOpen(pTq, sver, ever, tdir, &pSnapReader) == 0) { + pReader->complete = 1; + } pReader->pReaderImpl = pSnapReader; tqInfo("vgId:%d, vnode stream-state snapshot reader opened", TD_VID(pTq->pVnode)); @@ -63,6 +65,7 @@ _err: int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) { int32_t code = 0; + tqInfo("vgId:%d, vnode stream-state snapshot reader closed", TD_VID(pReader->pTq->pVnode)); streamSnapReaderClose(pReader->pReaderImpl); taosMemoryFree(pReader); return code; @@ -70,10 +73,16 @@ int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) { int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) { int32_t code = 0; + if (pReader->complete == 0) { + return 0; + } uint8_t* rowData = NULL; int64_t len; code = streamSnapRead(pReader->pReaderImpl, &rowData, &len); + if (rowData == NULL || len == 0) { + return code; + } *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + len); if (*ppData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -122,16 +131,18 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS SStreamSnapWriter* pSnapWriter = NULL; streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter); + tqInfo("vgId:%d, snapshot writer opened", TD_VID(pTq->pVnode)); pWriter->pWriterImpl = pSnapWriter; - + return code; _err: - tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + tqError("vgId:%d, vnode snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); *ppWriter = NULL; return code; } int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) { int32_t code = 0; + tqError("vgId:%d, vnode snapshot writer close", TD_VID(pWriter->pTq->pVnode)); code = streamSnapWriterClose(pWriter->pWriterImpl, rollback); taosMemoryFree(pWriter); return code; @@ -139,6 +150,7 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; + tqError("vgId:%d, vnode snapshot write", TD_VID(pWriter->pTq->pVnode)); code = streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); return code; } diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 93aee58b35..20d890595c 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -182,6 +182,7 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa } *ppWriter = pWriter; + tqInfo("vgId:%d, stream-task snapshot writer opened", TD_VID(pTq->pVnode)); return code; _err: @@ -195,6 +196,7 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) { int32_t code = 0; STQ* pTq = pWriter->pTq; + tqInfo("vgId:%d, stream-task snapshot writer closed", TD_VID(pTq->pVnode)); if (rollback) { tdbAbort(pWriter->pTq->pStreamMeta->db, pWriter->txn); } else { @@ -249,6 +251,7 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t } else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) { // do nothing } + tqInfo("vgId:%d, stream-task snapshot write", TD_VID(pTq->pVnode)); return code; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 14c6b6e619..14642d74f3 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -584,7 +584,9 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in vnode query queue is processing"); - if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME || pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) && !syncIsReadyForRead(pVnode->sync)) { + if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME || + pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) && + !syncIsReadyForRead(pVnode->sync)) { vnodeRedirectRpcMsg(pVnode, pMsg, terrno); return 0; } @@ -640,8 +642,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return vnodeGetTableCfg(pVnode, pMsg, true); case TDMT_VND_BATCH_META: return vnodeGetBatchMeta(pVnode, pMsg); -// case TDMT_VND_TMQ_CONSUME: -// return tqProcessPollReq(pVnode->pTq, pMsg); + // case TDMT_VND_TMQ_CONSUME: + // return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_VND_TMQ_VG_WALINFO: return tqProcessVgWalInfoReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: @@ -1566,7 +1568,7 @@ static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t ver) { } static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { - vInfo("vgId:%d, vnode handle msgType:alter-confirm, alter confim msg is processed", TD_VID(pVnode)); + vInfo("vgId:%d, vnode handle msgType:alter-confirm, alter confirm msg is processed", TD_VID(pVnode)); int32_t code = TSDB_CODE_SUCCESS; if (!pVnode->config.hashChange) { goto _exit; diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 5fdc5daa20..763e6d6b70 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -131,6 +131,10 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) { } taosCloseDir(&pDir); + if (pFile->pCurrent == NULL) { + code = -1; + return code; + } SArray* list = taosArrayInit(64, sizeof(SBackendFileItem)); SBackendFileItem item; @@ -239,7 +243,8 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); if (nread == -1) { code = TAOS_SYSTEM_ERROR(terrno); - qError("stream snap failed to read snap, file name:%s, reason:%s", item->name, tstrerror(code)); + qError("stream snap failed to read snap, file name:%s, type:%d, reason:%s", item->name, item->type, + tstrerror(code)); return code; // handle later return -1;