refactor: do some internal refactor.
This commit is contained in:
parent
e387b2f932
commit
0b6e1a12bb
|
@ -1322,12 +1322,13 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("s-task:%s scan-history finish rsp received from task:0x%x", pTask->id.idStr, req.downstreamId);
|
tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x", pTask->id.idStr, req.downstreamId);
|
||||||
|
|
||||||
int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
|
int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
|
||||||
if (remain > 0) {
|
if (remain > 0) {
|
||||||
tqDebug("s-task:%s remain:%d not send finish rsp", pTask->id.idStr, remain);
|
tqDebug("s-task:%s remain:%d not send finish rsp", pTask->id.idStr, remain);
|
||||||
} else {
|
} else {
|
||||||
|
tqDebug("s-task:%s all downstream tasks rsp scan-history completed msg", pTask->id.idStr);
|
||||||
streamProcessScanHistoryFinishRsp(pTask);
|
streamProcessScanHistoryFinishRsp(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -344,7 +344,7 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
|
||||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||||
pTask->notReadyTasks = numOfVgs;
|
pTask->notReadyTasks = numOfVgs;
|
||||||
|
|
||||||
qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
|
qDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
|
||||||
numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
|
numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
|
@ -352,7 +352,7 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
|
||||||
streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
qDebug("s-task:%s no downstream tasks, invoke history finish rsp directly", pTask->id.idStr);
|
qDebug("s-task:%s no downstream tasks, invoke scan-history finish rsp directly", pTask->id.idStr);
|
||||||
streamProcessScanHistoryFinishRsp(pTask);
|
streamProcessScanHistoryFinishRsp(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue