refactor: do some internal refactor.
This commit is contained in:
parent
387b4d365e
commit
8b27f98ee4
|
@ -1777,7 +1777,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) {
|
||||
code = TSDB_CODE_MSG_DECODE_ERROR;
|
||||
tDecoderClear(&decoder);
|
||||
tqError("vgId:%d failed to decode checkpoint source msg, code:%s", vgId, tstrerror(code));
|
||||
tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -1790,18 +1790,10 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// todo: when generating checkpoint, no new tasks are allowed to add into current Vnode
|
||||
// todo: when generating checkpoint, leader of mnode has transfer to other DNode?
|
||||
|
||||
// set the initial value for generating check point
|
||||
int32_t total = 0;
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
|
||||
// set the initial value for generating check point
|
||||
// set the mgmt epset info according to the checkout source msg from mnode, todo opt perf
|
||||
pMeta->mgmtInfo.epset = req.mgmtEps;
|
||||
pMeta->mgmtInfo.mnodeId = req.mnodeId;
|
||||
|
@ -1813,9 +1805,16 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
total = taosArrayGetSize(pMeta->pTaskList);
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
|
||||
qDebug("s-task:%s level:%d receive checkpoint source msg from mnode id:%" PRId64 ", total source checkpoint req:%d",
|
||||
pTask->id.idStr, pTask->info.taskLevel, req.checkpointId, total);
|
||||
qDebug("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg, chkpt:%" PRId64 ", total checkpoint req:%d",
|
||||
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, total);
|
||||
|
||||
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// todo: when generating checkpoint, no new tasks are allowed to add into current Vnode
|
||||
// todo: when generating checkpoint, leader of mnode has transfer to other DNode?
|
||||
streamProcessCheckpointSourceReq(pTask, &req);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return code;
|
||||
|
@ -1934,7 +1933,7 @@ _end:
|
|||
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||
vInfo("vgId:%d, restart to all stream tasks", vgId);
|
||||
vInfo("vgId:%d, restart all stream tasks", vgId);
|
||||
tqCheckStreamStatus(pTq);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -250,7 +250,7 @@ const char* streamGetBlockTypeStr(int32_t type) {
|
|||
case STREAM_INPUT__CHECKPOINT:
|
||||
return "checkpoint";
|
||||
case STREAM_INPUT__CHECKPOINT_TRIGGER:
|
||||
return "checkpoint-triggre";
|
||||
return "checkpoint-trigger";
|
||||
case STREAM_INPUT__TRANS_STATE:
|
||||
return "trans-state";
|
||||
default:
|
||||
|
|
|
@ -178,10 +178,9 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p
|
|||
.stage = pTask->pMeta->stage,
|
||||
};
|
||||
|
||||
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr,
|
||||
pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, req.stage);
|
||||
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr,
|
||||
pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, req.stage);
|
||||
streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
|
|
|
@ -419,7 +419,7 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
|
|||
|
||||
int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||
for (int32_t i = 0; i < numOfUpstream; ++i) {
|
||||
SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i);
|
||||
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
|
||||
if (pInfo->nodeId == nodeId) {
|
||||
epsetAssign(&pInfo->epSet, pEpSet);
|
||||
qDebug("s-task:0x%x update the upstreamInfo, nodeId:%d newEpset:%s", pTask->id.taskId, nodeId, buf);
|
||||
|
|
Loading…
Reference in New Issue