diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1611cc4be1..354f202e55 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -593,7 +593,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); // agg level int32_t streamAggScanHistoryPrepare(SStreamTask* pTask); -int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId); +int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t childId); // stream task meta void streamMetaInit(); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 534704e462..1c70dfca0d 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -90,6 +90,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0); ASSERT(pTask->exec.pExecutor); + streamTaskOpenAllUpstreamInput(pTask); streamSetupScheduleTrigger(pTask); qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE, @@ -292,7 +293,7 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { return -1; } // do process request - if (streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId) < 0) { + if (streamProcessScanHistoryFinishReq(pTask, req.childId) < 0) { streamMetaReleaseTask(pSnode->pMeta, pTask); return -1; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 25c6fbe81d..dcd44209eb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1325,7 +1325,7 @@ int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { return -1; } - int32_t code = streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId); + int32_t code = streamProcessScanHistoryFinishReq(pTask, req.childId); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return code; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index a12cab9f53..0740566d15 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -379,7 +379,7 @@ int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) { return 0; } -int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId) { +int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t childId) { if (pTask->info.taskLevel == TASK_LEVEL__AGG) { int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1); ASSERT(left >= 0); @@ -391,8 +391,8 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, in streamAggUpstreamScanHistoryFinish(pTask); } else { - qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", - pTask->id.idStr, taskId, childId, left); + qDebug("s-task:%s receive scan-history data finish msg from upstream (index:%d), unfinished:%d", + pTask->id.idStr, childId, left); } } diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 5b1773e664..7da8da09bf 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -118,7 +118,7 @@ echo "statusInterval 1" >> $TAOS_CFG echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG echo "debugFlag 0" >> $TAOS_CFG -echo "tmrDebugFlag 143" >> $TAOS_CFG +echo "tmrDebugFlag 131" >> $TAOS_CFG echo "uDebugFlag 143" >> $TAOS_CFG echo "rpcDebugFlag 143" >> $TAOS_CFG echo "jniDebugFlag 143" >> $TAOS_CFG