fix(stream): fix the stream task after checking if it is in the timer activities.

This commit is contained in:
Haojun Liao 2023-08-07 17:49:36 +08:00
parent e6b80e6c52
commit aea1690cd1
5 changed files with 43 additions and 30 deletions

View File

@ -183,7 +183,7 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
} }
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->status.timerActive == 1) { if (pTask->status.timerActive >= 1) {
inTimer = true; inTimer = true;
} }
} }

View File

@ -550,13 +550,27 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
static void doRetryDispatchData(void* param, void* tmrId) { static void doRetryDispatchData(void* param, void* tmrId) {
SStreamTask* pTask = param; SStreamTask* pTask = param;
if (streamTaskShouldStop(&pTask->status)) {
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
return;
}
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); if (!streamTaskShouldStop(&pTask->status)) {
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
} else {
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
}
} else {
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
} }
} }

View File

@ -137,19 +137,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
if (pIter == NULL) { if (pIter == NULL) {
break; break;
} }
tFreeStreamTask(*(SStreamTask**)pIter);
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->schedTimer) {
taosTmrStop(pTask->schedTimer);
pTask->schedTimer = NULL;
}
if (pTask->launchTaskTimer) {
taosTmrStop(pTask->launchTaskTimer);
pTask->launchTaskTimer = NULL;
}
tFreeStreamTask(pTask);
} }
taosHashCleanup(pMeta->pTasks); taosHashCleanup(pMeta->pTasks);
@ -362,11 +350,6 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
int32_t num = taosArrayGetSize(pMeta->pTaskList); int32_t num = taosArrayGetSize(pMeta->pTaskList);
doRemoveIdFromList(pMeta, num, pTask->id.taskId); doRemoveIdFromList(pMeta, num, pTask->id.taskId);
// remove the ref by timer
if (pTask->triggerParam != 0) {
taosTmrStop(pTask->schedTimer);
}
streamMetaRemoveTask(pMeta, taskId); streamMetaRemoveTask(pMeta, taskId);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} else { } else {

View File

@ -540,14 +540,14 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
if (ppTask) { if (ppTask) {
ASSERT((*ppTask)->status.timerActive == 1); ASSERT((*ppTask)->status.timerActive >= 1);
if (streamTaskShouldStop(&(*ppTask)->status)) { if (streamTaskShouldStop(&(*ppTask)->status)) {
const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus); const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus); qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
(*ppTask)->status.timerActive = 0; atomic_sub_fetch_8(&(*ppTask)->status.timerActive, 1);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
return; return;
} }
@ -556,7 +556,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId);
if (pTask != NULL) { if (pTask != NULL) {
ASSERT(pTask->status.timerActive == 1); ASSERT(pTask->status.timerActive >= 1);
// abort the timer if intend to stop task // abort the timer if intend to stop task
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
@ -578,7 +578,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
} }
// not in timer anymore // not in timer anymore
pTask->status.timerActive = 0; atomic_sub_fetch_8(&pTask->status.timerActive, 1);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} else { } else {
qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId); qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId);
@ -609,11 +609,11 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
// todo failed to create timer // todo failed to create timer
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
} else { } else {
pTask->status.timerActive = 1; // timer is active atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active
qDebug("s-task:%s set timer active flag", pTask->id.idStr); qDebug("s-task:%s set timer active flag", pTask->id.idStr);
} }
} else { // timer exists } else { // timer exists
pTask->status.timerActive = 1; ASSERT(pTask->status.timerActive > 0);
qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr); qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer); taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
} }

View File

@ -13,11 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <libs/transport/trpc.h> #include "streamInt.h"
#include <streamInt.h>
#include "executor.h" #include "executor.h"
#include "tstream.h" #include "tstream.h"
#include "wal.h" #include "wal.h"
#include "ttimer.h"
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray); int32_t childId = taosArrayGetSize(pArray);
@ -213,6 +213,22 @@ static void freeItem(void* p) {
void tFreeStreamTask(SStreamTask* pTask) { void tFreeStreamTask(SStreamTask* pTask) {
qDebug("free s-task:%s, %p", pTask->id.idStr, pTask); qDebug("free s-task:%s, %p", pTask->id.idStr, pTask);
// remove the ref by timer
while(pTask->status.timerActive > 0) {
qDebug("s-task:%s wait for task stop timer activities", pTask->id.idStr);
taosMsleep(10);
}
if (pTask->schedTimer != NULL) {
taosTmrStop(pTask->schedTimer);
pTask->schedTimer = NULL;
}
if (pTask->launchTaskTimer != NULL) {
taosTmrStop(pTask->launchTaskTimer);
pTask->launchTaskTimer = NULL;
}
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
if (pTask->inputQueue) { if (pTask->inputQueue) {
streamQueueClose(pTask->inputQueue); streamQueueClose(pTask->inputQueue);