From ca10ef4490f341fe3cbcad0de4d599cee6e5e4b0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 16 Jan 2022 21:43:41 -0800 Subject: [PATCH 1/3] fix invalid db err while process connect req --- source/dnode/mnode/impl/src/mndProfile.c | 70 +++++++++++++----------- 1 file changed, 38 insertions(+), 32 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 79e5d9eae5..b458403dbf 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -179,7 +179,12 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) { } static int32_t mndProcessConnectReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; + SMnode *pMnode = pReq->pMnode; + SUserObj *pUser = NULL; + SDbObj *pDb = NULL; + SConnObj *pConn = NULL; + int32_t code = -1; + SConnectReq *pConnReq = pReq->rpcMsg.pCont; pConnReq->pid = htonl(pConnReq->pid); pConnReq->startTime = htobe64(pConnReq->startTime); @@ -187,54 +192,61 @@ static int32_t mndProcessConnectReq(SMnodeMsg *pReq) { SRpcConnInfo info = {0}; if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) { mError("user:%s, failed to login while get connection info since %s", pReq->user, terrstr()); - return -1; + goto CONN_OVER; } char ip[30]; taosIp2String(info.clientIp, ip); + pUser = mndAcquireUser(pMnode, pReq->user); + if (pUser == NULL) { + mError("user:%s, failed to login while acquire user since %s", pReq->user, terrstr()); + goto CONN_OVER; + } + if (pConnReq->db[0]) { - snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pReq->acctId, TS_PATH_DELIMITER, pConnReq->db); - SDbObj *pDb = mndAcquireDb(pMnode, pReq->db); + snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, pConnReq->db); + pDb = mndAcquireDb(pMnode, pReq->db); if (pDb == NULL) { terrno = TSDB_CODE_MND_INVALID_DB; mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, pConnReq->db, terrstr()); - return -1; + goto CONN_OVER; } - mndReleaseDb(pMnode, pDb); } - SConnObj *pConn = mndCreateConn(pMnode, &info, pConnReq->pid, pConnReq->app, pConnReq->startTime); + pConn = mndCreateConn(pMnode, &info, pConnReq->pid, pConnReq->app, pConnReq->startTime); if (pConn == NULL) { mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr()); - return -1; + goto CONN_OVER; } SConnectRsp *pRsp = rpcMallocCont(sizeof(SConnectRsp)); if (pRsp == NULL) { - mndReleaseConn(pMnode, pConn); terrno = TSDB_CODE_OUT_OF_MEMORY; mError("user:%s, failed to login from %s while create rsp since %s", pReq->user, ip, terrstr()); - return -1; - } - - SUserObj *pUser = mndAcquireUser(pMnode, pReq->user); - if (pUser != NULL) { - pRsp->acctId = htonl(pUser->acctId); - pRsp->superUser = pUser->superUser; - mndReleaseUser(pMnode, pUser); + goto CONN_OVER; } + pRsp->acctId = htonl(pUser->acctId); + pRsp->superUser = pUser->superUser; pRsp->clusterId = htobe64(pMnode->clusterId); pRsp->connId = htonl(pConn->id); mndGetMnodeEpSet(pMnode, &pRsp->epSet); - mndReleaseConn(pMnode, pConn); pReq->contLen = sizeof(SConnectRsp); pReq->pCont = pRsp; mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pConnReq->app); - return 0; + + code = 0; + +CONN_OVER: + + mndReleaseUser(pMnode, pUser); + mndReleaseDb(pMnode, pDb); + mndReleaseConn(pMnode, pConn); + + return code; } static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) { @@ -258,33 +270,27 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) { } static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - char *batchReqStr = pReq->rpcMsg.pCont; + SMnode *pMnode = pReq->pMnode; + char *batchReqStr = pReq->rpcMsg.pCont; SClientHbBatchReq batchReq = {0}; tDeserializeSClientHbBatchReq(batchReqStr, &batchReq); SArray *pArray = batchReq.reqs; - int sz = taosArrayGetSize(pArray); + int sz = taosArrayGetSize(pArray); SClientHbBatchRsp batchRsp = {0}; batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp)); for (int i = 0; i < sz; i++) { - SClientHbReq* pHbReq = taosArrayGet(pArray, i); + SClientHbReq *pHbReq = taosArrayGet(pArray, i); if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) { - } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) { - SClientHbRsp rsp = { - .status = 0, - .connKey = pHbReq->connKey, - .bodyLen = 0, - .body = NULL - }; + SClientHbRsp rsp = {.status = 0, .connKey = pHbReq->connKey, .bodyLen = 0, .body = NULL}; taosArrayPush(batchRsp.rsps, &rsp); } } int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp); - void* buf = rpcMallocCont(tlen); - void* bufCopy = buf; + void *buf = rpcMallocCont(tlen); + void *bufCopy = buf; tSerializeSClientHbBatchRsp(&bufCopy, &batchRsp); pReq->contLen = tlen; pReq->pCont = buf; From a15551d6f9e6eff5d68a141242a5fd017569ff42 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 16 Jan 2022 23:38:50 -0800 Subject: [PATCH 2/3] minor changes --- include/common/tmsg.h | 2 +- source/dnode/mgmt/impl/src/dndVnodes.c | 3 + source/dnode/mnode/impl/src/mndStb.c | 100 ++++++++++++------------- 3 files changed, 54 insertions(+), 51 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 73bc748705..e6defca724 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1332,7 +1332,7 @@ typedef struct { typedef struct { SMsgHead head; char name[TSDB_TABLE_FNAME_LEN]; - int8_t ignoreNotExists; + int64_t suid; } SVDropTbReq; typedef struct { diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 156009fa68..61c6041dd6 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -422,6 +422,9 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { int32_t threadNum = pDnode->env.numOfCores; int32_t vnodesPerThread = numOfVnodes / threadNum + 1; +#if 1 + vnodesPerThread = 1; +#endif SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread)); for (int32_t t = 0; t < threadNum; ++t) { diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index ad8c16f826..fde9bc7865 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -31,16 +31,16 @@ static SSdbRaw *mndStbActionEncode(SStbObj *pStb); static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw); static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb); -static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb); -static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg); -static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg); -static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg); -static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg); -static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg); -static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg); -static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg); -static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew); +static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq); +static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq); +static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq); +static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp); +static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp); +static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp); +static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq); +static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextStb(SMnode *pMnode, void *pIter); int32_t mndInitStb(SMnode *pMnode) { @@ -52,13 +52,13 @@ int32_t mndInitStb(SMnode *pMnode) { .updateFp = (SdbUpdateFp)mndStbActionUpdate, .deleteFp = (SdbDeleteFp)mndStbActionDelete}; - mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcesSMCreateStbReq); - mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcesSMAlterStbReq); - mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcesSMDropStbReq); - mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessCreateStbInRsp); - mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessAlterStbInRsp); - mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessDropStbInRsp); - mndSetMsgHandle(pMnode, TDMT_MND_STB_META, mndProcessStbMetaMsg); + mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq); + mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq); + mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessMDropStbReq); + mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessVCreateStbRsp); + mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessVAlterStbRsp); + mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessVDropStbRsp); + mndSetMsgHandle(pMnode, TDMT_MND_STB_META, mndProcessStbMetaReq); mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_STB, mndGetStbMeta); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb); @@ -177,27 +177,27 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) { return 0; } -static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb) { - mTrace("stb:%s, perform update action, old row:%p new row:%p", pOldStb->name, pOldStb, pNewStb); - atomic_exchange_32(&pOldStb->updateTime, pNewStb->updateTime); - atomic_exchange_32(&pOldStb->version, pNewStb->version); +static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) { + mTrace("stb:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew); + atomic_exchange_32(&pOld->updateTime, pNew->updateTime); + atomic_exchange_32(&pOld->version, pNew->version); - taosWLockLatch(&pOldStb->lock); - pOldStb->numOfColumns = pNewStb->numOfColumns; - pOldStb->numOfTags = pNewStb->numOfTags; - int32_t totalCols = pNewStb->numOfTags + pNewStb->numOfColumns; + taosWLockLatch(&pOld->lock); + pOld->numOfColumns = pNew->numOfColumns; + pOld->numOfTags = pNew->numOfTags; + int32_t totalCols = pNew->numOfTags + pNew->numOfColumns; int32_t totalSize = totalCols * sizeof(SSchema); - if (pOldStb->numOfTags + pOldStb->numOfColumns < totalCols) { + if (pOld->numOfTags + pOld->numOfColumns < totalCols) { void *pSchema = malloc(totalSize); if (pSchema != NULL) { - free(pOldStb->pSchema); - pOldStb->pSchema = pSchema; + free(pOld->pSchema); + pOld->pSchema = pSchema; } } - memcpy(pOldStb->pSchema, pNewStb->pSchema, totalSize); - taosWUnLockLatch(&pOldStb->lock); + memcpy(pOld->pSchema, pNew->pSchema, totalSize); + taosWUnLockLatch(&pOld->lock); return 0; } @@ -215,7 +215,7 @@ void mndReleaseStb(SMnode *pMnode, SStbObj *pStb) { sdbRelease(pSdb, pStb); } -static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) { +static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) { SName name = {0}; tNameFromString(&name, stbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); @@ -225,17 +225,17 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) { return mndAcquireDb(pMnode, db); } -static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int *pContLen) { +static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { SVCreateTbReq req; void *buf; - int bsize; + int32_t bsize; SMsgHead *pMsgHead; req.ver = 0; SName name = {0}; - tNameFromString(&name, pStb->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE); + tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - req.name = (char*) tNameGetTableName(&name); + req.name = (char *)tNameGetTableName(&name); req.ttl = 0; req.keep = 0; req.type = TD_SUPER_TABLE; @@ -264,7 +264,7 @@ static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb return buf; } -static SVDropTbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) { +static SVDropTbReq *mndBuildDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) { int32_t contLen = sizeof(SVDropTbReq); SVDropTbReq *pDrop = calloc(1, contLen); @@ -356,14 +356,14 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; void *pIter = NULL; - int contLen; + int32_t contLen; while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; if (pVgroup->dbUid != pDb->uid) continue; - void *pMsg = mndBuildCreateStbMsg(pMnode, pVgroup, pStb, &contLen); + void *pMsg = mndBuildCreateStbReq(pMnode, pVgroup, pStb, &contLen); if (pMsg == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); @@ -398,7 +398,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj if (pIter == NULL) break; if (pVgroup->dbUid != pDb->uid) continue; - SVDropTbReq *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb); + SVDropTbReq *pMsg = mndBuildDropStbReq(pMnode, pVgroup, pStb); if (pMsg == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); @@ -494,8 +494,8 @@ CREATE_STB_OVER: return code; } -static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndProcessMCreateStbReq(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; SMCreateStbReq *pCreate = pMsg->rpcMsg.pCont; mDebug("stb:%s, start to create", pCreate->name); @@ -548,7 +548,7 @@ static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) { +static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pMsg) { mndTransProcessRsp(pMsg); return 0; } @@ -578,10 +578,10 @@ static int32_t mndCheckAlterStbMsg(SMAlterStbReq *pAlter) { return 0; } -static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pOldStb, SStbObj *pNewStb) { return 0; } +static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pOld, SStbObj *pNew) { return 0; } -static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndProcessMAlterStbReq(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; SMAlterStbReq *pAlter = pMsg->rpcMsg.pCont; mDebug("stb:%s, start to alter", pAlter->name); @@ -612,7 +612,7 @@ static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg) { +static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pMsg) { mndTransProcessRsp(pMsg); return 0; } @@ -694,8 +694,8 @@ DROP_STB_OVER: return 0; } -static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndProcessMDropStbReq(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; SMDropStbReq *pDrop = pMsg->rpcMsg.pCont; mDebug("stb:%s, start to drop", pDrop->name); @@ -724,12 +724,12 @@ static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) { +static int32_t mndProcessVDropStbRsp(SMnodeMsg *pMsg) { mndTransProcessRsp(pMsg); return 0; } -static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) { +static int32_t mndProcessStbMetaReq(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; STableInfoReq *pInfo = pMsg->rpcMsg.pCont; From 2158727db35dec11b2111f0de905386ec11ec980 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 16 Jan 2022 23:50:22 -0800 Subject: [PATCH 3/3] minor changes --- source/dnode/mgmt/impl/src/dndVnodes.c | 5 +- source/dnode/mnode/impl/src/mndStb.c | 173 +++++++++---------------- 2 files changed, 64 insertions(+), 114 deletions(-) diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 61c6041dd6..c174de9893 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -421,11 +421,12 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { pMgmt->totalVnodes = numOfVnodes; int32_t threadNum = pDnode->env.numOfCores; - int32_t vnodesPerThread = numOfVnodes / threadNum + 1; #if 1 - vnodesPerThread = 1; + threadNum = 1; #endif + int32_t vnodesPerThread = numOfVnodes / threadNum + 1; + SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread)); for (int32_t t = 0; t < threadNum; ++t) { threads[t].threadIndex = t; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index fde9bc7865..a6fd2a3c58 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -276,12 +276,12 @@ static SVDropTbReq *mndBuildDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj pDrop->head.contLen = htonl(contLen); pDrop->head.vgId = htonl(pVgroup->vgId); memcpy(pDrop->name, pStb->name, TSDB_TABLE_FNAME_LEN); - // pDrop->suid = htobe64(pStb->uid); + pDrop->suid = htobe64(pStb->uid); return pDrop; } -static int32_t mndCheckCreateStbMsg(SMCreateStbReq *pCreate) { +static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) { pCreate->numOfColumns = htonl(pCreate->numOfColumns); pCreate->numOfTags = htonl(pCreate->numOfTags); int32_t totalCols = pCreate->numOfColumns + pCreate->numOfTags; @@ -363,8 +363,8 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj if (pIter == NULL) break; if (pVgroup->dbUid != pDb->uid) continue; - void *pMsg = mndBuildCreateStbReq(pMnode, pVgroup, pStb, &contLen); - if (pMsg == NULL) { + void *pReq = mndBuildCreateStbReq(pMnode, pVgroup, pStb, &contLen); + if (pReq == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -373,11 +373,11 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgroup); - action.pCont = pMsg; + action.pCont = pReq; action.contLen = contLen; action.msgType = TDMT_VND_CREATE_STB; if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMsg); + free(pReq); sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); return -1; @@ -398,8 +398,8 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj if (pIter == NULL) break; if (pVgroup->dbUid != pDb->uid) continue; - SVDropTbReq *pMsg = mndBuildDropStbReq(pMnode, pVgroup, pStb); - if (pMsg == NULL) { + SVDropTbReq *pReq = mndBuildDropStbReq(pMnode, pVgroup, pStb); + if (pReq == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -408,11 +408,11 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgroup); - action.pCont = pMsg; + action.pCont = pReq; action.contLen = sizeof(SVDropTbReq); action.msgType = TDMT_VND_DROP_STB; if (mndTransAppendUndoAction(pTrans, &action) != 0) { - free(pMsg); + free(pReq); sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); return -1; @@ -423,7 +423,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SMCreateStbReq *pCreate, SDbObj *pDb) { +static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) { SStbObj stbObj = {0}; tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); tstrncpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN); @@ -449,43 +449,17 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SMCreateStbReq *pCr } int32_t code = 0; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); - if (pTrans == NULL) { - mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; - } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + if (pTrans == NULL) goto CREATE_STB_OVER; + mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name); - if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) { - mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); - goto CREATE_STB_OVER; - } - - if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, &stbObj) != 0) { - mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); - goto CREATE_STB_OVER; - } - - if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) { - mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); - goto CREATE_STB_OVER; - } - - if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto CREATE_STB_OVER; - } - - if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, &stbObj) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto CREATE_STB_OVER; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } + if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER; + if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER; + if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER; + if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER; + if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_STB_OVER; code = 0; @@ -494,13 +468,13 @@ CREATE_STB_OVER: return code; } -static int32_t mndProcessMCreateStbReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SMCreateStbReq *pCreate = pMsg->rpcMsg.pCont; +static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + SMCreateStbReq *pCreate = pReq->rpcMsg.pCont; mDebug("stb:%s, start to create", pCreate->name); - if (mndCheckCreateStbMsg(pCreate) != 0) { + if (mndCheckCreateStbReq(pCreate) != 0) { mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); return -1; } @@ -536,7 +510,7 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pMsg) { return -1; } - int32_t code = mndCreateStb(pMnode, pMsg, pCreate, pDb); + int32_t code = mndCreateStb(pMnode, pReq, pCreate, pDb); mndReleaseDb(pMnode, pDb); if (code != 0) { @@ -548,12 +522,12 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); +static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) { + mndTransProcessRsp(pRsp); return 0; } -static int32_t mndCheckAlterStbMsg(SMAlterStbReq *pAlter) { +static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) { SSchema *pSchema = &pAlter->schema; pSchema->colId = htonl(pSchema->colId); pSchema->bytes = htonl(pSchema->bytes); @@ -578,15 +552,15 @@ static int32_t mndCheckAlterStbMsg(SMAlterStbReq *pAlter) { return 0; } -static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pOld, SStbObj *pNew) { return 0; } +static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pReq, SStbObj *pOld, SStbObj *pNew) { return 0; } -static int32_t mndProcessMAlterStbReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SMAlterStbReq *pAlter = pMsg->rpcMsg.pCont; +static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + SMAlterStbReq *pAlter = pReq->rpcMsg.pCont; mDebug("stb:%s, start to alter", pAlter->name); - if (mndCheckAlterStbMsg(pAlter) != 0) { + if (mndCheckAlterStbReq(pAlter) != 0) { mError("stb:%s, failed to alter since %s", pAlter->name, terrstr()); return -1; } @@ -601,7 +575,7 @@ static int32_t mndProcessMAlterStbReq(SMnodeMsg *pMsg) { SStbObj stbObj = {0}; memcpy(&stbObj, pStb, sizeof(SStbObj)); - int32_t code = mndUpdateStb(pMnode, pMsg, pStb, &stbObj); + int32_t code = mndUpdateStb(pMnode, pReq, pStb, &stbObj); mndReleaseStb(pMnode, pStb); if (code != 0) { @@ -612,8 +586,8 @@ static int32_t mndProcessMAlterStbReq(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); +static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp) { + mndTransProcessRsp(pRsp); return 0; } @@ -648,44 +622,19 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SStbObj static int32_t mndSetDropStbUndoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { return 0; } -static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) { +static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pReq, SStbObj *pStb) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); - if (pTrans == NULL) { - mError("stb:%s, failed to drop since %s", pStb->name, terrstr()); - return -1; - } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + if (pTrans == NULL)goto DROP_STB_OVER; + mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name); - if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) { - mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); - goto DROP_STB_OVER; - } - - if (mndSetDropStbUndoLogs(pMnode, pTrans, pStb) != 0) { - mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); - goto DROP_STB_OVER; - } - - if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) { - mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); - goto DROP_STB_OVER; - } - - if (mndSetDropStbRedoActions(pMnode, pTrans, pStb) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto DROP_STB_OVER; - } - - if (mndSetDropStbUndoActions(pMnode, pTrans, pStb) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto DROP_STB_OVER; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - goto DROP_STB_OVER; - } + if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER; + if (mndSetDropStbUndoLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER; + if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER; + if (mndSetDropStbRedoActions(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER; + if (mndSetDropStbUndoActions(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_STB_OVER; code = 0; @@ -694,9 +643,9 @@ DROP_STB_OVER: return 0; } -static int32_t mndProcessMDropStbReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SMDropStbReq *pDrop = pMsg->rpcMsg.pCont; +static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + SMDropStbReq *pDrop = pReq->rpcMsg.pCont; mDebug("stb:%s, start to drop", pDrop->name); @@ -712,7 +661,7 @@ static int32_t mndProcessMDropStbReq(SMnodeMsg *pMsg) { } } - int32_t code = mndDropStb(pMnode, pMsg, pStb); + int32_t code = mndDropStb(pMnode, pReq, pStb); mndReleaseStb(pMnode, pStb); if (code != 0) { @@ -724,14 +673,14 @@ static int32_t mndProcessMDropStbReq(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessVDropStbRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); +static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp) { + mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessStbMetaReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - STableInfoReq *pInfo = pMsg->rpcMsg.pCont; +static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + STableInfoReq *pInfo = pReq->rpcMsg.pCont; mDebug("stb:%s, start to retrieve meta", pInfo->tableFname); @@ -786,8 +735,8 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pMsg) { mndReleaseDb(pMnode, pDb); mndReleaseStb(pMnode, pStb); - pMsg->pCont = pMeta; - pMsg->contLen = contLen; + pReq->pCont = pMeta; + pReq->contLen = contLen; mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pStb->numOfColumns, pStb->numOfTags); return 0; @@ -820,8 +769,8 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs return 0; } -static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; if (mndGetNumOfStbs(pMnode, pShow->db, &pShow->numOfRows) != 0) { @@ -883,8 +832,8 @@ static void mndExtractTableName(char *tableId, char *name) { } } -static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SStbObj *pStb = NULL;