refactor: do some internal refactor.
This commit is contained in:
parent
312dbc1caa
commit
4d8548e938
|
@ -372,7 +372,9 @@ typedef struct STaskExecStatisInfo {
|
|||
int64_t init;
|
||||
int64_t start;
|
||||
int64_t step1Start;
|
||||
double step1El;
|
||||
int64_t step2Start;
|
||||
double step2El;
|
||||
int32_t updateCount;
|
||||
int64_t latestUpdateTs;
|
||||
int32_t processDataBlocks;
|
||||
|
|
|
@ -1102,10 +1102,6 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
|
|||
}
|
||||
}
|
||||
|
||||
static void ddxx() {
|
||||
|
||||
}
|
||||
|
||||
// this function should be executed by only one thread, so we set an sentinel to protect this function
|
||||
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
|
||||
|
@ -1149,9 +1145,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
} else {
|
||||
if (pTask->execInfo.step2Start == 0) {
|
||||
tqDebug("s-task:%s resume from paused, original step1 startTs:%" PRId64, id, pTask->execInfo.step1Start);
|
||||
tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64", already elapsed:%.2fs", id,
|
||||
pTask->execInfo.step1Start, pTask->execInfo.step1El);
|
||||
} else {
|
||||
tqDebug("s-task:%s already in step2, no need to scan-history data, step2 starTs:%"PRId64, id, pTask->execInfo.step2Start);
|
||||
tqDebug("s-task:%s already in step2, no need to scan-history data, step2 startTs:%" PRId64, id,
|
||||
pTask->execInfo.step2Start);
|
||||
|
||||
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return 0;
|
||||
|
|
|
@ -1106,7 +1106,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
stError("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch data", id,
|
||||
stWarn("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch", id,
|
||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS);
|
||||
} else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) {
|
||||
stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id,
|
||||
|
@ -1147,8 +1147,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
}
|
||||
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s failed to dispatch msg to downstream code:%s, add timer to retry in %dms, ref:%d",
|
||||
pTask->id.idStr, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||
stDebug("s-task:%s failed to dispatch msg to downstream, add into timer to retry in %dms, ref:%d",
|
||||
pTask->id.idStr, DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||
|
||||
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
} else { // this message has been sent successfully, let's try next one.
|
||||
|
|
|
@ -220,7 +220,12 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
stDebug("s-task:%s level:%d inputQ is blocked, retry later", pTask->id.idStr, pTask->info.taskLevel);
|
||||
int64_t el = taosGetTimestampMs() - st;
|
||||
pTask->execInfo.step1El += el/1000.0;
|
||||
|
||||
stDebug("s-task:%s level:%d inputQ is blocked, resume in 5sec, elapsed time:%.2fs", pTask->id.idStr,
|
||||
pTask->info.taskLevel, pTask->execInfo.step1El);
|
||||
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000};
|
||||
}
|
||||
|
@ -268,14 +273,20 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
int64_t el = taosGetTimestampMs() - st;
|
||||
pTask->execInfo.step1El += el/1000.0;
|
||||
|
||||
if (el >= STREAM_SCAN_HISTORY_TIMESLICE) {
|
||||
stDebug("s-task:%s fill-history:%d level:%d timeslice for scan-history exhausted", pTask->id.idStr,
|
||||
pTask->info.fillHistory, pTask->info.taskLevel);
|
||||
stDebug("s-task:%s fill-history:%d time slice for scan-history exhausted, elapse time:%.2fs, retry in 100ms",
|
||||
pTask->id.idStr, pTask->info.fillHistory, el / 1000.0);
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100};
|
||||
}
|
||||
}
|
||||
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0};;
|
||||
// todo refactor
|
||||
int64_t el = taosGetTimestampMs() - st;
|
||||
pTask->execInfo.step1El += el/1000.0;
|
||||
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0};
|
||||
}
|
||||
|
||||
// wait for the stream task to be idle
|
||||
|
|
|
@ -125,7 +125,7 @@ int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration)
|
|||
pTask->schedHistoryInfo.numOfTicks = numOfTicks;
|
||||
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s scan-history start in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref);
|
||||
stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref);
|
||||
|
||||
if (pTask->schedHistoryInfo.pTimer == NULL) {
|
||||
pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer);
|
||||
|
|
Loading…
Reference in New Issue