From 8ff4a07d7bb8c5cdec5582a10f6b566c9bd94e39 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 14 Dec 2021 10:32:39 +0800 Subject: [PATCH] [td-10564] remove unused attributes in user structure, add new log for performance metric. --- include/common/taosmsg.h | 15 +++++----- source/client/inc/clientInt.h | 4 +-- source/client/src/clientImpl.c | 30 ++++---------------- source/client/src/clientmain.c | 8 ------ source/client/src/tscEnv.c | 35 ++++++++++++------------ source/dnode/mgmt/impl/src/dndDnode.c | 1 + source/dnode/mnode/impl/inc/mndDef.h | 4 +-- source/dnode/mnode/impl/src/mndProfile.c | 30 ++++++++++---------- source/dnode/mnode/impl/src/mndUser.c | 34 ++++++++--------------- source/libs/transport/src/rpcMain.c | 2 -- 10 files changed, 62 insertions(+), 101 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index b4bbab376b..45f9514cf0 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -358,6 +358,7 @@ typedef struct { int32_t pid; char app[TSDB_APP_NAME_LEN]; char db[TSDB_DB_NAME_LEN]; + int64_t startTime; } SConnectMsg; typedef struct SEpSet { @@ -368,14 +369,12 @@ typedef struct SEpSet { } SEpSet; typedef struct { - int32_t acctId; - int32_t clusterId; - int32_t connId; - int8_t superAuth; - int8_t readAuth; - int8_t writeAuth; - int8_t reserved[5]; - SEpSet epSet; + int32_t acctId; + uint32_t clusterId; + int32_t connId; + int8_t superUser; + int8_t reserved[5]; + SEpSet epSet; } SConnectRsp; typedef struct { diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 46620abcf2..749894444e 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -117,8 +117,6 @@ extern SAppInfo appInfo; extern int32_t tscReqRef; extern void *tscQhandle; extern int32_t tscConnRef; -extern void *tscRpcCache; -extern pthread_mutex_t rpcObjMutex; extern int (*tscBuildMsg[TSDB_SQL_MAX])(SRequestObj *pRequest, SRequestMsgBody *pMsg); extern int (*handleRequestRspFp[TSDB_SQL_MAX])(SRequestObj *pRequest, const char* pMsg, int32_t msgLen); @@ -126,7 +124,7 @@ extern int (*handleRequestRspFp[TSDB_SQL_MAX])(SRequestObj *pRequest, const char int taos_init(); void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t port, SAppInstInfo* pAppInfo); -void destroyTscObj(void* pTscObj); +void destroyTscObj(void*pObj); void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type); void destroyRequest(void* p); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index e81494ae6f..0f9b40e262 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -201,11 +201,10 @@ static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) tstrncpy(pConnect->db, db, sizeof(pConnect->db)); pthread_mutex_unlock(&pObj->mutex); -// tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion)); -// tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion)); + pConnect->pid = htonl(appInfo.pid); + pConnect->startTime = htobe64(appInfo.startTime); + tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app)); -// pConnect->pid = htonl(taosGetPId()); -// taosGetCurrentAPPName(pConnect->appName, NULL); pMsgBody->pData = pConnect; return 0; } @@ -232,23 +231,6 @@ int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody return TSDB_CODE_SUCCESS; } -// -//int tscBuildAndSendRequest(SRequestObj *pRequest) { -// assert(pRequest != NULL); -// char name[TSDB_TABLE_FNAME_LEN] = {0}; -// -// uint32_t type = 0; -// tscDebug("0x%"PRIx64" SQL cmd:%s will be processed, name:%s, type:%d", pRequest->requestId, taosMsg[pRequest->type], name, type); -// if (pRequest->type < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL -// -// } else if (pCmd->command >= TSDB_SQL_LOCAL) { -// return (*tscProcessMsgRsp[pCmd->command])(pSql); -// } -// -// return buildConnectMsg(pRequest); -//} - - void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { int64_t requestRefId = (int64_t)pMsg->ahandle; @@ -275,13 +257,13 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { * The actual inserted number of points is the first number. */ if (pMsg->code == TSDB_CODE_SUCCESS) { - tscDebug("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d", pRequest->requestId, taosMsg[pRequest->type], - tstrerror(pMsg->code), pMsg->contLen); + tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%"PRId64 " ms", pRequest->requestId, taosMsg[pMsg->msgType], + tstrerror(pMsg->code), pMsg->contLen, pRequest->metric.rsp - pRequest->metric.start); if (handleRequestRspFp[pRequest->type]) { pMsg->code = (*handleRequestRspFp[pRequest->type])(pRequest, pMsg->pCont, pMsg->contLen); } } else { - tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d", pRequest->requestId, taosMsg[pRequest->type], + tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d", pRequest->requestId, taosMsg[pMsg->msgType], tstrerror(pMsg->code), pMsg->contLen); } diff --git a/source/client/src/clientmain.c b/source/client/src/clientmain.c index ce3ef41f18..bda2311f97 100644 --- a/source/client/src/clientmain.c +++ b/source/client/src/clientmain.c @@ -61,14 +61,6 @@ void taos_cleanup(void) { tscConnRef = -1; taosCloseRef(id); - p = tscRpcCache; - tscRpcCache = NULL; - - if (p != NULL) { - taosCacheCleanup(p); - pthread_mutex_destroy(&rpcObjMutex); - } - rpcCleanup(); taosCloseLog(); } diff --git a/source/client/src/tscEnv.c b/source/client/src/tscEnv.c index c3d52cfe9e..7565dd0c53 100644 --- a/source/client/src/tscEnv.c +++ b/source/client/src/tscEnv.c @@ -34,7 +34,6 @@ SAppInfo appInfo; int32_t tscReqRef = -1; int32_t tscConnRef = -1; void *tscQhandle = NULL; -void *tscRpcCache= NULL; // TODO removed from here. int32_t tsNumOfThreads = 1; @@ -55,7 +54,7 @@ static void registerRequest(SRequestObj* pRequest) { int32_t total = atomic_add_fetch_32(&pActivity->totalRequests, 1); int32_t currentInst = atomic_add_fetch_32(&pActivity->currentRequests, 1); - tscDebug("0x%" PRIx64 " new Request from 0x%" PRIx64 ", current:%d, app current:%d, total:%d", pRequest->self, + tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d", pRequest->self, pRequest->pTscObj->id, num, currentInst, total); } } @@ -100,12 +99,14 @@ void tscFreeRpcObj(void *param) { #endif } -void tscReleaseRpc(void *param) { - if (param == NULL) { +void closeTransporter(STscObj* pTscObj) { + if (pTscObj == NULL || pTscObj->pTransporter == NULL) { return; } - taosCacheRelease(tscRpcCache, (void *)¶m, false); + tscDebug("free transporter:%p in connObj: 0x%"PRIx64, pTscObj->pTransporter, pTscObj->id); + rpcClose(pTscObj->pTransporter); + pTscObj->pTransporter = NULL; } // TODO refactor @@ -133,11 +134,13 @@ void* openTransporter(const char *user, const char *auth) { return pDnodeConn; } -void destroyTscObj(void *pTscObj) { - STscObj *pObj = pTscObj; -// tscReleaseRpc(pObj->pRpcObj); - pthread_mutex_destroy(&pObj->mutex); - tfree(pObj); +void destroyTscObj(void *pObj) { + STscObj *pTscObj = pObj; + tscDebug("connect obj destroyed, 0x%"PRIx64, pTscObj->id); + + closeTransporter(pTscObj); + pthread_mutex_destroy(&pTscObj->mutex); + tfree(pTscObj); } void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t port, SAppInstInfo* pAppInfo) { @@ -157,6 +160,9 @@ void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t pthread_mutex_init(&pObj->mutex, NULL); pObj->id = taosAddRef(tscConnRef, pObj); + + tscDebug("connect obj created, 0x%"PRIx64, pObj->id); + return pObj; } void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type) { @@ -219,7 +225,7 @@ void taos_init_imp(void) { rpcInit(); - tscDebug("starting to initialize TAOS client ...\nLocal End Point is:%s", tsLocalEp); + tscDebug("starting to initialize TAOS driver, local ep: %s", tsLocalEp); taosSetCoreDump(true); @@ -234,12 +240,7 @@ void taos_init_imp(void) { return; } - tscDebug("client task queue is initialized, numOfWorkers: %d", numOfThreads); - - int refreshTime = 5; - tscRpcCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, true, tscFreeRpcObj, "rpcObj"); - pthread_mutex_init(&rpcObjMutex, NULL); - + tscDebug("client task queue is initialized, numOfThreads: %d", numOfThreads); tscConnRef = taosOpenRef(200, destroyTscObj); tscReqRef = taosOpenRef(40960, destroyRequest); diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 130cbbef07..9e2d4d4c9b 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -17,6 +17,7 @@ #include "dndDnode.h" #include "dndTransport.h" #include "dndVnodes.h" +#include "tep.h" int32_t dndGetDnodeId(SDnode *pDnode) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 426daf5298..5ddde3181e 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -184,9 +184,7 @@ typedef struct SUserObj { char acct[TSDB_USER_LEN]; int64_t createdTime; int64_t updateTime; - int8_t superAuth; - int8_t readAuth; - int8_t writeAuth; + int8_t superUser; int32_t acctId; SHashObj *prohibitDbHash; } SUserObj; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index b429879789..59d84e5760 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -29,6 +29,7 @@ typedef struct { char user[TSDB_USER_LEN]; char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc int32_t pid; // pid of app that invokes taosc + int64_t appStartTime; // app start time int32_t id; int8_t killed; int8_t align; @@ -44,7 +45,7 @@ typedef struct { SQueryDesc *pQueries; } SConnObj; -static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app); +static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app, int64_t startTime); static void mndFreeConn(SConnObj *pConn); static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId); static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn); @@ -102,13 +103,14 @@ void mndCleanupProfile(SMnode *pMnode) { } } -static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app) { +static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app, int64_t startTime) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1); if (connId == 0) atomic_add_fetch_32(&pMgmt->connId, 1); SConnObj connObj = {.pid = pid, + .appStartTime = startTime, .id = connId, .killed = 0, .port = port, @@ -195,6 +197,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SConnectMsg *pReq = pMsg->rpcMsg.pCont; pReq->pid = htonl(pReq->pid); + pReq->startTime = htobe64(pReq->startTime); SRpcConnInfo info = {0}; if (rpcGetConnInfo(pMsg->rpcMsg.handle, &info) != 0) { @@ -216,7 +219,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) { mndReleaseDb(pMnode, pDb); } - SConnObj *pConn = mndCreateConn(pMnode, info.user, info.clientIp, info.clientPort, pReq->pid, pReq->app); + SConnObj *pConn = mndCreateConn(pMnode, info.user, info.clientIp, info.clientPort, pReq->pid, pReq->app, pReq->startTime); if (pConn == NULL) { mError("user:%s, failed to login from %s while create connection since %s", pMsg->user, ip, terrstr()); return -1; @@ -233,9 +236,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) { SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); if (pUser != NULL) { pRsp->acctId = htonl(pUser->acctId); - pRsp->superAuth = pUser->superAuth; - pRsp->readAuth = pUser->readAuth; - pRsp->writeAuth = pUser->writeAuth; + pRsp->superUser = pUser->superUser; mndReleaseUser(pMnode, pUser); } @@ -246,7 +247,8 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) { pMsg->contLen = sizeof(SConnectRsp); pMsg->pCont = pRsp; - mDebug("user:%s, login from %s, conn:%d", info.user, ip, pConn->id); + + mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pReq->app); return 0; } @@ -301,7 +303,7 @@ static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) { SConnObj *pConn = mndAcquireConn(pMnode, pReq->connId); if (pConn == NULL) { - pConn = mndCreateConn(pMnode, info.user, info.clientIp, info.clientPort, pReq->pid, pReq->app); + pConn = mndCreateConn(pMnode, info.user, info.clientIp, info.clientPort, pReq->pid, pReq->app, 0); if (pConn == NULL) { mError("user:%s, conn:%d is freed and failed to create new conn since %s", pMsg->user, pReq->connId, terrstr()); return -1; @@ -368,7 +370,7 @@ static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg) { SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); if (pUser == NULL) return 0; - if (!pUser->superAuth) { + if (!pUser->superUser) { mndReleaseUser(pMnode, pUser); terrno = TSDB_CODE_MND_NO_RIGHTS; return -1; @@ -399,7 +401,7 @@ static int32_t mndProcessKillStreamMsg(SMnodeMsg *pMsg) { SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); if (pUser == NULL) return 0; - if (!pUser->superAuth) { + if (!pUser->superUser) { mndReleaseUser(pMnode, pUser); terrno = TSDB_CODE_MND_NO_RIGHTS; return -1; @@ -430,7 +432,7 @@ static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg) { SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); if (pUser == NULL) return 0; - if (!pUser->superAuth) { + if (!pUser->superUser) { mndReleaseUser(pMnode, pUser); terrno = TSDB_CODE_MND_NO_RIGHTS; return -1; @@ -459,7 +461,7 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg * SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); if (pUser == NULL) return 0; - if (!pUser->superAuth) { + if (!pUser->superUser) { mndReleaseUser(pMnode, pUser); terrno = TSDB_CODE_MND_NO_RIGHTS; return -1; @@ -587,7 +589,7 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg * SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); if (pUser == NULL) return 0; - if (!pUser->superAuth) { + if (!pUser->superUser) { mndReleaseUser(pMnode, pUser); terrno = TSDB_CODE_MND_NO_RIGHTS; return -1; @@ -803,7 +805,7 @@ static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); if (pUser == NULL) return 0; - if (!pUser->superAuth) { + if (!pUser->superUser) { mndReleaseUser(pMnode, pUser); terrno = TSDB_CODE_MND_NO_RIGHTS; return -1; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index ef823a5f0b..e0c8c21c72 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -65,11 +65,9 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char taosEncryptPass((uint8_t *)pass, strlen(pass), userObj.pass); userObj.createdTime = taosGetTimestampMs(); userObj.updateTime = userObj.createdTime; - userObj.readAuth = 1; - userObj.writeAuth = 1; if (strcmp(user, TSDB_DEFAULT_USER) == 0) { - userObj.superAuth = 1; + userObj.superUser = 1; } SSdbRaw *pRaw = mndUserActionEncode(&userObj); @@ -102,9 +100,7 @@ static SSdbRaw *mndUserActionEncode(SUserObj *pUser) { SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN) SDB_SET_INT64(pRaw, dataPos, pUser->createdTime) SDB_SET_INT64(pRaw, dataPos, pUser->updateTime) - SDB_SET_INT8(pRaw, dataPos, pUser->superAuth) - SDB_SET_INT8(pRaw, dataPos, pUser->readAuth) - SDB_SET_INT8(pRaw, dataPos, pUser->writeAuth) + SDB_SET_INT8(pRaw, dataPos, pUser->superUser) SDB_SET_DATALEN(pRaw, dataPos); return pRaw; @@ -130,9 +126,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->acct, TSDB_USER_LEN) SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->createdTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->updateTime) - SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->superAuth) - SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->readAuth) - SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->writeAuth) + SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->superUser) return pRow; } @@ -175,9 +169,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOldUser, SUserObj *pNe memcpy(pOldUser->acct, pNewUser->acct, TSDB_USER_LEN); pOldUser->createdTime = pNewUser->createdTime; pOldUser->updateTime = pNewUser->updateTime; - pOldUser->superAuth = pNewUser->superAuth; - pOldUser->readAuth = pNewUser->readAuth; - pOldUser->writeAuth = pNewUser->writeAuth; + pOldUser->superUser = pNewUser->superUser; return 0; } @@ -198,9 +190,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, taosEncryptPass((uint8_t *)pass, strlen(pass), userObj.pass); userObj.createdTime = taosGetTimestampMs(); userObj.updateTime = userObj.createdTime; - userObj.superAuth = 0; - userObj.readAuth = 1; - userObj.writeAuth = 1; + userObj.superUser = 0; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); if (pTrans == NULL) { @@ -513,15 +503,15 @@ static int32_t mndRetrieveUsers(SMnodeMsg *pMsg, SShowObj *pShow, char *data, in cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - if (pUser->superAuth) { + if (pUser->superUser) { const char *src = "super"; STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src)); - } else if (pUser->writeAuth) { - const char *src = "writable"; - STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src)); - } else { - const char *src = "readable"; - STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src)); +// } else if (pUser->writeAuth) { +// const char *src = "writable"; +// STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src)); +// } else { +// const char *src = "readable"; +// STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src)); } cols++; diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 67e468b1bd..c54415860a 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -229,8 +229,6 @@ static void rpcInitImp(void) { tsRpcOverhead = sizeof(SRpcReqContext); tsRpcRefId = taosOpenRef(200, rpcFree); - - return 0; } int32_t rpcInit(void) {