From e0a275f1d41ee12853b502b3375f671c1b791d47 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 12 Sep 2024 14:49:35 +0800 Subject: [PATCH] fix(stream): do some internal refactor and rewrite the error code for encode. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 13 ++++++++----- source/libs/stream/src/streamCheckStatus.c | 1 + 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index c00d9a93bb..c05e9f7b60 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -586,6 +586,10 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve bool isLeader, bool restored) { int32_t code = 0; int32_t vgId = pMeta->vgId; + int32_t numOfTasks = 0; + int32_t taskId = -1; + int64_t streamId = -1; + bool added = false; if (tsDisableStream) { tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId); @@ -613,13 +617,12 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve } // 2.save task, use the latest commit version as the initial start version of stream task. - int32_t taskId = pTask->id.taskId; - int64_t streamId = pTask->id.streamId; - bool added = false; + taskId = pTask->id.taskId; + streamId = pTask->id.streamId; streamMetaWLock(pMeta); code = streamMetaRegisterTask(pMeta, sversion, pTask, &added); - int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + numOfTasks = streamMetaGetNumOfTasks(pMeta); streamMetaWUnLock(pMeta); if (code < 0) { @@ -654,7 +657,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId); } } else { - tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId); + tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store, total:%d", vgId, taskId, numOfTasks); tFreeStreamTask(pTask); } diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index d0604e6d21..31d1bddd01 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -268,6 +268,7 @@ int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTa SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo}; tmsgSendRsp(&rspMsg); + code = (code >= 0)? 0:code; return code; }