fix(stream):update the check-rsp prcedure, to avoid repeatly start check-rsp procedure.
This commit is contained in:
parent
00fa4e7f0c
commit
a8fac441be
|
@ -444,6 +444,7 @@ typedef struct STaskCheckInfo {
|
|||
int64_t startTs;
|
||||
int32_t notReadyTasks;
|
||||
int32_t inCheckProcess;
|
||||
int32_t stopCheckProcess;
|
||||
tmr_h checkRspTmr;
|
||||
TdThreadMutex checkInfoLock;
|
||||
} STaskCheckInfo;
|
||||
|
@ -844,14 +845,12 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
|
|||
bool streamTaskIsSinkTask(const SStreamTask* pTask);
|
||||
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
|
||||
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id);
|
||||
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
|
||||
int32_t* pNotReady, const char* id);
|
||||
void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo);
|
||||
int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
|
||||
int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id);
|
||||
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask);
|
||||
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id);
|
||||
|
||||
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
|
||||
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
|
||||
|
|
|
@ -216,7 +216,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
|||
|
||||
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
||||
streamTaskResetStatus(pTask);
|
||||
streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr);
|
||||
|
||||
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
|
||||
|
||||
SStreamTask** ppHTask = NULL;
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
|
@ -231,7 +232,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
|||
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
|
||||
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
|
||||
streamTaskResetStatus(*ppHTask);
|
||||
streamTaskCompleteCheck(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr);
|
||||
streamTaskStopMonitorCheckRsp(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -184,13 +184,6 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
|
|||
|
||||
ASSERT(pTask->status.downstreamReady == 0);
|
||||
|
||||
int32_t code = streamTaskStartCheckDownstream(&pTask->taskCheckInfo, pTask->id.idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return;
|
||||
}
|
||||
|
||||
streamTaskInitTaskCheckInfo(&pTask->taskCheckInfo, &pTask->outputInfo, taosGetTimestampMs());
|
||||
|
||||
// serialize streamProcessScanHistoryFinishRsp
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
req.reqId = tGenIdPI64();
|
||||
|
@ -230,7 +223,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
|
|||
streamTaskStartMonitorCheckRsp(pTask);
|
||||
} else { // for sink task, set it ready directly.
|
||||
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
|
||||
streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr);
|
||||
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
|
||||
doProcessDownstreamReadyRsp(pTask);
|
||||
}
|
||||
}
|
||||
|
@ -405,7 +398,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
|
||||
if (left == 0) {
|
||||
doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag
|
||||
streamTaskCompleteCheck(pInfo, id);
|
||||
streamTaskStopMonitorCheckRsp(pInfo, id);
|
||||
} else {
|
||||
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
|
||||
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
|
||||
|
|
|
@ -942,15 +942,13 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
|
||||
static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
|
||||
if (pInfo->pList == NULL) {
|
||||
pInfo->pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
|
||||
} else {
|
||||
taosArrayClear(pInfo->pList);
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||
|
||||
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
pInfo->notReadyTasks = 1;
|
||||
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
|
@ -959,8 +957,6 @@ int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOut
|
|||
}
|
||||
|
||||
pInfo->startTs = startTs;
|
||||
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1014,39 +1010,33 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
|
||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
|
||||
if (pInfo->inCheckProcess == 0) {
|
||||
pInfo->inCheckProcess = 1;
|
||||
} else {
|
||||
ASSERT(pInfo->startTs > 0);
|
||||
stError("s-task:%s already in check procedure, checkTs:%"PRId64, id, pInfo->startTs);
|
||||
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
stError("s-task:%s already in check procedure, checkTs:%"PRId64", start monitor check rsp failed", id, pInfo->startTs);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
stDebug("s-task:%s set the in-check-procedure flag", id);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id) {
|
||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||
static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, const char* id) {
|
||||
if (!pInfo->inCheckProcess) {
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
stWarn("s-task:%s already not in-check-procedure", id);
|
||||
}
|
||||
|
||||
int64_t el = taosGetTimestampMs() - pInfo->startTs;
|
||||
stDebug("s-task:%s clear the in-check-procedure flag, elapsed time:%" PRId64 " ms", id, el);
|
||||
stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el);
|
||||
|
||||
pInfo->startTs = 0;
|
||||
pInfo->inCheckProcess = 0;
|
||||
pInfo->notReadyTasks = 0;
|
||||
pInfo->inCheckProcess = 0;
|
||||
pInfo->stopCheckProcess = 1;
|
||||
taosArrayClear(pInfo->pList);
|
||||
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1108,7 +1098,10 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
|||
if (state == TASK_STATUS__STOP) {
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
||||
streamTaskCompleteCheck(pInfo, id);
|
||||
|
||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||
streamTaskCompleteCheckRsp(pInfo, id);
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
|
||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
|
||||
return;
|
||||
|
@ -1117,7 +1110,11 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
|||
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) {
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
||||
streamTaskCompleteCheck(pInfo, id);
|
||||
|
||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||
streamTaskCompleteCheckRsp(pInfo, id);
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1127,8 +1124,8 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
|||
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name,
|
||||
vgId, ref);
|
||||
|
||||
streamTaskCompleteCheckRsp(pInfo, id);
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
streamTaskCompleteCheck(pInfo, id);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1176,17 +1173,19 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
|||
taosArrayDestroy(pNotReadyList);
|
||||
taosArrayDestroy(pTimeoutList);
|
||||
|
||||
streamTaskCompleteCheck(pInfo, id);
|
||||
streamTaskCompleteCheckRsp(pInfo, id);
|
||||
return;
|
||||
}
|
||||
|
||||
// checking of downstream tasks has been stopped by other threads
|
||||
if (pInfo->inCheckProcess == 0) {
|
||||
if (pInfo->stopCheckProcess == 1) {
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug(
|
||||
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notReady:%d, fault:%d, "
|
||||
"timeout:%d, ready:%d ref:%d",
|
||||
id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
||||
|
||||
streamTaskCompleteCheckRsp(pInfo, id);
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
|
||||
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists.
|
||||
|
@ -1238,10 +1237,10 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
|||
stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, id, numOfTimeout, now);
|
||||
}
|
||||
|
||||
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
|
||||
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
||||
stDebug("s-task:%s continue checking rsp in 200ms, notReady:%d, fault:%d, timeout:%d, ready:%d", id, numOfNotReady,
|
||||
stDebug("s-task:%s continue checking rsp in 300ms, notReady:%d, fault:%d, timeout:%d, ready:%d", id, numOfNotReady,
|
||||
numOfFault, numOfTimeout, numOfReady);
|
||||
|
||||
taosArrayDestroy(pNotReadyList);
|
||||
|
@ -1249,14 +1248,42 @@ static void rspMonitorFn(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
||||
ASSERT(pTask->taskCheckInfo.checkRspTmr == NULL);
|
||||
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
||||
|
||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||
int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
|
||||
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref);
|
||||
pTask->taskCheckInfo.checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer);
|
||||
|
||||
if (pInfo->checkRspTmr == NULL) {
|
||||
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer);
|
||||
} else {
|
||||
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, pInfo->checkRspTmr);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
|
||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||
streamTaskCompleteCheckRsp(pInfo, id);
|
||||
|
||||
pInfo->stopCheckProcess = 1;
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
|
||||
stDebug("s-task:%s set stop check rsp mon", id);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo) {
|
||||
ASSERT(pInfo->inCheckProcess == 0);
|
||||
|
||||
|
|
Loading…
Reference in New Issue