fix(stream): set check downstream completed for sink task.

This commit is contained in:
Haojun Liao 2024-04-18 18:36:02 +08:00
parent 4802cd1ddf
commit 815d12fd77
4 changed files with 26 additions and 3 deletions

View File

@ -835,8 +835,6 @@ void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask);
void streamTaskResume(SStreamTask* pTask);
int32_t streamTaskStop(SStreamTask* pTask);
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask);
int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask);

View File

@ -200,6 +200,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamTaskResetStatus(pTask);
streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr);
SStreamTask** ppHTask = NULL;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
@ -213,6 +214,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
} else {
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
streamTaskResetStatus(*ppHTask);
streamTaskCompleteCheck(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr);
}
}

View File

@ -230,6 +230,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
streamTaskStartMonitorCheckRsp(pTask);
} else { // for sink task, set it ready directly.
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr);
doProcessDownstreamReadyRsp(pTask);
}
}

View File

@ -24,6 +24,8 @@
#define CHECK_NOT_RSP_DURATION 10*1000 // 10 sec
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray);
@ -951,7 +953,7 @@ int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOut
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
pInfo->notReadyTasks = 1;
} else {
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos);
ASSERT(pInfo->notReadyTasks == pOutputInfo->shuffleDispatcher.dbInfo.vgNum);
}
@ -1139,6 +1141,7 @@ static void rspMonitorFn(void* param, void* tmrId) {
int32_t numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
int32_t numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
// fault tasks detected, not try anymore
if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) && (numOfFault > 0)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug(
@ -1154,6 +1157,24 @@ static void rspMonitorFn(void* param, void* tmrId) {
return;
}
// checking of downstream tasks has been stopped by other threads
if (pInfo->inCheckProcess == 0) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug(
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notReady:%d, fault:%d, "
"timeout:%d, ready:%d ref:%d",
pTask->id.idStr, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists.
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
streamMetaAddTaskLaunchResult(pTask->pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId, pInfo->startTs, now, false);
}
return;
}
if (numOfNotReady > 0) { // check to make sure not in recheck timer
ASSERT(pTask->status.downstreamReady == 0);