fix(stream): add check for null ptr and leader .

This commit is contained in:
Haojun Liao 2023-12-18 11:52:59 +08:00
parent f2bba670d8
commit b4a1907fb7
2 changed files with 22 additions and 5 deletions

View File

@ -368,6 +368,7 @@ int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMs
}
int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
int32_t code = TSDB_CODE_SUCCESS;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
@ -379,6 +380,12 @@ int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMs
tDecodeCompleteHistoryDataMsg(&decoder, &req);
tDecoderClear(&decoder);
if (pMeta->role == NODE_ROLE_FOLLOWER) {
tqError("s-task:0x%x (vgId:%d) not handle the scan-history finish rsp, since it becomes follower",
req.upstreamTaskId, pMeta->vgId);
return TASK_DOWNSTREAM_NOT_LEADER;
}
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId);
if (pTask == NULL) {
tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed",
@ -395,11 +402,11 @@ int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMs
"s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history "
"completed msg",
pTask->id.idStr, req.downstreamId);
streamProcessScanHistoryFinishRsp(pTask);
code = streamProcessScanHistoryFinishRsp(pTask);
}
streamMetaReleaseTask(pMeta, pTask);
return 0;
return code;
}
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {

View File

@ -476,11 +476,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
// automatically set the related fill-history task to be failed.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
int64_t current = taosGetTimestampMs();
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId);
streamMetaUpdateTaskDownstreamStatus(pHTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init,
taosGetTimestampMs(), false);
streamMetaReleaseTask(pTask->pMeta, pHTask);
if (pHTask != NULL) {
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init, current,
false);
streamMetaReleaseTask(pTask->pMeta, pHTask);
} else {
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pId->streamId, pId->taskId, 0, current, false);
}
}
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
@ -657,6 +662,11 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
// task restart now, not handle the scan-history finish rsp
if (status == TASK_STATUS__UNINIT) {
return TSDB_CODE_INVALID_MSG;
}
ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY);
SStreamMeta* pMeta = pTask->pMeta;