From 5624efff4e3b3c05be035ee6157228ec68c2f124 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 18 Mar 2024 14:31:40 +0800 Subject: [PATCH] opti:tmq --- source/dnode/mnode/impl/src/mndTopic.c | 123 +++++++++++++------------ source/dnode/vnode/src/tq/tqMeta.c | 24 ++--- 2 files changed, 72 insertions(+), 75 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 3bfab4a688..ae76f3be31 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -353,6 +353,66 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) { return 0; } +static int32_t sendCheckInfoToVnode(STrans *pTrans, SMnode *pMnode, SMqTopicObj *topicObj){ + STqCheckInfo info; + memcpy(info.topic, topicObj->name, TSDB_TOPIC_FNAME_LEN); + info.ntbUid = topicObj->ntbUid; + info.colIdList = topicObj->ntbColIds; + // broadcast forbid alter info + void *pIter = NULL; + SSdb *pSdb = pMnode->pSdb; + SVgObj *pVgroup = NULL; + int32_t code = 0; + void *buf = NULL; + + while (1) { + // iterate vg + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + if (!mndVgroupInDb(pVgroup, topicObj->dbUid)) { + sdbRelease(pSdb, pVgroup); + continue; + } + + // encoder check alter info + int32_t len; + tEncodeSize(tEncodeSTqCheckInfo, &info, len, code); + if (code != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + SEncoder encoder; + tEncoderInit(&encoder, abuf, len); + code = tEncodeSTqCheckInfo(&encoder, &info); + if (code != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + tEncoderClear(&encoder); + ((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId); + // add redo action + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + action.pCont = buf; + action.contLen = sizeof(SMsgHead) + len; + action.msgType = TDMT_VND_TMQ_ADD_CHECKINFO; + code = mndTransAppendRedoAction(pTrans, &action); + if (code != 0) { + goto END; + } + sdbRelease(pSdb, pVgroup); + buf = NULL; + } + +END: + taosMemoryFree(buf); + sdbRelease(pSdb, pVgroup); + sdbCancelFetch(pSdb, pIter); + return code; +} + static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb, const char *userName) { mInfo("start to create topic:%s", pCreate->name); @@ -396,13 +456,6 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.withMeta = pCreate->withMeta; if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) { - if (pCreate->withMeta) { - terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - code = terrno; - goto _OUT; - } - topicObj.ast = taosStrdup(pCreate->ast); topicObj.astLen = strlen(pCreate->ast) + 1; @@ -474,59 +527,9 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); if (topicObj.ntbUid != 0) { - STqCheckInfo info; - memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN); - info.ntbUid = topicObj.ntbUid; - info.colIdList = topicObj.ntbColIds; - // broadcast forbid alter info - void *pIter = NULL; - SSdb *pSdb = pMnode->pSdb; - SVgObj *pVgroup = NULL; - while (1) { - // iterate vg - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - if (!mndVgroupInDb(pVgroup, topicObj.dbUid)) { - sdbRelease(pSdb, pVgroup); - continue; - } - - // encoder check alter info - int32_t len; - tEncodeSize(tEncodeSTqCheckInfo, &info, len, code); - if (code < 0) { - sdbRelease(pSdb, pVgroup); - sdbCancelFetch(pSdb, pIter); - goto _OUT; - } - void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - SEncoder encoder; - tEncoderInit(&encoder, abuf, len); - if (tEncodeSTqCheckInfo(&encoder, &info) < 0) { - taosMemoryFree(buf); - sdbRelease(pSdb, pVgroup); - sdbCancelFetch(pSdb, pIter); - code = -1; - goto _OUT; - } - tEncoderClear(&encoder); - ((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId); - // add redo action - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgroup); - action.pCont = buf; - action.contLen = sizeof(SMsgHead) + len; - action.msgType = TDMT_VND_TMQ_ADD_CHECKINFO; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(buf); - sdbRelease(pSdb, pVgroup); - sdbCancelFetch(pSdb, pIter); - code = -1; - goto _OUT; - } - buf = NULL; - sdbRelease(pSdb, pVgroup); + code = sendCheckInfoToVnode(pTrans, pMnode, &topicObj); + if (code != 0){ + goto _OUT; } } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 4c403dc18f..e64de9a423 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -88,10 +88,6 @@ int32_t tqMetaOpen(STQ* pTq) { return -1; } -// if (tqMetaRestoreHandle(pTq) < 0) { -// return -1; -// } - if (tqMetaRestoreCheckInfo(pTq) < 0) { return -1; } @@ -167,32 +163,30 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) { void* pVal = NULL; int vLen = 0; SDecoder decoder; + int32_t code = 0; tdbTbcMoveToFirst(pCur); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { STqCheckInfo info; tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - if (tDecodeSTqCheckInfo(&decoder, &info) < 0) { + code = tDecodeSTqCheckInfo(&decoder, &info); + if (code != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tdbFree(pKey); - tdbFree(pVal); - tdbTbcClose(pCur); - return -1; + goto END; } tDecoderClear(&decoder); - if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) { + code = taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)); + if (code != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tdbFree(pKey); - tdbFree(pVal); - tdbTbcClose(pCur); - return -1; + goto END; } } +END: tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); - return 0; + return code; } int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {