diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 0c4f00de6a..cc1987492c 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -186,6 +186,8 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { // serialize streamProcessScanHistoryFinishRsp if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + streamTaskStartMonitorCheckRsp(pTask); + req.reqId = tGenIdPI64(); req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId; req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; @@ -199,8 +201,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); - streamTaskStartMonitorCheckRsp(pTask); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + streamTaskStartMonitorCheckRsp(pTask); + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); @@ -219,8 +222,6 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } - - streamTaskStartMonitorCheckRsp(pTask); } else { // for sink task, set it ready directly. stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 0dc3682da7..72611f4c14 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -534,7 +534,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t)); TdThreadMutexAttr attr = {0}; - int code = taosThreadMutexAttrInit(&attr); + + int code = taosThreadMutexAttrInit(&attr); if (code != 0) { stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code)); return code; @@ -563,6 +564,14 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr); pOutputInfo->pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset)); if (pOutputInfo->pDownstreamUpdateList == NULL) { + stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return TSDB_CODE_OUT_OF_MEMORY; + } + + pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo)); + if (pTask->taskCheckInfo.pList == NULL) { + stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr, + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); return TSDB_CODE_OUT_OF_MEMORY; } @@ -943,11 +952,7 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) { } static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) { - if (pInfo->pList == NULL) { - pInfo->pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo)); - } else { - taosArrayClear(pInfo->pList); - } + taosArrayClear(pInfo->pList); if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { pInfo->notReadyTasks = 1; @@ -1089,9 +1094,12 @@ static void rspMonitorFn(void* param, void* tmrId) { int64_t now = taosGetTimestampMs(); int64_t el = now - pInfo->startTs; ETaskStatus state = pStat->state; + const char* id = pTask->id.idStr; int32_t numOfReady = 0; int32_t numOfFault = 0; - const char* id = pTask->id.idStr; + int32_t numOfNotRsp = 0; + int32_t numOfNotReady = 0; + int32_t numOfTimeout = 0; stDebug("s-task:%s start to do check downstream rsp check", id); @@ -1147,7 +1155,7 @@ static void rspMonitorFn(void* param, void* tmrId) { if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec. taosArrayPush(pTimeoutList, &p->taskId); } else { // el < CHECK_NOT_RSP_DURATION - // do nothing and continue waiting for their rsps + numOfNotRsp += 1; // do nothing and continue waiting for their rsp } } else { taosArrayPush(pNotReadyList, &p->taskId); @@ -1158,17 +1166,17 @@ static void rspMonitorFn(void* param, void* tmrId) { stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); } - int32_t numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); - int32_t numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); + numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); + numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); // fault tasks detected, not try anymore - if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) && - (numOfFault > 0)) { + ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList)); + if ((numOfNotRsp == 0) && (numOfFault > 0)) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " - "detected, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", - id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + "detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", + id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); taosThreadMutexUnlock(&pInfo->checkInfoLock); taosArrayDestroy(pNotReadyList); @@ -1182,9 +1190,9 @@ static void rspMonitorFn(void* param, void* tmrId) { if (pInfo->stopCheckProcess == 1) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( - "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notReady:%d, fault:%d, " - "timeout:%d, ready:%d ref:%d", - id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notRsp:%d, notReady:%d, " + "fault:%d, timeout:%d, ready:%d ref:%d", + id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); streamTaskCompleteCheckRsp(pInfo, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); @@ -1241,8 +1249,8 @@ static void rspMonitorFn(void* param, void* tmrId) { taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); taosThreadMutexUnlock(&pInfo->checkInfoLock); - stDebug("s-task:%s continue checking rsp in 300ms, notReady:%d, fault:%d, timeout:%d, ready:%d", id, numOfNotReady, - numOfFault, numOfTimeout, numOfReady); + stDebug("s-task:%s continue checking rsp in 300ms, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", id, + numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList);