diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f68b725cdd..ed37605d08 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1112,8 +1112,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // wait for the stream task get ready for scan history data while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) || pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - tqDebug("s-task:%s level:%d not ready for halt, wait for 100ms and recheck", pStreamTask->id.idStr, - pStreamTask->info.taskLevel); + tqDebug("s-task:%s level:%d related stream task:%s not ready for halt, wait for it continue and recheck in 100ms", + pTask->id.idStr, pStreamTask->id.idStr, pStreamTask->info.taskLevel); taosMsleep(100); } @@ -1181,11 +1181,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamTryExec(pTask); pTask->status.taskStatus = TASK_STATUS__DROPPING; - tqDebug("s-task:%s set status to be dropping", pId); - - // transfer the ownership of executor state - streamTaskReleaseState(pTask); - streamTaskReloadState(pStreamTask); + tqDebug("s-task:%s scan-history-task set status to be dropping", pId); streamMetaSaveTask(pMeta, pTask); streamMetaSaveTask(pMeta, pStreamTask); @@ -1236,12 +1232,14 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); if (pTask == NULL) { - tqError("failed to find task:0x%x", req.taskId); + tqError("failed to find task:0x%x, it may have been dropped already", req.taskId); return -1; } // transfer the ownership of executor state streamTaskReleaseState(pTask); + tqDebug("s-task:%s receive state transfer req", pTask->id.idStr); + SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId); streamTaskReloadState(pStreamTask); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 4db4e0d9a6..6e1804b08e 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -351,7 +351,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); - qDebug("s-task:%s scan history task end, update stream task:%s info and launch it", pTask->id.idStr, pStreamTask->id.idStr); + qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr); // todo handle stream task is dropped here @@ -390,7 +390,12 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { pTimeWindow->skey = INT64_MIN; qResetStreamInfoTimeWindow(pStreamTask->exec.pExecutor); + // transfer the ownership of executor state + streamTaskReleaseState(pTask); + streamTaskReloadState(pStreamTask); + streamSetStatusNormal(pStreamTask); + streamSchedExec(pStreamTask); streamMetaReleaseTask(pTask->pMeta, pStreamTask); return TSDB_CODE_SUCCESS; @@ -584,6 +589,7 @@ int32_t streamTryExec(SStreamTask* pTask) { } int32_t streamTaskReleaseState(SStreamTask* pTask) { + qDebug("s-task:%s release exec state", pTask->id.idStr); void* pExecutor = pTask->exec.pExecutor; if (pExecutor != NULL) { int32_t code = qStreamOperatorReleaseState(pExecutor); @@ -594,6 +600,7 @@ int32_t streamTaskReleaseState(SStreamTask* pTask) { } int32_t streamTaskReloadState(SStreamTask* pTask) { + qDebug("s-task:%s reload exec state", pTask->id.idStr); void* pExecutor = pTask->exec.pExecutor; if (pExecutor != NULL) { int32_t code = qStreamOperatorReloadState(pExecutor);