diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index f6737b4e27..6730d211df 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -323,7 +323,6 @@ typedef struct SStreamStatus { int8_t schedStatus; int8_t keepTaskStatus; bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it - int8_t pauseAllowed; // allowed task status to be set to be paused int32_t timerActive; // timer is active int32_t inScanHistorySentinel; } SStreamStatus; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index b3c4479efc..2140b80eef 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -369,6 +369,7 @@ int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMs } int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { + int32_t code = TSDB_CODE_SUCCESS; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -380,6 +381,12 @@ int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMs tDecodeCompleteHistoryDataMsg(&decoder, &req); tDecoderClear(&decoder); + if (pMeta->role == NODE_ROLE_FOLLOWER) { + tqError("s-task:0x%x (vgId:%d) not handle the scan-history finish rsp, since it becomes follower", + req.upstreamTaskId, pMeta->vgId); + return TASK_DOWNSTREAM_NOT_LEADER; + } + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); if (pTask == NULL) { tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", pMeta->vgId, @@ -396,11 +403,11 @@ int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMs "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " "completed msg", pTask->id.idStr, req.downstreamId); - streamProcessScanHistoryFinishRsp(pTask); + code = streamProcessScanHistoryFinishRsp(pTask); } streamMetaReleaseTask(pMeta, pTask); - return 0; + return code; } int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 309f377621..53f87591e8 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -476,11 +476,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs // automatically set the related fill-history task to be failed. if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; + int64_t current = taosGetTimestampMs(); SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId); - streamMetaUpdateTaskDownstreamStatus(pHTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init, - taosGetTimestampMs(), false); - streamMetaReleaseTask(pTask->pMeta, pHTask); + if (pHTask != NULL) { + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init, current, + false); + streamMetaReleaseTask(pTask->pMeta, pHTask); + } else { + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pId->streamId, pId->taskId, 0, current, false); + } } } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); @@ -657,6 +662,11 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { ETaskStatus status = streamTaskGetStatus(pTask, NULL); + // task restart now, not handle the scan-history finish rsp + if (status == TASK_STATUS__UNINIT) { + return TSDB_CODE_INVALID_MSG; + } + ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY); SStreamMeta* pMeta = pTask->pMeta; diff --git a/tests/system-test/8-stream/stream_basic.py b/tests/system-test/8-stream/stream_basic.py index 7f4d1d5ee3..e838950bb8 100644 --- a/tests/system-test/8-stream/stream_basic.py +++ b/tests/system-test/8-stream/stream_basic.py @@ -91,6 +91,8 @@ class TDTestCase: tdLog.info("loop wait result ...") tdSql.checkDataLoop(0, 0, 99999, sql, loopCount=120, waitTime=0.5) + time.sleep(5) + # check all data is correct sql = "select * from sta where cnt != 20;" tdSql.query(sql)