diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 18ac74a940..8b90a006d3 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -31,18 +31,6 @@ typedef int32_t (*PutReqToMWriteQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef int32_t (*PutReqToMReadQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); - -typedef struct { - char user[TSDB_USER_LEN]; - char db[TSDB_DB_FNAME_LEN]; - int32_t acctId; - SMnode* pMnode; - int64_t createdTime; - SRpcMsg rpcMsg; - int32_t contLen; - void* pCont; -} SMndMsg; - typedef struct { int32_t dnodeId; int64_t clusterId; @@ -121,7 +109,7 @@ int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, cha * @param pMsg The request msg. * @return int32_t 0 for success, -1 for failure. */ -int32_t mndProcessMsg(SMndMsg *pMsg); +int32_t mndProcessMsg(SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/bnode/inc/bmWorker.h b/source/dnode/mgmt/bnode/inc/bmWorker.h index ab45c53408..1f81c9d7af 100644 --- a/source/dnode/mgmt/bnode/inc/bmWorker.h +++ b/source/dnode/mgmt/bnode/inc/bmWorker.h @@ -28,7 +28,7 @@ void bmInitMsgFp(SMnodeMgmt *pMgmt); void bmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t bmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t bmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -void bmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); +void bmConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); void bmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); void bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index 67e8d89719..00e780986e 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -29,7 +29,7 @@ static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { if (pMnode != NULL) { pMsg->pNode = pMnode; - code = mndProcessMsg((SMndMsg *)pMsg); + code = mndProcessMsg((SNodeMsg *)pMsg); mmRelease(pMgmt, pMnode); } diff --git a/source/dnode/mgmt/qnode/inc/qmWorker.h b/source/dnode/mgmt/qnode/inc/qmWorker.h index b268ed7d1e..aaba32538d 100644 --- a/source/dnode/mgmt/qnode/inc/qmWorker.h +++ b/source/dnode/mgmt/qnode/inc/qmWorker.h @@ -28,7 +28,7 @@ void qmInitMsgFp(SMnodeMgmt *pMgmt); void qmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t qmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t qmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -void qmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); +void qmConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); void qmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); void qmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/snode/inc/smWorker.h b/source/dnode/mgmt/snode/inc/smWorker.h index 3ad593faed..973a590c96 100644 --- a/source/dnode/mgmt/snode/inc/smWorker.h +++ b/source/dnode/mgmt/snode/inc/smWorker.h @@ -28,7 +28,7 @@ void smInitMsgFp(SMnodeMgmt *pMgmt); void smProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t smPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t smPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -void smConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); +void smConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); void smConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); void smProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/vnode/inc/vmWorker.h b/source/dnode/mgmt/vnode/inc/vmWorker.h index e64ecd8dbf..7f4e944e23 100644 --- a/source/dnode/mgmt/vnode/inc/vmWorker.h +++ b/source/dnode/mgmt/vnode/inc/vmWorker.h @@ -28,7 +28,7 @@ void vmInitMsgFp(SMnodeMgmt *pMgmt); void vmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t vmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t vmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -void vmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); +void vmConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); void vmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 3577f1dda3..1da1673b2b 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -38,11 +38,11 @@ extern "C" { #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} -typedef int32_t (*MndMsgFp)(SMndMsg *pMsg); +typedef int32_t (*MndMsgFp)(SNodeMsg *pMsg); typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode); -typedef int32_t (*ShowMetaFp)(SMndMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); -typedef int32_t (*ShowRetrieveFp)(SMndMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +typedef int32_t (*ShowMetaFp)(SNodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); +typedef int32_t (*ShowRetrieveFp)(SNodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter); typedef struct SMnodeLoad { diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index d6258c040c..5c1b0991be 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -47,7 +47,7 @@ void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); -void mndTransProcessRsp(SMndMsg *pRsp); +void mndTransProcessRsp(SNodeMsg *pRsp); void mndTransPullup(SMnode *pMnode); #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/src/mndAcct.c b/source/dnode/mnode/impl/src/mndAcct.c index 0e3bc53219..a6e6d57345 100644 --- a/source/dnode/mnode/impl/src/mndAcct.c +++ b/source/dnode/mnode/impl/src/mndAcct.c @@ -26,9 +26,9 @@ static SSdbRow *mndAcctActionDecode(SSdbRaw *pRaw); static int32_t mndAcctActionInsert(SSdb *pSdb, SAcctObj *pAcct); static int32_t mndAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct); static int32_t mndAcctActionUpdate(SSdb *pSdb, SAcctObj *pOld, SAcctObj *pNew); -static int32_t mndProcessCreateAcctReq(SMndMsg *pReq); -static int32_t mndProcessAlterAcctReq(SMndMsg *pReq); -static int32_t mndProcessDropAcctReq(SMndMsg *pReq); +static int32_t mndProcessCreateAcctReq(SNodeMsg *pReq); +static int32_t mndProcessAlterAcctReq(SNodeMsg *pReq); +static int32_t mndProcessDropAcctReq(SNodeMsg *pReq); int32_t mndInitAcct(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_ACCT, @@ -185,19 +185,19 @@ static int32_t mndAcctActionUpdate(SSdb *pSdb, SAcctObj *pOld, SAcctObj *pNew) { return 0; } -static int32_t mndProcessCreateAcctReq(SMndMsg *pReq) { +static int32_t mndProcessCreateAcctReq(SNodeMsg *pReq) { terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED; mError("failed to process create acct request since %s", terrstr()); return -1; } -static int32_t mndProcessAlterAcctReq(SMndMsg *pReq) { +static int32_t mndProcessAlterAcctReq(SNodeMsg *pReq) { terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED; mError("failed to process create acct request since %s", terrstr()); return -1; } -static int32_t mndProcessDropAcctReq(SMndMsg *pReq) { +static int32_t mndProcessDropAcctReq(SNodeMsg *pReq) { terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED; mError("failed to process create acct request since %s", terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndAuth.c b/source/dnode/mnode/impl/src/mndAuth.c index ea58887af7..20dd7f1378 100644 --- a/source/dnode/mnode/impl/src/mndAuth.c +++ b/source/dnode/mnode/impl/src/mndAuth.c @@ -17,7 +17,7 @@ #include "mndAuth.h" #include "mndUser.h" -static int32_t mndProcessAuthReq(SMndMsg *pReq); +static int32_t mndProcessAuthReq(SNodeMsg *pReq); int32_t mndInitAuth(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_AUTH, mndProcessAuthReq); @@ -45,7 +45,7 @@ int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, cha return 0; } -static int32_t mndProcessAuthReq(SMndMsg *pReq) { +static int32_t mndProcessAuthReq(SNodeMsg *pReq) { SAuthReq authReq = {0}; if (tDeserializeSAuthReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &authReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -56,15 +56,15 @@ static int32_t mndProcessAuthReq(SMndMsg *pReq) { memcpy(authRsp.user, authReq.user, TSDB_USER_LEN); int32_t code = - mndRetriveAuth(pReq->pMnode, authRsp.user, &authRsp.spi, &authRsp.encrypt, authRsp.secret, authRsp.ckey); + mndRetriveAuth(pReq->pNode, authRsp.user, &authRsp.spi, &authRsp.encrypt, authRsp.secret, authRsp.ckey); mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->user, authRsp.spi, authRsp.encrypt, authRsp.user); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authRsp); void *pRsp = rpcMallocCont(contLen); tSerializeSAuthReq(pRsp, contLen, &authRsp); - pReq->pCont = pRsp; - pReq->contLen = contLen; + pReq->pRsp = pRsp; + pReq->rspLen = contLen; return code; } diff --git a/source/dnode/mnode/impl/src/mndBnode.c b/source/dnode/mnode/impl/src/mndBnode.c index cea05f16b2..77e008c2d4 100644 --- a/source/dnode/mnode/impl/src/mndBnode.c +++ b/source/dnode/mnode/impl/src/mndBnode.c @@ -29,12 +29,12 @@ static SSdbRow *mndBnodeActionDecode(SSdbRaw *pRaw); static int32_t mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj); static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj); static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew); -static int32_t mndProcessCreateBnodeReq(SMndMsg *pReq); -static int32_t mndProcessDropBnodeReq(SMndMsg *pReq); -static int32_t mndProcessCreateBnodeRsp(SMndMsg *pRsp); -static int32_t mndProcessDropBnodeRsp(SMndMsg *pRsp); -static int32_t mndGetBnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveBnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessCreateBnodeReq(SNodeMsg *pReq); +static int32_t mndProcessDropBnodeReq(SNodeMsg *pReq); +static int32_t mndProcessCreateBnodeRsp(SNodeMsg *pRsp); +static int32_t mndProcessDropBnodeRsp(SNodeMsg *pRsp); +static int32_t mndGetBnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveBnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextBnode(SMnode *pMnode, void *pIter); int32_t mndInitBnode(SMnode *pMnode) { @@ -240,7 +240,7 @@ static int32_t mndSetCreateBnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, S return 0; } -static int32_t mndCreateBnode(SMnode *pMnode, SMndMsg *pReq, SDnodeObj *pDnode, SMCreateBnodeReq *pCreate) { +static int32_t mndCreateBnode(SMnode *pMnode, SNodeMsg *pReq, SDnodeObj *pDnode, SMCreateBnodeReq *pCreate) { int32_t code = -1; SBnodeObj bnodeObj = {0}; @@ -266,8 +266,8 @@ CREATE_BNODE_OVER: return code; } -static int32_t mndProcessCreateBnodeReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessCreateBnodeReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SBnodeObj *pObj = NULL; SDnodeObj *pDnode = NULL; @@ -363,7 +363,7 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn return 0; } -static int32_t mndDropBnode(SMnode *pMnode, SMndMsg *pReq, SBnodeObj *pObj) { +static int32_t mndDropBnode(SMnode *pMnode, SNodeMsg *pReq, SBnodeObj *pObj) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_BNODE, &pReq->rpcMsg); @@ -382,8 +382,8 @@ DROP_BNODE_OVER: return code; } -static int32_t mndProcessDropBnodeReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessDropBnodeReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SUserObj *pUser = NULL; SBnodeObj *pObj = NULL; @@ -430,18 +430,18 @@ DROP_BNODE_OVER: return code; } -static int32_t mndProcessCreateBnodeRsp(SMndMsg *pRsp) { +static int32_t mndProcessCreateBnodeRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessDropBnodeRsp(SMndMsg *pRsp) { +static int32_t mndProcessDropBnodeRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndGetBnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndGetBnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t cols = 0; @@ -480,8 +480,8 @@ static int32_t mndGetBnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pM return 0; } -static int32_t mndRetrieveBnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndRetrieveBnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; int32_t cols = 0; diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index a433f70644..ce47080645 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -26,8 +26,8 @@ static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster); static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster); static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SClusterObj *pNewCluster); static int32_t mndCreateDefaultCluster(SMnode *pMnode); -static int32_t mndGetClusterMeta(SMndMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveClusters(SMndMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndGetClusterMeta(SNodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter); int32_t mndInitCluster(SMnode *pMnode) { @@ -163,7 +163,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { return sdbWrite(pMnode->pSdb, pRaw); } -static int32_t mndGetClusterMeta(SMndMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetClusterMeta(SNodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) { int32_t cols = 0; SSchema *pSchema = pMeta->pSchemas; @@ -201,8 +201,8 @@ static int32_t mndGetClusterMeta(SMndMsg *pMsg, SShowObj *pShow, STableMetaRsp * return 0; } -static int32_t mndRetrieveClusters(SMndMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pMsg->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; int32_t cols = 0; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 50b3f7ca9f..756725068f 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -34,9 +34,9 @@ static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer); -static int32_t mndProcessConsumerMetaMsg(SMndMsg *pMsg); -static int32_t mndGetConsumerMeta(SMndMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveConsumer(SMndMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessConsumerMetaMsg(SNodeMsg *pMsg); +static int32_t mndGetConsumerMeta(SNodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveConsumer(SNodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter); int32_t mndInitConsumer(SMnode *pMnode) { diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 6f5a2acd21..e72e14a168 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -31,14 +31,14 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw); static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew); -static int32_t mndProcessCreateDbReq(SMndMsg *pReq); -static int32_t mndProcessAlterDbReq(SMndMsg *pReq); -static int32_t mndProcessDropDbReq(SMndMsg *pReq); -static int32_t mndProcessUseDbReq(SMndMsg *pReq); -static int32_t mndProcessSyncDbReq(SMndMsg *pReq); -static int32_t mndProcessCompactDbReq(SMndMsg *pReq); -static int32_t mndGetDbMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveDbs(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessCreateDbReq(SNodeMsg *pReq); +static int32_t mndProcessAlterDbReq(SNodeMsg *pReq); +static int32_t mndProcessDropDbReq(SNodeMsg *pReq); +static int32_t mndProcessUseDbReq(SNodeMsg *pReq); +static int32_t mndProcessSyncDbReq(SNodeMsg *pReq); +static int32_t mndProcessCompactDbReq(SNodeMsg *pReq); +static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextDb(SMnode *pMnode, void *pIter); int32_t mndInitDb(SMnode *pMnode) { @@ -384,7 +384,7 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndCreateDb(SMnode *pMnode, SMndMsg *pReq, SCreateDbReq *pCreate, SUserObj *pUser) { +static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate, SUserObj *pUser) { SDbObj dbObj = {0}; memcpy(dbObj.name, pCreate->db, TSDB_DB_FNAME_LEN); memcpy(dbObj.acct, pUser->acct, TSDB_USER_LEN); @@ -458,8 +458,8 @@ CREATE_DB_OVER: return code; } -static int32_t mndProcessCreateDbReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessCreateDbReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SDbObj *pDb = NULL; SUserObj *pUser = NULL; @@ -622,7 +622,7 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndUpdateDb(SMnode *pMnode, SMndMsg *pReq, SDbObj *pOld, SDbObj *pNew) { +static int32_t mndUpdateDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_DB, &pReq->rpcMsg); if (pTrans == NULL) goto UPDATE_DB_OVER; @@ -642,8 +642,8 @@ UPDATE_DB_OVER: return code; } -static int32_t mndProcessAlterDbReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SDbObj *pDb = NULL; SUserObj *pUser = NULL; @@ -802,7 +802,7 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p return 0; } -static int32_t mndDropDb(SMnode *pMnode, SMndMsg *pReq, SDbObj *pDb) { +static int32_t mndDropDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_DB, &pReq->rpcMsg); if (pTrans == NULL) goto DROP_DB_OVER; @@ -837,8 +837,8 @@ DROP_DB_OVER: return code; } -static int32_t mndProcessDropDbReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessDropDbReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SDbObj *pDb = NULL; SUserObj *pUser = NULL; @@ -948,8 +948,8 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { sdbCancelFetch(pSdb, pIter); } -static int32_t mndProcessUseDbReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SDbObj *pDb = NULL; SUserObj *pUser = NULL; @@ -1017,8 +1017,8 @@ static int32_t mndProcessUseDbReq(SMndMsg *pReq) { tSerializeSUseDbRsp(pRsp, contLen, &usedbRsp); - pReq->pCont = pRsp; - pReq->contLen = contLen; + pReq->pRsp = pRsp; + pReq->rspLen = contLen; USE_DB_OVER: if (code != 0) { @@ -1101,8 +1101,8 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, return 0; } -static int32_t mndProcessSyncDbReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessSyncDbReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SDbObj *pDb = NULL; SUserObj *pUser = NULL; @@ -1142,8 +1142,8 @@ SYNC_DB_OVER: return code; } -static int32_t mndProcessCompactDbReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessCompactDbReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SDbObj *pDb = NULL; SUserObj *pUser = NULL; @@ -1183,8 +1183,8 @@ SYNC_DB_OVER: return code; } -static int32_t mndGetDbMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t cols = 0; @@ -1324,8 +1324,8 @@ char *mnGetDbStr(char *src) { return pos; } -static int32_t mndRetrieveDbs(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SDbObj *pDb = NULL; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index e3b1c0a177..bf79c89a1c 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -49,17 +49,17 @@ static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode); static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode); static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew); -static int32_t mndProcessCreateDnodeReq(SMndMsg *pReq); -static int32_t mndProcessDropDnodeReq(SMndMsg *pReq); -static int32_t mndProcessConfigDnodeReq(SMndMsg *pReq); -static int32_t mndProcessConfigDnodeRsp(SMndMsg *pRsp); -static int32_t mndProcessStatusReq(SMndMsg *pReq); +static int32_t mndProcessCreateDnodeReq(SNodeMsg *pReq); +static int32_t mndProcessDropDnodeReq(SNodeMsg *pReq); +static int32_t mndProcessConfigDnodeReq(SNodeMsg *pReq); +static int32_t mndProcessConfigDnodeRsp(SNodeMsg *pRsp); +static int32_t mndProcessStatusReq(SNodeMsg *pReq); -static int32_t mndGetConfigMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveConfigs(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndGetConfigMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveConfigs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter); -static int32_t mndGetDnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveDnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndGetDnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveDnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter); int32_t mndInitDnode(SMnode *pMnode) { @@ -296,8 +296,8 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { return 0; } -static int32_t mndProcessStatusReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessStatusReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; SStatusReq statusReq = {0}; SDnodeObj *pDnode = NULL; int32_t code = -1; @@ -424,8 +424,8 @@ static int32_t mndProcessStatusReq(SMndMsg *pReq) { tSerializeSStatusRsp(pHead, contLen, &statusRsp); taosArrayDestroy(statusRsp.pDnodeEps); - pReq->contLen = contLen; - pReq->pCont = pHead; + pReq->rspLen = contLen; + pReq->pRsp = pHead; } pDnode->lastAccessTime = curMs; @@ -437,7 +437,7 @@ PROCESS_STATUS_MSG_OVER: return code; } -static int32_t mndCreateDnode(SMnode *pMnode, SMndMsg *pReq, SCreateDnodeReq *pCreate) { +static int32_t mndCreateDnode(SMnode *pMnode, SNodeMsg *pReq, SCreateDnodeReq *pCreate) { SDnodeObj dnodeObj = {0}; dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE); dnodeObj.createdTime = taosGetTimestampMs(); @@ -471,8 +471,8 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMndMsg *pReq, SCreateDnodeReq *pC return 0; } -static int32_t mndProcessCreateDnodeReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessCreateDnodeReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SUserObj *pUser = NULL; SDnodeObj *pDnode = NULL; @@ -521,7 +521,7 @@ CREATE_DNODE_OVER: return code; } -static int32_t mndDropDnode(SMnode *pMnode, SMndMsg *pReq, SDnodeObj *pDnode) { +static int32_t mndDropDnode(SMnode *pMnode, SNodeMsg *pReq, SDnodeObj *pDnode) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_DNODE, &pReq->rpcMsg); if (pTrans == NULL) { mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr()); @@ -547,8 +547,8 @@ static int32_t mndDropDnode(SMnode *pMnode, SMndMsg *pReq, SDnodeObj *pDnode) { return 0; } -static int32_t mndProcessDropDnodeReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessDropDnodeReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SUserObj *pUser = NULL; SDnodeObj *pDnode = NULL; @@ -596,8 +596,8 @@ DROP_DNODE_OVER: return code; } -static int32_t mndProcessConfigDnodeReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessConfigDnodeReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; SMCfgDnodeReq cfgReq = {0}; if (tDeserializeSMCfgDnodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &cfgReq) != 0) { @@ -628,11 +628,11 @@ static int32_t mndProcessConfigDnodeReq(SMndMsg *pReq) { return 0; } -static int32_t mndProcessConfigDnodeRsp(SMndMsg *pRsp) { +static int32_t mndProcessConfigDnodeRsp(SNodeMsg *pRsp) { mInfo("app:%p config rsp from dnode", pRsp->rpcMsg.ahandle); } -static int32_t mndGetConfigMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetConfigMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { int32_t cols = 0; SSchema *pSchema = pMeta->pSchemas; @@ -663,8 +663,8 @@ static int32_t mndGetConfigMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *p return 0; } -static int32_t mndRetrieveConfigs(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndRetrieveConfigs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pNode; int32_t totalRows = 0; int32_t numOfRows = 0; char *cfgOpts[TSDB_CONFIG_NUMBER] = {0}; @@ -709,8 +709,8 @@ static int32_t mndRetrieveConfigs(SMndMsg *pReq, SShowObj *pShow, char *data, in static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {} -static int32_t mndGetDnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndGetDnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t cols = 0; @@ -773,8 +773,8 @@ static int32_t mndGetDnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pM return 0; } -static int32_t mndRetrieveDnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndRetrieveDnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; int32_t cols = 0; diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index 90162d75ab..b43d887dd7 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -29,13 +29,13 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw); static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc); static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc); static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew); -static int32_t mndCreateFunc(SMnode *pMnode, SMndMsg *pReq, SCreateFuncReq *pCreate); -static int32_t mndDropFunc(SMnode *pMnode, SMndMsg *pReq, SFuncObj *pFunc); -static int32_t mndProcessCreateFuncReq(SMndMsg *pReq); -static int32_t mndProcessDropFuncReq(SMndMsg *pReq); -static int32_t mndProcessRetrieveFuncReq(SMndMsg *pReq); -static int32_t mndGetFuncMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveFuncs(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndCreateFunc(SMnode *pMnode, SNodeMsg *pReq, SCreateFuncReq *pCreate); +static int32_t mndDropFunc(SMnode *pMnode, SNodeMsg *pReq, SFuncObj *pFunc); +static int32_t mndProcessCreateFuncReq(SNodeMsg *pReq); +static int32_t mndProcessDropFuncReq(SNodeMsg *pReq); +static int32_t mndProcessRetrieveFuncReq(SNodeMsg *pReq); +static int32_t mndGetFuncMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveFuncs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter); int32_t mndInitFunc(SMnode *pMnode) { @@ -181,7 +181,7 @@ static void mndReleaseFunc(SMnode *pMnode, SFuncObj *pFunc) { sdbRelease(pSdb, pFunc); } -static int32_t mndCreateFunc(SMnode *pMnode, SMndMsg *pReq, SCreateFuncReq *pCreate) { +static int32_t mndCreateFunc(SMnode *pMnode, SNodeMsg *pReq, SCreateFuncReq *pCreate) { int32_t code = -1; STrans *pTrans = NULL; @@ -234,7 +234,7 @@ CREATE_FUNC_OVER: return code; } -static int32_t mndDropFunc(SMnode *pMnode, SMndMsg *pReq, SFuncObj *pFunc) { +static int32_t mndDropFunc(SMnode *pMnode, SNodeMsg *pReq, SFuncObj *pFunc) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_FUNC, &pReq->rpcMsg); if (pTrans == NULL) goto DROP_FUNC_OVER; @@ -262,8 +262,8 @@ DROP_FUNC_OVER: return code; } -static int32_t mndProcessCreateFuncReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessCreateFuncReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SUserObj *pUser = NULL; SFuncObj *pFunc = NULL; @@ -339,8 +339,8 @@ CREATE_FUNC_OVER: return code; } -static int32_t mndProcessDropFuncReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessDropFuncReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SUserObj *pUser = NULL; SFuncObj *pFunc = NULL; @@ -394,8 +394,8 @@ DROP_FUNC_OVER: return code; } -static int32_t mndProcessRetrieveFuncReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessRetrieveFuncReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SRetrieveFuncReq retrieveReq = {0}; SRetrieveFuncRsp retrieveRsp = {0}; @@ -451,8 +451,8 @@ static int32_t mndProcessRetrieveFuncReq(SMndMsg *pReq) { tSerializeSRetrieveFuncRsp(pRsp, contLen, &retrieveRsp); - pReq->pCont = pRsp; - pReq->contLen = contLen; + pReq->pRsp = pRsp; + pReq->rspLen = contLen; code = 0; @@ -463,8 +463,8 @@ RETRIEVE_FUNC_OVER: return code; } -static int32_t mndGetFuncMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndGetFuncMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t cols = 0; @@ -545,8 +545,8 @@ static void *mnodeGenTypeStr(char *buf, int32_t buflen, uint8_t type, int16_t le return tDataTypes[type].name; } -static int32_t mndRetrieveFuncs(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndRetrieveFuncs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SFuncObj *pFunc = NULL; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 5f0e0874df..f33dd5629e 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -30,13 +30,13 @@ static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw); static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj); static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj); static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew); -static int32_t mndProcessCreateMnodeReq(SMndMsg *pReq); -static int32_t mndProcessDropMnodeReq(SMndMsg *pReq); -static int32_t mndProcessCreateMnodeRsp(SMndMsg *pRsp); -static int32_t mndProcessAlterMnodeRsp(SMndMsg *pRsp); -static int32_t mndProcessDropMnodeRsp(SMndMsg *pRsp); -static int32_t mndGetMnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveMnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessCreateMnodeReq(SNodeMsg *pReq); +static int32_t mndProcessDropMnodeReq(SNodeMsg *pReq); +static int32_t mndProcessCreateMnodeRsp(SNodeMsg *pRsp); +static int32_t mndProcessAlterMnodeRsp(SNodeMsg *pRsp); +static int32_t mndProcessDropMnodeRsp(SNodeMsg *pRsp); +static int32_t mndGetMnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveMnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter); int32_t mndInitMnode(SMnode *pMnode) { @@ -355,7 +355,7 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno return 0; } -static int32_t mndCreateMnode(SMnode *pMnode, SMndMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) { +static int32_t mndCreateMnode(SMnode *pMnode, SNodeMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) { int32_t code = -1; SMnodeObj mnodeObj = {0}; @@ -380,8 +380,8 @@ CREATE_MNODE_OVER: return code; } -static int32_t mndProcessCreateMnodeReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessCreateMnodeReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SMnodeObj *pObj = NULL; SDnodeObj *pDnode = NULL; @@ -527,7 +527,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode return 0; } -static int32_t mndDropMnode(SMnode *pMnode, SMndMsg *pReq, SMnodeObj *pObj) { +static int32_t mndDropMnode(SMnode *pMnode, SNodeMsg *pReq, SMnodeObj *pObj) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_MNODE, &pReq->rpcMsg); @@ -547,8 +547,8 @@ DROP_MNODE_OVER: return code; } -static int32_t mndProcessDropMnodeReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessDropMnodeReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SUserObj *pUser = NULL; SMnodeObj *pObj = NULL; @@ -595,23 +595,23 @@ DROP_MNODE_OVER: return code; } -static int32_t mndProcessCreateMnodeRsp(SMndMsg *pRsp) { +static int32_t mndProcessCreateMnodeRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessAlterMnodeRsp(SMndMsg *pRsp) { +static int32_t mndProcessAlterMnodeRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessDropMnodeRsp(SMndMsg *pRsp) { +static int32_t mndProcessDropMnodeRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndGetMnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndGetMnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t cols = 0; @@ -663,8 +663,8 @@ static int32_t mndGetMnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pM return 0; } -static int32_t mndRetrieveMnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndRetrieveMnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; int32_t cols = 0; diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index d61840f9df..38157cf220 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -32,7 +32,7 @@ static int32_t mndOffsetActionInsert(SSdb *pSdb, SMqOffsetObj *pOffset); static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset); static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOffset, SMqOffsetObj *pNewOffset); -static int32_t mndProcessCommitOffsetReq(SMndMsg *pReq); +static int32_t mndProcessCommitOffsetReq(SNodeMsg *pReq); int32_t mndInitOffset(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_OFFSET, @@ -152,10 +152,10 @@ int32_t mndCreateOffset(STrans *pTrans, const char *cgroup, const char *topicNam return 0; } -static int32_t mndProcessCommitOffsetReq(SMndMsg *pMsg) { +static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) { char key[TSDB_PARTITION_KEY_LEN]; - SMnode *pMnode = pMsg->pMnode; + SMnode *pMnode = pMsg->pNode; char *msgStr = pMsg->rpcMsg.pCont; SMqCMCommitOffsetReq commitOffsetReq; SCoder decoder; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 6a531ffa78..7ca1984457 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -50,14 +50,14 @@ static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId); static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn); static void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter); static void mndCancelGetNextConn(SMnode *pMnode, void *pIter); -static int32_t mndProcessHeartBeatReq(SMndMsg *pReq); -static int32_t mndProcessConnectReq(SMndMsg *pReq); -static int32_t mndProcessKillQueryReq(SMndMsg *pReq); -static int32_t mndProcessKillConnReq(SMndMsg *pReq); -static int32_t mndGetConnsMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveConns(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); -static int32_t mndGetQueryMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveQueries(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq); +static int32_t mndProcessConnectReq(SNodeMsg *pReq); +static int32_t mndProcessKillQueryReq(SNodeMsg *pReq); +static int32_t mndProcessKillConnReq(SNodeMsg *pReq); +static int32_t mndGetConnsMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndGetQueryMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter); int32_t mndInitProfile(SMnode *pMnode) { @@ -177,8 +177,8 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) { } } -static int32_t mndProcessConnectReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessConnectReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; SUserObj *pUser = NULL; SDbObj *pDb = NULL; SConnObj *pConn = NULL; @@ -206,8 +206,9 @@ static int32_t mndProcessConnectReq(SMndMsg *pReq) { } if (connReq.db[0]) { - snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db); - pDb = mndAcquireDb(pMnode, pReq->db); + char db[TSDB_DB_FNAME_LEN]; + snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db); + pDb = mndAcquireDb(pMnode, db); if (pDb == NULL) { terrno = TSDB_CODE_MND_INVALID_DB; mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, connReq.db, terrstr()); @@ -237,8 +238,8 @@ static int32_t mndProcessConnectReq(SMndMsg *pReq) { if (pRsp == NULL) goto CONN_OVER; tSerializeSConnectRsp(pRsp, contLen, &connectRsp); - pReq->contLen = contLen; - pReq->pCont = pRsp; + pReq->rspLen = contLen; + pReq->pRsp = pRsp; mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, connReq.app); @@ -338,8 +339,8 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) { return NULL; } -static int32_t mndProcessHeartBeatReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; SClientHbBatchReq batchReq = {0}; if (tDeserializeSClientHbBatchReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &batchReq) != 0) { @@ -423,12 +424,12 @@ static int32_t mndProcessHeartBeatReq(SMndMsg *pReq) { } taosArrayDestroy(batchRsp.rsps); - pReq->contLen = tlen; - pReq->pCont = buf; + pReq->rspLen = tlen; + pReq->pRsp = buf; return 0; #if 0 - SMnode *pMnode = pReq->pMnode; + SMnode *pMnode = pReq->pNode; SProfileMgmt *pMgmt = &pMnode->profileMgmt; SHeartBeatReq *pHeartbeat = pReq->rpcMsg.pCont; @@ -495,13 +496,13 @@ static int32_t mndProcessHeartBeatReq(SMndMsg *pReq) { mndReleaseConn(pMnode, pConn); pReq->contLen = sizeof(SConnectRsp); - pReq->pCont = pRsp; + pReq->pRsp = pRsp; return 0; #endif } -static int32_t mndProcessKillQueryReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; SProfileMgmt *pMgmt = &pMnode->profileMgmt; SUserObj *pUser = mndAcquireUser(pMnode, pReq->user); @@ -534,8 +535,8 @@ static int32_t mndProcessKillQueryReq(SMndMsg *pReq) { } } -static int32_t mndProcessKillConnReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessKillConnReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; SProfileMgmt *pMgmt = &pMnode->profileMgmt; SUserObj *pUser = mndAcquireUser(pMnode, pReq->user); @@ -566,8 +567,8 @@ static int32_t mndProcessKillConnReq(SMndMsg *pReq) { } } -static int32_t mndGetConnsMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndGetConnsMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pNode; SProfileMgmt *pMgmt = &pMnode->profileMgmt; SUserObj *pUser = mndAcquireUser(pMnode, pReq->user); @@ -641,8 +642,8 @@ static int32_t mndGetConnsMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pM return 0; } -static int32_t mndRetrieveConns(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pNode; int32_t numOfRows = 0; SConnObj *pConn = NULL; int32_t cols = 0; @@ -700,8 +701,8 @@ static int32_t mndRetrieveConns(SMndMsg *pReq, SShowObj *pShow, char *data, int3 return numOfRows; } -static int32_t mndGetQueryMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndGetQueryMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pNode; SProfileMgmt *pMgmt = &pMnode->profileMgmt; SUserObj *pUser = mndAcquireUser(pMnode, pReq->user); @@ -815,8 +816,8 @@ static int32_t mndGetQueryMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pM return 0; } -static int32_t mndRetrieveQueries(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pNode; int32_t numOfRows = 0; SConnObj *pConn = NULL; int32_t cols = 0; diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index 3e74da00be..f8d1c892f8 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -29,14 +29,14 @@ static SSdbRow *mndQnodeActionDecode(SSdbRaw *pRaw); static int32_t mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj); static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj); static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew); -static int32_t mndProcessCreateQnodeReq(SMndMsg *pReq); -static int32_t mndProcessDropQnodeReq(SMndMsg *pReq); -static int32_t mndProcessCreateQnodeRsp(SMndMsg *pRsp); -static int32_t mndProcessDropQnodeRsp(SMndMsg *pRsp); -static int32_t mndGetQnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveQnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessCreateQnodeReq(SNodeMsg *pReq); +static int32_t mndProcessDropQnodeReq(SNodeMsg *pReq); +static int32_t mndProcessCreateQnodeRsp(SNodeMsg *pRsp); +static int32_t mndProcessDropQnodeRsp(SNodeMsg *pRsp); +static int32_t mndGetQnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveQnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextQnode(SMnode *pMnode, void *pIter); -static int32_t mndProcessQnodeListReq(SMndMsg *pReq); +static int32_t mndProcessQnodeListReq(SNodeMsg *pReq); int32_t mndInitQnode(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_QNODE, @@ -242,7 +242,7 @@ static int32_t mndSetCreateQnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, S return 0; } -static int32_t mndCreateQnode(SMnode *pMnode, SMndMsg *pReq, SDnodeObj *pDnode, SMCreateQnodeReq *pCreate) { +static int32_t mndCreateQnode(SMnode *pMnode, SNodeMsg *pReq, SDnodeObj *pDnode, SMCreateQnodeReq *pCreate) { int32_t code = -1; SQnodeObj qnodeObj = {0}; @@ -268,8 +268,8 @@ CREATE_QNODE_OVER: return code; } -static int32_t mndProcessCreateQnodeReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessCreateQnodeReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SQnodeObj *pObj = NULL; SDnodeObj *pDnode = NULL; @@ -365,7 +365,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn return 0; } -static int32_t mndDropQnode(SMnode *pMnode, SMndMsg *pReq, SQnodeObj *pObj) { +static int32_t mndDropQnode(SMnode *pMnode, SNodeMsg *pReq, SQnodeObj *pObj) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_QNODE, &pReq->rpcMsg); @@ -384,8 +384,8 @@ DROP_QNODE_OVER: return code; } -static int32_t mndProcessDropQnodeReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessDropQnodeReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SUserObj *pUser = NULL; SQnodeObj *pObj = NULL; @@ -432,11 +432,11 @@ DROP_QNODE_OVER: return code; } -static int32_t mndProcessQnodeListReq(SMndMsg *pReq) { +static int32_t mndProcessQnodeListReq(SNodeMsg *pReq) { int32_t code = -1; SQnodeListReq qlistReq = {0}; int32_t numOfRows = 0; - SMnode *pMnode = pReq->pMnode; + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; SQnodeObj *pObj = NULL; SQnodeListRsp qlistRsp = {0}; @@ -482,8 +482,8 @@ static int32_t mndProcessQnodeListReq(SMndMsg *pReq) { tSerializeSQnodeListRsp(pRsp, rspLen, &qlistRsp); - pReq->contLen = rspLen; - pReq->pCont = pRsp; + pReq->rspLen = rspLen; + pReq->pRsp = pRsp; code = 0; QNODE_LIST_OVER: @@ -493,18 +493,18 @@ QNODE_LIST_OVER: return code; } -static int32_t mndProcessCreateQnodeRsp(SMndMsg *pRsp) { +static int32_t mndProcessCreateQnodeRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessDropQnodeRsp(SMndMsg *pRsp) { +static int32_t mndProcessDropQnodeRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndGetQnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndGetQnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t cols = 0; @@ -543,8 +543,8 @@ static int32_t mndGetQnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pM return 0; } -static int32_t mndRetrieveQnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndRetrieveQnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; int32_t cols = 0; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index e580b1e0cd..33a8b7fbe5 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -22,10 +22,10 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowReq *pReq); static void mndFreeShowObj(SShowObj *pShow); static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId); static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove); -static int32_t mndProcessShowReq(SMndMsg *pReq); -static int32_t mndProcessRetrieveReq(SMndMsg *pReq); +static int32_t mndProcessShowReq(SNodeMsg *pReq); +static int32_t mndProcessRetrieveReq(SNodeMsg *pReq); static bool mndCheckRetrieveFinished(SShowObj *pShow); -static int32_t mndProcessRetrieveSysTableReq(SMndMsg *pReq); +static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq); int32_t mndInitShow(SMnode *pMnode) { SShowMgmt *pMgmt = &pMnode->showMgmt; @@ -117,8 +117,8 @@ static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) { taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove); } -static int32_t mndProcessShowReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessShowReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; SShowMgmt *pMgmt = &pMnode->showMgmt; int32_t code = -1; SShowReq showReq = {0}; @@ -161,8 +161,8 @@ static int32_t mndProcessShowReq(SMndMsg *pReq) { int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp); void *pBuf = rpcMallocCont(bufLen); tSerializeSShowRsp(pBuf, bufLen, &showRsp); - pReq->contLen = bufLen; - pReq->pCont = pBuf; + pReq->rspLen = bufLen; + pReq->pRsp = pBuf; mndReleaseShowObj(pShow, false); } else { mndReleaseShowObj(pShow, true); @@ -178,8 +178,8 @@ SHOW_OVER: return code; } -static int32_t mndProcessRetrieveReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessRetrieveReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; SShowMgmt *pMgmt = &pMnode->showMgmt; int32_t rowsToRead = 0; int32_t size = 0; @@ -248,8 +248,8 @@ static int32_t mndProcessRetrieveReq(SMndMsg *pReq) { pRsp->numOfRows = htonl(rowsRead); pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision - pReq->pCont = pRsp; - pReq->contLen = size; + pReq->pRsp = pRsp; + pReq->rspLen = size; if (rowsRead == 0 || rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) { pRsp->completed = 1; @@ -263,8 +263,8 @@ static int32_t mndProcessRetrieveReq(SMndMsg *pReq) { return TSDB_CODE_SUCCESS; } -static int32_t mndProcessRetrieveSysTableReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; SShowMgmt *pMgmt = &pMnode->showMgmt; int32_t rowsToRead = 0; int32_t size = 0; @@ -348,8 +348,8 @@ static int32_t mndProcessRetrieveSysTableReq(SMndMsg *pReq) { pRsp->numOfRows = htonl(rowsRead); pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision - pReq->pCont = pRsp; - pReq->contLen = size; + pReq->pRsp = pRsp; + pReq->rspLen = size; if (rowsRead == 0 || rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) { pRsp->completed = 1; diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index da26ef3e6a..5927afef29 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -29,12 +29,12 @@ static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw); static int32_t mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj); static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj); static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew); -static int32_t mndProcessCreateSnodeReq(SMndMsg *pReq); -static int32_t mndProcessDropSnodeReq(SMndMsg *pReq); -static int32_t mndProcessCreateSnodeRsp(SMndMsg *pRsp); -static int32_t mndProcessDropSnodeRsp(SMndMsg *pRsp); -static int32_t mndGetSnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveSnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessCreateSnodeReq(SNodeMsg *pReq); +static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq); +static int32_t mndProcessCreateSnodeRsp(SNodeMsg *pRsp); +static int32_t mndProcessDropSnodeRsp(SNodeMsg *pRsp); +static int32_t mndGetSnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveSnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextSnode(SMnode *pMnode, void *pIter); int32_t mndInitSnode(SMnode *pMnode) { @@ -240,7 +240,7 @@ static int32_t mndSetCreateSnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, S return 0; } -static int32_t mndCreateSnode(SMnode *pMnode, SMndMsg *pReq, SDnodeObj *pDnode, SMCreateSnodeReq *pCreate) { +static int32_t mndCreateSnode(SMnode *pMnode, SNodeMsg *pReq, SDnodeObj *pDnode, SMCreateSnodeReq *pCreate) { int32_t code = -1; SSnodeObj snodeObj = {0}; @@ -267,8 +267,8 @@ CREATE_SNODE_OVER: return code; } -static int32_t mndProcessCreateSnodeReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessCreateSnodeReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SSnodeObj *pObj = NULL; SDnodeObj *pDnode = NULL; @@ -365,7 +365,7 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn return 0; } -static int32_t mndDropSnode(SMnode *pMnode, SMndMsg *pReq, SSnodeObj *pObj) { +static int32_t mndDropSnode(SMnode *pMnode, SNodeMsg *pReq, SSnodeObj *pObj) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_SNODE, &pReq->rpcMsg); @@ -385,8 +385,8 @@ DROP_SNODE_OVER: return code; } -static int32_t mndProcessDropSnodeReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SUserObj *pUser = NULL; SSnodeObj *pObj = NULL; @@ -433,18 +433,18 @@ DROP_SNODE_OVER: return code; } -static int32_t mndProcessCreateSnodeRsp(SMndMsg *pRsp) { +static int32_t mndProcessCreateSnodeRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessDropSnodeRsp(SMndMsg *pRsp) { +static int32_t mndProcessDropSnodeRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndGetSnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndGetSnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t cols = 0; @@ -483,8 +483,8 @@ static int32_t mndGetSnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pM return 0; } -static int32_t mndRetrieveSnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndRetrieveSnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; int32_t cols = 0; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index f24ba8c188..121720f48f 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -33,15 +33,15 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw); static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew); -static int32_t mndProcessMCreateStbReq(SMndMsg *pReq); -static int32_t mndProcessMAlterStbReq(SMndMsg *pReq); -static int32_t mndProcessMDropStbReq(SMndMsg *pReq); -static int32_t mndProcessVCreateStbRsp(SMndMsg *pRsp); -static int32_t mndProcessVAlterStbRsp(SMndMsg *pRsp); -static int32_t mndProcessVDropStbRsp(SMndMsg *pRsp); -static int32_t mndProcessTableMetaReq(SMndMsg *pReq); -static int32_t mndGetStbMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveStb(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessMCreateStbReq(SNodeMsg *pReq); +static int32_t mndProcessMAlterStbReq(SNodeMsg *pReq); +static int32_t mndProcessMDropStbReq(SNodeMsg *pReq); +static int32_t mndProcessVCreateStbRsp(SNodeMsg *pRsp); +static int32_t mndProcessVAlterStbRsp(SNodeMsg *pRsp); +static int32_t mndProcessVDropStbRsp(SNodeMsg *pRsp); +static int32_t mndProcessTableMetaReq(SNodeMsg *pReq); +static int32_t mndGetStbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextStb(SMnode *pMnode, void *pIter); int32_t mndInitStb(SMnode *pMnode) { @@ -490,7 +490,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndCreateStb(SMnode *pMnode, SMndMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) { +static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) { SStbObj stbObj = {0}; memcpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); memcpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN); @@ -551,8 +551,8 @@ CREATE_STB_OVER: return code; } -static int32_t mndProcessMCreateStbReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessMCreateStbReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SStbObj *pTopicStb = NULL; SStbObj *pStb = NULL; @@ -623,7 +623,7 @@ CREATE_STB_OVER: return code; } -static int32_t mndProcessVCreateStbRsp(SMndMsg *pRsp) { +static int32_t mndProcessVCreateStbRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } @@ -980,7 +980,7 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndAlterStb(SMnode *pMnode, SMndMsg *pReq, const SMAltertbReq *pAlter, SDbObj *pDb, SStbObj *pOld) { +static int32_t mndAlterStb(SMnode *pMnode, SNodeMsg *pReq, const SMAltertbReq *pAlter, SDbObj *pDb, SStbObj *pOld) { SStbObj stbObj = {0}; taosRLockLatch(&pOld->lock); memcpy(&stbObj, pOld, sizeof(SStbObj)); @@ -1043,8 +1043,8 @@ ALTER_STB_OVER: return code; } -static int32_t mndProcessMAlterStbReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessMAlterStbReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SDbObj *pDb = NULL; SStbObj *pStb = NULL; @@ -1096,7 +1096,7 @@ ALTER_STB_OVER: return code; } -static int32_t mndProcessVAlterStbRsp(SMndMsg *pRsp) { +static int32_t mndProcessVAlterStbRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } @@ -1160,7 +1160,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * return 0; } -static int32_t mndDropStb(SMnode *pMnode, SMndMsg *pReq, SDbObj *pDb, SStbObj *pStb) { +static int32_t mndDropStb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb, SStbObj *pStb) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_DROP_STB, &pReq->rpcMsg); if (pTrans == NULL) goto DROP_STB_OVER; @@ -1180,8 +1180,8 @@ DROP_STB_OVER: return code; } -static int32_t mndProcessMDropStbReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessMDropStbReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SUserObj *pUser = NULL; SDbObj *pDb = NULL; @@ -1237,7 +1237,7 @@ DROP_STB_OVER: return code; } -static int32_t mndProcessVDropStbRsp(SMndMsg *pRsp) { +static int32_t mndProcessVDropStbRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } @@ -1311,8 +1311,8 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char return code; } -static int32_t mndProcessTableMetaReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessTableMetaReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; STableInfoReq infoReq = {0}; STableMetaRsp metaRsp = {0}; @@ -1347,8 +1347,8 @@ static int32_t mndProcessTableMetaReq(SMndMsg *pReq) { } tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp); - pReq->pCont = pRsp; - pReq->contLen = rspLen; + pReq->pRsp = pRsp; + pReq->rspLen = rspLen; code = 0; mDebug("stb:%s.%s, meta is retrieved", infoReq.dbFName, infoReq.tbName); @@ -1436,8 +1436,8 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs return 0; } -static int32_t mndGetStbMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndGetStbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; if (mndGetNumOfStbs(pMnode, pShow->db, &pShow->numOfRows) != 0) { @@ -1499,8 +1499,8 @@ static void mndExtractTableName(char *tableId, char *name) { } } -static int32_t mndRetrieveStb(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SStbObj *pStb = NULL; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index cd05710f51..d7979ac5af 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -31,12 +31,12 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream); -static int32_t mndProcessCreateStreamReq(SMndMsg *pReq); -/*static int32_t mndProcessDropStreamReq(SMndMsg *pReq);*/ -/*static int32_t mndProcessDropStreamInRsp(SMndMsg *pRsp);*/ -static int32_t mndProcessStreamMetaReq(SMndMsg *pReq); -static int32_t mndGetStreamMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveStream(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessCreateStreamReq(SNodeMsg *pReq); +/*static int32_t mndProcessDropStreamReq(SNodeMsg *pReq);*/ +/*static int32_t mndProcessDropStreamInRsp(SNodeMsg *pRsp);*/ +static int32_t mndProcessStreamMetaReq(SNodeMsg *pReq); +static int32_t mndGetStreamMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); int32_t mndInitStream(SMnode *pMnode) { @@ -208,7 +208,7 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { return 0; } -static int32_t mndCreateStream(SMnode *pMnode, SMndMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) { +static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) { mDebug("stream:%s to create", pCreate->name); SStreamObj streamObj = {0}; tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN); @@ -247,8 +247,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SMndMsg *pReq, SCMCreateStreamReq return 0; } -static int32_t mndProcessCreateStreamReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessCreateStreamReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SStreamObj *pStream = NULL; SDbObj *pDb = NULL; @@ -339,8 +339,8 @@ static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfS return 0; } -static int32_t mndGetStreamMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndGetStreamMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; if (mndGetNumOfStreams(pMnode, pShow->db, &pShow->numOfRows) != 0) { @@ -383,8 +383,8 @@ static int32_t mndGetStreamMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *p return 0; } -static int32_t mndRetrieveStream(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SStreamObj *pStream = NULL; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 220900710c..d9fd2090e8 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -48,14 +48,14 @@ static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *); static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *); static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub); -static int32_t mndProcessSubscribeReq(SMndMsg *pMsg); -static int32_t mndProcessSubscribeRsp(SMndMsg *pMsg); -static int32_t mndProcessSubscribeInternalReq(SMndMsg *pMsg); -static int32_t mndProcessSubscribeInternalRsp(SMndMsg *pMsg); -static int32_t mndProcessMqTimerMsg(SMndMsg *pMsg); -static int32_t mndProcessGetSubEpReq(SMndMsg *pMsg); -static int32_t mndProcessDoRebalanceMsg(SMndMsg *pMsg); -static int32_t mndProcessResetOffsetReq(SMndMsg *pMsg); +static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg); +static int32_t mndProcessSubscribeRsp(SNodeMsg *pMsg); +static int32_t mndProcessSubscribeInternalReq(SNodeMsg *pMsg); +static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pMsg); +static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg); +static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg); +static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg); +static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg); static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, const SMqConsumerEp *pConsumerEp); @@ -211,8 +211,8 @@ static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMq } #if 0 -static int32_t mndProcessResetOffsetReq(SMndMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg) { + SMnode *pMnode = pMsg->pNode; uint8_t *str = pMsg->rpcMsg.pCont; SMqCMResetOffsetReq req; @@ -249,14 +249,14 @@ static int32_t mndProcessResetOffsetReq(SMndMsg *pMsg) { } #endif -static int32_t mndProcessGetSubEpReq(SMndMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) { + SMnode *pMnode = pMsg->pNode; SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont; SMqCMGetSubEpRsp rsp = {0}; int64_t consumerId = be64toh(pReq->consumerId); int32_t epoch = ntohl(pReq->epoch); - SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId); + SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pNode, consumerId); if (pConsumer == NULL) { terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; return -1; @@ -327,8 +327,8 @@ static int32_t mndProcessGetSubEpReq(SMndMsg *pMsg) { tEncodeSMqCMGetSubEpRsp(&abuf, &rsp); tDeleteSMqCMGetSubEpRsp(&rsp); mndReleaseConsumer(pMnode, pConsumer); - pMsg->pCont = buf; - pMsg->contLen = tlen; + pMsg->pRsp = buf; + pMsg->rspLen = tlen; return 0; } @@ -356,8 +356,8 @@ static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { return pRebSub; } -static int32_t mndProcessMqTimerMsg(SMndMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { + SMnode *pMnode = pMsg->pNode; SSdb *pSdb = pMnode->pSdb; SMqConsumerObj *pConsumer; void *pIter = NULL; @@ -428,8 +428,8 @@ static int32_t mndProcessMqTimerMsg(SMndMsg *pMsg) { return 0; } -static int32_t mndProcessDoRebalanceMsg(SMndMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { + SMnode *pMnode = pMsg->pNode; SMqDoRebalanceMsg *pReq = pMsg->rpcMsg.pCont; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg); void *pIter = NULL; @@ -994,8 +994,8 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) { sdbRelease(pSdb, pSub); } -static int32_t mndProcessSubscribeReq(SMndMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { + SMnode *pMnode = pMsg->pNode; char *msgStr = pMsg->rpcMsg.pCont; SCMSubscribeReq subscribe; tDeserializeSCMSubscribeReq(msgStr, &subscribe); @@ -1156,7 +1156,7 @@ static int32_t mndProcessSubscribeReq(SMndMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessSubscribeInternalRsp(SMndMsg *pRsp) { +static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } diff --git a/source/dnode/mnode/impl/src/mndTelem.c b/source/dnode/mnode/impl/src/mndTelem.c index 0298a2fce1..7993d7df9d 100644 --- a/source/dnode/mnode/impl/src/mndTelem.c +++ b/source/dnode/mnode/impl/src/mndTelem.c @@ -79,8 +79,8 @@ static char* mndBuildTelemetryReport(SMnode* pMnode) { return pCont; } -static int32_t mndProcessTelemTimer(SMndMsg* pReq) { - SMnode* pMnode = pReq->pMnode; +static int32_t mndProcessTelemTimer(SNodeMsg* pReq) { + SMnode* pMnode = pReq->pNode; STelemMgmt* pMgmt = &pMnode->telemMgmt; if (!pMgmt->enable) return 0; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 8982fa7061..83c83ad61c 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -31,12 +31,12 @@ static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic); static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic); static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic); -static int32_t mndProcessCreateTopicReq(SMndMsg *pReq); -static int32_t mndProcessDropTopicReq(SMndMsg *pReq); -static int32_t mndProcessDropTopicInRsp(SMndMsg *pRsp); -static int32_t mndProcessTopicMetaReq(SMndMsg *pReq); -static int32_t mndGetTopicMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveTopic(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq); +static int32_t mndProcessDropTopicReq(SNodeMsg *pReq); +static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp); +static int32_t mndProcessTopicMetaReq(SNodeMsg *pReq); +static int32_t mndGetTopicMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); int32_t mndInitTopic(SMnode *pMnode) { @@ -236,7 +236,7 @@ static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) { return 0; } -static int32_t mndCreateTopic(SMnode *pMnode, SMndMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) { +static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) { mDebug("topic:%s to create", pCreate->name); SMqTopicObj topicObj = {0}; tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); @@ -276,8 +276,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMndMsg *pReq, SCMCreateTopicReq * return 0; } -static int32_t mndProcessCreateTopicReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SMqTopicObj *pTopic = NULL; SDbObj *pDb = NULL; @@ -341,7 +341,7 @@ CREATE_TOPIC_OVER: return code; } -static int32_t mndDropTopic(SMnode *pMnode, SMndMsg *pReq, SMqTopicObj *pTopic) { +static int32_t mndDropTopic(SMnode *pMnode, SNodeMsg *pReq, SMqTopicObj *pTopic) { // TODO: cannot drop when subscribed STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg); if (pTrans == NULL) { @@ -368,8 +368,8 @@ static int32_t mndDropTopic(SMnode *pMnode, SMndMsg *pReq, SMqTopicObj *pTopic) return 0; } -static int32_t mndProcessDropTopicReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessDropTopicReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; SMDropTopicReq dropReq = {0}; if (tDeserializeSMDropTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) { @@ -403,7 +403,7 @@ static int32_t mndProcessDropTopicReq(SMndMsg *pReq) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessDropTopicInRsp(SMndMsg *pRsp) { +static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } @@ -435,8 +435,8 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo return 0; } -static int32_t mndGetTopicMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndGetTopicMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; if (mndGetNumOfTopics(pMnode, pShow->db, &pShow->numOfRows) != 0) { @@ -479,8 +479,8 @@ static int32_t mndGetTopicMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pM return 0; } -static int32_t mndRetrieveTopic(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SMqTopicObj *pTopic = NULL; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index c5e872f22d..14d08b36c8 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -55,11 +55,11 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans); static void mndTransExecute(SMnode *pMnode, STrans *pTrans); static void mndTransSendRpcRsp(STrans *pTrans); -static int32_t mndProcessTransReq(SMndMsg *pReq); -static int32_t mndProcessKillTransReq(SMndMsg *pReq); +static int32_t mndProcessTransReq(SNodeMsg *pReq); +static int32_t mndProcessKillTransReq(SNodeMsg *pReq); -static int32_t mndGetTransMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveTrans(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndGetTransMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter); int32_t mndInitTrans(SMnode *pMnode) { @@ -774,8 +774,8 @@ static void mndTransSendRpcRsp(STrans *pTrans) { } } -void mndTransProcessRsp(SMndMsg *pRsp) { - SMnode *pMnode = pRsp->pMnode; +void mndTransProcessRsp(SNodeMsg *pRsp) { + SMnode *pMnode = pRsp->pNode; int64_t signature = (int64_t)(pRsp->rpcMsg.ahandle); int32_t transId = (int32_t)(signature >> 32); int32_t action = (int32_t)((signature << 32) >> 32); @@ -1157,8 +1157,8 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { mndTransSendRpcRsp(pTrans); } -static int32_t mndProcessTransReq(SMndMsg *pReq) { - mndTransPullup(pReq->pMnode); +static int32_t mndProcessTransReq(SNodeMsg *pReq) { + mndTransPullup(pReq->pNode); return 0; } @@ -1199,8 +1199,8 @@ static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) { return 0; } -static int32_t mndProcessKillTransReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessKillTransReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; SKillTransReq killReq = {0}; int32_t code = -1; SUserObj *pUser = NULL; @@ -1257,8 +1257,8 @@ void mndTransPullup(SMnode *pMnode) { sdbWriteFile(pMnode->pSdb); } -static int32_t mndGetTransMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndGetTransMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t cols = 0; @@ -1320,8 +1320,8 @@ static int32_t mndGetTransMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pM return 0; } -static int32_t mndRetrieveTrans(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; STrans *pTrans = NULL; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 38af07d46b..b7d32c01d9 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -30,13 +30,13 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw); static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser); static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser); static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew); -static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate, SMndMsg *pReq); -static int32_t mndProcessCreateUserReq(SMndMsg *pReq); -static int32_t mndProcessAlterUserReq(SMndMsg *pReq); -static int32_t mndProcessDropUserReq(SMndMsg *pReq); -static int32_t mndProcessGetUserAuthReq(SMndMsg *pReq); -static int32_t mndGetUserMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveUsers(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate, SNodeMsg *pReq); +static int32_t mndProcessCreateUserReq(SNodeMsg *pReq); +static int32_t mndProcessAlterUserReq(SNodeMsg *pReq); +static int32_t mndProcessDropUserReq(SNodeMsg *pReq); +static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq); +static int32_t mndGetUserMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextUser(SMnode *pMnode, void *pIter); int32_t mndInitUser(SMnode *pMnode) { @@ -261,7 +261,7 @@ void mndReleaseUser(SMnode *pMnode, SUserObj *pUser) { sdbRelease(pSdb, pUser); } -static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate, SMndMsg *pReq) { +static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate, SNodeMsg *pReq) { SUserObj userObj = {0}; taosEncryptPass_c((uint8_t *)pCreate->pass, strlen(pCreate->pass), userObj.pass); tstrncpy(userObj.user, pCreate->user, TSDB_USER_LEN); @@ -295,8 +295,8 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate return 0; } -static int32_t mndProcessCreateUserReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessCreateUserReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SUserObj *pUser = NULL; SUserObj *pOperUser = NULL; @@ -349,7 +349,7 @@ CREATE_USER_OVER: return code; } -static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SMndMsg *pReq) { +static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SNodeMsg *pReq) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_ALTER_USER,&pReq->rpcMsg); if (pTrans == NULL) { mError("user:%s, failed to update since %s", pOld->user, terrstr()); @@ -397,8 +397,8 @@ static SHashObj *mndDupDbHash(SHashObj *pOld) { return pNew; } -static int32_t mndProcessAlterUserReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SUserObj *pUser = NULL; SUserObj *pOperUser = NULL; @@ -510,7 +510,7 @@ ALTER_USER_OVER: return code; } -static int32_t mndDropUser(SMnode *pMnode, SMndMsg *pReq, SUserObj *pUser) { +static int32_t mndDropUser(SMnode *pMnode, SNodeMsg *pReq, SUserObj *pUser) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_DROP_USER, &pReq->rpcMsg); if (pTrans == NULL) { mError("user:%s, failed to drop since %s", pUser->user, terrstr()); @@ -536,8 +536,8 @@ static int32_t mndDropUser(SMnode *pMnode, SMndMsg *pReq, SUserObj *pUser) { return 0; } -static int32_t mndProcessDropUserReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessDropUserReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SUserObj *pUser = NULL; SUserObj *pOperUser = NULL; @@ -585,8 +585,8 @@ DROP_USER_OVER: return code; } -static int32_t mndProcessGetUserAuthReq(SMndMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; int32_t code = -1; SUserObj *pUser = NULL; SGetUserAuthReq authReq = {0}; @@ -635,8 +635,8 @@ static int32_t mndProcessGetUserAuthReq(SMndMsg *pReq) { tSerializeSGetUserAuthRsp(pRsp, contLen, &authRsp); - pReq->pCont = pRsp; - pReq->contLen = contLen; + pReq->pRsp = pRsp; + pReq->rspLen = contLen; code = 0; GET_AUTH_OVER: @@ -647,8 +647,8 @@ GET_AUTH_OVER: return code; } -static int32_t mndGetUserMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndGetUserMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t cols = 0; @@ -693,8 +693,8 @@ static int32_t mndGetUserMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMe return 0; } -static int32_t mndRetrieveUsers(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SUserObj *pUser = NULL; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 53deff8967..6aee662675 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -29,17 +29,17 @@ static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew); -static int32_t mndProcessCreateVnodeRsp(SMndMsg *pRsp); -static int32_t mndProcessAlterVnodeRsp(SMndMsg *pRsp); -static int32_t mndProcessDropVnodeRsp(SMndMsg *pRsp); -static int32_t mndProcessSyncVnodeRsp(SMndMsg *pRsp); -static int32_t mndProcessCompactVnodeRsp(SMndMsg *pRsp); +static int32_t mndProcessCreateVnodeRsp(SNodeMsg *pRsp); +static int32_t mndProcessAlterVnodeRsp(SNodeMsg *pRsp); +static int32_t mndProcessDropVnodeRsp(SNodeMsg *pRsp); +static int32_t mndProcessSyncVnodeRsp(SNodeMsg *pRsp); +static int32_t mndProcessCompactVnodeRsp(SNodeMsg *pRsp); -static int32_t mndGetVgroupMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveVgroups(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndGetVgroupMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter); -static int32_t mndGetVnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveVnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndGetVnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter); int32_t mndInitVgroup(SMnode *pMnode) { @@ -452,24 +452,24 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) { return epset; } -static int32_t mndProcessCreateVnodeRsp(SMndMsg *pRsp) { +static int32_t mndProcessCreateVnodeRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessAlterVnodeRsp(SMndMsg *pRsp) { +static int32_t mndProcessAlterVnodeRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessDropVnodeRsp(SMndMsg *pRsp) { +static int32_t mndProcessDropVnodeRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessSyncVnodeRsp(SMndMsg *pRsp) { return 0; } +static int32_t mndProcessSyncVnodeRsp(SNodeMsg *pRsp) { return 0; } -static int32_t mndProcessCompactVnodeRsp(SMndMsg *pRsp) { return 0; } +static int32_t mndProcessCompactVnodeRsp(SNodeMsg *pRsp) { return 0; } static bool mndGetVgroupMaxReplicaFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { SVgObj *pVgroup = pObj; @@ -500,8 +500,8 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep return 0; } -static int32_t mndGetVgroupMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndGetVgroupMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; if (mndGetVgroupMaxReplica(pMnode, pShow->db, &pShow->replica, &pShow->numOfRows) != 0) { @@ -551,8 +551,8 @@ static int32_t mndGetVgroupMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *p return 0; } -static int32_t mndRetrieveVgroups(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SVgObj *pVgroup = NULL; @@ -624,8 +624,8 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) { return numOfVnodes; } -static int32_t mndGetVnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndGetVnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t cols = 0; @@ -664,8 +664,8 @@ static int32_t mndGetVnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pM return 0; } -static int32_t mndRetrieveVnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SVgObj *pVgroup = NULL; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 729c09bce5..ceb86d0a20 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -386,8 +386,8 @@ void mndDestroy(const char *path) { mDebug("mnode is destroyed"); } -int32_t mndProcessMsg(SMndMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; +int32_t mndProcessMsg(SNodeMsg *pMsg) { + SMnode *pMnode = pMsg->pNode; SRpcMsg *pRpc = &pMsg->rpcMsg; tmsg_t msgType = pMsg->rpcMsg.msgType; void *ahandle = pMsg->rpcMsg.ahandle;