fix(stream): check return value.

This commit is contained in:
Haojun Liao 2024-07-22 09:37:12 +08:00
parent 7201526674
commit 68ecda7b07
1 changed files with 31 additions and 12 deletions

View File

@ -412,13 +412,13 @@ int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId, &pTask); int32_t code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId, &pTask);
if (pTask) { if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code); code = streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS; return code;
} else { } else {
tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId); tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId);
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
return terrno; return TSDB_CODE_STREAM_TASK_NOT_EXIST;
} }
} }
@ -426,15 +426,21 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont; char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
SDecoder decoder; SDecoder decoder;
SStreamRetrieveReq req; SStreamRetrieveReq req;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req); code = tDecodeStreamRetrieveReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (code) {
tqError("vgId:%d failed to decode retrieve msg, quit handling it", pMeta->vgId);
return code;
}
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId, &pTask); code = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId, &pTask);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
req.dstTaskId); req.dstTaskId);
@ -464,6 +470,7 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont; char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
SStreamTaskCheckReq req; SStreamTaskCheckReq req;
SStreamTaskCheckRsp rsp = {0}; SStreamTaskCheckRsp rsp = {0};
@ -471,9 +478,14 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamTaskCheckReq(&decoder, &req); code = tDecodeStreamTaskCheckReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (code) {
tqError("vgId:%d decode check msg failed, not handle this msg", pMeta->vgId);
return code;
}
streamTaskProcessCheckMsg(pMeta, &req, &rsp); streamTaskProcessCheckMsg(pMeta, &req, &rsp);
return streamTaskSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId); return streamTaskSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId);
} }
@ -687,11 +699,17 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
// drop the related fill-history task firstly // drop the related fill-history task firstly
if (hTaskId.taskId != 0 && hTaskId.streamId != 0) { if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId); tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId); code = streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
if (code) {
tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x failed", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
}
} }
// drop the stream task now // drop the stream task now
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
if (code) {
tqDebug("s-task:0x%x vgId:%d drop failed", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
}
// commit the update // commit the update
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
@ -703,12 +721,13 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
} }
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
return 0; return 0; // always return success
} }
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg) { int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg) {
SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg; SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
int32_t code = 0;
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
tqDebug("vgId:%d receive msg to update-checkpoint-info for s-task:0x%x", vgId, pReq->taskId); tqDebug("vgId:%d receive msg to update-checkpoint-info for s-task:0x%x", vgId, pReq->taskId);
@ -718,7 +737,7 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL && (*ppTask) != NULL) { if (ppTask != NULL && (*ppTask) != NULL) {
streamTaskUpdateTaskCheckpointInfo(*ppTask, restored, pReq); code = streamTaskUpdateTaskCheckpointInfo(*ppTask, restored, pReq);
} else { // failed to get the task. } else { // failed to get the task.
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
tqError( tqError(
@ -729,7 +748,7 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
// always return success when handling the requirement issued by mnode during transaction. // always return success when handling the requirement issued by mnode during transaction.
return TSDB_CODE_SUCCESS; return code;
} }
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
@ -1099,7 +1118,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) { } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
code = tqScanWalAsync((STQ*)handle, false); code = tqScanWalAsync((STQ*)handle, false);
} else { } else {
streamTrySchedExec(pTask); code = streamTrySchedExec(pTask);
} }
} /*else { } /*else {
ASSERT(status != TASK_STATUS__UNINIT); ASSERT(status != TASK_STATUS__UNINIT);