chore: code optimization
This commit is contained in:
parent
067a8334fd
commit
6b0fc4aa1e
|
@ -33,7 +33,6 @@ extern "C" {
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "tmsgtype.h"
|
#include "tmsgtype.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "tsimplehash.h"
|
|
||||||
|
|
||||||
#include "tconfig.h"
|
#include "tconfig.h"
|
||||||
|
|
||||||
|
@ -86,7 +85,7 @@ typedef struct {
|
||||||
TdThread thread;
|
TdThread thread;
|
||||||
TdThreadMutex lock; // used when app init and cleanup
|
TdThreadMutex lock; // used when app init and cleanup
|
||||||
SHashObj* appSummary;
|
SHashObj* appSummary;
|
||||||
SSHashObj* appHbHash; // key: clusterId
|
SHashObj* appHbHash; // key: clusterId
|
||||||
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
|
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
|
||||||
FHbReqHandle reqHandle[CONN_TYPE__MAX];
|
FHbReqHandle reqHandle[CONN_TYPE__MAX];
|
||||||
FHbRspHandle rspHandle[CONN_TYPE__MAX];
|
FHbRspHandle rspHandle[CONN_TYPE__MAX];
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
#include "clientLog.h"
|
#include "clientLog.h"
|
||||||
#include "scheduler.h"
|
#include "scheduler.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "tsimplehash.h"
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
union {
|
union {
|
||||||
|
@ -35,11 +34,38 @@ static SClientHbMgr clientHbMgr = {0};
|
||||||
|
|
||||||
static int32_t hbCreateThread();
|
static int32_t hbCreateThread();
|
||||||
static void hbStopThread();
|
static void hbStopThread();
|
||||||
|
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp);
|
||||||
|
|
||||||
static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; }
|
static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; }
|
||||||
|
|
||||||
static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
|
static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
|
||||||
|
|
||||||
|
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog,
|
||||||
|
SAppHbMgr *pAppHbMgr) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
SUserAuthBatchRsp batchRsp = {0};
|
||||||
|
if (tDeserializeSUserAuthBatchRsp(value, valueLen, &batchRsp) != 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
|
||||||
|
for (int32_t i = 0; i < numOfBatchs; ++i) {
|
||||||
|
SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
|
||||||
|
tscDebug("hb user auth rsp, user:%s, version:%d", rsp->user, rsp->version);
|
||||||
|
|
||||||
|
catalogUpdateUserAuthInfo(pCatalog, rsp);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (numOfBatchs > 0) hbUpdateUserAuthInfo(pAppHbMgr, &batchRsp);
|
||||||
|
|
||||||
|
atomic_val_compare_exchange_8(&pAppHbMgr->connHbFlag, 1, 2);
|
||||||
|
|
||||||
|
taosArrayDestroy(batchRsp.pArray);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp) {
|
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp) {
|
||||||
SClientHbReq *pReq = NULL;
|
SClientHbReq *pReq = NULL;
|
||||||
while ((pReq = taosHashIterate(pAppHbMgr->activeInfo, pReq))) {
|
while ((pReq = taosHashIterate(pAppHbMgr->activeInfo, pReq))) {
|
||||||
|
@ -81,32 +107,6 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog,
|
|
||||||
SAppHbMgr *pAppHbMgr) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
SUserAuthBatchRsp batchRsp = {0};
|
|
||||||
if (tDeserializeSUserAuthBatchRsp(value, valueLen, &batchRsp) != 0) {
|
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
|
|
||||||
for (int32_t i = 0; i < numOfBatchs; ++i) {
|
|
||||||
SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
|
|
||||||
tscDebug("hb user auth rsp, user:%s, version:%d", rsp->user, rsp->version);
|
|
||||||
|
|
||||||
catalogUpdateUserAuthInfo(pCatalog, rsp);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (numOfBatchs > 0) hbUpdateUserAuthInfo(pAppHbMgr, &batchRsp);
|
|
||||||
|
|
||||||
atomic_val_compare_exchange_8(&pAppHbMgr->connHbFlag, 1, 2);
|
|
||||||
|
|
||||||
taosArrayDestroy(batchRsp.pArray);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
|
static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
|
SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
|
||||||
|
@ -612,22 +612,12 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
|
|
||||||
if (NULL == pTscObj) {
|
|
||||||
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
|
|
||||||
return TSDB_CODE_APP_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (userNum <= 0) {
|
if (userNum <= 0) {
|
||||||
// printf("%s:%d hb get expired: [User:%s] NNNNo user since user num: %d, second:%d\n", __func__, __LINE__, pTscObj->user, userNum,
|
|
||||||
// taosGetTimestampSec());
|
|
||||||
taosMemoryFree(users);
|
taosMemoryFree(users);
|
||||||
releaseTscObj(connKey->tscRid);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
releaseTscObj(connKey->tscRid);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < userNum; ++i) {
|
for (int32_t i = 0; i < userNum; ++i) {
|
||||||
SUserAuthVersion *user = &users[i];
|
SUserAuthVersion *user = &users[i];
|
||||||
user->version = htonl(user->version);
|
user->version = htonl(user->version);
|
||||||
|
@ -766,7 +756,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
|
||||||
hbGetQueryBasicInfo(connKey, req);
|
hbGetQueryBasicInfo(connKey, req);
|
||||||
|
|
||||||
if (hbParam->reqCnt == 0) {
|
if (hbParam->reqCnt == 0) {
|
||||||
if(!tSimpleHashGet(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(hbParam->clusterId))) {
|
if (!taosHashGet(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(hbParam->clusterId))) {
|
||||||
code = hbGetExpiredUserInfo(connKey, pCatalog, req);
|
code = hbGetExpiredUserInfo(connKey, pCatalog, req);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -923,9 +913,9 @@ static void *hbThreadFunc(void *param) {
|
||||||
if (sz > 0) {
|
if (sz > 0) {
|
||||||
hbGatherAppInfo();
|
hbGatherAppInfo();
|
||||||
if (sz > 1 && !clientHbMgr.appHbHash) {
|
if (sz > 1 && !clientHbMgr.appHbHash) {
|
||||||
clientHbMgr.appHbHash = tSimpleHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
|
clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
|
||||||
}
|
}
|
||||||
tSimpleHashClear(clientHbMgr.appHbHash);
|
taosHashClear(clientHbMgr.appHbHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < sz; i++) {
|
for (int i = 0; i < sz; i++) {
|
||||||
|
@ -978,7 +968,7 @@ static void *hbThreadFunc(void *param) {
|
||||||
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
|
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
|
||||||
tFreeClientHbBatchReq(pReq);
|
tFreeClientHbBatchReq(pReq);
|
||||||
// hbClearReqInfo(pAppHbMgr);
|
// hbClearReqInfo(pAppHbMgr);
|
||||||
tSimpleHashPut(clientHbMgr.appHbHash, &pAppHbMgr->pAppInstInfo->clusterId, sizeof(uint64_t), NULL, 0);
|
taosHashPut(clientHbMgr.appHbHash, &pAppHbMgr->pAppInstInfo->clusterId, sizeof(uint64_t), NULL, 0);
|
||||||
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
|
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -986,7 +976,6 @@ static void *hbThreadFunc(void *param) {
|
||||||
|
|
||||||
taosMsleep(HEARTBEAT_INTERVAL);
|
taosMsleep(HEARTBEAT_INTERVAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1099,10 +1088,10 @@ void appHbMgrCleanup(void) {
|
||||||
hbFreeAppHbMgr(pTarget);
|
hbFreeAppHbMgr(pTarget);
|
||||||
}
|
}
|
||||||
clientHbMgr.appHbMgrs = taosArrayDestroy(clientHbMgr.appHbMgrs);
|
clientHbMgr.appHbMgrs = taosArrayDestroy(clientHbMgr.appHbMgrs);
|
||||||
tSimpleHashCleanup(clientHbMgr.appHbHash);
|
|
||||||
clientHbMgr.appHbHash = NULL;
|
|
||||||
taosHashCleanup(clientHbMgr.appSummary);
|
taosHashCleanup(clientHbMgr.appSummary);
|
||||||
|
taosHashCleanup(clientHbMgr.appHbHash);
|
||||||
clientHbMgr.appSummary = NULL;
|
clientHbMgr.appSummary = NULL;
|
||||||
|
clientHbMgr.appHbHash = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int hbMgrInit() {
|
int hbMgrInit() {
|
||||||
|
|
Loading…
Reference in New Issue