refactor: do some internal refactor.
This commit is contained in:
parent
faa87e13b0
commit
a33e7b2eb3
|
@ -26,7 +26,7 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define CHECK_RSP_INTERVAL 300
|
||||
#define CHECK_RSP_CHECK_INTERVAL 300
|
||||
#define LAUNCH_HTASK_INTERVAL 100
|
||||
#define WAIT_FOR_MINIMAL_INTERVAL 100.00
|
||||
#define MAX_RETRY_LAUNCH_HISTORY_TASK 40
|
||||
|
|
|
@ -55,7 +55,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
|
|||
|
||||
// serialize streamProcessScanHistoryFinishRsp
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
streamTaskPrepareMonitorCheckRsp(pTask);
|
||||
streamTaskStartMonitorCheckRsp(pTask);
|
||||
|
||||
STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
|
||||
|
||||
|
@ -69,9 +69,8 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
|
|||
|
||||
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
|
||||
|
||||
streamTaskStartMonitorCheckRsp(pTask);
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
streamTaskPrepareMonitorCheckRsp(pTask);
|
||||
streamTaskStartMonitorCheckRsp(pTask);
|
||||
|
||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
|
||||
|
@ -90,9 +89,6 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
|
|||
idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
|
||||
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||
}
|
||||
|
||||
// the check rsp monitor timer must be invoked here
|
||||
streamTaskStartMonitorCheckRsp(pTask);
|
||||
} else { // for sink task, set it ready directly.
|
||||
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId);
|
||||
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr);
|
||||
|
@ -165,38 +161,30 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskPrepareMonitorCheckRsp(SStreamTask* pTask) {
|
||||
/*SStreamTask* p = */streamMetaAcquireOneTask(pTask); // add task ref here
|
||||
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
||||
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
||||
|
||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||
|
||||
int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
/*SStreamTask* p = */ streamMetaAcquireOneTask(pTask); // add task ref here
|
||||
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
||||
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
||||
|
||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref);
|
||||
|
||||
if (pInfo->checkRspTmr == NULL) {
|
||||
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer);
|
||||
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer);
|
||||
} else {
|
||||
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
||||
taosTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
|
||||
|
@ -376,7 +364,6 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char*
|
|||
|
||||
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) {
|
||||
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0};
|
||||
|
||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||
|
||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||
|
@ -649,7 +636,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
|||
handleTimeoutDownstreamTasks(pTask, pTimeoutList);
|
||||
}
|
||||
|
||||
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
||||
taosTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
|
||||
stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
|
||||
|
|
Loading…
Reference in New Issue