From f2bba670d8e6cd566f9d53063d30daafbb48a969 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Dec 2023 11:26:49 +0800 Subject: [PATCH 1/3] fix(stream): remove unused attribution. --- include/libs/stream/tstream.h | 1 - 1 file changed, 1 deletion(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index f6737b4e27..6730d211df 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -323,7 +323,6 @@ typedef struct SStreamStatus { int8_t schedStatus; int8_t keepTaskStatus; bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it - int8_t pauseAllowed; // allowed task status to be set to be paused int32_t timerActive; // timer is active int32_t inScanHistorySentinel; } SStreamStatus; From b4a1907fb7f5533bd4e2137274dd539ee65528ca Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Dec 2023 11:52:59 +0800 Subject: [PATCH 2/3] fix(stream): add check for null ptr and leader . --- source/dnode/vnode/src/tqCommon/tqCommon.c | 11 +++++++++-- source/libs/stream/src/streamStart.c | 16 +++++++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 13385a544a..480ed7fd38 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -368,6 +368,7 @@ int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMs } int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { + int32_t code = TSDB_CODE_SUCCESS; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -379,6 +380,12 @@ int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMs tDecodeCompleteHistoryDataMsg(&decoder, &req); tDecoderClear(&decoder); + if (pMeta->role == NODE_ROLE_FOLLOWER) { + tqError("s-task:0x%x (vgId:%d) not handle the scan-history finish rsp, since it becomes follower", + req.upstreamTaskId, pMeta->vgId); + return TASK_DOWNSTREAM_NOT_LEADER; + } + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); if (pTask == NULL) { tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", @@ -395,11 +402,11 @@ int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMs "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " "completed msg", pTask->id.idStr, req.downstreamId); - streamProcessScanHistoryFinishRsp(pTask); + code = streamProcessScanHistoryFinishRsp(pTask); } streamMetaReleaseTask(pMeta, pTask); - return 0; + return code; } int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 309f377621..53f87591e8 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -476,11 +476,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs // automatically set the related fill-history task to be failed. if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; + int64_t current = taosGetTimestampMs(); SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId); - streamMetaUpdateTaskDownstreamStatus(pHTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init, - taosGetTimestampMs(), false); - streamMetaReleaseTask(pTask->pMeta, pHTask); + if (pHTask != NULL) { + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init, current, + false); + streamMetaReleaseTask(pTask->pMeta, pHTask); + } else { + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pId->streamId, pId->taskId, 0, current, false); + } } } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); @@ -657,6 +662,11 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { ETaskStatus status = streamTaskGetStatus(pTask, NULL); + // task restart now, not handle the scan-history finish rsp + if (status == TASK_STATUS__UNINIT) { + return TSDB_CODE_INVALID_MSG; + } + ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY); SStreamMeta* pMeta = pTask->pMeta; From f3e0feb998cef7c725ad228d81dde512f50e624f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Dec 2023 14:26:15 +0800 Subject: [PATCH 3/3] other: wait for 5sec before check results. --- tests/system-test/8-stream/stream_basic.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/system-test/8-stream/stream_basic.py b/tests/system-test/8-stream/stream_basic.py index 7f4d1d5ee3..e838950bb8 100644 --- a/tests/system-test/8-stream/stream_basic.py +++ b/tests/system-test/8-stream/stream_basic.py @@ -91,6 +91,8 @@ class TDTestCase: tdLog.info("loop wait result ...") tdSql.checkDataLoop(0, 0, 99999, sql, loopCount=120, waitTime=0.5) + time.sleep(5) + # check all data is correct sql = "select * from sta where cnt != 20;" tdSql.query(sql)