From 68ecda7b078a28476e0c70942eac454d3dddf14a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 22 Jul 2024 09:37:12 +0800 Subject: [PATCH] fix(stream): check return value. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 43 ++++++++++++++++------ 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 778d79f8f6..c6139387b9 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -412,13 +412,13 @@ int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { SStreamTask* pTask = NULL; int32_t code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId, &pTask); if (pTask) { - streamProcessDispatchRsp(pTask, pRsp, pMsg->code); + code = streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamMetaReleaseTask(pMeta, pTask); - return TSDB_CODE_SUCCESS; + return code; } else { tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId); terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; - return terrno; + return TSDB_CODE_STREAM_TASK_NOT_EXIST; } } @@ -426,15 +426,21 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + int32_t code = 0; SDecoder decoder; SStreamRetrieveReq req; tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); - tDecodeStreamRetrieveReq(&decoder, &req); + code = tDecodeStreamRetrieveReq(&decoder, &req); tDecoderClear(&decoder); + if (code) { + tqError("vgId:%d failed to decode retrieve msg, quit handling it", pMeta->vgId); + return code; + } + SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId, &pTask); + code = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId, &pTask); if (pTask == NULL) { tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, req.dstTaskId); @@ -464,6 +470,7 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + int32_t code = 0; SStreamTaskCheckReq req; SStreamTaskCheckRsp rsp = {0}; @@ -471,9 +478,14 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); - tDecodeStreamTaskCheckReq(&decoder, &req); + code = tDecodeStreamTaskCheckReq(&decoder, &req); tDecoderClear(&decoder); + if (code) { + tqError("vgId:%d decode check msg failed, not handle this msg", pMeta->vgId); + return code; + } + streamTaskProcessCheckMsg(pMeta, &req, &rsp); return streamTaskSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId); } @@ -687,11 +699,17 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen // drop the related fill-history task firstly if (hTaskId.taskId != 0 && hTaskId.streamId != 0) { tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId); - streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId); + code = streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId); + if (code) { + tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x failed", pReq->taskId, vgId, (int32_t)hTaskId.taskId); + } } // drop the stream task now - streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); + code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); + if (code) { + tqDebug("s-task:0x%x vgId:%d drop failed", pReq->taskId, vgId, (int32_t)hTaskId.taskId); + } // commit the update streamMetaWLock(pMeta); @@ -703,12 +721,13 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen } streamMetaWUnLock(pMeta); - return 0; + return 0; // always return success } int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg) { SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg; + int32_t code = 0; int32_t vgId = pMeta->vgId; tqDebug("vgId:%d receive msg to update-checkpoint-info for s-task:0x%x", vgId, pReq->taskId); @@ -718,7 +737,7 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask != NULL && (*ppTask) != NULL) { - streamTaskUpdateTaskCheckpointInfo(*ppTask, restored, pReq); + code = streamTaskUpdateTaskCheckpointInfo(*ppTask, restored, pReq); } else { // failed to get the task. int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); tqError( @@ -729,7 +748,7 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored streamMetaWUnLock(pMeta); // always return success when handling the requirement issued by mnode during transaction. - return TSDB_CODE_SUCCESS; + return code; } static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { @@ -1099,7 +1118,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) { code = tqScanWalAsync((STQ*)handle, false); } else { - streamTrySchedExec(pTask); + code = streamTrySchedExec(pTask); } } /*else { ASSERT(status != TASK_STATUS__UNINIT);