chore: code optimization
This commit is contained in:
parent
387d611179
commit
067a8334fd
|
@ -64,7 +64,7 @@ typedef struct {
|
||||||
// statistics
|
// statistics
|
||||||
int32_t reportCnt;
|
int32_t reportCnt;
|
||||||
int32_t connKeyCnt;
|
int32_t connKeyCnt;
|
||||||
int8_t connHbFlag;
|
int8_t connHbFlag; // 0 init, 1 send req, 2 get resp
|
||||||
int64_t reportBytes; // not implemented
|
int64_t reportBytes; // not implemented
|
||||||
int64_t startTime;
|
int64_t startTime;
|
||||||
// ctl
|
// ctl
|
||||||
|
|
|
@ -21,11 +21,10 @@
|
||||||
#include "tsimplehash.h"
|
#include "tsimplehash.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SAppHbMgr *pAppHbMgr;
|
|
||||||
union {
|
union {
|
||||||
struct {
|
struct {
|
||||||
|
SAppHbMgr *pAppHbMgr;
|
||||||
int64_t clusterId;
|
int64_t clusterId;
|
||||||
// int32_t authVer;
|
|
||||||
int32_t reqCnt;
|
int32_t reqCnt;
|
||||||
int8_t connHbFlag;
|
int8_t connHbFlag;
|
||||||
};
|
};
|
||||||
|
@ -41,6 +40,47 @@ static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq
|
||||||
|
|
||||||
static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
|
static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
|
||||||
|
|
||||||
|
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp) {
|
||||||
|
SClientHbReq *pReq = NULL;
|
||||||
|
while ((pReq = taosHashIterate(pAppHbMgr->activeInfo, pReq))) {
|
||||||
|
STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid);
|
||||||
|
if (!pTscObj) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < TARRAY_SIZE(batchRsp->pArray); ++i) {
|
||||||
|
SGetUserAuthRsp *rsp = taosArrayGet(batchRsp->pArray, i);
|
||||||
|
|
||||||
|
if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) {
|
||||||
|
|
||||||
|
pTscObj->authVer = rsp->version;
|
||||||
|
|
||||||
|
if (pTscObj->sysInfo != rsp->sysInfo) {
|
||||||
|
tscDebug("update sysInfo of user %s from %" PRIi8 " to %" PRIi8 ", tscRid:%" PRIi64, rsp->user,
|
||||||
|
pTscObj->sysInfo, rsp->sysInfo, pTscObj->id);
|
||||||
|
pTscObj->sysInfo = rsp->sysInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTscObj->passInfo.fp) {
|
||||||
|
SPassInfo *passInfo = &pTscObj->passInfo;
|
||||||
|
int32_t oldVer = atomic_load_32(&passInfo->ver);
|
||||||
|
if (oldVer < rsp->passVer) {
|
||||||
|
atomic_store_32(&passInfo->ver, rsp->passVer);
|
||||||
|
if (passInfo->fp) {
|
||||||
|
(*passInfo->fp)(passInfo->param, &rsp->passVer, TAOS_NOTIFY_PASSVER);
|
||||||
|
}
|
||||||
|
tscDebug("update passVer of user %s from %d to %d, tscRid:%" PRIi64, rsp->user, oldVer,
|
||||||
|
atomic_load_32(&passInfo->ver), pTscObj->id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
releaseTscObj(pReq->connKey.tscRid);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog,
|
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog,
|
||||||
SAppHbMgr *pAppHbMgr) {
|
SAppHbMgr *pAppHbMgr) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -51,8 +91,6 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_val_compare_exchange_8(&pAppHbMgr->connHbFlag, 2, 0);
|
|
||||||
|
|
||||||
int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
|
int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
|
||||||
for (int32_t i = 0; i < numOfBatchs; ++i) {
|
for (int32_t i = 0; i < numOfBatchs; ++i) {
|
||||||
SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
|
SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
|
||||||
|
@ -61,48 +99,9 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC
|
||||||
catalogUpdateUserAuthInfo(pCatalog, rsp);
|
catalogUpdateUserAuthInfo(pCatalog, rsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
SClientHbReq *pReq = NULL;
|
if (numOfBatchs > 0) hbUpdateUserAuthInfo(pAppHbMgr, &batchRsp);
|
||||||
while ((pReq = taosHashIterate(pAppHbMgr->activeInfo, pReq))) {
|
|
||||||
STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid);
|
|
||||||
if (!pTscObj) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfBatchs; ++i) {
|
atomic_val_compare_exchange_8(&pAppHbMgr->connHbFlag, 1, 2);
|
||||||
SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
|
|
||||||
pTscObj->authVer = rsp->version;
|
|
||||||
|
|
||||||
if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) {
|
|
||||||
if (pTscObj->sysInfo != rsp->sysInfo) {
|
|
||||||
printf("update sysInfo of user %s from %" PRIi8 " to %" PRIi8 ", tscRid:%" PRIi64 "\n", rsp->user,
|
|
||||||
pTscObj->sysInfo, rsp->sysInfo, pTscObj->id);
|
|
||||||
pTscObj->sysInfo = rsp->sysInfo;
|
|
||||||
} else {
|
|
||||||
printf("not update sysInfo of user %s since not change: %" PRIi8 ", tscRid:%" PRIi64 "\n", rsp->user,
|
|
||||||
pTscObj->sysInfo, pTscObj->id);
|
|
||||||
}
|
|
||||||
|
|
||||||
SPassInfo *passInfo = &pTscObj->passInfo;
|
|
||||||
if (passInfo->fp) {
|
|
||||||
int32_t oldVer = atomic_load_32(&passInfo->ver);
|
|
||||||
if (oldVer < rsp->passVer) {
|
|
||||||
atomic_store_32(&passInfo->ver, rsp->passVer);
|
|
||||||
if (passInfo->fp) {
|
|
||||||
(*passInfo->fp)(passInfo->param, &passInfo->ver, TAOS_NOTIFY_PASSVER);
|
|
||||||
}
|
|
||||||
printf("update passVer of user %s from %d to %d, tscRid:%" PRIi64 "\n", rsp->user, oldVer,
|
|
||||||
atomic_load_32(&passInfo->ver), pTscObj->id);
|
|
||||||
} else {
|
|
||||||
printf("not update passVer of user %s since not changed: %d, tscRid:%" PRIi64 "\n", rsp->user, oldVer,
|
|
||||||
pTscObj->id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
releaseTscObj(pReq->connKey.tscRid);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayDestroy(batchRsp.pArray);
|
taosArrayDestroy(batchRsp.pArray);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -548,16 +547,16 @@ static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClient
|
||||||
SKv *pKv = NULL;
|
SKv *pKv = NULL;
|
||||||
if ((pKv = taosHashGet(req->info, &kv.key, sizeof(kv.key)))) {
|
if ((pKv = taosHashGet(req->info, &kv.key, sizeof(kv.key)))) {
|
||||||
int32_t userNum = pKv->valueLen / sizeof(SUserAuthVersion);
|
int32_t userNum = pKv->valueLen / sizeof(SUserAuthVersion);
|
||||||
SUserAuthVersion *pUserAuths = (SUserAuthVersion *)pKv->value;
|
SUserAuthVersion *userAuths = (SUserAuthVersion *)pKv->value;
|
||||||
for (int32_t i = 0; i < userNum; ++i) {
|
for (int32_t i = 0; i < userNum; ++i) {
|
||||||
SUserAuthVersion *pUserAuth = pUserAuths + i;
|
SUserAuthVersion *pUserAuth = userAuths + i;
|
||||||
// both key and user exist
|
// both key and user exist, update version
|
||||||
if (strncmp(pUserAuth->user, pTscObj->user, TSDB_USER_LEN) == 0) {
|
if (strncmp(pUserAuth->user, pTscObj->user, TSDB_USER_LEN) == 0) {
|
||||||
pUserAuth->version = htonl(-1); // force get userAuthInfo
|
pUserAuth->version = htonl(-1); // force get userAuthInfo
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// key exists, user not exist
|
// key exists, user not exist, append user
|
||||||
SUserAuthVersion *qUserAuth =
|
SUserAuthVersion *qUserAuth =
|
||||||
(SUserAuthVersion *)taosMemoryRealloc(pKv->value, (userNum + 1) * sizeof(SUserAuthVersion));
|
(SUserAuthVersion *)taosMemoryRealloc(pKv->value, (userNum + 1) * sizeof(SUserAuthVersion));
|
||||||
if (qUserAuth) {
|
if (qUserAuth) {
|
||||||
|
@ -571,6 +570,7 @@ static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClient
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// key/user not exist, add user
|
||||||
SUserAuthVersion *user = taosMemoryMalloc(sizeof(SUserAuthVersion));
|
SUserAuthVersion *user = taosMemoryMalloc(sizeof(SUserAuthVersion));
|
||||||
if (!user) {
|
if (!user) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -774,12 +774,12 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
|
||||||
}
|
}
|
||||||
|
|
||||||
// invoke after hbGetExpiredUserInfo
|
// invoke after hbGetExpiredUserInfo
|
||||||
if (atomic_load_8(&hbParam->pAppHbMgr->connHbFlag)) {
|
if (2 != atomic_load_8(&hbParam->pAppHbMgr->connHbFlag)) {
|
||||||
code = hbGetUserAuthInfo(connKey, hbParam, req);
|
code = hbGetUserAuthInfo(connKey, hbParam, req);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
atomic_store_8(&hbParam->pAppHbMgr->connHbFlag, 2);
|
atomic_store_8(&hbParam->pAppHbMgr->connHbFlag, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = hbGetExpiredDBInfo(connKey, pCatalog, req);
|
code = hbGetExpiredDBInfo(connKey, pCatalog, req);
|
||||||
|
@ -923,11 +923,10 @@ static void *hbThreadFunc(void *param) {
|
||||||
if (sz > 0) {
|
if (sz > 0) {
|
||||||
hbGatherAppInfo();
|
hbGatherAppInfo();
|
||||||
if (sz > 1 && !clientHbMgr.appHbHash) {
|
if (sz > 1 && !clientHbMgr.appHbHash) {
|
||||||
clientHbMgr.appHbHash = tSimpleHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
clientHbMgr.appHbHash = tSimpleHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
tSimpleHashClear(clientHbMgr.appHbHash);
|
tSimpleHashClear(clientHbMgr.appHbHash);
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < sz; i++) {
|
for (int i = 0; i < sz; i++) {
|
||||||
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
|
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
|
||||||
|
@ -979,7 +978,7 @@ static void *hbThreadFunc(void *param) {
|
||||||
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
|
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
|
||||||
tFreeClientHbBatchReq(pReq);
|
tFreeClientHbBatchReq(pReq);
|
||||||
// hbClearReqInfo(pAppHbMgr);
|
// hbClearReqInfo(pAppHbMgr);
|
||||||
tSimpleHashPut(clientHbMgr.appHbHash, &pAppHbMgr->pAppInstInfo->clusterId, sizeof(int64_t), NULL, 0);
|
tSimpleHashPut(clientHbMgr.appHbHash, &pAppHbMgr->pAppInstInfo->clusterId, sizeof(uint64_t), NULL, 0);
|
||||||
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
|
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1036,7 +1035,7 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
|
||||||
// init stat
|
// init stat
|
||||||
pAppHbMgr->startTime = taosGetTimestampMs();
|
pAppHbMgr->startTime = taosGetTimestampMs();
|
||||||
pAppHbMgr->connKeyCnt = 0;
|
pAppHbMgr->connKeyCnt = 0;
|
||||||
pAppHbMgr->connHbFlag = 1;
|
pAppHbMgr->connHbFlag = 0;
|
||||||
pAppHbMgr->reportCnt = 0;
|
pAppHbMgr->reportCnt = 0;
|
||||||
pAppHbMgr->reportBytes = 0;
|
pAppHbMgr->reportBytes = 0;
|
||||||
pAppHbMgr->key = taosStrdup(key);
|
pAppHbMgr->key = taosStrdup(key);
|
||||||
|
|
|
@ -1382,7 +1382,7 @@ int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_
|
||||||
|
|
||||||
pUsers[i].version = ntohl(pUsers[i].version);
|
pUsers[i].version = ntohl(pUsers[i].version);
|
||||||
if (pUser->authVersion <= pUsers[i].version) {
|
if (pUser->authVersion <= pUsers[i].version) {
|
||||||
mTrace("pUser->authVersion:%d <= pUsers[i].version:%d", pUser->authVersion, pUsers[i].version);
|
printf("pUser->authVersion:%d <= pUsers[i].version:%d\n", pUser->authVersion, pUsers[i].version);
|
||||||
mndReleaseUser(pMnode, pUser);
|
mndReleaseUser(pMnode, pUser);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -166,11 +166,11 @@ void passVerTestMulti(const char *host, char *qstr) {
|
||||||
// calculate the nPassVerNotified for root and users
|
// calculate the nPassVerNotified for root and users
|
||||||
int nConn = nRoot + nUser;
|
int nConn = nRoot + nUser;
|
||||||
|
|
||||||
for (int i = 1; i < 15; ++i) {
|
for (int i = 0; i < 15; ++i) {
|
||||||
|
printf("%s:%d [%d] second(s) elasped, passVer notification received:%d, total:%d\n", __func__, __LINE__, i,
|
||||||
|
nPassVerNotified, nConn);
|
||||||
if (nPassVerNotified >= nConn) break;
|
if (nPassVerNotified >= nConn) break;
|
||||||
sleep(1);
|
sleep(1);
|
||||||
printf("%s:%d %d second(s) elasped, passVer notification received:%d, total:%d\n", __func__, __LINE__, i,
|
|
||||||
nPassVerNotified, nConn);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// close the taos_conn
|
// close the taos_conn
|
||||||
|
|
Loading…
Reference in New Issue