diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index 4d433042ad..ef8f3097da 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -183,7 +183,7 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
}
SStreamTask* pTask = *(SStreamTask**)pIter;
- if (pTask->status.timerActive == 1) {
+ if (pTask->status.timerActive >= 1) {
inTimer = true;
}
}
diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c
index 8334ea1c88..6771d0cc28 100644
--- a/source/libs/stream/src/streamDispatch.c
+++ b/source/libs/stream/src/streamDispatch.c
@@ -550,13 +550,27 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
static void doRetryDispatchData(void* param, void* tmrId) {
SStreamTask* pTask = param;
+
+ if (streamTaskShouldStop(&pTask->status)) {
+ atomic_sub_fetch_8(&pTask->status.timerActive, 1);
+ qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
+ return;
+ }
+
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (code != TSDB_CODE_SUCCESS) {
- qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
- atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
- streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
+ if (!streamTaskShouldStop(&pTask->status)) {
+ qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
+ atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
+ streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
+ } else {
+ atomic_sub_fetch_8(&pTask->status.timerActive, 1);
+ qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
+ }
+ } else {
+ atomic_sub_fetch_8(&pTask->status.timerActive, 1);
}
}
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index ae07738868..b689f0c8f2 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -137,19 +137,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
if (pIter == NULL) {
break;
}
-
- SStreamTask* pTask = *(SStreamTask**)pIter;
- if (pTask->schedTimer) {
- taosTmrStop(pTask->schedTimer);
- pTask->schedTimer = NULL;
- }
-
- if (pTask->launchTaskTimer) {
- taosTmrStop(pTask->launchTaskTimer);
- pTask->launchTaskTimer = NULL;
- }
-
- tFreeStreamTask(pTask);
+ tFreeStreamTask(*(SStreamTask**)pIter);
}
taosHashCleanup(pMeta->pTasks);
@@ -362,11 +350,6 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
int32_t num = taosArrayGetSize(pMeta->pTaskList);
doRemoveIdFromList(pMeta, num, pTask->id.taskId);
- // remove the ref by timer
- if (pTask->triggerParam != 0) {
- taosTmrStop(pTask->schedTimer);
- }
-
streamMetaRemoveTask(pMeta, taskId);
streamMetaReleaseTask(pMeta, pTask);
} else {
diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c
index bd2d67e14a..bcb3760c93 100644
--- a/source/libs/stream/src/streamRecover.c
+++ b/source/libs/stream/src/streamRecover.c
@@ -540,14 +540,14 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
taosWLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
if (ppTask) {
- ASSERT((*ppTask)->status.timerActive == 1);
+ ASSERT((*ppTask)->status.timerActive >= 1);
if (streamTaskShouldStop(&(*ppTask)->status)) {
const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);
taosMemoryFree(pInfo);
- (*ppTask)->status.timerActive = 0;
+ atomic_sub_fetch_8(&(*ppTask)->status.timerActive, 1);
taosWUnLockLatch(&pMeta->lock);
return;
}
@@ -556,7 +556,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId);
if (pTask != NULL) {
- ASSERT(pTask->status.timerActive == 1);
+ ASSERT(pTask->status.timerActive >= 1);
// abort the timer if intend to stop task
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
@@ -578,7 +578,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
}
// not in timer anymore
- pTask->status.timerActive = 0;
+ atomic_sub_fetch_8(&pTask->status.timerActive, 1);
streamMetaReleaseTask(pMeta, pTask);
} else {
qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId);
@@ -609,11 +609,11 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
// todo failed to create timer
taosMemoryFree(pInfo);
} else {
- pTask->status.timerActive = 1; // timer is active
+ atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active
qDebug("s-task:%s set timer active flag", pTask->id.idStr);
}
} else { // timer exists
- pTask->status.timerActive = 1;
+ ASSERT(pTask->status.timerActive > 0);
qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
}
diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c
index 1eb8d11916..987a45cb5c 100644
--- a/source/libs/stream/src/streamTask.c
+++ b/source/libs/stream/src/streamTask.c
@@ -13,11 +13,11 @@
* along with this program. If not, see .
*/
-#include
-#include
+#include "streamInt.h"
#include "executor.h"
#include "tstream.h"
#include "wal.h"
+#include "ttimer.h"
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray);
@@ -213,6 +213,22 @@ static void freeItem(void* p) {
void tFreeStreamTask(SStreamTask* pTask) {
qDebug("free s-task:%s, %p", pTask->id.idStr, pTask);
+ // remove the ref by timer
+ while(pTask->status.timerActive > 0) {
+ qDebug("s-task:%s wait for task stop timer activities", pTask->id.idStr);
+ taosMsleep(10);
+ }
+
+ if (pTask->schedTimer != NULL) {
+ taosTmrStop(pTask->schedTimer);
+ pTask->schedTimer = NULL;
+ }
+
+ if (pTask->launchTaskTimer != NULL) {
+ taosTmrStop(pTask->launchTaskTimer);
+ pTask->launchTaskTimer = NULL;
+ }
+
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
if (pTask->inputQueue) {
streamQueueClose(pTask->inputQueue);