From 8420c3aab28a7da8fc92e574983b3c2aa3055fcc Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 27 Dec 2021 13:19:18 +0800 Subject: [PATCH] remove mgmtq --- source/dnode/mgmt/impl/inc/dndInt.h | 2 - source/dnode/mgmt/impl/inc/dndVnodes.h | 8 +- source/dnode/mgmt/impl/src/dndDnode.c | 18 ++++ source/dnode/mgmt/impl/src/dndTransport.c | 12 +-- source/dnode/mgmt/impl/src/dndVnodes.c | 96 ++----------------- .../dnode/mgmt/impl/test/cluster/cluster.cpp | 4 +- source/dnode/mnode/impl/src/mndTrans.c | 2 +- source/dnode/mnode/sdb/src/sdbHash.c | 4 + 8 files changed, 44 insertions(+), 102 deletions(-) diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index c352b37ef0..d6e9a6b4a1 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -98,8 +98,6 @@ typedef struct { int32_t openVnodes; int32_t totalVnodes; SRWLatch latch; - taos_queue pMgmtQ; - SWorkerPool mgmtPool; SWorkerPool queryPool; SWorkerPool fetchPool; SMWorkerPool syncPool; diff --git a/source/dnode/mgmt/impl/inc/dndVnodes.h b/source/dnode/mgmt/impl/inc/dndVnodes.h index 35f99ee73b..bf5f0122c1 100644 --- a/source/dnode/mgmt/impl/inc/dndVnodes.h +++ b/source/dnode/mgmt/impl/inc/dndVnodes.h @@ -24,12 +24,18 @@ extern "C" { int32_t dndInitVnodes(SDnode *pDnode); void dndCleanupVnodes(SDnode *pDnode); void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pVloads); -void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); +int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); +int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); +int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); +int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); +int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 777d6f77d9..c67e55f048 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -661,6 +661,24 @@ static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { case TDMT_MND_GRANT_RSP: dndProcessGrantRsp(pDnode, pMsg); break; + case TDMT_DND_CREATE_VNODE: + code = dndProcessCreateVnodeReq(pDnode, pMsg); + break; + case TDMT_DND_ALTER_VNODE: + code = dndProcessAlterVnodeReq(pDnode, pMsg); + break; + case TDMT_DND_DROP_VNODE: + code = dndProcessDropVnodeReq(pDnode, pMsg); + break; + case TDMT_DND_AUTH_VNODE: + code = dndProcessAuthVnodeReq(pDnode, pMsg); + break; + case TDMT_DND_SYNC_VNODE: + code = dndProcessSyncVnodeReq(pDnode, pMsg); + break; + case TDMT_DND_COMPACT_VNODE: + code = dndProcessCompactVnodeReq(pDnode, pMsg); + break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; code = -1; diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index f39bbb6ba3..000510970f 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -94,17 +94,17 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = dndProcessMnodeWriteMsg; // message from mnode to dnode - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE)] = dndProcessVnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE)] = dndProcessVnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE)] = dndProcessVnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE)] = dndProcessVnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE)] = dndProcessVnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessVnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = dndProcessMnodeWriteMsg; diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 351fc20784..1e03bba10b 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -56,11 +56,9 @@ typedef struct { static int32_t dndInitVnodeReadWorker(SDnode *pDnode); static int32_t dndInitVnodeWriteWorker(SDnode *pDnode); static int32_t dndInitVnodeSyncWorker(SDnode *pDnode); -static int32_t dndInitVnodeMgmtWorker(SDnode *pDnode); static void dndCleanupVnodeReadWorker(SDnode *pDnode); static void dndCleanupVnodeWriteWorker(SDnode *pDnode); static void dndCleanupVnodeSyncWorker(SDnode *pDnode); -static void dndCleanupVnodeMgmtWorker(SDnode *pDnode); static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode); static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode); static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode); @@ -77,12 +75,10 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); -static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg); static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId); @@ -96,13 +92,6 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode); static int32_t dndOpenVnodes(SDnode *pDnode); static void dndCloseVnodes(SDnode *pDnode); -static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -static int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -static int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); - static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodeObj *pVnode = NULL; @@ -600,7 +589,7 @@ static SAuthVnodeMsg *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) { return pAuth; } -static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { +int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { SCreateVnodeMsg *pCreate = dndParseCreateVnodeReq(rpcMsg); dDebug("vgId:%d, create vnode req is received", pCreate->vgId); @@ -641,7 +630,7 @@ static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return 0; } -static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { +int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { SAlterVnodeMsg *pAlter = (SAlterVnodeMsg *)dndParseCreateVnodeReq(rpcMsg); dDebug("vgId:%d, alter vnode req is received", pAlter->vgId); @@ -680,7 +669,7 @@ static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return code; } -static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { +int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { SDropVnodeMsg *pDrop = vnodeParseDropVnodeReq(rpcMsg); int32_t vgId = pDrop->vgId; @@ -707,7 +696,7 @@ static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return 0; } -static int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { +int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg); int32_t code = 0; @@ -725,7 +714,7 @@ static int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return 0; } -static int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { +int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg); int32_t vgId = pAuth->vgId; @@ -747,7 +736,7 @@ static int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return 0; } -static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { +int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { SCompactVnodeMsg *pCompact = (SCompactVnodeMsg *)vnodeParseDropVnodeReq(rpcMsg); int32_t vgId = pCompact->vgId; @@ -769,39 +758,6 @@ static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return 0; } -static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { - int32_t code = 0; - - switch (pMsg->msgType) { - case TDMT_DND_CREATE_VNODE: - code = dndProcessCreateVnodeReq(pDnode, pMsg); - break; - case TDMT_DND_ALTER_VNODE: - code = dndProcessAlterVnodeReq(pDnode, pMsg); - break; - case TDMT_DND_DROP_VNODE: - code = dndProcessDropVnodeReq(pDnode, pMsg); - break; - case TDMT_DND_AUTH_VNODE: - code = dndProcessAuthVnodeReq(pDnode, pMsg); - break; - case TDMT_DND_SYNC_VNODE: - code = dndProcessSyncVnodeReq(pDnode, pMsg); - break; - case TDMT_DND_COMPACT_VNODE: - code = dndProcessCompactVnodeReq(pDnode, pMsg); - break; - default: - code = TSDB_CODE_MSG_NOT_PROCESSED; - break; - } - - SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle}; - rpcSendResponse(&rsp); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); -} - static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { SRpcMsg *pRsp = NULL; vnodeProcessQueryReq(pVnode->pImpl, pMsg, &pRsp); @@ -909,11 +865,6 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { return pVnode; } -void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - dndWriteRpcMsgToVnodeQueue(pMgmt->pMgmtQ, pMsg); -} - void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { @@ -957,35 +908,6 @@ static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMs return code; } -static int32_t dndInitVnodeMgmtWorker(SDnode *pDnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SWorkerPool *pPool = &pMgmt->mgmtPool; - pPool->name = "vnode-mgmt"; - pPool->min = 1; - pPool->max = 1; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - pMgmt->pMgmtQ = tWorkerAllocQueue(pPool, pDnode, (FProcessItem)dndProcessVnodeMgmtQueue); - if (pMgmt->pMgmtQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("vnode mgmt worker is initialized"); - return 0; -} - -static void dndCleanupVnodeMgmtWorker(SDnode *pDnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ); - tWorkerCleanup(&pMgmt->mgmtPool); - pMgmt->pMgmtQ = NULL; - dDebug("vnode mgmt worker is closed"); -} - static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue); @@ -1167,11 +1089,6 @@ int32_t dndInitVnodes(SDnode *pDnode) { return -1; } - if (dndInitVnodeMgmtWorker(pDnode) != 0) { - dError("failed to init vnodes mgmt worker since %s", terrstr()); - return -1; - } - if (dndOpenVnodes(pDnode) != 0) { dError("failed to open vnodes since %s", terrstr()); return -1; @@ -1187,7 +1104,6 @@ void dndCleanupVnodes(SDnode *pDnode) { dndCleanupVnodeReadWorker(pDnode); dndCleanupVnodeWriteWorker(pDnode); dndCleanupVnodeSyncWorker(pDnode); - dndCleanupVnodeMgmtWorker(pDnode); dInfo("dnode-vnodes is cleaned up"); } diff --git a/source/dnode/mgmt/impl/test/cluster/cluster.cpp b/source/dnode/mgmt/impl/test/cluster/cluster.cpp index 7230c3eb74..7734826789 100644 --- a/source/dnode/mgmt/impl/test/cluster/cluster.cpp +++ b/source/dnode/mgmt/impl/test/cluster/cluster.cpp @@ -28,14 +28,14 @@ Testbase DndTestCluster::test; TEST_F(DndTestCluster, 01_ShowCluster) { test.SendShowMetaMsg(TSDB_MGMT_TABLE_CLUSTER, ""); CHECK_META( "show cluster", 3); - CHECK_SCHEMA(0, TSDB_DATA_TYPE_INT, 4, "id"); + CHECK_SCHEMA(0, TSDB_DATA_TYPE_BIGINT, 8, "id"); CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, "name"); CHECK_SCHEMA(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time"); test.SendShowRetrieveMsg(); EXPECT_EQ(test.GetShowRows(), 1); - IgnoreInt32(); + IgnoreInt64(); IgnoreBinary(TSDB_CLUSTER_ID_LEN); CheckTimestamp(); } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index cf85befb61..9459c5e525 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -844,7 +844,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) { mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); } - mError("trans:%d, exec finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes); + mDebug("trans:%d, finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes); return continueExec; } diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 0c311b86b6..e288f598a2 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -123,6 +123,10 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * SRWLatch *pLock = &pSdb->locks[pRow->type]; taosWLockLatch(pLock); + // remove attached object such as trans + SdbDeleteFp deleteFp = pSdb->deleteFps[pRow->type]; + if (deleteFp != NULL) (*deleteFp)(pSdb, pRow); + SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize); if (ppOldRow == NULL || *ppOldRow == NULL) { taosWUnLockLatch(pLock);