From cbc0bceec612a4c2e347cc69ea344610fd29a4d9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 9 Sep 2024 15:37:05 +0800 Subject: [PATCH] fix(stream): fix syntax error. --- source/dnode/mnode/impl/src/mndStream.c | 82 +++++++++++++++++-------- source/libs/stream/src/streamTask.c | 2 +- 2 files changed, 57 insertions(+), 27 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7d6f490fcf..7dacde41ba 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -63,8 +63,8 @@ static int32_t mndProcessCheckpointReport(SRpcMsg *pReq); static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg); static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code); static int32_t mndProcessDropOrphanTaskReq(SRpcMsg* pReq); - -static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); +static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, SVgroupChangeInfo* pInfo); +static void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo); static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo); static void removeExpiredNodeInfo(const SArray *pNodeSnapshot); @@ -1142,6 +1142,9 @@ int32_t extractStreamNodeList(SMnode *pMnode) { } static bool taskNodeIsUpdated(SMnode *pMnode) { + bool allReady = true; + SArray *pNodeSnapshot = NULL; + // check if the node update happens or not streamMutexLock(&execInfo.lock); @@ -1166,13 +1169,11 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { } } - bool allReady = true; - SArray *pNodeSnapshot = NULL; - int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot); if (code) { mError("failed to get the vgroup snapshot, ignore it and continue"); } + if (!allReady) { mWarn("not all vnodes ready, quit from vnodes status check"); taosArrayDestroy(pNodeSnapshot); @@ -1180,12 +1181,16 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { return true; } - SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); + SVgroupChangeInfo changeInfo = {0}; + code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo); + if (code) { + streamMutexUnlock(&execInfo.lock); + return false; + } bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); - taosArrayDestroy(changeInfo.pUpdateNodeList); - taosHashCleanup(changeInfo.pDBMap); + mndDestroyVgroupChangeInfo(&changeInfo); taosArrayDestroy(pNodeSnapshot); if (nodeUpdated) { @@ -1972,11 +1977,22 @@ static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) // tasks on the will be removed replica. // 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we // will handle it as mentioned in 1 & 2 items. -static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList) { - SVgroupChangeInfo info = { - .pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)), - .pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK), - }; +static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, + SVgroupChangeInfo *pInfo) { + int32_t code = 0; + int32_t lino = 0; + + if (pInfo == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)), + pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); + + if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) { + mndDestroyVgroupChangeInfo(pInfo); + return TSDB_CODE_OUT_OF_MEMORY; + } int32_t numOfNodes = taosArrayGetSize(pPrevNodeList); for (int32_t i = 0; i < numOfNodes; ++i) { @@ -1997,7 +2013,11 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset); char buf[256] = {0}; - (void) epsetToStr(&pCurrent->epset, buf, tListLen(buf)); // ignore this error + code = epsetToStr(&pCurrent->epset, buf, tListLen(buf)); // ignore this error + if (code) { + mError("failed to convert epset string, code:%s", tstrerror(code)); + TSDB_CHECK_CODE(code, lino, _err); + } mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId, pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated); @@ -2006,20 +2026,16 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset); epsetAssign(&updateInfo.newEp, &pCurrent->epset); - void* p = taosArrayPush(info.pUpdateNodeList, &updateInfo); - if (p == NULL) { - mError("failed to put update entry into node list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - } + void* p = taosArrayPush(pInfo->pUpdateNodeList, &updateInfo); + TSDB_CHECK_NULL(p, code, lino, _err, terrno); } // todo handle the snode info if (pCurrent->nodeId != SNODE_HANDLE) { SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId); - int32_t code = taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0); + code = taosHashPut(pInfo->pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0); mndReleaseVgroup(pMnode, pVgroup); - if (code) { - mError("failed to put into dbmap, code:out of memory"); - } + TSDB_CHECK_CODE(code, lino, _err); } break; @@ -2027,7 +2043,18 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP } } - return info; + return code; + + _err: + mndDestroyVgroupChangeInfo(pInfo); + return code; +} + +static void mndDestroyVgroupChangeInfo(SVgroupChangeInfo* pInfo) { + if (pInfo != NULL) { + taosArrayDestroy(pInfo->pUpdateNodeList); + taosHashCleanup(pInfo->pDBMap); + } } static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) { @@ -2271,7 +2298,11 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { goto _end; } - SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); + SVgroupChangeInfo changeInfo = {0}; + code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo); + if (code) { + goto _end; + } { if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) { @@ -2305,8 +2336,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { mDebug("no update found in nodeList"); } - taosArrayDestroy(changeInfo.pUpdateNodeList); - taosHashCleanup(changeInfo.pDBMap); + mndDestroyVgroupChangeInfo(&changeInfo); _end: streamMutexUnlock(&execInfo.lock); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index d008455c10..6d8f90f7f6 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -416,7 +416,7 @@ int32_t streamTaskSetBackendPath(SStreamTask* pTask) { int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) { int32_t code = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr); if (code) { - stError("%s failed create stream task id str, code:%s", pTask->id.taskId, tstrerror(code)); + stError("0x%x failed create stream task id str, code:%s", pTask->id.taskId, tstrerror(code)); return code; }