From aea1690cd1a77e394990e27017db18a7fce744b0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 7 Aug 2023 17:49:36 +0800 Subject: [PATCH] fix(stream): fix the stream task after checking if it is in the timer activities. --- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/stream/src/streamDispatch.c | 20 +++++++++++++++++--- source/libs/stream/src/streamMeta.c | 19 +------------------ source/libs/stream/src/streamRecover.c | 12 ++++++------ source/libs/stream/src/streamTask.c | 20 ++++++++++++++++++-- 5 files changed, 43 insertions(+), 30 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4d433042ad..ef8f3097da 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -183,7 +183,7 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) { } SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->status.timerActive == 1) { + if (pTask->status.timerActive >= 1) { inTimer = true; } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 8334ea1c88..6771d0cc28 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -550,13 +550,27 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat static void doRetryDispatchData(void* param, void* tmrId) { SStreamTask* pTask = param; + + if (streamTaskShouldStop(&pTask->status)) { + atomic_sub_fetch_8(&pTask->status.timerActive, 1); + qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr); + return; + } + ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); if (code != TSDB_CODE_SUCCESS) { - qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); - atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); - streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); + if (!streamTaskShouldStop(&pTask->status)) { + qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); + atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); + streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); + } else { + atomic_sub_fetch_8(&pTask->status.timerActive, 1); + qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr); + } + } else { + atomic_sub_fetch_8(&pTask->status.timerActive, 1); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ae07738868..b689f0c8f2 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -137,19 +137,7 @@ void streamMetaClose(SStreamMeta* pMeta) { if (pIter == NULL) { break; } - - SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->schedTimer) { - taosTmrStop(pTask->schedTimer); - pTask->schedTimer = NULL; - } - - if (pTask->launchTaskTimer) { - taosTmrStop(pTask->launchTaskTimer); - pTask->launchTaskTimer = NULL; - } - - tFreeStreamTask(pTask); + tFreeStreamTask(*(SStreamTask**)pIter); } taosHashCleanup(pMeta->pTasks); @@ -362,11 +350,6 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { int32_t num = taosArrayGetSize(pMeta->pTaskList); doRemoveIdFromList(pMeta, num, pTask->id.taskId); - // remove the ref by timer - if (pTask->triggerParam != 0) { - taosTmrStop(pTask->schedTimer); - } - streamMetaRemoveTask(pMeta, taskId); streamMetaReleaseTask(pMeta, pTask); } else { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index bd2d67e14a..bcb3760c93 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -540,14 +540,14 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { taosWLockLatch(&pMeta->lock); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t)); if (ppTask) { - ASSERT((*ppTask)->status.timerActive == 1); + ASSERT((*ppTask)->status.timerActive >= 1); if (streamTaskShouldStop(&(*ppTask)->status)) { const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus); qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus); taosMemoryFree(pInfo); - (*ppTask)->status.timerActive = 0; + atomic_sub_fetch_8(&(*ppTask)->status.timerActive, 1); taosWUnLockLatch(&pMeta->lock); return; } @@ -556,7 +556,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId); if (pTask != NULL) { - ASSERT(pTask->status.timerActive == 1); + ASSERT(pTask->status.timerActive >= 1); // abort the timer if intend to stop task SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); @@ -578,7 +578,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { } // not in timer anymore - pTask->status.timerActive = 0; + atomic_sub_fetch_8(&pTask->status.timerActive, 1); streamMetaReleaseTask(pMeta, pTask); } else { qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId); @@ -609,11 +609,11 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { // todo failed to create timer taosMemoryFree(pInfo); } else { - pTask->status.timerActive = 1; // timer is active + atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active qDebug("s-task:%s set timer active flag", pTask->id.idStr); } } else { // timer exists - pTask->status.timerActive = 1; + ASSERT(pTask->status.timerActive > 0); qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr); taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 1eb8d11916..987a45cb5c 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -13,11 +13,11 @@ * along with this program. If not, see . */ -#include -#include +#include "streamInt.h" #include "executor.h" #include "tstream.h" #include "wal.h" +#include "ttimer.h" static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { int32_t childId = taosArrayGetSize(pArray); @@ -213,6 +213,22 @@ static void freeItem(void* p) { void tFreeStreamTask(SStreamTask* pTask) { qDebug("free s-task:%s, %p", pTask->id.idStr, pTask); + // remove the ref by timer + while(pTask->status.timerActive > 0) { + qDebug("s-task:%s wait for task stop timer activities", pTask->id.idStr); + taosMsleep(10); + } + + if (pTask->schedTimer != NULL) { + taosTmrStop(pTask->schedTimer); + pTask->schedTimer = NULL; + } + + if (pTask->launchTaskTimer != NULL) { + taosTmrStop(pTask->launchTaskTimer); + pTask->launchTaskTimer = NULL; + } + int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); if (pTask->inputQueue) { streamQueueClose(pTask->inputQueue);