fix(stream): fix syntax error.

This commit is contained in:
Haojun Liao 2023-12-01 11:54:13 +08:00
parent e753d631af
commit dba58f5e38
3 changed files with 6 additions and 5 deletions

View File

@ -148,9 +148,8 @@ void mndCleanupStream(SMnode *pMnode) {
taosArrayDestroy(execInfo.pTaskList); taosArrayDestroy(execInfo.pTaskList);
taosHashCleanup(execInfo.pTaskMap); taosHashCleanup(execInfo.pTaskMap);
taosHashCleanup(execInfo.transMgmt.pDBTrans); taosHashCleanup(execInfo.transMgmt.pDBTrans);
taosThreadMutexDestroy(&execInfo.lock);
taosHashCleanup(execInfo.transMgmt.pDBTrans);
taosHashCleanup(execInfo.transMgmt.pWaitingList); taosHashCleanup(execInfo.transMgmt.pWaitingList);
taosThreadMutexDestroy(&execInfo.lock);
mDebug("mnd stream exec info cleanup"); mDebug("mnd stream exec info cleanup");
} }

View File

@ -822,7 +822,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1; if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &taskId) < 0) return -1; if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.status) < 0) return -1; if (tDecodeI32(pDecoder, &entry.status) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.stage) < 0) return -1; if (tDecodeI64(pDecoder, &entry.stage) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1;

View File

@ -468,14 +468,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
} }
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false); streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
taosGetTimestampMs(), false);
// automatically set the related fill-history task to be failed. // automatically set the related fill-history task to be failed.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id; STaskId* pId = &pTask->hTaskInfo.id;
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId); SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId);
streamMetaUpdateTaskDownstreamStatus(pHTask, pHTask->execInfo.init, taosGetTimestampMs(), false); streamMetaUpdateTaskDownstreamStatus(pHTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init,
taosGetTimestampMs(), false);
streamMetaReleaseTask(pTask->pMeta, pHTask); streamMetaReleaseTask(pTask->pMeta, pHTask);
} }
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms