diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 78acea1cd9..925bfa24c6 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -267,11 +267,11 @@ typedef struct SCheckpointInfo { typedef struct SStreamStatus { int8_t taskStatus; - int8_t checkDownstream; + int8_t checkDownstream; // downstream tasks are all ready now, if this flag is set int8_t schedStatus; int8_t keepTaskStatus; bool transferState; - TdThreadMutex lock; + int8_t timerActive; // timer is active } SStreamStatus; typedef struct SHistDataRange { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 891fb8dd20..f3e0bcaa0c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -168,16 +168,53 @@ void tqNotifyClose(STQ* pTq) { } SStreamTask* pTask = *(SStreamTask**)pIter; - tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr); + tqDebug("vgId:%d s-task:%s set closing flag", pTq->pStreamMeta->vgId, pTask->id.idStr); pTask->status.taskStatus = TASK_STATUS__STOP; int64_t st = taosGetTimestampMs(); qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); + int64_t el = taosGetTimestampMs() - st; tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el); } taosWUnLockLatch(&pTq->pStreamMeta->lock); + + tqDebug("vgId:%d start to check all tasks", pTq->pStreamMeta->vgId); + + int64_t st = taosGetTimestampMs(); + while(1) { + taosMsleep(1000); + + bool inTimer = false; + + taosWLockLatch(&pTq->pStreamMeta->lock); + pIter = NULL; + + while(1) { + pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); + if (pIter == NULL) { + break; + } + + SStreamTask* pTask = *(SStreamTask**)pIter; + if (pTask->status.timerActive == 1) { + inTimer = true; + } + } + + taosWUnLockLatch(&pTq->pStreamMeta->lock); + + if (inTimer) { + tqDebug("vgId:%d some tasks in timer, wait for 1sec and recheck", pTq->pStreamMeta->vgId); + } else { + break; + } + } + + int64_t el = taosGetTimestampMs() - st; + tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%"PRId64" ms", pTq->pStreamMeta->vgId, el); + } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 725f918f39..593f84cb8a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -280,18 +280,53 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { } void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { - taosWLockLatch(&pMeta->lock); + SStreamTask* pTask = NULL; + // pre-delete operation + taosWLockLatch(&pMeta->lock); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask) { - SStreamTask* pTask = *ppTask; + pTask = *ppTask; + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); + } else { + qDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); + taosWUnLockLatch(&pMeta->lock); + return; + } + taosWUnLockLatch(&pMeta->lock); + qDebug("s-task:0x%x set task status:%s", taskId, streamGetTaskStatusStr(TASK_STATUS__DROPPING)); + + while(1) { + taosRLockLatch(&pMeta->lock); + ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + + if (ppTask) { + if ((*ppTask)->status.timerActive == 0) { + taosRUnLockLatch(&pMeta->lock); + break; + } + + taosMsleep(10); + qDebug("s-task:%s wait for quit from timer", (*ppTask)->id.idStr); + taosRUnLockLatch(&pMeta->lock); + } else { + taosRUnLockLatch(&pMeta->lock); + break; + } + } + + // let's do delete of stream task + taosWLockLatch(&pMeta->lock); + ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + if (ppTask) { taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); - int32_t num = taosArrayGetSize(pMeta->pTaskList); + ASSERT(pTask->status.timerActive == 0); + int32_t num = taosArrayGetSize(pMeta->pTaskList); qDebug("s-task:%s set the drop task flag, remain running s-task:%d", pTask->id.idStr, num - 1); for (int32_t i = 0; i < num; ++i) { int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 45a8ad0dfa..a033556353 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -446,20 +446,54 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { streamTaskCheckDownstreamTasks(pHTask); } +typedef struct SStreamTaskRetryInfo { + SStreamMeta* pMeta; + int32_t taskId; +} SStreamTaskRetryInfo; + static void tryLaunchHistoryTask(void* param, void* tmrId) { - SStreamTask* pTask = param; + SStreamTaskRetryInfo* pInfo = param; + SStreamMeta* pMeta = pInfo->pMeta; - SStreamMeta* pMeta = pTask->pMeta; - SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &pTask->historyTaskId.taskId, sizeof(pTask->historyTaskId.taskId)); - if (pHTask == NULL) { - qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr, - pMeta->vgId, pTask->historyTaskId.taskId); + qDebug("s-task:0x%x in timer to launch history task", pInfo->taskId); - taosTmrReset(tryLaunchHistoryTask, 100, pTask, streamEnv.timer, &pTask->timer); - return; + taosWLockLatch(&pMeta->lock); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t)); + if (ppTask) { + ASSERT((*ppTask)->status.timerActive == 1); + if (streamTaskShouldStop(&(*ppTask)->status)) { + qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, + streamGetTaskStatusStr((*ppTask)->status.taskStatus)); + (*ppTask)->status.timerActive = 0; + taosWUnLockLatch(&pMeta->lock); + return; + } } + taosWUnLockLatch(&pMeta->lock); - doCheckDownstreamStatus(pTask, *pHTask); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId); + if (pTask != NULL) { + ASSERT(pTask->status.timerActive == 1); + + // abort the timer if intend to stop task + SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); + if (pHTask == NULL && pTask->status.taskStatus == TASK_STATUS__NORMAL) { + qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it may not be built or have been destroyed", + pTask->id.idStr, pMeta->vgId, pTask->historyTaskId.taskId); + + taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->timer); + return; + } + + doCheckDownstreamStatus(pTask, pHTask); + + // not in timer anymore + pTask->status.timerActive = 0; + streamMetaReleaseTask(pMeta, pHTask); + streamMetaReleaseTask(pMeta, pTask); + } else { + qError("s-task:0x%x failed to load task", pInfo->taskId); + } } // todo fix the bug: 2. race condition @@ -474,9 +508,16 @@ int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask) { pMeta->vgId, pTask->historyTaskId.taskId); if (pTask->timer == NULL) { - pTask->timer = taosTmrStart(tryLaunchHistoryTask, 100, pTask, streamEnv.timer); + SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo)); + pInfo->taskId = pTask->id.taskId; + pInfo->pMeta = pTask->pMeta; + + pTask->timer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer); if (pTask->timer == NULL) { // todo failed to create timer + } else { + pTask->status.timerActive = 1; // timer is active + qDebug("s-task:%s set time active flag", pTask->id.idStr); } } @@ -602,6 +643,7 @@ int32_t tDecodeStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishRe return 0; } +// todo handle race condition, this task may be destroyed void streamPrepareNdoCheckDownstream(SStreamTask* pTask) { if (pTask->info.fillHistory) { qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); diff --git a/tests/system-test/0-others/backquote_check.py b/tests/system-test/0-others/backquote_check.py index be8590f913..3076d246d1 100644 --- a/tests/system-test/0-others/backquote_check.py +++ b/tests/system-test/0-others/backquote_check.py @@ -22,7 +22,7 @@ class TDTestCase: def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug("start to execute %s" % __file__) - tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), True) self.dbname = 'db' self.setsql = TDSetSql() self.stbname = 'stb' @@ -76,8 +76,8 @@ class TDTestCase: tdSql.execute(f'drop database {self.dbname}') def run(self): - self.topic_name_check() - self.db_name_check() + # self.topic_name_check() + # self.db_name_check() self.stream_name_check() def stop(self): tdSql.close()