refactor: do some internal refactor.
This commit is contained in:
parent
19b0de6582
commit
88af030966
|
@ -60,8 +60,8 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
|
|||
|
||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
|
||||
" window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64,
|
||||
idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId,
|
||||
pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
|
||||
idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer,
|
||||
pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
|
||||
|
||||
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
|
||||
|
||||
|
@ -71,8 +71,8 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
|
|||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
|
||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||
stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
|
||||
idstr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
|
||||
stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, idstr,
|
||||
numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
|
||||
|
||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||
|
@ -181,7 +181,6 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
|||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||
int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
@ -306,10 +305,8 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t
|
|||
|
||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||
if (p != NULL) {
|
||||
|
||||
if (reqId != p->reqId) {
|
||||
stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64
|
||||
" expired check-rsp recv from downstream task:0x%x, discarded",
|
||||
stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded",
|
||||
id, reqId, p->reqId, taskId);
|
||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||
return TSDB_CODE_FAILED;
|
||||
|
@ -340,7 +337,8 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
|
|||
pInfo->inCheckProcess = 1;
|
||||
} else {
|
||||
ASSERT(pInfo->startTs > 0);
|
||||
stError("s-task:%s already in check procedure, checkTs:%"PRId64", start monitor check rsp failed", id, pInfo->startTs);
|
||||
stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id,
|
||||
pInfo->startTs);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
|
@ -388,9 +386,9 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
|||
|
||||
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
|
||||
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
req.reqId = p->reqId;
|
||||
req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId;
|
||||
req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId;
|
||||
STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher;
|
||||
setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->taskId);
|
||||
|
||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64,
|
||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
|
||||
|
||||
|
@ -403,9 +401,7 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
|||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||
|
||||
if (p->taskId == pVgInfo->taskId) {
|
||||
req.reqId = p->reqId;
|
||||
req.downstreamNodeId = pVgInfo->vgId;
|
||||
req.downstreamTaskId = pVgInfo->taskId;
|
||||
setCheckDownstreamReqInfo(&req, p->reqId, pVgInfo->taskId, pVgInfo->vgId);
|
||||
|
||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
|
||||
" re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64,
|
||||
|
@ -422,7 +418,6 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
|||
|
||||
void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
|
||||
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
|
||||
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
|
||||
if (p->status == TASK_DOWNSTREAM_READY) {
|
||||
|
|
Loading…
Reference in New Issue