diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a5a3175652..bcf665ddfb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1149,12 +1149,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // 1. get the related stream task pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { - // todo delete this task, if the related stream task is dropped - qError("failed to find s-task:0x%"PRIx64", it may have been destroyed, drop fill-history task:%s", + tqError("failed to find s-task:0x%"PRIx64", it may have been destroyed, drop related fill-history task:%s", pTask->streamTaskId.taskId, pTask->id.idStr); tqDebug("s-task:%s fill-history task set status to be dropping", id); - streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); atomic_store_32(&pTask->status.inScanHistorySentinel, 0); @@ -1163,68 +1161,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); -#if 0 - // 2. it cannot be paused, when the stream task in TASK_STATUS__SCAN_HISTORY status. Let's wait for the - // stream task get ready for scan history data - while (streamTaskGetStatus(pStreamTask, NULL) == TASK_STATUS__SCAN_HISTORY) { - tqDebug( - "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms", - id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus)); - taosMsleep(100); - } - - // now we can stop the stream task execution - int64_t nextProcessedVer = 0; - - while (1) { - taosThreadMutexLock(&pStreamTask->lock); - int8_t status = pStreamTask->status.taskStatus; - if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { - // return; do nothing - } - - if (status == TASK_STATUS__HALT) { -// tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, -// pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id); -// latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); -// -// taosThreadMutexUnlock(&pStreamTask->lock); -// break; - } - - if (pStreamTask->status.taskStatus == TASK_STATUS__CK) { - qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt", - pStreamTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__CK)); - taosThreadMutexUnlock(&pStreamTask->lock); - taosMsleep(1000); - continue; - } - - // upgrade to halt status - if (status == TASK_STATUS__PAUSE) { - qDebug("s-task:%s upgrade status to %s from %s", pStreamTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT), - streamGetTaskStatusStr(TASK_STATUS__PAUSE)); - } else { - qDebug("s-task:%s halt task, prev status:%s", pStreamTask->id.idStr, streamGetTaskStatusStr(status)); - } - - pStreamTask->status.keepTaskStatus = status; - pStreamTask->status.taskStatus = TASK_STATUS__HALT; - - // wal scan not start yet, reset it to be the start position - nextProcessedVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); - if (nextProcessedVer == -1) { - nextProcessedVer = pStreamTask->dataRange.range.maxVer + 1; - } - - tqDebug("s-task:%s level:%d nextProcessedVer:%" PRId64 ", sched-status:%d is halt by fill-history task:%s", - pStreamTask->id.idStr, pStreamTask->info.taskLevel, nextProcessedVer, pStreamTask->status.schedStatus, - id); - - taosThreadMutexUnlock(&pStreamTask->lock); - break; - } -#endif streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer; diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 1f1dd61c3c..1672c8e609 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -446,6 +446,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX; taosThreadMutexLock(&pTask->lock); + tqDebug("s-task:%s lock", pTask->id.idStr); char* p = NULL; ETaskStatus status = streamTaskGetStatus(pTask, &p); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index cd8bbacb98..14129522d6 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1004,6 +1004,8 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, info.msg.info = *pRpcInfo; taosThreadMutexLock(&pTask->lock); + stDebug("s-task:%s lock", pTask->id.idStr); + if (pTask->pRspMsgList == NULL) { pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo)); } diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 205994b7cc..71521bd8f2 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -201,6 +201,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt if (pTrans->attachEvent.event != 0) { attachEvent(pTask, &pTrans->attachEvent); taosThreadMutexUnlock(&pTask->lock); + stDebug("s-task:%s unlock1", pTask->id.idStr); while (1) { // wait for the task to be here @@ -224,6 +225,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt pSM->pActiveTrans = pTrans; pSM->startTs = taosGetTimestampMs(); taosThreadMutexUnlock(&pTask->lock); + stDebug("s-task:%s unlock2", pTask->id.idStr); int32_t code = pTrans->pAction(pTask); // todo handle error code; @@ -242,6 +244,8 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { while (1) { taosThreadMutexLock(&pTask->lock); + stDebug("s-task:%s lock", pTask->id.idStr); + if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) { taosThreadMutexUnlock(&pTask->lock); taosMsleep(100); @@ -282,6 +286,8 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even // do update the task status taosThreadMutexLock(&pTask->lock); + stDebug("s-task:%s lock", pTask->id.idStr); + STaskStateTrans* pTrans = pSM->pActiveTrans; if (pTrans == NULL) { @@ -292,6 +298,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pSM->prev.evt].name); taosThreadMutexUnlock(&pTask->lock); + stDebug("s-task:%s unlockx", pTask->id.idStr); return TSDB_CODE_INVALID_PARA; } @@ -299,6 +306,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr, StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pTrans->event].name); taosThreadMutexUnlock(&pTask->lock); + stDebug("s-task:%s unlocky", pTask->id.idStr); return TSDB_CODE_INVALID_PARA; } @@ -328,6 +336,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even pSM->pActiveTrans = pNextTrans; pSM->startTs = taosGetTimestampMs(); taosThreadMutexUnlock(&pTask->lock); + stDebug("s-task:%s unlockf", pTask->id.idStr); int32_t code = pNextTrans->pAction(pSM->pTask); if (pNextTrans->autoInvokeEndFn) { @@ -338,6 +347,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even } } else { taosThreadMutexUnlock(&pTask->lock); + stDebug("s-task:%s unlockz", pTask->id.idStr); int64_t el = (taosGetTimestampMs() - pSM->startTs); stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,