diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 37a171e9a4..11787a015b 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -443,7 +443,7 @@ static int32_t mndInitTimer(SMnode *pMnode) { (void)taosThreadAttrInit(&thAttr); (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); if ((code = taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode)) != 0) { - mError("failed to create timer thread since %s", strerror(errno)); + mError("failed to create timer thread since %s", tstrerror(code)); TAOS_RETURN(code); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index dc8f494914..20f0e7b105 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2420,7 +2420,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { if (pStream != NULL) { // TODO:handle error code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); if (code) { - mError("failed to create checkpoint trans, code:%s", strerror(code)); + mError("failed to create checkpoint trans, code:%s", tstrerror(code)); } } else { // todo: wait for the create stream trans completed, and launch the checkpoint trans diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 47b0fdb412..383ffe16da 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1069,6 +1069,9 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { SChkptReportInfo* px = (SChkptReportInfo *)pIter; + if (taosArrayGetSize(px->pTaskList) == 0) { + continue; + } STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0); if (pInfo == NULL) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index faca2020c5..7164c7f543 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -417,7 +417,6 @@ int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return code; } else { tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId); - terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; return TSDB_CODE_STREAM_TASK_NOT_EXIST; } } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index c555da9865..270f678d26 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -558,10 +558,17 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs); } else { // not in restore status, must be in checkpoint status - stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 - " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, - id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, - pInfo->checkpointTime, pReq->checkpointTs); + if (pStatus.state == TASK_STATUS__CK) { + stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 + " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, + id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, + pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs); + } else { + stDebug("s-task:%s vgId:%d status:%s NOT update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 + " checkpointVer:%" PRId64 "->%" PRId64, + id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, + pReq->checkpointVer); + } } ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer && @@ -573,12 +580,11 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV pInfo->checkpointVer = pReq->checkpointVer; pInfo->checkpointTime = pReq->checkpointTs; - streamTaskClearCheckInfo(pTask, true); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); - } else { - stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus.name); } + streamTaskClearCheckInfo(pTask, true); + if (pReq->dropRelHTask) { stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint", pReq->taskId, vgId, pReq->hTaskId); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 493b5013d0..4da108507a 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1083,7 +1083,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { void* buf = NULL; - int32_t code = -1; + int32_t code = 0; SRpcMsg msg = {0}; // serialize @@ -1093,9 +1093,9 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in goto FAIL; } - code = -1; buf = rpcMallocCont(sizeof(SMsgHead) + tlen); if (buf == NULL) { + code = terrno; goto FAIL; } @@ -1119,6 +1119,10 @@ FAIL: rpcFreeCont(buf); } + if (code == -1) { + code = TSDB_CODE_INVALID_MSG; + } + return code; } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 8513a8ba06..d2c5cb05b7 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -295,12 +295,12 @@ void streamMetaHbToMnode(void* param, void* tmrId) { if (code) { stError("vgId:%d failed to send hmMsg to mnode, try again in 5s, code:%s", pMeta->vgId, tstrerror(code)); } - streamMetaRUnLock(pMeta); + streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, "meta-hb-tmr"); - code = taosReleaseRef(streamMetaId, rid); + if (code) { stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid); } diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 6506d449a6..e8c7be5204 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -107,7 +107,7 @@ void streamTaskResumeHelper(void* param, void* tmrId) { int32_t code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); if (code) { - stError("s-task:%s sched task failed, code:%s, ref:%d", pId->idStr, strerror(code), ref); + stError("s-task:%s sched task failed, code:%s, ref:%d", pId->idStr, tstrerror(code), ref); } else { stDebug("trigger to resume s-task:%s after being idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime, ref);