fix(stream): add null check

This commit is contained in:
Haojun Liao 2023-09-18 15:14:51 +08:00
parent 8aa34802f5
commit c33ef4ce88
4 changed files with 7 additions and 9 deletions

View File

@ -712,7 +712,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamAlignTransferState(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask);
int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamTaskId* pTaskId); int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
int8_t isSucceed); int8_t isSucceed);
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,

View File

@ -1086,8 +1086,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s fill-history task set status to be dropping", id); tqDebug("s-task:%s fill-history task set status to be dropping", id);
// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
streamBuildAndSendDropTaskMsg(pTask, pMeta->vgId, &pTask->id);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return -1; return -1;
} }

View File

@ -303,7 +303,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
pTask->id.idStr, (int32_t) pTask->streamTaskId.taskId); pTask->id.idStr, (int32_t) pTask->streamTaskId.taskId);
// 1. free it and remove fill-history task from disk meta-store // 1. free it and remove fill-history task from disk meta-store
streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
// 2. save to disk // 2. save to disk
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
@ -365,8 +365,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
// 4. free it and remove fill-history task from disk meta-store // 4. free it and remove fill-history task from disk meta-store
// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id);
// 5. clear the link between fill-history task and stream task info // 5. clear the link between fill-history task and stream task info
pStreamTask->historyTaskId.taskId = 0; pStreamTask->historyTaskId.taskId = 0;
@ -411,7 +410,7 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask); code = streamDoTransferStateToStreamTask(pTask);
} else { // drop fill-history task } else { // drop fill-history task
streamBuildAndSendDropTaskMsg(pTask, pTask->pMeta->vgId, &pTask->id); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &pTask->id);
} }
return code; return code;

View File

@ -644,7 +644,7 @@ int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask) {
return status; return status;
} }
int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamTaskId* pTaskId) { int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId) {
SVDropStreamTaskReq *pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq)); SVDropStreamTaskReq *pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -656,7 +656,7 @@ int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamT
pReq->streamId = pTaskId->streamId; pReq->streamId = pTaskId->streamId;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)}; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
int32_t code = tmsgPutToQueue(pTask->pMsgCb, WRITE_QUEUE, &msg); int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code)); qError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
return code; return code;