diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 415d1ff9f0..d57dc6e52e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -59,7 +59,6 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList); static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static int32_t mndProcessCheckpointReport(SRpcMsg *pReq); -//static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg); static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg); static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code); @@ -966,8 +965,8 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask return -1; } - code = - setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY, 0); + code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY, + TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != 0) { taosMemoryFree(buf); } @@ -2317,6 +2316,8 @@ static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList) { } taosHashCleanup(pHash); + + mDebug("numOfNodes:%d for stream after extract nodeInfo from stream", (int32_t)taosArrayGetSize(pNodeList)); return TSDB_CODE_SUCCESS; } @@ -2905,7 +2906,6 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) { } addAllStreamTasksIntoBuf(pMnode, pExecInfo); - extractNodeListFromStream(pMnode, pExecInfo->pNodeList); pExecInfo->initTaskList = true; } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 1452ac77d2..bc10ec211d 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -255,7 +255,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { mndInitStreamExecInfo(pMnode, &execInfo); if (!validateHbMsg(execInfo.pNodeList, req.vgId)) { - mError("invalid hbMsg from vgId:%d, discarded", req.vgId); + mError("vgId:%d not exists in nodeList buf, discarded", req.vgId); terrno = TSDB_CODE_INVALID_MSG; doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index a3a7035905..990e7fb987 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -182,6 +182,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard", id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId); taosThreadMutexUnlock(&pTask->lock); + + streamFreeQitem((SStreamQueueItem*)pBlock); return TSDB_CODE_SUCCESS; } @@ -201,6 +203,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId); taosThreadMutexUnlock(&pTask->lock); + + streamFreeQitem((SStreamQueueItem*)pBlock); return TSDB_CODE_SUCCESS; } @@ -210,6 +214,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock " discard", id, vgId, pActiveInfo->activeId, checkpointId); taosThreadMutexUnlock(&pTask->lock); + + streamFreeQitem((SStreamQueueItem*)pBlock); return TSDB_CODE_SUCCESS; } else { // checkpointId == pActiveInfo->activeId if (pActiveInfo->allUpstreamTriggerRecv == 1) { @@ -218,6 +224,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock "checkpointId:%" PRId64 " transId:%d", id, vgId, checkpointId, transId); taosThreadMutexUnlock(&pTask->lock); + streamFreeQitem((SStreamQueueItem*)pBlock); return TSDB_CODE_SUCCESS; } @@ -232,6 +239,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs); taosThreadMutexUnlock(&pTask->lock); + streamFreeQitem((SStreamQueueItem*)pBlock); return TSDB_CODE_SUCCESS; } } @@ -455,6 +463,9 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV id, vgId, pReq->taskId, numOfTasks); } streamMetaWLock(pMeta); + if (streamMetaCommit(pMeta) < 0) { + // persist to disk + } } return TSDB_CODE_SUCCESS;