From db897fb03a3e6059311859a937bcbff0d3115960 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 28 Sep 2023 09:25:23 +0800 Subject: [PATCH] fix(stream): opt perf. --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tq/tq.c | 5 ++--- source/libs/stream/src/stream.c | 3 +-- source/libs/stream/src/streamTask.c | 21 +++++++++++++-------- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 61d74b5809..82fefc7e1c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -691,7 +691,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); -int8_t streamTaskSetSchedStatusWait(SStreamTask* pTask); +bool streamTaskSetSchedStatusWait(SStreamTask* pTask); int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask); int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 22f0f256ea..3b11b5b764 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1066,12 +1066,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } // we have to continue retrying to successfully execute the scan history task. - int8_t schedStatus = streamTaskSetSchedStatusWait(pTask); - if (schedStatus != TASK_SCHED_STATUS__INACTIVE) { + if (!streamTaskSetSchedStatusWait(pTask)) { tqError( "s-task:%s failed to start scan-history in first stream time window since already started, unexpected " "sched-status:%d", - id, schedStatus); + id, pTask->status.schedStatus); streamMetaReleaseTask(pMeta, pTask); return 0; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 2abd3bac05..6f9a577a46 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -108,8 +108,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { } int32_t streamSchedExec(SStreamTask* pTask) { - int8_t schedStatus = streamTaskSetSchedStatusWait(pTask); - if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { + if (streamTaskSetSchedStatusWait(pTask)) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index e77ab16040..c1ffcda8a5 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -624,15 +624,20 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) { stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr); } -int8_t streamTaskSetSchedStatusWait(SStreamTask* pTask) { - taosThreadMutexLock(&pTask->lock); - int8_t status = pTask->status.schedStatus; - if (status == TASK_SCHED_STATUS__INACTIVE) { - pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING; - } - taosThreadMutexUnlock(&pTask->lock); +bool streamTaskSetSchedStatusWait(SStreamTask* pTask) { + bool ret = false; - return status; + // double check + if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) { + taosThreadMutexLock(&pTask->lock); + if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) { + pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING; + ret = true; + } + taosThreadMutexUnlock(&pTask->lock); + } + + return ret; } int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {