refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-05-07 10:50:44 +08:00
parent bbd423f666
commit 0e0f98eb8d
4 changed files with 57 additions and 49 deletions

View File

@ -819,9 +819,6 @@ ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
// recover and fill history
void streamTaskCheckDownstream(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamId, int32_t vgId, int64_t stage, int64_t* oldStage);
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
@ -838,8 +835,10 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param);
int32_t streamTaskRestoreStatus(SStreamTask* pTask);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId);
void streamTaskSendCheckMsg(SStreamTask* pTask);
void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp, SRpcHandleInfo* pRpcInfo,
int32_t taskId);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);

View File

@ -406,50 +406,16 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamTaskCheckReq req;
SStreamTaskCheckRsp rsp = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamTaskCheckReq(&decoder, &req);
tDecoderClear(&decoder);
int32_t taskId = req.downstreamTaskId;
SStreamTaskCheckRsp rsp = {
.reqId = req.reqId,
.streamId = req.streamId,
.childId = req.childId,
.downstreamNodeId = req.downstreamNodeId,
.downstreamTaskId = req.downstreamTaskId,
.upstreamNodeId = req.upstreamNodeId,
.upstreamTaskId = req.upstreamTaskId,
};
// only the leader node handle the check request
if (pMeta->role == NODE_ROLE_FOLLOWER) {
tqError(
"s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId);
rsp.status = TASK_DOWNSTREAM_NOT_LEADER;
} else {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId);
if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage);
streamMetaReleaseTask(pMeta, pTask);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
tqDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
") task:0x%x (vgId:%d), check_status:%d",
pTask->id.idStr, pState->name, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId,
rsp.status);
} else {
rsp.status = TASK_DOWNSTREAM_NOT_READY;
tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp check_status %d",
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
}
return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId);
streamTaskProcessCheckMsg(pMeta, &req, &rsp);
return streamSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId);
}
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {

View File

@ -39,7 +39,7 @@ static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId);
// check status
void streamTaskCheckDownstream(SStreamTask* pTask) {
void streamTaskSendCheckMsg(SStreamTask* pTask) {
SDataRange* pRange = &pTask->dataRange;
STimeWindow* pWindow = &pRange->window;
const char* idstr = pTask->id.idStr;
@ -97,6 +97,46 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
}
}
void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp) {
int32_t taskId = pReq->downstreamTaskId;
*pRsp = (SStreamTaskCheckRsp){
.reqId = pReq->reqId,
.streamId = pReq->streamId,
.childId = pReq->childId,
.downstreamNodeId = pReq->downstreamNodeId,
.downstreamTaskId = pReq->downstreamTaskId,
.upstreamNodeId = pReq->upstreamNodeId,
.upstreamTaskId = pReq->upstreamTaskId,
};
// only the leader node handle the check request
if (pMeta->role == NODE_ROLE_FOLLOWER) {
stError(
"s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
taskId, pReq->upstreamTaskId, pReq->upstreamNodeId, pMeta->vgId);
pRsp->status = TASK_DOWNSTREAM_NOT_LEADER;
} else {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId);
if (pTask != NULL) {
pRsp->status = streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);
streamMetaReleaseTask(pMeta, pTask);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
") task:0x%x (vgId:%d), check_status:%d",
pTask->id.idStr, pState->name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId,
pRsp->status);
} else {
pRsp->status = TASK_DOWNSTREAM_NOT_READY;
stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp check_status %d",
pReq->streamId, taskId, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->status);
}
}
}
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
@ -152,7 +192,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
STaskId* pId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false);
}
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
} else { // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms
ASSERT(left > 0);
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
@ -162,7 +202,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
return 0;
}
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId) {
SEncoder encoder;
int32_t code;
@ -175,7 +215,7 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq*
}
void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t*)abuf, len);
@ -420,6 +460,9 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
.stage = pTask->pMeta->stage,
};
// update the reqId for the new check msg
p->reqId = tGenIdPI64();
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher;

View File

@ -84,7 +84,7 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) {
stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr,
pTask->execInfo.checkTs);
streamTaskCheckDownstream(pTask);
streamTaskSendCheckMsg(pTask);
return 0;
}