diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 48a6d046ea..3c74a9fd7b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -848,12 +848,9 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); bool streamTaskIsSinkTask(const SStreamTask* pTask); int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); -int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id); -int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId, - int32_t* pNotReady, const char* id); -void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask); int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); +void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 1135878929..0f4662594f 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -26,9 +26,13 @@ static void rspMonitorFn(void* param, void* tmrId); static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs); static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id); static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id); +static int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id); static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p); +static void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList); +static void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList); +static int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, + int64_t reqId, int32_t* pNotReady, const char* id); static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId); - static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id); static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId); @@ -157,24 +161,6 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return 0; } -int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) { - SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0}; - - taosThreadMutexLock(&pInfo->checkInfoLock); - - SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); - if (p != NULL) { - stDebug("s-task:%s check info to task:0x%x already sent", id, taskId); - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; - } - - taosArrayPush(pInfo->pList, &info); - - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; -} - int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { STaskCheckInfo* pInfo = &pTask->taskCheckInfo; @@ -375,6 +361,24 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* return 0; } +int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) { + SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0}; + + taosThreadMutexLock(&pInfo->checkInfoLock); + + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + stDebug("s-task:%s check info to task:0x%x already sent", id, taskId); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; + } + + taosArrayPush(pInfo->pList, &info); + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; +} + void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { SStreamTaskCheckReq req = { .streamId = pTask->id.streamId,