diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1b30fdcb01..565f6b5938 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -424,7 +424,7 @@ typedef struct STaskOutputInfo { }; int8_t type; STokenBucket* pTokenBucket; - SArray* pDownstreamUpdateList; + SArray* pNodeEpsetUpdateList; } STaskOutputInfo; typedef struct SUpstreamInfo { @@ -445,6 +445,8 @@ typedef struct STaskCheckInfo { int32_t notReadyTasks; int32_t inCheckProcess; int32_t stopCheckProcess; + int32_t notReadyRetryCount; + int32_t timeoutRetryCount; tmr_h checkRspTmr; TdThreadMutex checkInfoLock; } STaskCheckInfo; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f7f790fbe7..a464594233 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1073,9 +1073,9 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) { taosThreadMutexLock(&pTask->lock); - int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList); + int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList); for (int j = 0; j < num; ++j) { - SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, j); + SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, j); bool exist = existInHbMsg(pMsg, pTaskEpset); if (!exist) { @@ -1085,7 +1085,7 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) { } } - taosArrayClear(pTask->outputInfo.pDownstreamUpdateList); + taosArrayClear(pTask->outputInfo.pNodeEpsetUpdateList); taosThreadMutexUnlock(&pTask->lock); } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 7b8e6e2129..90d0de5a0e 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -356,10 +356,10 @@ static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { int32_t vgId = pTask->pMeta->vgId; taosThreadMutexLock(&pTask->lock); - int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList); + int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList); bool existed = false; for (int i = 0; i < num; ++i) { - SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i); + SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i); if (p->nodeId == nodeId) { existed = true; break; @@ -368,10 +368,10 @@ static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { if (!existed) { SDownstreamTaskEpset t = {.nodeId = nodeId}; - taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t); + taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t); stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId, - t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList)); + t.nodeId, (num + 1)); } taosThreadMutexUnlock(&pTask->lock); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 5849e1f00e..0827b5afe4 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -470,7 +470,7 @@ void tFreeStreamTask(SStreamTask* pTask) { taosMemoryFree(pTask->outputInfo.pTokenBucket); taosThreadMutexDestroy(&pTask->lock); - pTask->outputInfo.pDownstreamUpdateList = taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList); + pTask->outputInfo.pNodeEpsetUpdateList = taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList); taosMemoryFree(pTask); stDebug("s-task:0x%x free task completed", taskId); @@ -571,8 +571,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i // 2MiB per second for sink task // 50 times sink operator per second streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr); - pOutputInfo->pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset)); - if (pOutputInfo->pDownstreamUpdateList == NULL) { + pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset)); + if (pOutputInfo->pNodeEpsetUpdateList == NULL) { stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); return TSDB_CODE_OUT_OF_MEMORY; } @@ -1098,8 +1098,11 @@ static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, const char* id) pInfo->notReadyTasks = 0; pInfo->inCheckProcess = 0; pInfo->stopCheckProcess = 0; - taosArrayClear(pInfo->pList); + pInfo->notReadyRetryCount = 0; + pInfo->timeoutRetryCount = 0; + + taosArrayClear(pInfo->pList); return 0; } @@ -1292,11 +1295,13 @@ static void rspMonitorFn(void* param, void* tmrId) { } } - stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", id, numOfNotReady); + pInfo->notReadyRetryCount += 1; + stDebug("s-task:%s %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id, + numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs); } + // todo add into node update list and send to mnode if (numOfTimeout > 0) { - pInfo->startTs = now; ASSERT(pTask->status.downstreamReady == 0); for (int32_t i = 0; i < numOfTimeout; ++i) { @@ -1309,7 +1314,9 @@ static void rspMonitorFn(void* param, void* tmrId) { } } - stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, id, numOfTimeout, now); + pInfo->timeoutRetryCount += 1; + stDebug("s-task:%s %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id, + numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs); } taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);