diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 0e7d486eab..a7d1522d12 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -68,6 +68,7 @@ typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, cha typedef bool (*RpcRfp)(int32_t code); typedef struct SRpcInit { + char localFqdn[TSDB_FQDN_LEN]; uint16_t localPort; // local port char * label; // for debug purpose int numOfThreads; // number of threads to handle connections diff --git a/include/os/osSocket.h b/include/os/osSocket.h index 62c3771669..213a6930ee 100644 --- a/include/os/osSocket.h +++ b/include/os/osSocket.h @@ -161,6 +161,7 @@ int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec); TdSocketPtr taosOpenUdpSocket(uint32_t localIp, uint16_t localPort); TdSocketPtr taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp); +bool taosValidIpAndPort(uint32_t ip, uint16_t port); TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port); int32_t taosKeepTcpAlive(TdSocketPtr pSocket); TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr, int *addrLen); diff --git a/source/dnode/mgmt/implement/src/dmTransport.c b/source/dnode/mgmt/implement/src/dmTransport.c index 446894556e..114d7b6dfc 100644 --- a/source/dnode/mgmt/implement/src/dmTransport.c +++ b/source/dnode/mgmt/implement/src/dmTransport.c @@ -16,8 +16,8 @@ #define _DEFAULT_SOURCE #include "dmImp.h" -#define INTERNAL_USER "_dnd" -#define INTERNAL_CKEY "_key" +#define INTERNAL_USER "_dnd" +#define INTERNAL_CKEY "_key" #define INTERNAL_SECRET "_pwd" static void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { @@ -130,10 +130,10 @@ _OVER: } static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SDnodeTrans *pTrans = &pDnode->trans; + SDnodeTrans * pTrans = &pDnode->trans; tmsg_t msgType = pMsg->msgType; bool isReq = msgType & 1u; - SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)]; + SMsgHandle * pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)]; SMgmtWrapper *pWrapper = pHandle->pNdWrapper; if (msgType == TDMT_DND_SERVER_STATUS) { @@ -517,7 +517,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s SAuthReq authReq = {0}; tstrncpy(authReq.user, user, TSDB_USER_LEN); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); - void *pReq = rpcMallocCont(contLen); + void * pReq = rpcMallocCont(contLen); tSerializeSAuthReq(pReq, contLen, &authReq); SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; @@ -547,6 +547,8 @@ static int32_t dmInitServer(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; SRpcInit rpcInit = {0}; + + strncpy(rpcInit.localFqdn, pDnode->data.localFqdn, strlen(pDnode->data.localFqdn)); rpcInit.localPort = pDnode->data.serverPort; rpcInit.label = "DND"; rpcInit.numOfThreads = tsNumOfRpcThreads; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index bea0bfbe87..89638524ae 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -258,6 +258,7 @@ typedef struct { int32_t authVersion; SHashObj* readDbs; SHashObj* writeDbs; + SRWLatch lock; } SUserObj; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndSma.h b/source/dnode/mnode/impl/inc/mndSma.h index 91c6e24e28..4a80f619d3 100644 --- a/source/dnode/mnode/impl/inc/mndSma.h +++ b/source/dnode/mnode/impl/inc/mndSma.h @@ -26,7 +26,6 @@ int32_t mndInitSma(SMnode *pMnode); void mndCleanupSma(SMnode *pMnode); SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName); void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma); -int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index f42829eddf..8aecc22454 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -33,6 +33,7 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId); void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); +void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index dfad99bdfb..af7efb5543 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -44,16 +44,17 @@ static int32_t mndProcessCompactDbReq(SNodeMsg *pReq); static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity); static void mndCancelGetNextDb(SMnode *pMnode, void *pIter); static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq); -static int32_t mndProcessGetIndexReq(SNodeMsg *pReq); int32_t mndInitDb(SMnode *pMnode) { - SSdbTable table = {.sdbType = SDB_DB, - .keyType = SDB_KEY_BINARY, - .encodeFp = (SdbEncodeFp)mndDbActionEncode, - .decodeFp = (SdbDecodeFp)mndDbActionDecode, - .insertFp = (SdbInsertFp)mndDbActionInsert, - .updateFp = (SdbUpdateFp)mndDbActionUpdate, - .deleteFp = (SdbDeleteFp)mndDbActionDelete}; + SSdbTable table = { + .sdbType = SDB_DB, + .keyType = SDB_KEY_BINARY, + .encodeFp = (SdbEncodeFp)mndDbActionEncode, + .decodeFp = (SdbDecodeFp)mndDbActionDecode, + .insertFp = (SdbInsertFp)mndDbActionInsert, + .updateFp = (SdbUpdateFp)mndDbActionUpdate, + .deleteFp = (SdbDeleteFp)mndDbActionDelete, + }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DB, mndProcessCreateDbReq); mndSetMsgHandle(pMnode, TDMT_MND_ALTER_DB, mndProcessAlterDbReq); @@ -61,7 +62,6 @@ int32_t mndInitDb(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_USE_DB, mndProcessUseDbReq); mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_DB, mndProcessCompactDbReq); mndSetMsgHandle(pMnode, TDMT_MND_GET_DB_CFG, mndProcessGetDbCfgReq); - mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetIndexReq); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DB, mndRetrieveDbs); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DB, mndCancelGetNextDb); @@ -194,6 +194,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) { } SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER) + taosInitRWLatch(&pDb->lock); terrno = 0; @@ -222,17 +223,29 @@ static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) { static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) { mTrace("db:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew); taosWLockLatch(&pOld->lock); - SArray *pOldRetensions = pOld->cfg.pRetensions; pOld->updateTime = pNew->updateTime; pOld->cfgVersion = pNew->cfgVersion; pOld->vgVersion = pNew->vgVersion; - memcpy(&pOld->cfg, &pNew->cfg, sizeof(SDbCfg)); - pNew->cfg.pRetensions = pOldRetensions; + pOld->cfg.buffer = pNew->cfg.buffer; + pOld->cfg.pages = pNew->cfg.pages; + pOld->cfg.pageSize = pNew->cfg.pageSize; + pOld->cfg.daysPerFile = pNew->cfg.daysPerFile; + pOld->cfg.daysToKeep0 = pNew->cfg.daysToKeep0; + pOld->cfg.daysToKeep1 = pNew->cfg.daysToKeep1; + pOld->cfg.daysToKeep2 = pNew->cfg.daysToKeep2; + pOld->cfg.fsyncPeriod = pNew->cfg.fsyncPeriod; + pOld->cfg.walLevel = pNew->cfg.walLevel; + pOld->cfg.strict = pNew->cfg.strict; + pOld->cfg.cacheLastRow = pNew->cfg.cacheLastRow; + pOld->cfg.replications = pNew->cfg.replications; taosWUnLockLatch(&pOld->lock); return 0; } -static int32_t mndGetGlobalVgroupVersion(SMnode *pMnode) { return sdbGetTableVer(pMnode->pSdb, SDB_VGROUP); } +static inline int32_t mndGetGlobalVgroupVersion(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + return sdbGetTableVer(pSdb, SDB_VGROUP); +} SDbObj *mndAcquireDb(SMnode *pMnode, const char *db) { SSdb *pSdb = pMnode->pSdb; @@ -638,69 +651,7 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p return 0; } -void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) { - SAlterVnodeReq alterReq = {0}; - alterReq.vgVersion = pVgroup->version; - alterReq.buffer = pDb->cfg.buffer; - alterReq.pages = pDb->cfg.pages; - alterReq.pageSize = pDb->cfg.pageSize; - alterReq.daysPerFile = pDb->cfg.daysPerFile; - alterReq.daysToKeep0 = pDb->cfg.daysToKeep0; - alterReq.daysToKeep1 = pDb->cfg.daysToKeep1; - alterReq.daysToKeep2 = pDb->cfg.daysToKeep2; - alterReq.fsyncPeriod = pDb->cfg.fsyncPeriod; - alterReq.walLevel = pDb->cfg.walLevel; - alterReq.strict = pDb->cfg.strict; - alterReq.cacheLastRow = pDb->cfg.cacheLastRow; - alterReq.replica = pVgroup->replica; - alterReq.selfIndex = -1; - - for (int32_t v = 0; v < pVgroup->replica; ++v) { - SReplica *pReplica = &alterReq.replicas[v]; - SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; - SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); - if (pVgidDnode == NULL) { - return NULL; - } - - pReplica->id = pVgidDnode->id; - pReplica->port = pVgidDnode->port; - memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN); - mndReleaseDnode(pMnode, pVgidDnode); - - if (pDnode->id == pVgid->dnodeId) { - alterReq.selfIndex = v; - } - } - - if (alterReq.selfIndex == -1) { - terrno = TSDB_CODE_MND_APP_ERROR; - return NULL; - } - - int32_t contLen = tSerializeSAlterVnodeReq(NULL, 0, &alterReq); - if (contLen < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - contLen += +sizeof(SMsgHead); - - void *pReq = taosMemoryMalloc(contLen); - if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - SMsgHead *pHead = pReq; - pHead->contLen = htonl(contLen); - pHead->vgId = htonl(pVgroup->vgId); - - tSerializeSAlterVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq); - *pContLen = contLen; - return pReq; -} - -static int32_t mndBuilAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { +static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { STransAction action = {0}; SVnodeGid *pVgid = pVgroup->vnodeGid + vn; @@ -736,7 +687,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * if (pIter == NULL) break; if (pVgroup->dbUid == pNew->uid) { - if (mndBuilAlterVgroupAction(pMnode, pTrans, pNew, pVgroup) != 0) { + if (mndBuildAlterVgroupAction(pMnode, pTrans, pNew, pVgroup) != 0) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); return -1; @@ -752,19 +703,19 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * static int32_t mndAlterDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_DB, &pReq->rpcMsg); - if (pTrans == NULL) goto UPDATE_DB_OVER; + if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to alter db:%s", pTrans->id, pOld->name); mndTransSetDbInfo(pTrans, pOld); - if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER; - if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER; - if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER; - if (mndTransPrepare(pMnode, pTrans) != 0) goto UPDATE_DB_OVER; + if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; + if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; + if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; -UPDATE_DB_OVER: +_OVER: mndTransDrop(pTrans); return code; } @@ -778,7 +729,7 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) { if (tDeserializeSAlterDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; - goto ALTER_DB_OVER; + goto _OVER; } mDebug("db:%s, start to alter", alterReq.db); @@ -786,24 +737,26 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) { pDb = mndAcquireDb(pMnode, alterReq.db); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto ALTER_DB_OVER; + goto _OVER; } pUser = mndAcquireUser(pMnode, pReq->user); if (pUser == NULL) { - goto ALTER_DB_OVER; + goto _OVER; } if (mndCheckAlterDropCompactDbAuth(pUser, pDb) != 0) { - goto ALTER_DB_OVER; + goto _OVER; } SDbObj dbObj = {0}; memcpy(&dbObj, pDb, sizeof(SDbObj)); + dbObj.cfg.numOfRetensions = 0; + dbObj.cfg.pRetensions = NULL; code = mndSetDbCfgFromAlterDbReq(&dbObj, &alterReq); if (code != 0) { - goto ALTER_DB_OVER; + goto _OVER; } dbObj.cfgVersion++; @@ -811,7 +764,7 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) { code = mndAlterDb(pMnode, pReq, pDb, &dbObj); if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; -ALTER_DB_OVER: +_OVER: if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("db:%s, failed to alter since %s", alterReq.db, terrstr()); } @@ -831,13 +784,13 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) { if (tDeserializeSDbCfgReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &cfgReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; - goto GET_DB_CFG_OVER; + goto _OVER; } pDb = mndAcquireDb(pMnode, cfgReq.db); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto GET_DB_CFG_OVER; + goto _OVER; } cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups; @@ -866,7 +819,7 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) { if (pRsp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; code = -1; - goto GET_DB_CFG_OVER; + goto _OVER; } tSerializeSDbCfgRsp(pRsp, contLen, &cfgRsp); @@ -876,9 +829,9 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) { code = 0; -GET_DB_CFG_OVER: +_OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0) { mError("db:%s, failed to get cfg since %s", cfgReq.db, terrstr()); } @@ -1097,7 +1050,8 @@ _OVER: return code; } -void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) { +static int32_t mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode) { + int32_t numOfTables = 0; int32_t vindex = 0; SSdb *pSdb = pMnode->pSdb; @@ -1108,8 +1062,7 @@ void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) { if (pIter == NULL) break; if (pVgroup->dbUid == pDb->uid) { - *num += pVgroup->numOfTables / TSDB_TABLE_NUM_UNIT; - + numOfTables += pVgroup->numOfTables / TSDB_TABLE_NUM_UNIT; vindex++; } @@ -1117,6 +1070,7 @@ void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) { } sdbCancelFetch(pSdb, pIter); + return numOfTables; } static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { @@ -1170,8 +1124,7 @@ int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUs return -1; } - int32_t numOfTable = 0; - mndGetDBTableNum(pDb, pMnode, &numOfTable); + int32_t numOfTable = mndGetDBTableNum(pDb, pMnode); if (pReq == NULL || pReq->vgVersion < pDb->vgVersion || pReq->dbId != pDb->uid || numOfTable != pReq->numOfTable) { mndBuildDBVgroupInfo(pDb, pMnode, pRsp->pVgroupInfos); @@ -1195,7 +1148,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { if (tDeserializeSUseDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &usedbReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; - goto USE_DB_OVER; + goto _OVER; } char *p = strchr(usedbReq.db, '.'); @@ -1206,12 +1159,11 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo)); if (usedbRsp.pVgroupInfos == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - goto USE_DB_OVER; + goto _OVER; } mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos); usedbRsp.vgVersion = vgVersion++; - } else { usedbRsp.vgVersion = usedbReq.vgVersion; } @@ -1232,15 +1184,15 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { } else { pUser = mndAcquireUser(pMnode, pReq->user); if (pUser == NULL) { - goto USE_DB_OVER; + goto _OVER; } if (mndCheckUseDbAuth(pUser, pDb) != 0) { - goto USE_DB_OVER; + goto _OVER; } if (mndExtractDbInfo(pMnode, pDb, &usedbRsp, &usedbReq) < 0) { - goto USE_DB_OVER; + goto _OVER; } code = 0; @@ -1252,7 +1204,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { if (pRsp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; code = -1; - goto USE_DB_OVER; + goto _OVER; } tSerializeSUseDbRsp(pRsp, contLen, &usedbRsp); @@ -1260,7 +1212,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { pReq->pRsp = pRsp; pReq->rspLen = contLen; -USE_DB_OVER: +_OVER: if (code != 0) { mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr()); } @@ -1298,8 +1250,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, continue; } - int32_t numOfTable = 0; - mndGetDBTableNum(pDb, pMnode, &numOfTable); + int32_t numOfTable = mndGetDBTableNum(pDb, pMnode); if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable) { mDebug("db:%s, version & numOfTable not changed", pDbVgVersion->dbFName); @@ -1514,9 +1465,6 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in pColInfo = taosArrayGet(pBlock->pDataBlock, cols); colDataAppend(pColInfo, rows, (const char *)b, false); } - - // pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); - // *(int8_t *)pWrite = pDb->cfg.update; } static void setInformationSchemaDbCfg(SDbObj *pDbObj) { @@ -1544,7 +1492,6 @@ static void setPerfSchemaDbCfg(SDbObj *pDbObj) { static bool mndGetTablesOfDbFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { SVgObj *pVgroup = pObj; int32_t *numOfTables = p1; - *numOfTables += pVgroup->numOfTables; return true; } @@ -1594,49 +1541,3 @@ static void mndCancelGetNextDb(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } - -static int32_t mndProcessGetIndexReq(SNodeMsg *pReq) { - SUserIndexReq indexReq = {0}; - SMnode *pMnode = pReq->pNode; - int32_t code = -1; - SUserIndexRsp rsp = {0}; - bool exist = false; - - if (tDeserializeSUserIndexReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &indexReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - goto _OVER; - } - - code = mndProcessGetSmaReq(pMnode, &indexReq, &rsp, &exist); - if (code) { - goto _OVER; - } - - if (!exist) { - // TODO GET INDEX FROM FULLTEXT - code = -1; - terrno = TSDB_CODE_MND_DB_INDEX_NOT_EXIST; - } else { - int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp); - void *pRsp = rpcMallocCont(contLen); - if (pRsp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = -1; - goto _OVER; - } - - tSerializeSUserIndexRsp(pRsp, contLen, &rsp); - - pReq->pRsp = pRsp; - pReq->rspLen = contLen; - - code = 0; - } - -_OVER: - if (code != 0) { - mError("failed to get index %s since %s", indexReq.indexFName, terrstr()); - } - - return code; -} diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 0e3e8b68cc..8619df978b 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -40,6 +40,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq); static int32_t mndProcessMDropSmaReq(SNodeMsg *pReq); static int32_t mndProcessVCreateSmaRsp(SNodeMsg *pRsp); static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp); +static int32_t mndProcessGetSmaReq(SNodeMsg *pReq); static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextSma(SMnode *pMnode, void *pIter); @@ -56,6 +57,7 @@ int32_t mndInitSma(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessMDropSmaReq); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndProcessVCreateSmaRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndProcessVDropSmaRsp); + mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveSma); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelGetNextSma); @@ -686,7 +688,7 @@ _OVER: return code; } -int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) { +static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) { int32_t code = -1; SSmaObj *pSma = NULL; @@ -715,6 +717,51 @@ int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexR } mndReleaseSma(pMnode, pSma); + return code; +} + +static int32_t mndProcessGetSmaReq(SNodeMsg *pReq) { + SUserIndexReq indexReq = {0}; + SMnode *pMnode = pReq->pNode; + int32_t code = -1; + SUserIndexRsp rsp = {0}; + bool exist = false; + + if (tDeserializeSUserIndexReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &indexReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + code = mndGetSma(pMnode, &indexReq, &rsp, &exist); + if (code) { + goto _OVER; + } + + if (!exist) { + // TODO GET INDEX FROM FULLTEXT + code = -1; + terrno = TSDB_CODE_MND_DB_INDEX_NOT_EXIST; + } else { + int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp); + void *pRsp = rpcMallocCont(contLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = -1; + goto _OVER; + } + + tSerializeSUserIndexRsp(pRsp, contLen, &rsp); + + pReq->pRsp = pRsp; + pReq->rspLen = contLen; + + code = 0; + } + +_OVER: + if (code != 0) { + mError("failed to get index %s since %s", indexReq.indexFName, terrstr()); + } return code; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index f3438bd0dc..43e2735984 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1050,7 +1050,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions); - if (code != 0) { + if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("failed to execute redoActions since %s", terrstr()); } return code; @@ -1058,7 +1058,7 @@ static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions); - if (code != 0) { + if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("failed to execute undoActions since %s", terrstr()); } return code; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index d2a9151167..8b5fa06faf 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -186,6 +186,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { } SDB_GET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER) + taosInitRWLatch(&pUser->lock); terrno = 0; @@ -228,11 +229,12 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) { static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) { mTrace("user:%s, perform update action, old row:%p new row:%p", pOld->user, pOld, pNew); - memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN); + taosWLockLatch(&pOld->lock); pOld->updateTime = pNew->updateTime; - + memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN); TSWAP(pOld->readDbs, pNew->readDbs); TSWAP(pOld->writeDbs, pNew->writeDbs); + taosWUnLockLatch(&pOld->lock); return 0; } @@ -426,8 +428,12 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) { } memcpy(&newUser, pUser, sizeof(SUserObj)); + + taosRLockLatch(&pUser->lock); newUser.readDbs = mndDupDbHash(pUser->readDbs); newUser.writeDbs = mndDupDbHash(pUser->writeDbs); + taosRUnLockLatch(&pUser->lock); + if (newUser.readDbs == NULL || newUser.writeDbs == NULL) { goto _OVER; } @@ -586,8 +592,10 @@ static int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUser memcpy(pRsp->user, pUser->user, TSDB_USER_LEN); pRsp->superAuth = pUser->superUser; pRsp->version = pUser->authVersion; + taosRLockLatch(&pUser->lock); pRsp->readDbs = mndDupDbHash(pUser->readDbs); pRsp->writeDbs = mndDupDbHash(pUser->writeDbs); + taosRUnLockLatch(&pUser->lock); pRsp->createdDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (NULL == pRsp->createdDbs) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 31e4a8ea7d..d1e4be1161 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -21,8 +21,8 @@ #include "mndShow.h" #include "mndTrans.h" -#define TSDB_VGROUP_VER_NUMBER 1 -#define TSDB_VGROUP_RESERVE_SIZE 64 +#define VGROUP_VER_NUMBER 1 +#define VGROUP_RESERVE_SIZE 64 static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw); static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); @@ -34,19 +34,21 @@ static int32_t mndProcessAlterVnodeRsp(SNodeMsg *pRsp); static int32_t mndProcessDropVnodeRsp(SNodeMsg *pRsp); static int32_t mndProcessCompactVnodeRsp(SNodeMsg *pRsp); -static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows); +static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter); -static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows); +static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter); int32_t mndInitVgroup(SMnode *pMnode) { - SSdbTable table = {.sdbType = SDB_VGROUP, - .keyType = SDB_KEY_INT32, - .encodeFp = (SdbEncodeFp)mndVgroupActionEncode, - .decodeFp = (SdbDecodeFp)mndVgroupActionDecode, - .insertFp = (SdbInsertFp)mndVgroupActionInsert, - .updateFp = (SdbUpdateFp)mndVgroupActionUpdate, - .deleteFp = (SdbDeleteFp)mndVgroupActionDelete}; + SSdbTable table = { + .sdbType = SDB_VGROUP, + .keyType = SDB_KEY_INT32, + .encodeFp = (SdbEncodeFp)mndVgroupActionEncode, + .decodeFp = (SdbDecodeFp)mndVgroupActionDecode, + .insertFp = (SdbInsertFp)mndVgroupActionInsert, + .updateFp = (SdbUpdateFp)mndVgroupActionUpdate, + .deleteFp = (SdbDeleteFp)mndVgroupActionDelete, + }; mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndProcessCreateVnodeRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_VNODE_RSP, mndProcessAlterVnodeRsp); @@ -66,29 +68,29 @@ void mndCleanupVgroup(SMnode *pMnode) {} SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) { terrno = TSDB_CODE_OUT_OF_MEMORY; - SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, TSDB_VGROUP_VER_NUMBER, sizeof(SVgObj) + TSDB_VGROUP_RESERVE_SIZE); - if (pRaw == NULL) goto VG_ENCODE_OVER; + SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, VGROUP_VER_NUMBER, sizeof(SVgObj) + VGROUP_RESERVE_SIZE); + if (pRaw == NULL) goto _OVER; int32_t dataPos = 0; - SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, VG_ENCODE_OVER) - SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, VG_ENCODE_OVER) - SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, VG_ENCODE_OVER) - SDB_SET_INT32(pRaw, dataPos, pVgroup->version, VG_ENCODE_OVER) - SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, VG_ENCODE_OVER) - SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, VG_ENCODE_OVER) - SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, VG_ENCODE_OVER) - SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, VG_ENCODE_OVER) - SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, VG_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, _OVER) + SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, _OVER) + SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, _OVER) + SDB_SET_INT32(pRaw, dataPos, pVgroup->version, _OVER) + SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, _OVER) + SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER) + SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, _OVER) + SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, _OVER) for (int8_t i = 0; i < pVgroup->replica; ++i) { SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; - SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, VG_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER) } - SDB_SET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE, VG_ENCODE_OVER) - SDB_SET_DATALEN(pRaw, dataPos, VG_ENCODE_OVER) + SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER) + SDB_SET_DATALEN(pRaw, dataPos, _OVER) terrno = 0; -VG_ENCODE_OVER: +_OVER: if (terrno != 0) { mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr()); sdbFreeRaw(pRaw); @@ -103,41 +105,41 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto VG_DECODE_OVER; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; - if (sver != TSDB_VGROUP_VER_NUMBER) { + if (sver != VGROUP_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - goto VG_DECODE_OVER; + goto _OVER; } SSdbRow *pRow = sdbAllocRow(sizeof(SVgObj)); - if (pRow == NULL) goto VG_DECODE_OVER; + if (pRow == NULL) goto _OVER; SVgObj *pVgroup = sdbGetRowObj(pRow); - if (pVgroup == NULL) goto VG_DECODE_OVER; + if (pVgroup == NULL) goto _OVER; int32_t dataPos = 0; - SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, VG_DECODE_OVER) - SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, VG_DECODE_OVER) - SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, VG_DECODE_OVER) - SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, VG_DECODE_OVER) - SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, VG_DECODE_OVER) - SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, VG_DECODE_OVER) - SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, VG_DECODE_OVER) - SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, VG_DECODE_OVER) - SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, VG_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, _OVER) + SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, _OVER) + SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, _OVER) for (int8_t i = 0; i < pVgroup->replica; ++i) { SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; - SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, VG_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, _OVER) if (pVgroup->replica == 1) { pVgid->role = TAOS_SYNC_STATE_LEADER; } } - SDB_GET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE, VG_DECODE_OVER) + SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER) terrno = 0; -VG_DECODE_OVER: +_OVER: if (terrno != 0) { mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup->vgId, pRaw, terrstr()); taosMemoryFreeClear(pRow); @@ -254,6 +256,68 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg return pReq; } +void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) { + SAlterVnodeReq alterReq = {0}; + alterReq.vgVersion = pVgroup->version; + alterReq.buffer = pDb->cfg.buffer; + alterReq.pages = pDb->cfg.pages; + alterReq.pageSize = pDb->cfg.pageSize; + alterReq.daysPerFile = pDb->cfg.daysPerFile; + alterReq.daysToKeep0 = pDb->cfg.daysToKeep0; + alterReq.daysToKeep1 = pDb->cfg.daysToKeep1; + alterReq.daysToKeep2 = pDb->cfg.daysToKeep2; + alterReq.fsyncPeriod = pDb->cfg.fsyncPeriod; + alterReq.walLevel = pDb->cfg.walLevel; + alterReq.strict = pDb->cfg.strict; + alterReq.cacheLastRow = pDb->cfg.cacheLastRow; + alterReq.replica = pVgroup->replica; + alterReq.selfIndex = -1; + + for (int32_t v = 0; v < pVgroup->replica; ++v) { + SReplica *pReplica = &alterReq.replicas[v]; + SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; + SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pVgidDnode == NULL) { + return NULL; + } + + pReplica->id = pVgidDnode->id; + pReplica->port = pVgidDnode->port; + memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN); + mndReleaseDnode(pMnode, pVgidDnode); + + if (pDnode->id == pVgid->dnodeId) { + alterReq.selfIndex = v; + } + } + + if (alterReq.selfIndex == -1) { + terrno = TSDB_CODE_MND_APP_ERROR; + return NULL; + } + + int32_t contLen = tSerializeSAlterVnodeReq(NULL, 0, &alterReq); + if (contLen < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + contLen += +sizeof(SMsgHead); + + void *pReq = taosMemoryMalloc(contLen); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + SMsgHead *pHead = pReq; + pHead->contLen = htonl(contLen); + pHead->vgId = htonl(pVgroup->vgId); + + tSerializeSAlterVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq); + *pContLen = contLen; + return pReq; +} + void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) { SDropVnodeReq dropReq = {0}; dropReq.dnodeId = pDnode->id; @@ -372,12 +436,12 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { pVgroups = taosMemoryCalloc(pDb->cfg.numOfVgroups, sizeof(SVgObj)); if (pVgroups == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - goto ALLOC_VGROUP_OVER; + goto _OVER; } pArray = mndBuildDnodesArray(pMnode); if (pArray == NULL) { - goto ALLOC_VGROUP_OVER; + goto _OVER; } mDebug("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray), @@ -410,7 +474,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { if (mndGetAvailableDnode(pMnode, pVgroup, pArray) != 0) { terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; - goto ALLOC_VGROUP_OVER; + goto _OVER; } allocedVgroups++; @@ -421,7 +485,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { mDebug("db:%s, %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications); -ALLOC_VGROUP_OVER: +_OVER: if (code != 0) taosMemoryFree(pVgroups); taosArrayDestroy(pArray); return code; @@ -492,7 +556,7 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep return 0; } -static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) { +static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; @@ -533,14 +597,13 @@ static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* // default 3 replica for (int32_t i = 0; i < 3; ++i) { - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); if (i < pVgroup->replica) { colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->vnodeGid[i].dnodeId, false); char buf1[20] = {0}; const char *role = syncStr(pVgroup->vnodeGid[i].role); - STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes); + STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)buf1, false); @@ -597,13 +660,12 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) { return numOfVnodes; } -static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) { +static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SVgObj *pVgroup = NULL; int32_t cols = 0; -// int32_t dnodeId = pShow->replica; while (numOfRows < rows) { pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index f776fb3764..846cf6f967 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -46,9 +46,21 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; } + uint32_t ip = 0; + if (pInit->connType == TAOS_CONN_SERVER) { + ip = taosGetIpv4FromFqdn(pInit->localFqdn); + if (ip == 0xFFFFFFFF) { + tError("invalid fqdn: %s", pInit->localFqdn); + terrno = TSDB_CODE_RPC_FQDN_ERROR; + taosMemoryFree(pRpc); + return NULL; + } + } + pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; - pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); + pRpc->tcphandle = + (*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); if (pRpc->tcphandle == NULL) { taosMemoryFree(pRpc); return NULL; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 7378ca3241..ad3f520210 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -817,7 +817,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); - uv_os_sock_t fds[2]; if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { goto End; @@ -841,6 +840,10 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, goto End; } } + if (false == taosValidIpAndPort(srv->ip, srv->port)) { + tError("failed to bind, reason: %s", terrstr()); + goto End; + } if (false == addHandleToAcceptloop(srv)) { goto End; } diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 6aa8520082..8cac660039 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -638,6 +638,48 @@ int32_t taosKeepTcpAlive(TdSocketPtr pSocket) { return 0; } +bool taosValidIpAndPort(uint32_t ip, uint16_t port) { + struct sockaddr_in serverAdd; + SocketFd fd; + int32_t reuse; + + // printf("open tcp server socket:0x%x:%hu", ip, port); + + bzero((char *)&serverAdd, sizeof(serverAdd)); + serverAdd.sin_family = AF_INET; + serverAdd.sin_addr.s_addr = ip; + serverAdd.sin_port = (uint16_t)htons(port); + + if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) { + // printf("failed to open TCP socket: %d (%s)", errno, strerror(errno)); + taosCloseSocketNoCheck1(fd); + return false; + } + + TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket)); + if (pSocket == NULL) { + taosCloseSocketNoCheck1(fd); + return false; + } + pSocket->refId = 0; + pSocket->fd = fd; + + /* set REUSEADDR option, so the portnumber can be re-used */ + reuse = 1; + if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { + // printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno)); + taosCloseSocket(&pSocket); + return NULL; + } + /* bind socket to server address */ + if (bind(pSocket->fd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) { + // printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno)); + taosCloseSocket(&pSocket); + return false; + } + taosCloseSocket(&pSocket); + return true; +} TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { struct sockaddr_in serverAdd; SocketFd fd; diff --git a/tests/system-test/1-insert/insertWithMoreVgroup.py b/tests/system-test/1-insert/insertWithMoreVgroup.py new file mode 100644 index 0000000000..a7d17bc41e --- /dev/null +++ b/tests/system-test/1-insert/insertWithMoreVgroup.py @@ -0,0 +1,291 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import threading +import multiprocessing as mp +from numpy.lib.function_base import insert +import taos +from util.log import * +from util.cases import * +from util.sql import * +import numpy as np +import datetime as dt +import time +# constant define +WAITS = 5 # wait seconds + +class TDTestCase: + # + # --------------- main frame ------------------- + # + + def caseDescription(self): + ''' + limit and offset keyword function test cases; + case1: limit offset base function test + case2: offset return valid + ''' + return + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root)-len("/build/bin")] + break + return buildPath + + # init + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + # tdSql.prepare() + # self.create_tables(); + self.ts = 1500000000000 + + + # run case + def run(self): + + # test base case + self.test_case1() + tdLog.debug(" LIMIT test_case1 ............ [OK]") + + # test advance case + # self.test_case2() + # tdLog.debug(" LIMIT test_case2 ............ [OK]") + + # stop + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + # --------------- case ------------------- + + # create tables + def create_tables(self,dbname,stbname,count): + tdSql.execute("use %s" %dbname) + tdSql.execute("create stable %s(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%stbname) + pre_create = "create table" + sql = pre_create + tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + # print(time.time()) + exeStartTime=time.time() + for i in range(count): + sql += " %s_%d using %s tags(%d)"%(stbname,i,stbname,i+1) + if i >0 and i%3000 == 0: + tdSql.execute(sql) + sql = pre_create + # print(time.time()) + # end sql + if sql != pre_create: + tdSql.execute(sql) + exeEndTime=time.time() + spendTime=exeEndTime-exeStartTime + speedCreate=count/spendTime + tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate)) + return + + def newcur(self,host,cfg): + user = "root" + password = "taosdata" + port =6030 + con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port) + cur=con.cursor() + print(cur) + return cur + + def new_create_tables(self,dbname,vgroups,stbname,tcountStart,tcountStop): + host = "chenhaoran02" + buildPath = self.getBuildPath() + config = buildPath+ "../sim/dnode1/cfg/" + + tsql=self.newcur(host,config) + tsql.execute("create database %s vgroups %d"%(dbname,vgroups)) + tsql.execute("use %s" %dbname) + tsql.execute("create stable %s(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%stbname) + + pre_create = "create table" + sql = pre_create + tcountStop=int(tcountStop) + tcountStart=int(tcountStart) + count=tcountStop-tcountStart + + tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + # print(time.time()) + exeStartTime=time.time() + # print(type(tcountStop),type(tcountStart)) + for i in range(tcountStart,tcountStop): + sql += " %s_%d using %s tags(%d)"%(stbname,i,stbname,i+1) + if i >0 and i%20000 == 0: + # print(sql) + tsql.execute(sql) + sql = pre_create + # print(time.time()) + # end sql + if sql != pre_create: + # print(sql) + tsql.execute(sql) + exeEndTime=time.time() + spendTime=exeEndTime-exeStartTime + speedCreate=count/spendTime + # tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate)) + return + + + + # insert data + def insert_data(self, dbname, stbname, ts_start, tcountStart,tcountStop,rowCount): + tdSql.execute("use %s" %dbname) + pre_insert = "insert into " + sql = pre_insert + tcount=tcountStop-tcountStart + allRows=tcount*rowCount + tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbname, allRows)) + exeStartTime=time.time() + for i in range(tcountStart,tcountStop): + sql += " %s_%d values "%(stbname,i) + for j in range(rowCount): + sql += "(%d, %d, 'taos_%d') "%(ts_start + j*1000, j, j) + if j >0 and j%5000 == 0: + # print(sql) + tdSql.execute(sql) + sql = "insert into %s_%d values " %(stbname,i) + # end sql + if sql != pre_insert: + # print(sql) + tdSql.execute(sql) + exeEndTime=time.time() + spendTime=exeEndTime-exeStartTime + speedInsert=allRows/spendTime + # tdLog.debug("spent %.2fs to INSERT %d rows , insert rate is %.2f rows/s... [OK]"% (spendTime,allRows,speedInsert)) + + tdLog.debug("INSERT TABLE DATA ............ [OK]") + return + + + # test case1 base + def test_case1(self): + tdLog.debug("-----create database and tables test------- ") + tdSql.execute("drop database if exists db1") + tdSql.execute("drop database if exists db4") + tdSql.execute("drop database if exists db6") + tdSql.execute("drop database if exists db8") + tdSql.execute("drop database if exists db12") + tdSql.execute("drop database if exists db16") + + #create database and tables; + + # tdSql.execute("create database db11 vgroups 1") + # # self.create_tables("db1", "stb1", 30*10000) + # tdSql.execute("use db1") + # tdSql.execute("create stable stb1(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)") + + # tdSql.execute("create database db12 vgroups 1") + # # self.create_tables("db1", "stb1", 30*10000) + # tdSql.execute("use db1") + + # t1 = threading.Thread(target=self.new_create_tables("db1", "stb1", 15*10000), args=(1,)) + # t2 = threading.Thread(target=self.new_create_tables("db1", "stb1", 15*10000), args=(2,)) + # t1 = mp.Process(target=self.new_create_tables, args=("db1", "stb1", 0,count/2,)) + # t2 = mp.Process(target=self.new_create_tables, args=("db1", "stb1", count/2,count,)) + + count=50000 + vgroups=1 + threads = [] + threadNumbers=2 + for i in range(threadNumbers): + threads.append(mp.Process(target=self.new_create_tables, args=("db1%d"%i, vgroups, "stb1", 0,count,))) + start_time = time.time() + for tr in threads: + tr.start() + for tr in threads: + tr.join() + end_time = time.time() + spendTime=end_time-start_time + speedCreate=count/spendTime + tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate)) + # self.new_create_tables("db1", "stb1", 15*10000) + # self.new_create_tables("db1", "stb1", 15*10000) + + # tdSql.execute("create database db4 vgroups 4") + # self.create_tables("db4", "stb4", 30*10000) + + # tdSql.execute("create database db6 vgroups 6") + # self.create_tables("db6", "stb6", 30*10000) + + # tdSql.execute("create database db8 vgroups 8") + # self.create_tables("db8", "stb8", 30*10000) + + # tdSql.execute("create database db12 vgroups 12") + # self.create_tables("db12", "stb12", 30*10000) + + # tdSql.execute("create database db16 vgroups 16") + # self.create_tables("db16", "stb16", 30*10000) + return + + # test case2 base:insert data + def test_case2(self): + + tdLog.debug("-----insert data test------- ") + # drop database + tdSql.execute("drop database if exists db1") + tdSql.execute("drop database if exists db4") + tdSql.execute("drop database if exists db6") + tdSql.execute("drop database if exists db8") + tdSql.execute("drop database if exists db12") + tdSql.execute("drop database if exists db16") + + #create database and tables; + + tdSql.execute("create database db1 vgroups 1") + self.create_tables("db1", "stb1", 1*100) + self.insert_data("db1", "stb1", self.ts, 1*50,1*10000) + + + tdSql.execute("create database db4 vgroups 4") + self.create_tables("db4", "stb4", 1*100) + self.insert_data("db4", "stb4", self.ts, 1*100,1*10000) + + tdSql.execute("create database db6 vgroups 6") + self.create_tables("db6", "stb6", 1*100) + self.insert_data("db6", "stb6", self.ts, 1*100,1*10000) + + tdSql.execute("create database db8 vgroups 8") + self.create_tables("db8", "stb8", 1*100) + self.insert_data("db8", "stb8", self.ts, 1*100,1*10000) + + tdSql.execute("create database db12 vgroups 12") + self.create_tables("db12", "stb12", 1*100) + self.insert_data("db12", "stb12", self.ts, 1*100,1*10000) + + tdSql.execute("create database db16 vgroups 16") + self.create_tables("db16", "stb16", 1*100) + self.insert_data("db16", "stb16", self.ts, 1*100,1*10000) + + return + +# +# add case with filename +# +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/2-query/smaTest.py b/tests/system-test/2-query/smaTest.py new file mode 100644 index 0000000000..67824cc3a3 --- /dev/null +++ b/tests/system-test/2-query/smaTest.py @@ -0,0 +1,131 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys + +from numpy.lib.function_base import insert +import taos +from util.log import * +from util.cases import * +from util.sql import * +import numpy as np + +# constant define +WAITS = 5 # wait seconds + +class TDTestCase: + # + # --------------- main frame ------------------- + # + # updatecfgDict = {'debugFlag': 135} + # updatecfgDict = {'fqdn': 135} + + def caseDescription(self): + ''' + limit and offset keyword function test cases; + case1: limit offset base function test + case2: offset return valid + ''' + return + + # init + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + tdSql.prepare() + self.create_tables(); + self.ts = 1500000000000 + + + # run case + def run(self): + # insert data + self.insert_data1("t1", self.ts, 1000*10000) + self.insert_data1("t4", self.ts, 1000*10000) + # test base case + # self.test_case1() + tdLog.debug(" LIMIT test_case1 ............ [OK]") + # test advance case + # self.test_case2() + tdLog.debug(" LIMIT test_case2 ............ [OK]") + + + # stop + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + # + # --------------- case ------------------- + # + + # create table + def create_tables(self): + # super table + tdSql.execute("create table st(ts timestamp, i1 int,i2 int) tags(area int)"); + # child table + tdSql.execute("create table t1 using st tags(1)"); + + tdSql.execute("create table st1(ts timestamp, i1 int ,i2 int) tags(area int) sma(i2) "); + tdSql.execute("create table t4 using st1 tags(1)"); + + return + + # insert data1 + def insert_data(self, tbname, ts_start, count): + pre_insert = "insert into %s values"%tbname + sql = pre_insert + tdLog.debug("doing insert table %s rows=%d ..."%(tbname, count)) + for i in range(count): + sql += " (%d,%d)"%(ts_start + i*1000, i ) + if i >0 and i%30000 == 0: + tdSql.execute(sql) + sql = pre_insert + # end sql + if sql != pre_insert: + tdSql.execute(sql) + + tdLog.debug("INSERT TABLE DATA ............ [OK]") + return + + def insert_data1(self, tbname, ts_start, count): + pre_insert = "insert into %s values"%tbname + sql = pre_insert + tdLog.debug("doing insert table %s rows=%d ..."%(tbname, count)) + for i in range(count): + sql += " (%d,%d,%d)"%(ts_start + i*1000, i , i+1) + if i >0 and i%30000 == 0: + tdSql.execute(sql) + sql = pre_insert + # end sql + if sql != pre_insert: + tdSql.execute(sql) + + tdLog.debug("INSERT TABLE DATA ............ [OK]") + return + + # test case1 base + # def test_case1(self): + # # + # # limit base function + # # + # # base no where + # sql = "select * from t1 limit 10" + # tdSql.waitedQuery(sql, 10, WAITS) + + +# +# add case with filename +# +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index befcfc0731..08fe2cbc32 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -141,6 +141,9 @@ int32_t shellRunCommand(char *command) { *p++ = '\\'; } break; + default: + *p++ = '\\'; + break; } *p++ = c; esc = false; diff --git a/tools/shell/src/shellNettest.c b/tools/shell/src/shellNettest.c index c8ec31c48b..345b85d896 100644 --- a/tools/shell/src/shellNettest.c +++ b/tools/shell/src/shellNettest.c @@ -21,7 +21,7 @@ static void shellWorkAsClient() { SRpcInit rpcInit = {0}; SEpSet epSet = {.inUse = 0, .numOfEps = 1}; SRpcMsg rpcRsp = {0}; - void *clientRpc = NULL; + void * clientRpc = NULL; char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)("_pwd"), strlen("_pwd"), pass); @@ -116,6 +116,7 @@ static void shellWorkAsServer() { } SRpcInit rpcInit = {0}; + memcpy(rpcInit.localFqdn, tsLocalFqdn, strlen(tsLocalFqdn)); rpcInit.localPort = pArgs->port; rpcInit.label = "CHK"; rpcInit.numOfThreads = tsNumOfRpcThreads; @@ -126,7 +127,7 @@ static void shellWorkAsServer() { void *serverRpc = rpcOpen(&rpcInit); if (serverRpc == NULL) { - printf("failed to init net test server since %s", terrstr()); + printf("failed to init net test server since %s\n", terrstr()); } else { printf("network test server is initialized, port:%u\n", pArgs->port); taosSetSignal(SIGTERM, shellNettestHandler);