From 3332a0b822c41fd593e16444fa95e93d4da3fde5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 15 Sep 2024 01:19:30 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 26 +++++++---- source/dnode/mnode/impl/src/mndStream.c | 52 +++++++++++----------- source/dnode/vnode/src/tqCommon/tqCommon.c | 40 ++++++++++------- source/libs/stream/src/streamMeta.c | 22 +++++---- source/libs/stream/src/streamUtil.c | 5 ++- 5 files changed, 86 insertions(+), 59 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b77c8535f1..29759bc561 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -494,6 +494,14 @@ typedef struct SScanWalInfo { tmr_h scanTimer; } SScanWalInfo; +typedef struct SFatalErrInfo { + int32_t code; + int64_t ts; + int32_t threadId; + int32_t line; + char func[128]; +} SFatalErrInfo; + // meta typedef struct SStreamMeta { char* path; @@ -523,14 +531,13 @@ typedef struct SStreamMeta { int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta int32_t numOfPausedTasks; int64_t rid; - - int64_t chkpId; - int32_t chkpCap; - SArray* chkpSaved; - SArray* chkpInUse; - SRWLatch chkpDirLock; - void* qHandle; // todo remove it - void* bkdChkptMgt; + SFatalErrInfo fatalInfo; // fatal error occurs, stream stop to execute + int64_t chkpId; + int32_t chkpCap; + SArray* chkpSaved; + SArray* chkpInUse; + SRWLatch chkpDirLock; + void* bkdChkptMgt; } SStreamMeta; typedef struct STaskUpdateEntry { @@ -776,6 +783,9 @@ void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); void streamMetaWUnLock(SStreamMeta* pMeta); +void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName, int32_t lino); +int32_t streamGetFatalError(const SStreamMeta* pMeta); + void streamMetaResetStartInfo(STaskStartInfo* pMeta, int32_t vgId); int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pTaskList); void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e0b8caa938..1fb398d070 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1143,18 +1143,16 @@ int32_t extractStreamNodeList(SMnode *pMnode) { return taosArrayGetSize(execInfo.pNodeList); } -static bool taskNodeIsUpdated(SMnode *pMnode) { - bool allReady = true; - SArray *pNodeSnapshot = NULL; - - // check if the node update happens or not - streamMutexLock(&execInfo.lock); +static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) { + bool allReady = false; + bool nodeUpdated = false; + SVgroupChangeInfo changeInfo = {0}; int32_t numOfNodes = extractStreamNodeList(pMnode); + if (numOfNodes == 0) { mDebug("stream task node change checking done, no vgroups exist, do nothing"); execInfo.ts = taosGetTimestampSec(); - streamMutexUnlock(&execInfo.lock); return false; } @@ -1166,43 +1164,46 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { if (pNodeEntry->stageUpdated) { mDebug("stream task not ready due to node update detected, checkpoint not issued"); - streamMutexUnlock(&execInfo.lock); return true; } } - int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot); + int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot); if (code) { mError("failed to get the vgroup snapshot, ignore it and continue"); } if (!allReady) { mWarn("not all vnodes ready, quit from vnodes status check"); - taosArrayDestroy(pNodeSnapshot); - streamMutexUnlock(&execInfo.lock); return true; } - SVgroupChangeInfo changeInfo = {0}; - code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo); + code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, *ppNodeSnapshot, &changeInfo); if (code) { - streamMutexUnlock(&execInfo.lock); - return false; + nodeUpdated = false; + } else { + nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); + if (nodeUpdated) { + mDebug("stream tasks not ready due to node update"); + } } - bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); - mndDestroyVgroupChangeInfo(&changeInfo); - taosArrayDestroy(pNodeSnapshot); - - if (nodeUpdated) { - mDebug("stream tasks not ready due to node update"); - } - - streamMutexUnlock(&execInfo.lock); return nodeUpdated; } +// check if the node update happens or not +static bool taskNodeIsUpdated(SMnode *pMnode) { + SArray *pNodeSnapshot = NULL; + + streamMutexLock(&execInfo.lock); + bool updated = doCheckForUpdated(pMnode, &pNodeSnapshot); + streamMutexUnlock(&execInfo.lock); + + taosArrayDestroy(pNodeSnapshot); + return updated; +} + static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { bool ready = true; if (taskNodeIsUpdated(pMnode)) { @@ -1993,7 +1994,7 @@ static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeLis if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) { mndDestroyVgroupChangeInfo(pInfo); - return terrno; + TSDB_CHECK_NULL(NULL, code, lino, _err, terrno); } int32_t numOfNodes = taosArrayGetSize(pPrevNodeList); @@ -2048,6 +2049,7 @@ static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeLis return code; _err: + mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino); mndDestroyVgroupChangeInfo(pInfo); return code; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 3f4329f22b..116acd4636 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -160,6 +160,13 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM tDecoderClear(&decoder); + int32_t gError = streamGetFatalError(pMeta); + if (gError != 0) { + tqError("vgId:%d global fatal occurs, code:%s, ts:%" PRId64 " func:%s", pMeta->vgId, tstrerror(gError), + pMeta->fatalInfo.ts, pMeta->fatalInfo.func); + return 0; + } + // update the nodeEpset when it exists streamMetaWLock(pMeta); @@ -290,8 +297,11 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, updateTasks, (numOfTasks - updateTasks)); } else { - if (streamMetaCommit(pMeta) < 0) { - // persist to disk + if ((code = streamMetaCommit(pMeta)) < 0) { + // always return true + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return TSDB_CODE_SUCCESS; } streamMetaClearSetUpdateTaskListComplete(pMeta); @@ -754,8 +764,9 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored } streamMetaWUnLock(pMeta); + // always return success when handling the requirement issued by mnode during transaction. - return code; + return TSDB_CODE_SUCCESS; } static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { @@ -1197,10 +1208,6 @@ int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { ret int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } -int32_t tqStreamProcessConsensusChkptRsp2(SStreamMeta* pMeta, SRpcMsg* pMsg) { - return doProcessDummyRspMsg(pMeta, pMsg); -} - int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont; @@ -1221,14 +1228,13 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t vgId = pMeta->vgId; int32_t code = 0; SStreamTask* pTask = NULL; - SRestoreCheckpointInfo req = {0}; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); int64_t now = taosGetTimestampMs(); + SDecoder decoder; + SRestoreCheckpointInfo req = {0}; - SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); - if (tDecodeRestoreCheckpointInfo(&decoder, &req) < 0) { tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code)); tDecoderClear(&decoder); @@ -1239,16 +1245,15 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); if (pTask == NULL || (code != 0)) { - tqError( - "vgId:%d process set consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already", - pMeta->vgId, req.taskId); + tqError("vgId:%d process consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already", + pMeta->vgId, req.taskId); // ignore this code to avoid error code over write int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); if (ret) { tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret)); } - return code; + return 0; } // discard the rsp, since it is expired. @@ -1272,7 +1277,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); - return TSDB_CODE_STREAM_INTERNAL_ERROR; + return 0; } SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo; @@ -1299,10 +1304,13 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { if (pMeta->role == NODE_ROLE_LEADER) { code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId); + if (code) { + tqError("s-task:0x%x vgId:%d failed start task async, code:%s", req.taskId, vgId, tstrerror(code)); + } } else { tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr); } streamMetaReleaseTask(pMeta, pTask); - return code; + return 0; } \ No newline at end of file diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d8249666c3..f4202667ff 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -923,32 +923,38 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) { streamMetaWLock(pMeta); int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + if (code) { + streamSetFatalError(pMeta, code, __func__, __LINE__); + } streamMetaWUnLock(pMeta); return code; } int32_t streamMetaCommit(SStreamMeta* pMeta) { - int32_t code = 0; - code = tdbCommit(pMeta->db, pMeta->txn); + int32_t code = tdbCommit(pMeta->db, pMeta->txn); if (code != 0) { - stError("vgId:%d failed to commit stream meta", pMeta->vgId); - return code; + streamSetFatalError(pMeta, code, __func__, __LINE__); + stFatal("vgId:%d failed to commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code), + pMeta->fatalInfo.line); } code = tdbPostCommit(pMeta->db, pMeta->txn); if (code != 0) { - stError("vgId:%d failed to do post-commit stream meta", pMeta->vgId); + streamSetFatalError(pMeta, code, __func__, __LINE__); + stFatal("vgId:%d failed to do post-commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code), + pMeta->fatalInfo.line); return code; } code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); if (code != 0) { - stError("vgId:%d failed to begin trans", pMeta->vgId); - return code; + streamSetFatalError(pMeta, code, __func__, __LINE__); + stFatal("vgId:%d failed to begin trans, code:%s, line:%d", pMeta->vgId, tstrerror(code), pMeta->fatalInfo.line); + } else { + stDebug("vgId:%d stream meta file commit completed", pMeta->vgId); } - stDebug("vgId:%d stream meta file commit completed", pMeta->vgId); return code; } diff --git a/source/libs/stream/src/streamUtil.c b/source/libs/stream/src/streamUtil.c index cef2ba35e7..5bf9370cb7 100644 --- a/source/libs/stream/src/streamUtil.c +++ b/source/libs/stream/src/streamUtil.c @@ -77,9 +77,10 @@ void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName, pMeta->fatalInfo.threadId = taosGetSelfPthreadId(); tstrncpy(pMeta->fatalInfo.func, funcName, tListLen(pMeta->fatalInfo.func)); pMeta->fatalInfo.line = lino; - stInfo("vgId:%d set global fatal error, code:%s %s line:%d", pMeta->vgId, tstrerror(code), funcName, lino); + stInfo("vgId:%d set fatal error, code:%s %s line:%d", pMeta->vgId, tstrerror(code), funcName, lino); } else { - stFatal("vgId:%d existed global fatal eror:%s, failed to set new fatal error code:%s", pMeta->vgId, code); + stFatal("vgId:%d existed fatal error:%s, ts:%" PRId64 " failed to set new fatal error code:%s", pMeta->vgId, + pMeta->fatalInfo.ts, tstrerror(code)); } }