diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4a1a43324b..ca09033f5d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3184,18 +3184,11 @@ typedef struct { typedef struct { SMsgHead head; - int64_t leftForVer; + int64_t resetRelHalt; // reset related stream task halt status int64_t streamId; int32_t taskId; } SVDropStreamTaskReq; -typedef struct { - SMsgHead head; - int64_t streamId; - int32_t taskId; - int64_t dataVer; -} SVStreamTaskVerUpdateReq; - typedef struct { int8_t reserved; } SVDropStreamTaskRsp; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9738be839d..67ee4620b6 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -796,7 +796,7 @@ bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask); bool streamTaskSetSchedStatusWait(SStreamTask* pTask); int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask); int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); -int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock); +int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt, bool metaLock); int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event); @@ -882,7 +882,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskBuildCheckpoint(SStreamTask* pTask); void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg); int32_t streamAlignTransferState(SStreamTask* pTask); -int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId); +int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt); int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, int8_t isSucceed); int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 1d8b2cf5d3..0b08baf4ae 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -30,6 +30,28 @@ extern bool tsDeployOnSnode; static int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, SEpSet* pEpset, bool isFillhistory); +static bool hasCountWindowNode(SPhysiNode* pNode) { + if (nodeType(pNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT) { + return true; + } else { + size_t size = LIST_LENGTH(pNode->pChildren); + + for (int32_t i = 0; i < size; ++i) { + SPhysiNode* pChild = (SPhysiNode*)nodesListGetNode(pNode->pChildren, i); + if (hasCountWindowNode(pChild)) { + return true; + } + } + + return false; + } +} + +static bool countWindowStreamTask(SSubplan* pPlan) { + SPhysiNode* pNode = pPlan->pNode; + return hasCountWindowNode(pNode); +} + int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, int64_t deleteMark) { SNode* pAst = NULL; @@ -312,6 +334,14 @@ static int32_t addSourceTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, return terrno; } + bool hasCountWindowNode = countWindowStreamTask(plan); + if (hasCountWindowNode && (!fillHistory) && hasFillHistory) { + SStreamStatus* pStatus = &pTask->status; + mDebug("s-task:0x%x status is set to %s from %s for count window agg task with fill-history option set", + pTask->id.taskId, streamTaskGetStatusStr(pStatus->taskStatus), streamTaskGetStatusStr(TASK_STATUS__HALT)); + pStatus->taskStatus = TASK_STATUS__HALT; + } + for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); streamTaskSetUpstreamInfo(pSinkTask, pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8689c30a55..0d04304f97 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -841,23 +841,25 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { } char* p = streamTaskGetStatus(pTask)->name; + const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus); if (pTask->info.fillHistory) { tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 - " child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 - " ms, inputVer:%" PRId64, + " child id:%d, level:%d, cur-status:%s, next-status:%s fill-history:%d, related stream task:0x%x " + "trigger:%" PRId64 " ms, inputVer:%" PRId64, vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, - pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, + pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, (int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam, nextProcessVer); } else { - tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 - " nextProcessVer:%" PRId64 - " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 - " ms, inputVer:%" PRId64, - vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, - pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, - (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam, nextProcessVer); + tqInfo( + "vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " nextProcessVer:%" PRId64 + " child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 + " ms, inputVer:%" PRId64, + vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, + pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, + (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam, nextProcessVer); } return 0; @@ -1016,8 +1018,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s", pTask->streamTaskId.taskId, pTask->id.idStr); - tqDebug("s-task:%s fill-history task set status to be dropping", id); - streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); + tqDebug("s-task:%s fill-history task set status to be dropping and drop it", id); + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0); atomic_store_32(&pTask->status.inScanHistorySentinel, 0); streamMetaReleaseTask(pMeta, pTask); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index c4973b7c1e..06e80e1732 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -602,6 +602,10 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen streamMetaReleaseTask(pMeta, pTask); } + streamMetaWLock(pMeta); + streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt, false); + streamMetaWUnLock(pMeta); + // drop the stream task now streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index f45904f036..2e576f7ec7 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -512,7 +512,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { SStreamTaskId hTaskId = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId}; stDebug("s-task:%s fill-history finish checkpoint done, drop related fill-history task:0x%x", id, hTaskId.taskId); - streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &hTaskId); + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &hTaskId, 1); } else { stWarn("s-task:%s related fill-history task:0x%x is erased", id, (int32_t)pTask->hTaskInfo.id.taskId); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 7fb8095acd..f274068b73 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -328,7 +328,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { id, (int32_t) pTask->streamTaskId.taskId); // 1. free it and remove fill-history task from disk meta-store - streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0); // 2. save to disk streamMetaWLock(pMeta); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index db74ce9897..ab519e2b4b 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -725,9 +725,6 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t // it is an fill-history task, remove the related stream task's id that points to it atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); - if (pTask->info.fillHistory == 1) { - streamTaskClearHTaskAttr(pTask, false); - } taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index ee98bc801b..cdc59bb0f0 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -391,6 +391,16 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) { int64_t startTs = pTask->execInfo.start; streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true); + if (pTask->status.taskStatus == TASK_STATUS__HALT) { + ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0)); + + // halt it self for count window stream task until the related + // fill history task completd. + stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr, + pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus)); + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); + } + // start the related fill-history task, when current task is ready // not invoke in success callback due to the deadlock. if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { @@ -804,7 +814,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { // check stream task status in the first place. SStreamTaskState* pStatus = streamTaskGetStatus(pTask); - if (pStatus->state != TASK_STATUS__READY) { + if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT) { stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId, pStatus->name); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index fef733c9f3..b9bcf7e90e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -108,7 +108,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, pTask->id.idStr = taosStrdup(buf); pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; - pTask->status.taskStatus = (fillHistory || hasFillhistory) ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY; + pTask->status.taskStatus = fillHistory? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY; pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; @@ -126,7 +126,6 @@ int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1; if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1; - /*if (tEncodeI64(pEncoder, pInfo->processedVer) < 0) return -1;*/ if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1; if (tEncodeI64(pEncoder, pInfo->stage) < 0) return -1; return 0; @@ -136,7 +135,6 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) { if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1; - /*if (tDecodeI64(pDecoder, &pInfo->processedVer) < 0) return -1;*/ if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1; if (tDecodeI64(pDecoder, &pInfo->stage) < 0) return -1; return 0; @@ -294,7 +292,6 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) { - int64_t ver; int64_t skip64; int8_t skip8; int32_t skip32; @@ -648,7 +645,7 @@ int32_t streamTaskStop(SStreamTask* pTask) { streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP); qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); - while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */ !streamTaskIsIdle(pTask)) { + while (!streamTaskIsIdle(pTask)) { stDebug("s-task:%s level:%d wait for task to be idle and then close, check again in 100ms", id, pTask->info.taskLevel); taosMsleep(100); @@ -755,7 +752,7 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) { return status; } -int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock) { +int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool metaLock) { SStreamMeta* pMeta = pTask->pMeta; STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId}; if (pTask->info.fillHistory == 0) { @@ -773,6 +770,12 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock) { taosThreadMutexLock(&(*ppStreamTask)->lock); CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask)); + + if (resetRelHalt) { + (*ppStreamTask)->status.taskStatus = TASK_STATUS__READY; + stDebug("s-task:0x%" PRIx64 " set the status to be ready", sTaskId.taskId); + } + streamMetaSaveTask(pMeta, *ppStreamTask); taosThreadMutexUnlock(&(*ppStreamTask)->lock); } @@ -784,7 +787,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock) { return TSDB_CODE_SUCCESS; } -int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId) { +int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt) { SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq)); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -794,6 +797,7 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI pReq->head.vgId = vgId; pReq->taskId = pTaskId->taskId; pReq->streamId = pTaskId->streamId; + pReq->resetRelHalt = resetRelHalt; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)}; int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);