diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0aa00d50b4..e3487c49d1 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -56,6 +56,7 @@ extern "C" { #define STREAM_EXEC_T_RESTART_ALL_TASKS (-4) #define STREAM_EXEC_T_STOP_ALL_TASKS (-5) #define STREAM_EXEC_T_RESUME_TASK (-6) +#define STREAM_EXEC_T_ADD_FAILED_TASK (-7) typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; @@ -886,6 +887,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta); int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int64_t endTs, bool ready); int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta); +int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 04c0c0d204..924b0a8207 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -23,6 +23,10 @@ typedef struct STaskUpdateEntry { int32_t transId; } STaskUpdateEntry; +typedef struct SMStreamCheckpointReadyRspMsg { + SMsgHead head; +} SMStreamCheckpointReadyRspMsg; + static STaskId replaceStreamTaskId(SStreamTask* pTask) { ASSERT(pTask->info.fillHistory); STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; @@ -518,63 +522,15 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); - int64_t initTs = 0; - int64_t now = taosGetTimestampMs(); - STaskId id = {.streamId = rsp.streamId, .taskId = rsp.upstreamTaskId}; - STaskId fId = {0}; - bool hasHistoryTask = false; - - // todo extract method if (!isLeader) { - // this task may have been stopped, so acquire task may failed. Retrieve it directly from the task hash map. - streamMetaRLock(pMeta); - - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask != NULL) { - setParam(*ppTask, &initTs, &hasHistoryTask, &fId); - streamMetaRUnLock(pMeta); - - if (hasHistoryTask) { - streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false); - } - - tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, - rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); - } else { - streamMetaRUnLock(pMeta); - - tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", - rsp.streamId, rsp.upstreamTaskId, vgId); - code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; - } - - streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); - return code; + tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, + rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); + return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId); } SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { - streamMetaRLock(pMeta); - - // let's try to find this task in hashmap - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask != NULL) { - setParam(*ppTask, &initTs, &hasHistoryTask, &fId); - streamMetaRUnLock(pMeta); - - if (hasHistoryTask) { - streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false); - } - } else { // not exist even in the hash map of meta, forget it - streamMetaRUnLock(pMeta); - } - - streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); - tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", - rsp.streamId, rsp.upstreamTaskId, vgId); - - code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; - return code; + return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId); } code = streamProcessCheckRsp(pTask, &rsp); @@ -582,10 +538,6 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe return code; } -typedef struct SMStreamCheckpointReadyRspMsg { - SMsgHead head; -} SMStreamCheckpointReadyRspMsg; - int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t vgId = pMeta->vgId; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); @@ -868,6 +820,9 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) { streamMetaStopAllTasks(pMeta); return 0; + } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) { + int32_t code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId); + return code; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index eee1332821..ea9b2ef89f 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -295,7 +295,7 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); if (p != NULL) { if (reqId != p->reqId) { - stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded", + stError("s-task:%s reqId:0x%" PRIx64 " expected:0x%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded", id, reqId, p->reqId, taskId); taosThreadMutexUnlock(&pInfo->checkInfoLock); return TSDB_CODE_FAILED; @@ -521,6 +521,30 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs); } +// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread. +// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution +// of restart in timer thread will result in a dead lock. +static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) { + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + stError("vgId:%d failed to create msg to stop tasks async, code:%s", vgId, terrstr()); + return -1; + } + + stDebug("vgId:%d create msg add failed s-task:0x%x", vgId, taskId); + + pRunReq->head.vgId = vgId; + pRunReq->streamId = streamId; + pRunReq->taskId = taskId; + pRunReq->reqType = STREAM_EXEC_T_ADD_FAILED_TASK; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; + tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); + return 0; +} + +// this function is executed in timer thread void rspMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = param; SStreamMeta* pMeta = pTask->pMeta; @@ -545,12 +569,7 @@ void rspMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); streamTaskCompleteCheckRsp(pInfo, true, id); - - streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pHId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); - } + addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId); streamMetaReleaseTask(pMeta, pTask); return; @@ -618,13 +637,7 @@ void rspMonitorFn(void* param, void* tmrId) { streamTaskCompleteCheckRsp(pInfo, false, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); - // add the not-ready tasks into the final task status result buf, along with related fill-history task if exists. - streamMetaAddTaskLaunchResult(pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pHId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); - } - + addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId); streamMetaReleaseTask(pMeta, pTask); taosArrayDestroy(pNotReadyList); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 891e0aa142..250866005e 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -541,7 +541,7 @@ static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastEx * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. */ -int32_t doStreamExecTask(SStreamTask* pTask) { +static int32_t doStreamExecTask(SStreamTask* pTask) { const char* id = pTask->id.idStr; // merge multiple input data if possible in the input queue. diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 03f8d2adfd..210199b912 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1706,4 +1706,32 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) { } return 0; +} + +int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { + int32_t code = TSDB_CODE_SUCCESS; + + streamMetaWLock(pMeta); + stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId); + + STaskId id = {.streamId = streamId, .taskId = taskId}; + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + + if (ppTask != NULL) { + STaskCheckInfo* pInfo = &(*ppTask)->taskCheckInfo; + int64_t now = taosGetTimestampMs(); + streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->startTs, now, false); + + if (HAS_RELATED_FILLHISTORY_TASK(*ppTask)) { + STaskId hId = (*ppTask)->hTaskInfo.id; + streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, pInfo->startTs, now, false); + } + } else { + stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", + streamId, taskId, pMeta->vgId); + code = TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + streamMetaWUnLock(pMeta); + return code; } \ No newline at end of file