diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 63da78a174..46f4b0959f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -879,6 +879,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); SStreamTask* streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); +SStreamTask* streamMetaAcquireOneTask(SStreamTask* pTask); void streamMetaClear(SStreamMeta* pMeta); void streamMetaInitBackend(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 53232ccb84..5cff4b318f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -763,8 +763,7 @@ static int32_t schedTaskInFuture(SStreamTask* pTask) { pTask->status.schedIdleTime, ref); // add one ref count for task - // todo this may be failed, and add ref may be failed. - SStreamTask* pAddRefTask = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId); + /*SStreamTask* pAddRefTask = */streamMetaAcquireOneTask(pTask); if (pTask->schedInfo.pIdleTimer == NULL) { pTask->schedInfo.pIdleTimer = taosTmrStart(doStreamExecTaskHelper, pTask->status.schedIdleTime, pTask, streamTimer); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 112777da9e..87c558a99e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -647,6 +647,12 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t return p; } +SStreamTask* streamMetaAcquireOneTask(SStreamTask* pTask) { + int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); + stTrace("s-task:%s acquire task, ref:%d", pTask->id.idStr, ref); + return pTask; +} + void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) { int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); if (ref > 0) {