From 2f9c63ae3fd513da4ab8a9c603f143674f73638e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 19 May 2022 15:31:20 +0800 Subject: [PATCH] enh(tq): update tb uid when droping table --- source/dnode/mnode/impl/src/mndConsumer.c | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 4 ++-- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 9 ++++++++- 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 4fe7d8a399..57f7b341d4 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -848,7 +848,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false); - // app id + // client id char clientId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; tstrncpy(varDataVal(clientId), pConsumer->clientId, TSDB_CGROUP_LEN); varDataSetLen(clientId, strlen(varDataVal(clientId))); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index b1760e6dae..0051312908 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -104,7 +104,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeep int tsdbClose(STsdb** pTsdb); int tsdbBegin(STsdb* pTsdb); int tsdbCommit(STsdb* pTsdb); -int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg); +int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, const SSubmitReq* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, @@ -118,7 +118,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqCommit(STQ*); -int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList); +int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 30de2dcfa0..eb1e6e4b83 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -104,7 +104,7 @@ static void tdSRowDemo() { taosMemoryFree(pTSChema); } -int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList) { +int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { void* pIter = NULL; STqExec* pExec = NULL; while (1) { @@ -113,7 +113,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList) { pExec = (STqExec*)pIter; if (pExec->subType == TOPIC_SUB_TYPE__DB) continue; for (int32_t i = 0; i < 5; i++) { - int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, true); + int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, isAdd); ASSERT(code == 0); } } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index a1a8e7bc88..09c644bd79 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -386,7 +386,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, tDecoderClear(&decoder); - tqUpdateTbUidList(pVnode->pTq, tbUids); + tqUpdateTbUidList(pVnode->pTq, tbUids, true); tdUpdateTbUidList(pVnode->pSma, pStore); tdUidStoreFree(pStore); @@ -517,6 +517,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in SDecoder decoder = {0}; SEncoder encoder = {0}; int ret; + SArray *tbUids; pRsp->msgType = TDMT_VND_DROP_TABLE_RSP; pRsp->pCont = NULL; @@ -533,7 +534,10 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in } // process req + tbUids = taosArrayInit(req.nReqs, sizeof(int64_t)); rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp)); + if (tbUids == NULL || rsp.pArray == NULL) goto _exit; + for (int iReq = 0; iReq < req.nReqs; iReq++) { SVDropTbReq *pDropTbReq = req.pReqs + iReq; SVDropTbRsp dropTbRsp = {0}; @@ -553,7 +557,10 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in taosArrayPush(rsp.pArray, &dropTbRsp); } + tqUpdateTbUidList(pVnode->pTq, tbUids, false); + _exit: + taosArrayDestroy(tbUids); tDecoderClear(&decoder); tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret); pRsp->pCont = rpcMallocCont(pRsp->contLen);