From c33ef4ce88cbf625d181717131d7a612f8314f4c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Sep 2023 15:14:51 +0800 Subject: [PATCH] fix(stream): add null check --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tq/tq.c | 3 +-- source/libs/stream/src/streamExec.c | 7 +++---- source/libs/stream/src/streamTask.c | 4 ++-- 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2d70bb1e1c..a19ebd67b0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -712,7 +712,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask); -int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamTaskId* pTaskId); +int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId); int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, int8_t isSucceed); int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e639e272fa..f5909eb0fe 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1086,8 +1086,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s fill-history task set status to be dropping", id); -// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); - streamBuildAndSendDropTaskMsg(pTask, pMeta->vgId, &pTask->id); + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); streamMetaReleaseTask(pMeta, pTask); return -1; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 91c46c8ad9..969b547d71 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -303,7 +303,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { pTask->id.idStr, (int32_t) pTask->streamTaskId.taskId); // 1. free it and remove fill-history task from disk meta-store - streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id); + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); // 2. save to disk taosWLockLatch(&pMeta->lock); @@ -365,8 +365,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); // 4. free it and remove fill-history task from disk meta-store -// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); - streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id); + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); // 5. clear the link between fill-history task and stream task info pStreamTask->historyTaskId.taskId = 0; @@ -411,7 +410,7 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. code = streamDoTransferStateToStreamTask(pTask); } else { // drop fill-history task - streamBuildAndSendDropTaskMsg(pTask, pTask->pMeta->vgId, &pTask->id); + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &pTask->id); } return code; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index ba8578f98e..d2e306fa01 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -644,7 +644,7 @@ int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask) { return status; } -int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamTaskId* pTaskId) { +int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId) { SVDropStreamTaskReq *pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq)); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -656,7 +656,7 @@ int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamT pReq->streamId = pTaskId->streamId; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)}; - int32_t code = tmsgPutToQueue(pTask->pMsgCb, WRITE_QUEUE, &msg); + int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg); if (code != TSDB_CODE_SUCCESS) { qError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code)); return code;