diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 47b0fdb412..383ffe16da 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1069,6 +1069,9 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { SChkptReportInfo* px = (SChkptReportInfo *)pIter; + if (taosArrayGetSize(px->pTaskList) == 0) { + continue; + } STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0); if (pInfo == NULL) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index faca2020c5..7164c7f543 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -417,7 +417,6 @@ int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return code; } else { tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId); - terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; return TSDB_CODE_STREAM_TASK_NOT_EXIST; } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 493b5013d0..4da108507a 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1083,7 +1083,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { void* buf = NULL; - int32_t code = -1; + int32_t code = 0; SRpcMsg msg = {0}; // serialize @@ -1093,9 +1093,9 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in goto FAIL; } - code = -1; buf = rpcMallocCont(sizeof(SMsgHead) + tlen); if (buf == NULL) { + code = terrno; goto FAIL; } @@ -1119,6 +1119,10 @@ FAIL: rpcFreeCont(buf); } + if (code == -1) { + code = TSDB_CODE_INVALID_MSG; + } + return code; } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 8513a8ba06..898e2bbc0b 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -295,10 +295,14 @@ void streamMetaHbToMnode(void* param, void* tmrId) { if (code) { stError("vgId:%d failed to send hmMsg to mnode, try again in 5s, code:%s", pMeta->vgId, tstrerror(code)); } - streamMetaRUnLock(pMeta); - streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, - "meta-hb-tmr"); + + if (code != TSDB_CODE_APP_IS_STOPPING) { + streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, + "meta-hb-tmr"); + } else { + stDebug("vgId:%d is stopping, not start hb again", pMeta->vgId); + } code = taosReleaseRef(streamMetaId, rid); if (code) {