diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 432e1d995a..2c46b2aa7b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -309,7 +309,6 @@ struct SStreamTask { SHistDataRange dataRange; SStreamId historyTaskId; SStreamId streamTaskId; - SArray* pUpstreamEpInfoList; // SArray, // children info int32_t nextCheckId; SArray* checkpointInfo; // SArray int64_t initTs; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 640f581f05..d4a897f99c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -937,10 +937,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { streamSetupScheduleTrigger(pTask); SCheckpointInfo* pChkInfo = &pTask->chkInfo; - taosThreadMutexInit(&pTask->lock, NULL); - + // checkpoint ver is the kept version, handled data should be the next version. if (pTask->chkInfo.checkpointId != 0) { - // checkpoint ver is the kept version, handled data should be the next version. pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1; tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 1e4396c8d0..b1ea4754c7 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -395,7 +395,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { // agg int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) { - pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList); + pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamInfoList); qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, streamGetTaskStatusStr(pTask->status.taskStatus)); @@ -426,7 +426,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory ASSERT(left >= 0); if (left == 0) { - int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList); + int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamInfoList); qDebug( "s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send " "rsp to all upstream tasks", diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 8088791e0e..dc598b6c69 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -270,7 +270,6 @@ void tFreeStreamTask(SStreamTask* pTask) { walCloseReader(pTask->exec.pWalReader); } - taosArrayDestroyP(pTask->pUpstreamEpInfoList, taosMemoryFree); if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper); taosMemoryFree(pTask->tbSink.pTSchema);