minor changes
This commit is contained in:
parent
11a07e7dc3
commit
d10df1e3a8
|
@ -302,7 +302,7 @@ typedef struct {
|
||||||
char app[TSDB_APP_NAME_LEN];
|
char app[TSDB_APP_NAME_LEN];
|
||||||
char db[TSDB_DB_NAME_LEN];
|
char db[TSDB_DB_NAME_LEN];
|
||||||
int64_t startTime;
|
int64_t startTime;
|
||||||
} SConnectMsg;
|
} SConnectReq;
|
||||||
|
|
||||||
typedef struct SEpSet {
|
typedef struct SEpSet {
|
||||||
int8_t inUse;
|
int8_t inUse;
|
||||||
|
@ -898,7 +898,7 @@ typedef struct {
|
||||||
int32_t numOfStreams;
|
int32_t numOfStreams;
|
||||||
char app[TSDB_APP_NAME_LEN];
|
char app[TSDB_APP_NAME_LEN];
|
||||||
char pData[];
|
char pData[];
|
||||||
} SHeartBeatMsg;
|
} SHeartBeatReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t connId;
|
int32_t connId;
|
||||||
|
@ -911,19 +911,14 @@ typedef struct {
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
} SHeartBeatRsp;
|
} SHeartBeatRsp;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int32_t connId;
|
|
||||||
int32_t streamId;
|
|
||||||
} SKillStreamMsg;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t connId;
|
int32_t connId;
|
||||||
int32_t queryId;
|
int32_t queryId;
|
||||||
} SKillQueryMsg;
|
} SKillQueryReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t connId;
|
int32_t connId;
|
||||||
} SKillConnMsg;
|
} SKillConnReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char user[TSDB_USER_LEN];
|
char user[TSDB_USER_LEN];
|
||||||
|
|
|
@ -395,13 +395,13 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pMsgSendInfo->msgType = TDMT_MND_CONNECT;
|
pMsgSendInfo->msgType = TDMT_MND_CONNECT;
|
||||||
pMsgSendInfo->msgInfo.len = sizeof(SConnectMsg);
|
pMsgSendInfo->msgInfo.len = sizeof(SConnectReq);
|
||||||
pMsgSendInfo->requestObjRefId = pRequest->self;
|
pMsgSendInfo->requestObjRefId = pRequest->self;
|
||||||
pMsgSendInfo->requestId = pRequest->requestId;
|
pMsgSendInfo->requestId = pRequest->requestId;
|
||||||
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
|
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
|
||||||
pMsgSendInfo->param = pRequest;
|
pMsgSendInfo->param = pRequest;
|
||||||
|
|
||||||
SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg));
|
SConnectReq *pConnect = calloc(1, sizeof(SConnectReq));
|
||||||
if (pConnect == NULL) {
|
if (pConnect == NULL) {
|
||||||
tfree(pMsgSendInfo);
|
tfree(pMsgSendInfo);
|
||||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
|
|
@ -10,7 +10,6 @@ add_subdirectory(db)
|
||||||
add_subdirectory(dnode)
|
add_subdirectory(dnode)
|
||||||
# add_subdirectory(func)
|
# add_subdirectory(func)
|
||||||
add_subdirectory(mnode)
|
add_subdirectory(mnode)
|
||||||
add_subdirectory(profile)
|
|
||||||
add_subdirectory(stb)
|
add_subdirectory(stb)
|
||||||
# add_subdirectory(sync)
|
# add_subdirectory(sync)
|
||||||
# add_subdirectory(telem)
|
# add_subdirectory(telem)
|
||||||
|
|
|
@ -1,11 +0,0 @@
|
||||||
aux_source_directory(. PROFILE_SRC)
|
|
||||||
add_executable(dnode_test_profile ${PROFILE_SRC})
|
|
||||||
target_link_libraries(
|
|
||||||
dnode_test_profile
|
|
||||||
PUBLIC sut
|
|
||||||
)
|
|
||||||
|
|
||||||
add_test(
|
|
||||||
NAME dnode_test_profile
|
|
||||||
COMMAND dnode_test_profile
|
|
||||||
)
|
|
|
@ -47,14 +47,14 @@ static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
|
||||||
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
|
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
|
||||||
static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn);
|
static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn);
|
||||||
static void mndCancelGetNextConn(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextConn(SMnode *pMnode, void *pIter);
|
||||||
static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg);
|
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq);
|
||||||
static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg);
|
static int32_t mndProcessConnectReq(SMnodeMsg *pReq);
|
||||||
static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg);
|
static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq);
|
||||||
static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg);
|
static int32_t mndProcessKillConnReq(SMnodeMsg *pReq);
|
||||||
static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
|
static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
|
||||||
static int32_t mndRetrieveConns(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
|
static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
|
||||||
static int32_t mndRetrieveQueries(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
int32_t mndInitProfile(SMnode *pMnode) {
|
int32_t mndInitProfile(SMnode *pMnode) {
|
||||||
|
@ -68,10 +68,10 @@ int32_t mndInitProfile(SMnode *pMnode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnectionMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndGetConnsMeta);
|
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndGetConnsMeta);
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
|
||||||
|
@ -178,35 +178,35 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
|
||||||
taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
|
taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
|
static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
SConnectMsg *pReq = pMsg->rpcMsg.pCont;
|
SConnectReq *pConnReq = pReq->rpcMsg.pCont;
|
||||||
pReq->pid = htonl(pReq->pid);
|
pConnReq->pid = htonl(pConnReq->pid);
|
||||||
pReq->startTime = htobe64(pReq->startTime);
|
pConnReq->startTime = htobe64(pConnReq->startTime);
|
||||||
|
|
||||||
SRpcConnInfo info = {0};
|
SRpcConnInfo info = {0};
|
||||||
if (rpcGetConnInfo(pMsg->rpcMsg.handle, &info) != 0) {
|
if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
|
||||||
mError("user:%s, failed to login while get connection info since %s", pMsg->user, terrstr());
|
mError("user:%s, failed to login while get connection info since %s", pReq->user, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
char ip[30];
|
char ip[30];
|
||||||
taosIp2String(info.clientIp, ip);
|
taosIp2String(info.clientIp, ip);
|
||||||
|
|
||||||
if (pReq->db[0]) {
|
if (pConnReq->db[0]) {
|
||||||
snprintf(pMsg->db, TSDB_DB_FNAME_LEN, "%d%s%s", pMsg->acctId, TS_PATH_DELIMITER, pReq->db);
|
snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pReq->acctId, TS_PATH_DELIMITER, pConnReq->db);
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db);
|
SDbObj *pDb = mndAcquireDb(pMnode, pReq->db);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_DB;
|
terrno = TSDB_CODE_MND_INVALID_DB;
|
||||||
mError("user:%s, failed to login from %s while use db:%s since %s", pMsg->user, ip, pReq->db, terrstr());
|
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, pConnReq->db, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
}
|
}
|
||||||
|
|
||||||
SConnObj *pConn = mndCreateConn(pMnode, &info, pReq->pid, pReq->app, pReq->startTime);
|
SConnObj *pConn = mndCreateConn(pMnode, &info, pConnReq->pid, pConnReq->app, pConnReq->startTime);
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
mError("user:%s, failed to login from %s while create connection since %s", pMsg->user, ip, terrstr());
|
mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -214,11 +214,11 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
mndReleaseConn(pMnode, pConn);
|
mndReleaseConn(pMnode, pConn);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
mError("user:%s, failed to login from %s while create rsp since %s", pMsg->user, ip, terrstr());
|
mError("user:%s, failed to login from %s while create rsp since %s", pReq->user, ip, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
|
SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
|
||||||
if (pUser != NULL) {
|
if (pUser != NULL) {
|
||||||
pRsp->acctId = htonl(pUser->acctId);
|
pRsp->acctId = htonl(pUser->acctId);
|
||||||
pRsp->superUser = pUser->superUser;
|
pRsp->superUser = pUser->superUser;
|
||||||
|
@ -230,16 +230,16 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
|
||||||
mndGetMnodeEpSet(pMnode, &pRsp->epSet);
|
mndGetMnodeEpSet(pMnode, &pRsp->epSet);
|
||||||
mndReleaseConn(pMnode, pConn);
|
mndReleaseConn(pMnode, pConn);
|
||||||
|
|
||||||
pMsg->contLen = sizeof(SConnectRsp);
|
pReq->contLen = sizeof(SConnectRsp);
|
||||||
pMsg->pCont = pRsp;
|
pReq->pCont = pRsp;
|
||||||
|
|
||||||
mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pReq->app);
|
mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pConnReq->app);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pMsg) {
|
static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
|
||||||
pConn->numOfQueries = 0;
|
pConn->numOfQueries = 0;
|
||||||
int32_t numOfQueries = htonl(pMsg->numOfQueries);
|
int32_t numOfQueries = htonl(pReq->numOfQueries);
|
||||||
|
|
||||||
if (numOfQueries > 0) {
|
if (numOfQueries > 0) {
|
||||||
if (pConn->pQueries == NULL) {
|
if (pConn->pQueries == NULL) {
|
||||||
|
@ -250,38 +250,38 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pMsg) {
|
||||||
|
|
||||||
int32_t saveSize = pConn->numOfQueries * sizeof(SQueryDesc);
|
int32_t saveSize = pConn->numOfQueries * sizeof(SQueryDesc);
|
||||||
if (saveSize > 0 && pConn->pQueries != NULL) {
|
if (saveSize > 0 && pConn->pQueries != NULL) {
|
||||||
memcpy(pConn->pQueries, pMsg->pData, saveSize);
|
memcpy(pConn->pQueries, pReq->pData, saveSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) {
|
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
|
|
||||||
SHeartBeatMsg *pReq = pMsg->rpcMsg.pCont;
|
SHeartBeatReq *pHeartbeat = pReq->rpcMsg.pCont;
|
||||||
pReq->connId = htonl(pReq->connId);
|
pHeartbeat->connId = htonl(pHeartbeat->connId);
|
||||||
pReq->pid = htonl(pReq->pid);
|
pHeartbeat->pid = htonl(pHeartbeat->pid);
|
||||||
|
|
||||||
SRpcConnInfo info = {0};
|
SRpcConnInfo info = {0};
|
||||||
if (rpcGetConnInfo(pMsg->rpcMsg.handle, &info) != 0) {
|
if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
|
||||||
mError("user:%s, connId:%d failed to process hb since %s", pMsg->user, pReq->connId, terrstr());
|
mError("user:%s, connId:%d failed to process hb since %s", pReq->user, pHeartbeat->connId, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SConnObj *pConn = mndAcquireConn(pMnode, pReq->connId);
|
SConnObj *pConn = mndAcquireConn(pMnode, pHeartbeat->connId);
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
pConn = mndCreateConn(pMnode, &info, pReq->pid, pReq->app, 0);
|
pConn = mndCreateConn(pMnode, &info, pHeartbeat->pid, pHeartbeat->app, 0);
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
mError("user:%s, conn:%d is freed and failed to create new conn since %s", pMsg->user, pReq->connId, terrstr());
|
mError("user:%s, conn:%d is freed and failed to create new since %s", pReq->user, pHeartbeat->connId, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
mDebug("user:%s, conn:%d is freed and create a new conn:%d", pMsg->user, pReq->connId, pConn->id);
|
mDebug("user:%s, conn:%d is freed and create a new conn:%d", pReq->user, pHeartbeat->connId, pConn->id);
|
||||||
}
|
}
|
||||||
} else if (pConn->killed) {
|
} else if (pConn->killed) {
|
||||||
mError("user:%s, conn:%d is already killed", pMsg->user, pConn->id);
|
mError("user:%s, conn:%d is already killed", pReq->user, pConn->id);
|
||||||
terrno = TSDB_CODE_MND_INVALID_CONNECTION;
|
terrno = TSDB_CODE_MND_INVALID_CONNECTION;
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
|
@ -304,11 +304,11 @@ static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) {
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
mndReleaseConn(pMnode, pConn);
|
mndReleaseConn(pMnode, pConn);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
mError("user:%s, conn:%d failed to process hb while create rsp since %s", pMsg->user, pReq->connId, terrstr());
|
mError("user:%s, conn:%d failed to process hb while since %s", pReq->user, pHeartbeat->connId, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndSaveQueryStreamList(pConn, pReq);
|
mndSaveQueryStreamList(pConn, pHeartbeat);
|
||||||
if (pConn->killed != 0) {
|
if (pConn->killed != 0) {
|
||||||
pRsp->killConnection = 1;
|
pRsp->killConnection = 1;
|
||||||
}
|
}
|
||||||
|
@ -324,16 +324,16 @@ static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) {
|
||||||
mndGetMnodeEpSet(pMnode, &pRsp->epSet);
|
mndGetMnodeEpSet(pMnode, &pRsp->epSet);
|
||||||
mndReleaseConn(pMnode, pConn);
|
mndReleaseConn(pMnode, pConn);
|
||||||
|
|
||||||
pMsg->contLen = sizeof(SConnectRsp);
|
pReq->contLen = sizeof(SConnectRsp);
|
||||||
pMsg->pCont = pRsp;
|
pReq->pCont = pRsp;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg) {
|
static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
|
|
||||||
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
|
SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
|
||||||
if (pUser == NULL) return 0;
|
if (pUser == NULL) return 0;
|
||||||
if (!pUser->superUser) {
|
if (!pUser->superUser) {
|
||||||
mndReleaseUser(pMnode, pUser);
|
mndReleaseUser(pMnode, pUser);
|
||||||
|
@ -342,7 +342,7 @@ static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
mndReleaseUser(pMnode, pUser);
|
mndReleaseUser(pMnode, pUser);
|
||||||
|
|
||||||
SKillQueryMsg *pKill = pMsg->rpcMsg.pCont;
|
SKillQueryReq *pKill = pReq->rpcMsg.pCont;
|
||||||
int32_t connId = htonl(pKill->connId);
|
int32_t connId = htonl(pKill->connId);
|
||||||
int32_t queryId = htonl(pKill->queryId);
|
int32_t queryId = htonl(pKill->queryId);
|
||||||
mInfo("kill query msg is received, queryId:%d", pKill->queryId);
|
mInfo("kill query msg is received, queryId:%d", pKill->queryId);
|
||||||
|
@ -353,18 +353,18 @@ static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
|
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
mInfo("connId:%d, queryId:%d is killed by user:%s", connId, queryId, pMsg->user);
|
mInfo("connId:%d, queryId:%d is killed by user:%s", connId, queryId, pReq->user);
|
||||||
pConn->queryId = queryId;
|
pConn->queryId = queryId;
|
||||||
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
|
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg) {
|
static int32_t mndProcessKillConnReq(SMnodeMsg *pReq) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
|
|
||||||
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
|
SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
|
||||||
if (pUser == NULL) return 0;
|
if (pUser == NULL) return 0;
|
||||||
if (!pUser->superUser) {
|
if (!pUser->superUser) {
|
||||||
mndReleaseUser(pMnode, pUser);
|
mndReleaseUser(pMnode, pUser);
|
||||||
|
@ -373,7 +373,7 @@ static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
mndReleaseUser(pMnode, pUser);
|
mndReleaseUser(pMnode, pUser);
|
||||||
|
|
||||||
SKillConnMsg *pKill = pMsg->rpcMsg.pCont;
|
SKillConnReq *pKill = pReq->rpcMsg.pCont;
|
||||||
int32_t connId = htonl(pKill->connId);
|
int32_t connId = htonl(pKill->connId);
|
||||||
|
|
||||||
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
|
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
|
||||||
|
@ -382,18 +382,18 @@ static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
|
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
mInfo("connId:%d, is killed by user:%s", connId, pMsg->user);
|
mInfo("connId:%d, is killed by user:%s", connId, pReq->user);
|
||||||
pConn->killed = 1;
|
pConn->killed = 1;
|
||||||
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
|
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
|
static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
|
|
||||||
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
|
SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
|
||||||
if (pUser == NULL) return 0;
|
if (pUser == NULL) return 0;
|
||||||
if (!pUser->superUser) {
|
if (!pUser->superUser) {
|
||||||
mndReleaseUser(pMnode, pUser);
|
mndReleaseUser(pMnode, pUser);
|
||||||
|
@ -464,8 +464,8 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveConns(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
|
static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
SConnObj *pConn = NULL;
|
SConnObj *pConn = NULL;
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
|
@ -518,11 +518,11 @@ static int32_t mndRetrieveConns(SMnodeMsg *pMsg, SShowObj *pShow, char *data, in
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
|
static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
|
|
||||||
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
|
SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
|
||||||
if (pUser == NULL) return 0;
|
if (pUser == NULL) return 0;
|
||||||
if (!pUser->superUser) {
|
if (!pUser->superUser) {
|
||||||
mndReleaseUser(pMnode, pUser);
|
mndReleaseUser(pMnode, pUser);
|
||||||
|
@ -633,8 +633,8 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveQueries(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
|
static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
SConnObj *pConn = NULL;
|
SConnObj *pConn = NULL;
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
|
|
|
@ -124,20 +124,20 @@ static int32_t mndProcessShowReq(SMnodeMsg *pReq) {
|
||||||
|
|
||||||
if (type <= TSDB_MGMT_TABLE_START || type >= TSDB_MGMT_TABLE_MAX) {
|
if (type <= TSDB_MGMT_TABLE_START || type >= TSDB_MGMT_TABLE_MAX) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
|
terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
|
||||||
mError("failed to process show msg since %s", terrstr());
|
mError("failed to process show-meta req since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ShowMetaFp metaFp = pMgmt->metaFps[type];
|
ShowMetaFp metaFp = pMgmt->metaFps[type];
|
||||||
if (metaFp == NULL) {
|
if (metaFp == NULL) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
|
terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
|
||||||
mError("failed to process show-meta msg:%s since %s", mndShowStr(type), terrstr());
|
mError("failed to process show-meta req:%s since %s", mndShowStr(type), terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SShowObj *pShow = mndCreateShowObj(pMnode, pShowReq);
|
SShowObj *pShow = mndCreateShowObj(pMnode, pShowReq);
|
||||||
if (pShow == NULL) {
|
if (pShow == NULL) {
|
||||||
mError("failed to process show-meta msg:%s since %s", mndShowStr(type), terrstr());
|
mError("failed to process show-meta req:%s since %s", mndShowStr(type), terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,7 +146,7 @@ static int32_t mndProcessShowReq(SMnodeMsg *pReq) {
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
mndReleaseShowObj(pShow, true);
|
mndReleaseShowObj(pShow, true);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
mError("show:0x%" PRIx64 ", failed to process show-meta msg:%s since malloc rsp error", pShow->id,
|
mError("show:0x%" PRIx64 ", failed to process show-meta req:%s since malloc rsp error", pShow->id,
|
||||||
mndShowStr(type));
|
mndShowStr(type));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -181,7 +181,7 @@ static int32_t mndProcessRetrieveReq(SMnodeMsg *pReq) {
|
||||||
SShowObj *pShow = mndAcquireShowObj(pMnode, showId);
|
SShowObj *pShow = mndAcquireShowObj(pMnode, showId);
|
||||||
if (pShow == NULL) {
|
if (pShow == NULL) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_SHOWOBJ;
|
terrno = TSDB_CODE_MND_INVALID_SHOWOBJ;
|
||||||
mError("failed to process show-retrieve msg:%p since %s", pShow, terrstr());
|
mError("failed to process show-retrieve req:%p since %s", pShow, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,3 +7,4 @@ add_subdirectory(qnode)
|
||||||
add_subdirectory(snode)
|
add_subdirectory(snode)
|
||||||
add_subdirectory(bnode)
|
add_subdirectory(bnode)
|
||||||
add_subdirectory(show)
|
add_subdirectory(show)
|
||||||
|
add_subdirectory(profile)
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
aux_source_directory(. PROFILE_SRC)
|
||||||
|
add_executable(mnode_test_profile ${PROFILE_SRC})
|
||||||
|
target_link_libraries(
|
||||||
|
mnode_test_profile
|
||||||
|
PUBLIC sut
|
||||||
|
)
|
||||||
|
|
||||||
|
add_test(
|
||||||
|
NAME mnode_test_profile
|
||||||
|
COMMAND mnode_test_profile
|
||||||
|
)
|
|
@ -1,19 +1,19 @@
|
||||||
/**
|
/**
|
||||||
* @file profile.cpp
|
* @file profile.cpp
|
||||||
* @author slguan (slguan@taosdata.com)
|
* @author slguan (slguan@taosdata.com)
|
||||||
* @brief DNODE module profile-msg tests
|
* @brief MNODE module profile tests
|
||||||
* @version 0.1
|
* @version 1.0
|
||||||
* @date 2021-12-15
|
* @date 2022-01-06
|
||||||
*
|
*
|
||||||
* @copyright Copyright (c) 2021
|
* @copyright Copyright (c) 2022
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "sut.h"
|
#include "sut.h"
|
||||||
|
|
||||||
class DndTestProfile : public ::testing::Test {
|
class MndTestProfile : public ::testing::Test {
|
||||||
protected:
|
protected:
|
||||||
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_profile", 9080); }
|
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_profile", 9022); }
|
||||||
static void TearDownTestSuite() { test.Cleanup(); }
|
static void TearDownTestSuite() { test.Cleanup(); }
|
||||||
|
|
||||||
static Testbase test;
|
static Testbase test;
|
||||||
|
@ -24,15 +24,15 @@ class DndTestProfile : public ::testing::Test {
|
||||||
void TearDown() override {}
|
void TearDown() override {}
|
||||||
};
|
};
|
||||||
|
|
||||||
Testbase DndTestProfile::test;
|
Testbase MndTestProfile::test;
|
||||||
int32_t DndTestProfile::connId;
|
int32_t MndTestProfile::connId;
|
||||||
|
|
||||||
TEST_F(DndTestProfile, 01_ConnectMsg) {
|
TEST_F(MndTestProfile, 01_ConnectMsg) {
|
||||||
int32_t contLen = sizeof(SConnectMsg);
|
int32_t contLen = sizeof(SConnectReq);
|
||||||
|
|
||||||
SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(contLen);
|
SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen);
|
||||||
pReq->pid = htonl(1234);
|
pReq->pid = htonl(1234);
|
||||||
strcpy(pReq->app, "dnode_test_profile");
|
strcpy(pReq->app, "mnode_test_profile");
|
||||||
strcpy(pReq->db, "");
|
strcpy(pReq->db, "");
|
||||||
|
|
||||||
SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
|
SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
|
||||||
|
@ -53,18 +53,18 @@ TEST_F(DndTestProfile, 01_ConnectMsg) {
|
||||||
|
|
||||||
EXPECT_EQ(pRsp->epSet.inUse, 0);
|
EXPECT_EQ(pRsp->epSet.inUse, 0);
|
||||||
EXPECT_EQ(pRsp->epSet.numOfEps, 1);
|
EXPECT_EQ(pRsp->epSet.numOfEps, 1);
|
||||||
EXPECT_EQ(pRsp->epSet.port[0], 9080);
|
EXPECT_EQ(pRsp->epSet.port[0], 9022);
|
||||||
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
|
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
|
||||||
|
|
||||||
connId = pRsp->connId;
|
connId = pRsp->connId;
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DndTestProfile, 02_ConnectMsg_InvalidDB) {
|
TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) {
|
||||||
int32_t contLen = sizeof(SConnectMsg);
|
int32_t contLen = sizeof(SConnectReq);
|
||||||
|
|
||||||
SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(contLen);
|
SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen);
|
||||||
pReq->pid = htonl(1234);
|
pReq->pid = htonl(1234);
|
||||||
strcpy(pReq->app, "dnode_test_profile");
|
strcpy(pReq->app, "mnode_test_profile");
|
||||||
strcpy(pReq->db, "invalid_db");
|
strcpy(pReq->db, "invalid_db");
|
||||||
|
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
|
||||||
|
@ -73,7 +73,7 @@ TEST_F(DndTestProfile, 02_ConnectMsg_InvalidDB) {
|
||||||
ASSERT_EQ(pRsp->contLen, 0);
|
ASSERT_EQ(pRsp->contLen, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DndTestProfile, 03_ConnectMsg_Show) {
|
TEST_F(MndTestProfile, 03_ConnectMsg_Show) {
|
||||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_CONNS, "");
|
test.SendShowMetaReq(TSDB_MGMT_TABLE_CONNS, "");
|
||||||
CHECK_META("show connections", 7);
|
CHECK_META("show connections", 7);
|
||||||
CHECK_SCHEMA(0, TSDB_DATA_TYPE_INT, 4, "connId");
|
CHECK_SCHEMA(0, TSDB_DATA_TYPE_INT, 4, "connId");
|
||||||
|
@ -88,22 +88,22 @@ TEST_F(DndTestProfile, 03_ConnectMsg_Show) {
|
||||||
EXPECT_EQ(test.GetShowRows(), 1);
|
EXPECT_EQ(test.GetShowRows(), 1);
|
||||||
CheckInt32(1);
|
CheckInt32(1);
|
||||||
CheckBinary("root", TSDB_USER_LEN);
|
CheckBinary("root", TSDB_USER_LEN);
|
||||||
CheckBinary("dnode_test_profile", TSDB_APP_NAME_LEN);
|
CheckBinary("mnode_test_profile", TSDB_APP_NAME_LEN);
|
||||||
CheckInt32(1234);
|
CheckInt32(1234);
|
||||||
IgnoreBinary(TSDB_IPv4ADDR_LEN + 6);
|
IgnoreBinary(TSDB_IPv4ADDR_LEN + 6);
|
||||||
CheckTimestamp();
|
CheckTimestamp();
|
||||||
CheckTimestamp();
|
CheckTimestamp();
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DndTestProfile, 04_HeartBeatMsg) {
|
TEST_F(MndTestProfile, 04_HeartBeatMsg) {
|
||||||
int32_t contLen = sizeof(SHeartBeatMsg);
|
int32_t contLen = sizeof(SHeartBeatReq);
|
||||||
|
|
||||||
SHeartBeatMsg* pReq = (SHeartBeatMsg*)rpcMallocCont(contLen);
|
SHeartBeatReq* pReq = (SHeartBeatReq*)rpcMallocCont(contLen);
|
||||||
pReq->connId = htonl(connId);
|
pReq->connId = htonl(connId);
|
||||||
pReq->pid = htonl(1234);
|
pReq->pid = htonl(1234);
|
||||||
pReq->numOfQueries = htonl(0);
|
pReq->numOfQueries = htonl(0);
|
||||||
pReq->numOfStreams = htonl(0);
|
pReq->numOfStreams = htonl(0);
|
||||||
strcpy(pReq->app, "dnode_test_profile");
|
strcpy(pReq->app, "mnode_test_profile");
|
||||||
|
|
||||||
SRpcMsg* pMsg = test.SendReq(TDMT_MND_HEARTBEAT, pReq, contLen);
|
SRpcMsg* pMsg = test.SendReq(TDMT_MND_HEARTBEAT, pReq, contLen);
|
||||||
ASSERT_NE(pMsg, nullptr);
|
ASSERT_NE(pMsg, nullptr);
|
||||||
|
@ -127,15 +127,15 @@ TEST_F(DndTestProfile, 04_HeartBeatMsg) {
|
||||||
|
|
||||||
EXPECT_EQ(pRsp->epSet.inUse, 0);
|
EXPECT_EQ(pRsp->epSet.inUse, 0);
|
||||||
EXPECT_EQ(pRsp->epSet.numOfEps, 1);
|
EXPECT_EQ(pRsp->epSet.numOfEps, 1);
|
||||||
EXPECT_EQ(pRsp->epSet.port[0], 9080);
|
EXPECT_EQ(pRsp->epSet.port[0], 9022);
|
||||||
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
|
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DndTestProfile, 05_KillConnMsg) {
|
TEST_F(MndTestProfile, 05_KillConnMsg) {
|
||||||
{
|
{
|
||||||
int32_t contLen = sizeof(SKillConnMsg);
|
int32_t contLen = sizeof(SKillConnReq);
|
||||||
|
|
||||||
SKillConnMsg* pReq = (SKillConnMsg*)rpcMallocCont(contLen);
|
SKillConnReq* pReq = (SKillConnReq*)rpcMallocCont(contLen);
|
||||||
pReq->connId = htonl(connId);
|
pReq->connId = htonl(connId);
|
||||||
|
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_KILL_CONN, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_KILL_CONN, pReq, contLen);
|
||||||
|
@ -144,14 +144,14 @@ TEST_F(DndTestProfile, 05_KillConnMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
int32_t contLen = sizeof(SHeartBeatMsg);
|
int32_t contLen = sizeof(SHeartBeatReq);
|
||||||
|
|
||||||
SHeartBeatMsg* pReq = (SHeartBeatMsg*)rpcMallocCont(contLen);
|
SHeartBeatReq* pReq = (SHeartBeatReq*)rpcMallocCont(contLen);
|
||||||
pReq->connId = htonl(connId);
|
pReq->connId = htonl(connId);
|
||||||
pReq->pid = htonl(1234);
|
pReq->pid = htonl(1234);
|
||||||
pReq->numOfQueries = htonl(0);
|
pReq->numOfQueries = htonl(0);
|
||||||
pReq->numOfStreams = htonl(0);
|
pReq->numOfStreams = htonl(0);
|
||||||
strcpy(pReq->app, "dnode_test_profile");
|
strcpy(pReq->app, "mnode_test_profile");
|
||||||
|
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_HEARTBEAT, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_HEARTBEAT, pReq, contLen);
|
||||||
ASSERT_NE(pRsp, nullptr);
|
ASSERT_NE(pRsp, nullptr);
|
||||||
|
@ -160,11 +160,11 @@ TEST_F(DndTestProfile, 05_KillConnMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
int32_t contLen = sizeof(SConnectMsg);
|
int32_t contLen = sizeof(SConnectReq);
|
||||||
|
|
||||||
SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(contLen);
|
SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen);
|
||||||
pReq->pid = htonl(1234);
|
pReq->pid = htonl(1234);
|
||||||
strcpy(pReq->app, "dnode_test_profile");
|
strcpy(pReq->app, "mnode_test_profile");
|
||||||
strcpy(pReq->db, "");
|
strcpy(pReq->db, "");
|
||||||
|
|
||||||
SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
|
SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
|
||||||
|
@ -185,17 +185,17 @@ TEST_F(DndTestProfile, 05_KillConnMsg) {
|
||||||
|
|
||||||
EXPECT_EQ(pRsp->epSet.inUse, 0);
|
EXPECT_EQ(pRsp->epSet.inUse, 0);
|
||||||
EXPECT_EQ(pRsp->epSet.numOfEps, 1);
|
EXPECT_EQ(pRsp->epSet.numOfEps, 1);
|
||||||
EXPECT_EQ(pRsp->epSet.port[0], 9080);
|
EXPECT_EQ(pRsp->epSet.port[0], 9022);
|
||||||
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
|
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
|
||||||
|
|
||||||
connId = pRsp->connId;
|
connId = pRsp->connId;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DndTestProfile, 06_KillConnMsg_InvalidConn) {
|
TEST_F(MndTestProfile, 06_KillConnMsg_InvalidConn) {
|
||||||
int32_t contLen = sizeof(SKillConnMsg);
|
int32_t contLen = sizeof(SKillConnReq);
|
||||||
|
|
||||||
SKillConnMsg* pReq = (SKillConnMsg*)rpcMallocCont(contLen);
|
SKillConnReq* pReq = (SKillConnReq*)rpcMallocCont(contLen);
|
||||||
pReq->connId = htonl(2345);
|
pReq->connId = htonl(2345);
|
||||||
|
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_KILL_CONN, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_KILL_CONN, pReq, contLen);
|
||||||
|
@ -203,11 +203,11 @@ TEST_F(DndTestProfile, 06_KillConnMsg_InvalidConn) {
|
||||||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_CONN_ID);
|
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_CONN_ID);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DndTestProfile, 07_KillQueryMsg) {
|
TEST_F(MndTestProfile, 07_KillQueryMsg) {
|
||||||
{
|
{
|
||||||
int32_t contLen = sizeof(SKillQueryMsg);
|
int32_t contLen = sizeof(SKillQueryReq);
|
||||||
|
|
||||||
SKillQueryMsg* pReq = (SKillQueryMsg*)rpcMallocCont(contLen);
|
SKillQueryReq* pReq = (SKillQueryReq*)rpcMallocCont(contLen);
|
||||||
pReq->connId = htonl(connId);
|
pReq->connId = htonl(connId);
|
||||||
pReq->queryId = htonl(1234);
|
pReq->queryId = htonl(1234);
|
||||||
|
|
||||||
|
@ -218,14 +218,14 @@ TEST_F(DndTestProfile, 07_KillQueryMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
int32_t contLen = sizeof(SHeartBeatMsg);
|
int32_t contLen = sizeof(SHeartBeatReq);
|
||||||
|
|
||||||
SHeartBeatMsg* pReq = (SHeartBeatMsg*)rpcMallocCont(contLen);
|
SHeartBeatReq* pReq = (SHeartBeatReq*)rpcMallocCont(contLen);
|
||||||
pReq->connId = htonl(connId);
|
pReq->connId = htonl(connId);
|
||||||
pReq->pid = htonl(1234);
|
pReq->pid = htonl(1234);
|
||||||
pReq->numOfQueries = htonl(0);
|
pReq->numOfQueries = htonl(0);
|
||||||
pReq->numOfStreams = htonl(0);
|
pReq->numOfStreams = htonl(0);
|
||||||
strcpy(pReq->app, "dnode_test_profile");
|
strcpy(pReq->app, "mnode_test_profile");
|
||||||
|
|
||||||
SRpcMsg* pMsg = test.SendReq(TDMT_MND_HEARTBEAT, pReq, contLen);
|
SRpcMsg* pMsg = test.SendReq(TDMT_MND_HEARTBEAT, pReq, contLen);
|
||||||
ASSERT_NE(pMsg, nullptr);
|
ASSERT_NE(pMsg, nullptr);
|
||||||
|
@ -249,15 +249,15 @@ TEST_F(DndTestProfile, 07_KillQueryMsg) {
|
||||||
|
|
||||||
EXPECT_EQ(pRsp->epSet.inUse, 0);
|
EXPECT_EQ(pRsp->epSet.inUse, 0);
|
||||||
EXPECT_EQ(pRsp->epSet.numOfEps, 1);
|
EXPECT_EQ(pRsp->epSet.numOfEps, 1);
|
||||||
EXPECT_EQ(pRsp->epSet.port[0], 9080);
|
EXPECT_EQ(pRsp->epSet.port[0], 9022);
|
||||||
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
|
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DndTestProfile, 08_KillQueryMsg_InvalidConn) {
|
TEST_F(MndTestProfile, 08_KillQueryMsg_InvalidConn) {
|
||||||
int32_t contLen = sizeof(SKillQueryMsg);
|
int32_t contLen = sizeof(SKillQueryReq);
|
||||||
|
|
||||||
SKillQueryMsg* pReq = (SKillQueryMsg*)rpcMallocCont(contLen);
|
SKillQueryReq* pReq = (SKillQueryReq*)rpcMallocCont(contLen);
|
||||||
pReq->connId = htonl(2345);
|
pReq->connId = htonl(2345);
|
||||||
pReq->queryId = htonl(1234);
|
pReq->queryId = htonl(1234);
|
||||||
|
|
||||||
|
@ -266,7 +266,7 @@ TEST_F(DndTestProfile, 08_KillQueryMsg_InvalidConn) {
|
||||||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_CONN_ID);
|
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_CONN_ID);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DndTestProfile, 09_KillQueryMsg) {
|
TEST_F(MndTestProfile, 09_KillQueryMsg) {
|
||||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_QUERIES, "");
|
test.SendShowMetaReq(TSDB_MGMT_TABLE_QUERIES, "");
|
||||||
CHECK_META("show queries", 14);
|
CHECK_META("show queries", 14);
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
|
|
||||||
class MndTestShow : public ::testing::Test {
|
class MndTestShow : public ::testing::Test {
|
||||||
protected:
|
protected:
|
||||||
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_show", 9020); }
|
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_show", 9021); }
|
||||||
static void TearDownTestSuite() { test.Cleanup(); }
|
static void TearDownTestSuite() { test.Cleanup(); }
|
||||||
|
|
||||||
static Testbase test;
|
static Testbase test;
|
||||||
|
@ -50,9 +50,9 @@ TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(MndTestShow, 03_ShowMsg_Conn) {
|
TEST_F(MndTestShow, 03_ShowMsg_Conn) {
|
||||||
int32_t contLen = sizeof(SConnectMsg);
|
int32_t contLen = sizeof(SConnectReq);
|
||||||
|
|
||||||
SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(contLen);
|
SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen);
|
||||||
pReq->pid = htonl(1234);
|
pReq->pid = htonl(1234);
|
||||||
strcpy(pReq->app, "mnode_test_show");
|
strcpy(pReq->app, "mnode_test_show");
|
||||||
strcpy(pReq->db, "");
|
strcpy(pReq->db, "");
|
||||||
|
|
|
@ -226,7 +226,7 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
|
int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
|
||||||
SHeartBeatMsg *pHeartbeat = pMsg;
|
SHeartBeatReq *pHeartbeat = pMsg;
|
||||||
|
|
||||||
int allocedQueriesNum = pHeartbeat->numOfQueries;
|
int allocedQueriesNum = pHeartbeat->numOfQueries;
|
||||||
int allocedStreamsNum = pHeartbeat->numOfStreams;
|
int allocedStreamsNum = pHeartbeat->numOfStreams;
|
||||||
|
@ -327,7 +327,7 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t msgLen = pHeartbeat->numOfQueries * sizeof(SQueryDesc) + pHeartbeat->numOfStreams * sizeof(SStreamDesc) +
|
int32_t msgLen = pHeartbeat->numOfQueries * sizeof(SQueryDesc) + pHeartbeat->numOfStreams * sizeof(SStreamDesc) +
|
||||||
sizeof(SHeartBeatMsg);
|
sizeof(SHeartBeatReq);
|
||||||
pHeartbeat->connId = htonl(pObj->connId);
|
pHeartbeat->connId = htonl(pObj->connId);
|
||||||
pHeartbeat->numOfQueries = htonl(pHeartbeat->numOfQueries);
|
pHeartbeat->numOfQueries = htonl(pHeartbeat->numOfQueries);
|
||||||
pHeartbeat->numOfStreams = htonl(pHeartbeat->numOfStreams);
|
pHeartbeat->numOfStreams = htonl(pHeartbeat->numOfStreams);
|
||||||
|
|
|
@ -1469,7 +1469,7 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
|
|
||||||
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
pCmd->payloadLen = sizeof(SKillQueryMsg);
|
pCmd->payloadLen = sizeof(SKillQueryReq);
|
||||||
|
|
||||||
switch (pCmd->command) {
|
switch (pCmd->command) {
|
||||||
case TSDB_SQL_KILL_QUERY:
|
case TSDB_SQL_KILL_QUERY:
|
||||||
|
@ -1862,14 +1862,14 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
STscObj *pObj = pSql->pTscObj;
|
STscObj *pObj = pSql->pTscObj;
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
pCmd->msgType = TDMT_MND_CONNECT;
|
pCmd->msgType = TDMT_MND_CONNECT;
|
||||||
pCmd->payloadLen = sizeof(SConnectMsg);
|
pCmd->payloadLen = sizeof(SConnectReq);
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
|
||||||
tscError("0x%"PRIx64" failed to malloc for query msg", pSql->self);
|
tscError("0x%"PRIx64" failed to malloc for query msg", pSql->self);
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
SConnectMsg *pConnect = (SConnectMsg*)pCmd->payload;
|
SConnectReq *pConnect = (SConnectReq*)pCmd->payload;
|
||||||
|
|
||||||
// TODO refactor full_name
|
// TODO refactor full_name
|
||||||
char *db; // ugly code to move the space
|
char *db; // ugly code to move the space
|
||||||
|
@ -1974,7 +1974,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
numOfStreams++;
|
numOfStreams++;
|
||||||
}
|
}
|
||||||
|
|
||||||
int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SHeartBeatMsg) + 100;
|
int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SHeartBeatReq) + 100;
|
||||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
||||||
pthread_mutex_unlock(&pObj->mutex);
|
pthread_mutex_unlock(&pObj->mutex);
|
||||||
tscError("0x%"PRIx64" failed to create heartbeat msg", pSql->self);
|
tscError("0x%"PRIx64" failed to create heartbeat msg", pSql->self);
|
||||||
|
@ -1982,7 +1982,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO the expired hb and client can not be identified by server till now.
|
// TODO the expired hb and client can not be identified by server till now.
|
||||||
SHeartBeatMsg *pHeartbeat = (SHeartBeatMsg *)pCmd->payload;
|
SHeartBeatReq *pHeartbeat = (SHeartBeatReq *)pCmd->payload;
|
||||||
tstrncpy(pHeartbeat->clientVer, version, tListLen(pHeartbeat->clientVer));
|
tstrncpy(pHeartbeat->clientVer, version, tListLen(pHeartbeat->clientVer));
|
||||||
|
|
||||||
pHeartbeat->numOfQueries = numOfQueries;
|
pHeartbeat->numOfQueries = numOfQueries;
|
||||||
|
|
|
@ -44,13 +44,20 @@ print $data10 $data11 $data12
|
||||||
|
|
||||||
print =============== create child table
|
print =============== create child table
|
||||||
sql create table c1 using st tags(1)
|
sql create table c1 using st tags(1)
|
||||||
sql create table c2 using st tags(2)
|
sql create table c2 using st tags(2)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
if $rows != 2 then
|
if $rows != 2 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql create table c3 using st tags(3) c4 using st tags(4) c5 using st tags(5) c6 using st tags(6) c7 using st tags(7)
|
||||||
|
|
||||||
|
sql show tables
|
||||||
|
if $rows != 7 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
print $data00 $data01 $data02
|
print $data00 $data01 $data02
|
||||||
print $data10 $data11 $data22
|
print $data10 $data11 $data22
|
||||||
print $data20 $data11 $data22
|
print $data20 $data11 $data22
|
||||||
|
|
Loading…
Reference in New Issue