From aaa12ed4f85c6b22381cab26a848147bc6321f15 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 13 Sep 2023 14:00:34 +0800 Subject: [PATCH] refactor: do some internal refactor about the sched status set --- include/libs/stream/tstream.h | 4 ++++ source/dnode/vnode/src/tq/tq.c | 16 ++++++------- source/libs/stream/src/stream.c | 11 +++++---- source/libs/stream/src/streamExec.c | 9 +++---- source/libs/stream/src/streamRecover.c | 2 +- source/libs/stream/src/streamTask.c | 33 +++++++++++++++++++++++++- 6 files changed, 56 insertions(+), 19 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3eb624f932..a5baf33612 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -401,6 +401,7 @@ typedef struct SStreamMeta { int32_t vgId; int64_t stage; SRWLatch lock; +// TdThreadRwlock lock; int32_t walScanCounter; void* streamBackend; int64_t streamBackendRid; @@ -660,6 +661,9 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); +int8_t streamTaskSetSchedStatusWait(SStreamTask* pTask); +int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask); +int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask); int32_t streamTaskStop(SStreamTask* pTask); int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 58544090e2..7186adc2c4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1046,8 +1046,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } // we have to continue retrying to successfully execute the scan history task. - int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, - TASK_SCHED_STATUS__WAITING); + int8_t schedStatus = streamTaskSetSchedStatusWait(pTask); if (schedStatus != TASK_SCHED_STATUS__INACTIVE) { tqError( "s-task:%s failed to start scan-history in first stream time window since already started, unexpected " @@ -1064,9 +1063,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamScanHistoryData(pTask); if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; - tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, - TASK_SCHED_STATUS__INACTIVE); - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + int8_t status = streamTaskSetSchedStatusInActive(pTask); + tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status); streamMetaReleaseTask(pMeta, pTask); return 0; } @@ -1106,8 +1104,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } // now we can stop the stream task execution - int64_t latestVer = 0; + taosThreadMutexLock(&pStreamTask->lock); streamTaskHalt(pStreamTask); tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, @@ -1141,7 +1139,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE); - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + /*int8_t status = */streamTaskSetSchedStatusInActive(pTask); // set the fill-history task to be normal if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) { @@ -1308,9 +1306,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { pTask->chkInfo.nextProcessVer); streamProcessRunReq(pTask); } else { - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + int8_t status = streamTaskSetSchedStatusInActive(pTask); tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId, - pTask->id.idStr, streamGetTaskStatusStr(st), pTask->status.schedStatus); + pTask->id.idStr, streamGetTaskStatusStr(st), status); } streamMetaReleaseTask(pTq->pStreamMeta, pTask); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index d16822be60..1b4de5e6c4 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -108,14 +108,13 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { } int32_t streamSchedExec(SStreamTask* pTask) { - int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, - TASK_SCHED_STATUS__WAITING); + int8_t schedStatus = streamTaskSetSchedStatusWait(pTask); if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + /*int8_t status = */streamTaskSetSchedStatusInActive(pTask); qError("failed to create msg to aunch s-task:%s, reason out of memory", pTask->id.idStr); return -1; } @@ -256,7 +255,11 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S } tDeleteStreamDispatchReq(pReq); - streamTryExec(pTask); + + int8_t schedStatus = streamTaskSetSchedStatusWait(pTask); + if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { + streamTryExec(pTask); + } return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 16fc54d8be..b981fcb41a 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -509,7 +509,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock code = streamTransferStateToStreamTask(pTask); if (code != TSDB_CODE_SUCCESS) { - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + /*int8_t status = */streamTaskSetSchedStatusInActive(pTask); } } else { qDebug("s-task:%s sink task does not transfer state", id); @@ -615,11 +615,9 @@ bool streamTaskIsIdle(const SStreamTask* pTask) { int32_t streamTryExec(SStreamTask* pTask) { // this function may be executed by multi-threads, so status check is required. - int8_t schedStatus = - atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE); - const char* id = pTask->id.idStr; + int8_t schedStatus = streamTaskSetSchedStatusActive(pTask); if (schedStatus == TASK_SCHED_STATUS__WAITING) { while (1) { int32_t code = streamExecForAll(pTask); @@ -628,9 +626,12 @@ int32_t streamTryExec(SStreamTask* pTask) { return -1; } + taosThreadMutexLock(&pTask->lock); if (taosQueueEmpty(pTask->inputInfo.queue->pQueue) || streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + taosThreadMutexUnlock(&pTask->lock); + qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); return 0; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 54d5957900..8b2a800576 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -480,7 +480,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { // execute in the scan history complete call back msg, ready to process data from inputQ streamSetStatusNormal(pTask); - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + streamTaskSetSchedStatusInActive(pTask); taosWLockLatch(&pMeta->lock); streamMetaSaveTask(pMeta, pTask); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 84e373172f..71a9a3102c 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -332,7 +332,6 @@ void tFreeStreamTask(SStreamTask* pTask) { } pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList); - taosThreadMutexDestroy(&pTask->lock); if (pTask->msgInfo.pData != NULL) { destroyStreamDataBlock(pTask->msgInfo.pData); pTask->msgInfo.pData = NULL; @@ -553,3 +552,35 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) { qDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr); } + +int8_t streamTaskSetSchedStatusWait(SStreamTask* pTask) { + taosThreadMutexLock(&pTask->lock); + int8_t status = pTask->status.schedStatus; + if (status == TASK_SCHED_STATUS__INACTIVE) { + pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING; + } + taosThreadMutexUnlock(&pTask->lock); + + return status; +} + +int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) { + taosThreadMutexLock(&pTask->lock); + int8_t status = pTask->status.schedStatus; + if (status == TASK_SCHED_STATUS__WAITING) { + pTask->status.schedStatus = TASK_SCHED_STATUS__ACTIVE; + } + taosThreadMutexUnlock(&pTask->lock); + + return status; +} + +int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask) { + taosThreadMutexLock(&pTask->lock); + int8_t status = pTask->status.schedStatus; + ASSERT(status == TASK_SCHED_STATUS__WAITING || status == TASK_SCHED_STATUS__ACTIVE); + pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; + taosThreadMutexUnlock(&pTask->lock); + + return status; +}