diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8c9eeb6f3a..dbcdef57b0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -56,6 +56,7 @@ extern "C" { #define STREAM_EXEC_T_RESTART_ALL_TASKS (-4) #define STREAM_EXEC_T_STOP_ALL_TASKS (-5) #define STREAM_EXEC_T_RESUME_TASK (-6) +#define STREAM_EXEC_T_UPDATE_TASK_EPSET (-7) typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 3989acad2e..aed1091c84 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -77,6 +77,33 @@ int32_t tqStreamOneTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream return 0; } +int32_t tqUpdateNodeEpsetAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) { + int32_t vgId = pMeta->vgId; + + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + if (numOfTasks == 0) { + tqDebug("vgId:%d no stream tasks existed to run", vgId); + return 0; + } + + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("vgId:%d failed to create msg to start task:0x%x, code:%s", vgId, taskId, terrstr()); + return -1; + } + + tqDebug("vgId:%d update s-task:0x%x nodeEpset async", vgId, taskId); + pRunReq->head.vgId = vgId; + pRunReq->streamId = streamId; + pRunReq->taskId = taskId; + pRunReq->reqType = STREAM_EXEC_T_UPDATE_TASK_EPSET; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; + tmsgPutToQueue(cb, STREAM_QUEUE, &msg); + return 0; +} + int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { int32_t vgId = pMeta->vgId; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); @@ -443,12 +470,22 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); } +static void setParam(SStreamTask* pTask, int64_t* initTs, bool* hasHTask, STaskId* pId) { + *initTs = pTask->execInfo.init; + + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + *hasHTask = true; + pId->streamId = pTask->hTaskInfo.id.streamId; + pId->taskId = pTask->hTaskInfo.id.taskId; + } +} + int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) { char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t vgId = pMeta->vgId; + int32_t code = TSDB_CODE_SUCCESS; - int32_t code; SStreamTaskCheckRsp rsp; SDecoder decoder; @@ -467,35 +504,60 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe int64_t initTs = 0; int64_t now = taosGetTimestampMs(); - if (!isLeader) { - SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId); - if (pTask != NULL) { - initTs = pTask->execInfo.init; + STaskId id = {.streamId = rsp.streamId, .taskId = rsp.upstreamTaskId}; + STaskId fId = {0}; + bool hasHistoryTask = false; - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pId = &pTask->hTaskInfo.id; - streamMetaUpdateTaskDownstreamStatus(pMeta, pId->streamId, pId->taskId, initTs, now, false); + // todo extract method + if (!isLeader) { + // this task may have been stopped, so acquire task may failed. Retrieve it directly from the task hash map. + streamMetaRLock(pMeta); + + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (ppTask != NULL) { + setParam(*ppTask, &initTs, &hasHistoryTask, &fId); + streamMetaRUnLock(pMeta); + + if (hasHistoryTask) { + streamMetaUpdateTaskDownstreamStatus(pMeta, fId.streamId, fId.taskId, initTs, now, false); } - streamMetaReleaseTask(pMeta, pTask); + tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, + rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); + } else { + streamMetaRUnLock(pMeta); + + tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", + rsp.streamId, rsp.upstreamTaskId, vgId); + code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; } streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); - tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, - rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); - return code; } SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { - streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, now, false); + streamMetaRLock(pMeta); + + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (ppTask != NULL) { + setParam(*ppTask, &initTs, &hasHistoryTask, &fId); + streamMetaRUnLock(pMeta); + + if (hasHistoryTask) { + streamMetaUpdateTaskDownstreamStatus(pMeta, fId.streamId, fId.taskId, initTs, now, false); + } + } else { + streamMetaRUnLock(pMeta); + } + + streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", rsp.streamId, rsp.upstreamTaskId, vgId); - terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; - // failed to find the related fill-history task - return -1; + code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; + return code; } code = streamProcessCheckRsp(pTask, &rsp); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b8c5cf8e3f..7e69610f80 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1472,6 +1472,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { // fill-history task can only be launched by related stream tasks. STaskExecStatisInfo* pInfo = &pTask->execInfo; if (pTask->info.fillHistory == 1) { + stDebug("s-task:%s fill-history task wait related stream task start", pTask->id.idStr); streamMetaReleaseTask(pMeta, pTask); continue; }