fix(stream): add some logs for retry for notready/timeout downstream tasks. and do some internal refactor.
This commit is contained in:
parent
77961ea791
commit
fae53efed9
|
@ -424,7 +424,7 @@ typedef struct STaskOutputInfo {
|
|||
};
|
||||
int8_t type;
|
||||
STokenBucket* pTokenBucket;
|
||||
SArray* pDownstreamUpdateList;
|
||||
SArray* pNodeEpsetUpdateList;
|
||||
} STaskOutputInfo;
|
||||
|
||||
typedef struct SUpstreamInfo {
|
||||
|
@ -445,6 +445,8 @@ typedef struct STaskCheckInfo {
|
|||
int32_t notReadyTasks;
|
||||
int32_t inCheckProcess;
|
||||
int32_t stopCheckProcess;
|
||||
int32_t notReadyRetryCount;
|
||||
int32_t timeoutRetryCount;
|
||||
tmr_h checkRspTmr;
|
||||
TdThreadMutex checkInfoLock;
|
||||
} STaskCheckInfo;
|
||||
|
|
|
@ -1073,9 +1073,9 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
|
|||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
|
||||
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
|
||||
int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
|
||||
for (int j = 0; j < num; ++j) {
|
||||
SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, j);
|
||||
SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, j);
|
||||
|
||||
bool exist = existInHbMsg(pMsg, pTaskEpset);
|
||||
if (!exist) {
|
||||
|
@ -1085,7 +1085,7 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
taosArrayClear(pTask->outputInfo.pDownstreamUpdateList);
|
||||
taosArrayClear(pTask->outputInfo.pNodeEpsetUpdateList);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
}
|
||||
|
||||
|
|
|
@ -356,10 +356,10 @@ static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
|
|||
int32_t vgId = pTask->pMeta->vgId;
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
|
||||
int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
|
||||
bool existed = false;
|
||||
for (int i = 0; i < num; ++i) {
|
||||
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i);
|
||||
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i);
|
||||
if (p->nodeId == nodeId) {
|
||||
existed = true;
|
||||
break;
|
||||
|
@ -368,10 +368,10 @@ static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
|
|||
|
||||
if (!existed) {
|
||||
SDownstreamTaskEpset t = {.nodeId = nodeId};
|
||||
taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t);
|
||||
taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t);
|
||||
|
||||
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId,
|
||||
t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList));
|
||||
t.nodeId, (num + 1));
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
|
|
@ -470,7 +470,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
taosMemoryFree(pTask->outputInfo.pTokenBucket);
|
||||
taosThreadMutexDestroy(&pTask->lock);
|
||||
|
||||
pTask->outputInfo.pDownstreamUpdateList = taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList);
|
||||
pTask->outputInfo.pNodeEpsetUpdateList = taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
|
||||
|
||||
taosMemoryFree(pTask);
|
||||
stDebug("s-task:0x%x free task completed", taskId);
|
||||
|
@ -571,8 +571,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
// 2MiB per second for sink task
|
||||
// 50 times sink operator per second
|
||||
streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
|
||||
pOutputInfo->pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
|
||||
if (pOutputInfo->pDownstreamUpdateList == NULL) {
|
||||
pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
|
||||
if (pOutputInfo->pNodeEpsetUpdateList == NULL) {
|
||||
stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -1098,8 +1098,11 @@ static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, const char* id)
|
|||
pInfo->notReadyTasks = 0;
|
||||
pInfo->inCheckProcess = 0;
|
||||
pInfo->stopCheckProcess = 0;
|
||||
taosArrayClear(pInfo->pList);
|
||||
|
||||
pInfo->notReadyRetryCount = 0;
|
||||
pInfo->timeoutRetryCount = 0;
|
||||
|
||||
taosArrayClear(pInfo->pList);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1292,11 +1295,13 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
|||
}
|
||||
}
|
||||
|
||||
stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", id, numOfNotReady);
|
||||
pInfo->notReadyRetryCount += 1;
|
||||
stDebug("s-task:%s %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id,
|
||||
numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
|
||||
}
|
||||
|
||||
// todo add into node update list and send to mnode
|
||||
if (numOfTimeout > 0) {
|
||||
pInfo->startTs = now;
|
||||
ASSERT(pTask->status.downstreamReady == 0);
|
||||
|
||||
for (int32_t i = 0; i < numOfTimeout; ++i) {
|
||||
|
@ -1309,7 +1314,9 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
|||
}
|
||||
}
|
||||
|
||||
stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, id, numOfTimeout, now);
|
||||
pInfo->timeoutRetryCount += 1;
|
||||
stDebug("s-task:%s %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
|
||||
numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs);
|
||||
}
|
||||
|
||||
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
||||
|
|
Loading…
Reference in New Issue