chore: passVer and authVer refact
This commit is contained in:
parent
1e9b85545b
commit
387d611179
|
@ -106,7 +106,6 @@ enum {
|
|||
HEARTBEAT_KEY_DBINFO,
|
||||
HEARTBEAT_KEY_STBINFO,
|
||||
HEARTBEAT_KEY_TMQ,
|
||||
HEARTBEAT_KEY_USER_PASSINFO,
|
||||
};
|
||||
|
||||
typedef enum _mgmt_table {
|
||||
|
@ -704,6 +703,7 @@ int32_t tDeserializeSGetUserAuthReq(void* buf, int32_t bufLen, SGetUserAuthReq*
|
|||
typedef struct {
|
||||
char user[TSDB_USER_LEN];
|
||||
int32_t version;
|
||||
int32_t passVer;
|
||||
int8_t superAuth;
|
||||
int8_t sysInfo;
|
||||
int8_t enable;
|
||||
|
@ -720,14 +720,6 @@ int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pR
|
|||
int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
|
||||
void tFreeSGetUserAuthRsp(SGetUserAuthRsp* pRsp);
|
||||
|
||||
typedef struct SUserPassVersion {
|
||||
char user[TSDB_USER_LEN];
|
||||
int32_t version;
|
||||
} SUserPassVersion;
|
||||
|
||||
typedef SGetUserAuthReq SGetUserPassReq;
|
||||
typedef SUserPassVersion SGetUserPassRsp;
|
||||
|
||||
/*
|
||||
* for client side struct, only column id, type, bytes are necessary
|
||||
* But for data in vnode side, we need all the following information.
|
||||
|
@ -1071,14 +1063,6 @@ int32_t tSerializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp
|
|||
int32_t tDeserializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp* pRsp);
|
||||
void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
SArray* pArray; // Array of SGetUserPassRsp
|
||||
} SUserPassBatchRsp;
|
||||
|
||||
int32_t tSerializeSUserPassBatchRsp(void* buf, int32_t bufLen, SUserPassBatchRsp* pRsp);
|
||||
int32_t tDeserializeSUserPassBatchRsp(void* buf, int32_t bufLen, SUserPassBatchRsp* pRsp);
|
||||
void tFreeSUserPassBatchRsp(SUserPassBatchRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
STimeWindow timeRange;
|
||||
|
|
|
@ -33,6 +33,7 @@ extern "C" {
|
|||
#include "tmsg.h"
|
||||
#include "tmsgtype.h"
|
||||
#include "trpc.h"
|
||||
#include "tsimplehash.h"
|
||||
|
||||
#include "tconfig.h"
|
||||
|
||||
|
@ -63,7 +64,7 @@ typedef struct {
|
|||
// statistics
|
||||
int32_t reportCnt;
|
||||
int32_t connKeyCnt;
|
||||
int32_t passKeyCnt; // with passVer call back
|
||||
int8_t connHbFlag;
|
||||
int64_t reportBytes; // not implemented
|
||||
int64_t startTime;
|
||||
// ctl
|
||||
|
@ -85,6 +86,7 @@ typedef struct {
|
|||
TdThread thread;
|
||||
TdThreadMutex lock; // used when app init and cleanup
|
||||
SHashObj* appSummary;
|
||||
SSHashObj* appHbHash; // key: clusterId
|
||||
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
|
||||
FHbReqHandle reqHandle[CONN_TYPE__MAX];
|
||||
FHbRspHandle rspHandle[CONN_TYPE__MAX];
|
||||
|
|
|
@ -18,15 +18,16 @@
|
|||
#include "clientLog.h"
|
||||
#include "scheduler.h"
|
||||
#include "trpc.h"
|
||||
#include "tsimplehash.h"
|
||||
|
||||
typedef struct {
|
||||
SAppHbMgr *pAppHbMgr;
|
||||
union {
|
||||
struct {
|
||||
int64_t clusterId;
|
||||
int32_t passKeyCnt;
|
||||
int32_t passVer;
|
||||
int32_t authVer;
|
||||
// int32_t authVer;
|
||||
int32_t reqCnt;
|
||||
int8_t connHbFlag;
|
||||
};
|
||||
};
|
||||
} SHbParam;
|
||||
|
@ -50,6 +51,8 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC
|
|||
return -1;
|
||||
}
|
||||
|
||||
atomic_val_compare_exchange_8(&pAppHbMgr->connHbFlag, 2, 0);
|
||||
|
||||
int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
|
||||
for (int32_t i = 0; i < numOfBatchs; ++i) {
|
||||
SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
|
||||
|
@ -57,7 +60,7 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC
|
|||
|
||||
catalogUpdateUserAuthInfo(pCatalog, rsp);
|
||||
}
|
||||
#if 1
|
||||
|
||||
SClientHbReq *pReq = NULL;
|
||||
while ((pReq = taosHashIterate(pAppHbMgr->activeInfo, pReq))) {
|
||||
STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid);
|
||||
|
@ -68,6 +71,7 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC
|
|||
for (int32_t i = 0; i < numOfBatchs; ++i) {
|
||||
SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
|
||||
pTscObj->authVer = rsp->version;
|
||||
|
||||
if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) {
|
||||
if (pTscObj->sysInfo != rsp->sysInfo) {
|
||||
printf("update sysInfo of user %s from %" PRIi8 " to %" PRIi8 ", tscRid:%" PRIi64 "\n", rsp->user,
|
||||
|
@ -77,51 +81,23 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC
|
|||
printf("not update sysInfo of user %s since not change: %" PRIi8 ", tscRid:%" PRIi64 "\n", rsp->user,
|
||||
pTscObj->sysInfo, pTscObj->id);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
releaseTscObj(pReq->connKey.tscRid);
|
||||
}
|
||||
#endif
|
||||
taosArrayDestroy(batchRsp.pArray);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHbKey *connKey, SAppHbMgr *pAppHbMgr) {
|
||||
int32_t code = 0;
|
||||
int32_t numOfBatchs = 0;
|
||||
SUserPassBatchRsp batchRsp = {0};
|
||||
if (tDeserializeSUserPassBatchRsp(value, valueLen, &batchRsp) != 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
return code;
|
||||
}
|
||||
|
||||
numOfBatchs = taosArrayGetSize(batchRsp.pArray);
|
||||
|
||||
SClientHbReq *pReq = NULL;
|
||||
while ((pReq = taosHashIterate(pAppHbMgr->activeInfo, pReq))) {
|
||||
STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid);
|
||||
if (!pTscObj) {
|
||||
continue;
|
||||
}
|
||||
SPassInfo *passInfo = &pTscObj->passInfo;
|
||||
if (!passInfo->fp) {
|
||||
releaseTscObj(pReq->connKey.tscRid);
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfBatchs; ++i) {
|
||||
SGetUserPassRsp *rsp = taosArrayGet(batchRsp.pArray, i);
|
||||
if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) {
|
||||
if (passInfo->fp) {
|
||||
int32_t oldVer = atomic_load_32(&passInfo->ver);
|
||||
if (oldVer < rsp->version) {
|
||||
atomic_store_32(&passInfo->ver, rsp->version);
|
||||
if (oldVer < rsp->passVer) {
|
||||
atomic_store_32(&passInfo->ver, rsp->passVer);
|
||||
if (passInfo->fp) {
|
||||
(*passInfo->fp)(passInfo->param, &passInfo->ver, TAOS_NOTIFY_PASSVER);
|
||||
}
|
||||
tscDebug("update passVer of user %s from %d to %d, tscRid:%" PRIi64, rsp->user, oldVer,
|
||||
printf("update passVer of user %s from %d to %d, tscRid:%" PRIi64 "\n", rsp->user, oldVer,
|
||||
atomic_load_32(&passInfo->ver), pTscObj->id);
|
||||
} else {
|
||||
printf("not update passVer of user %s since not changed: %d, tscRid:%" PRIi64 "\n", rsp->user, oldVer,
|
||||
pTscObj->id);
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -129,8 +105,7 @@ static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHb
|
|||
}
|
||||
|
||||
taosArrayDestroy(batchRsp.pArray);
|
||||
|
||||
return code;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
|
||||
|
@ -380,15 +355,6 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
|||
hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog);
|
||||
break;
|
||||
}
|
||||
case HEARTBEAT_KEY_USER_PASSINFO: {
|
||||
if (kv->valueLen <= 0 || NULL == kv->value) {
|
||||
tscError("invalid hb user pass info, len:%d, value:%p", kv->valueLen, kv->value);
|
||||
break;
|
||||
}
|
||||
|
||||
hbProcessUserPassInfoRsp(kv->value, kv->valueLen, &pRsp->connKey, pAppHbMgr);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
tscError("invalid hb key type:%d", kv->key);
|
||||
break;
|
||||
|
@ -569,69 +535,6 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) {
|
||||
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
|
||||
if (!pTscObj) {
|
||||
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
|
||||
if (param && (param->passVer != INT32_MIN) && (param->passVer <= pTscObj->passInfo.ver)) {
|
||||
tscDebug("hb got user basic info, no need since passVer %d <= %d", param->passVer, pTscObj->passInfo.ver);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
SKv kv = {.key = HEARTBEAT_KEY_USER_PASSINFO};
|
||||
SKv *pKv = NULL;
|
||||
if ((pKv = taosHashGet(req->info, &kv.key, sizeof(kv.key)))) {
|
||||
SUserPassVersion *pPassVer = (SUserPassVersion *)pKv->value;
|
||||
tscDebug("hb got user basic info, already exists:%s, update passVer from %d to %d", pTscObj->user,
|
||||
htonl(pPassVer->version), pTscObj->passInfo.ver);
|
||||
pPassVer->version = htonl(pTscObj->passInfo.ver);
|
||||
if (param) param->passVer = pTscObj->passInfo.ver;
|
||||
goto _return;
|
||||
}
|
||||
|
||||
SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion));
|
||||
if (!user) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _return;
|
||||
}
|
||||
strncpy(user->user, pTscObj->user, TSDB_USER_LEN);
|
||||
user->version = htonl(pTscObj->passInfo.ver);
|
||||
|
||||
kv.valueLen = sizeof(SUserPassVersion);
|
||||
kv.value = user;
|
||||
|
||||
tscDebug("hb got user basic info, valueLen:%d, user:%s, passVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user,
|
||||
pTscObj->passInfo.ver, connKey->tscRid);
|
||||
|
||||
if (!req->info) {
|
||||
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
|
||||
}
|
||||
|
||||
if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) < 0) {
|
||||
taosMemoryFreeClear(user);
|
||||
code = terrno ? terrno : TSDB_CODE_APP_ERROR;
|
||||
goto _return;
|
||||
}
|
||||
|
||||
// assign the passVer
|
||||
if (param) {
|
||||
param->passVer = pTscObj->passInfo.ver;
|
||||
}
|
||||
|
||||
_return:
|
||||
releaseTscObj(connKey->tscRid);
|
||||
if (code) {
|
||||
tscError("hb got user basic info failed since %s", terrstr(code));
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) {
|
||||
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
|
||||
if (!pTscObj) {
|
||||
|
@ -641,11 +544,6 @@ static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClient
|
|||
|
||||
int32_t code = 0;
|
||||
|
||||
if (param && (param->authVer != INT32_MIN) && (param->authVer <= pTscObj->authVer)) {
|
||||
tscDebug("hb got user auth info, no need since authVer %d <= %d", param->authVer, pTscObj->authVer);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
SKv kv = {.key = HEARTBEAT_KEY_USER_AUTHINFO};
|
||||
SKv *pKv = NULL;
|
||||
if ((pKv = taosHashGet(req->info, &kv.key, sizeof(kv.key)))) {
|
||||
|
@ -653,24 +551,20 @@ static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClient
|
|||
SUserAuthVersion *pUserAuths = (SUserAuthVersion *)pKv->value;
|
||||
for (int32_t i = 0; i < userNum; ++i) {
|
||||
SUserAuthVersion *pUserAuth = pUserAuths + i;
|
||||
// user exist
|
||||
// both key and user exist
|
||||
if (strncmp(pUserAuth->user, pTscObj->user, TSDB_USER_LEN) == 0) {
|
||||
if (htonl(pUserAuth->version) > pTscObj->authVer) {
|
||||
pUserAuth->version = htonl(pTscObj->authVer);
|
||||
}
|
||||
if (param) param->authVer = htonl(pUserAuth->version);
|
||||
pUserAuth->version = htonl(-1); // force get userAuthInfo
|
||||
goto _return;
|
||||
}
|
||||
}
|
||||
// key exists, but user not exist
|
||||
// key exists, user not exist
|
||||
SUserAuthVersion *qUserAuth =
|
||||
(SUserAuthVersion *)taosMemoryRealloc(pKv->value, (userNum + 1) * sizeof(SUserAuthVersion));
|
||||
if (qUserAuth) {
|
||||
strncpy((qUserAuth + userNum)->user, pTscObj->user, TSDB_USER_LEN);
|
||||
(qUserAuth + userNum)->version = htonl(pTscObj->authVer);
|
||||
(qUserAuth + userNum)->version = htonl(-1); // force get userAuthInfo
|
||||
pKv->value = qUserAuth;
|
||||
pKv->valueLen += sizeof(SUserAuthVersion);
|
||||
if (param) param->authVer = pTscObj->authVer;
|
||||
} else {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -683,7 +577,7 @@ static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClient
|
|||
goto _return;
|
||||
}
|
||||
strncpy(user->user, pTscObj->user, TSDB_USER_LEN);
|
||||
user->version = htonl(pTscObj->authVer);
|
||||
user->version = htonl(-1); // force get userAuthInfo
|
||||
kv.valueLen = sizeof(SUserAuthVersion);
|
||||
kv.value = user;
|
||||
|
||||
|
@ -695,15 +589,11 @@ static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClient
|
|||
}
|
||||
|
||||
if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) < 0) {
|
||||
taosMemoryFreeClear(user);
|
||||
taosMemoryFree(user);
|
||||
code = terrno ? terrno : TSDB_CODE_APP_ERROR;
|
||||
goto _return;
|
||||
}
|
||||
|
||||
if (param) {
|
||||
param->authVer = pTscObj->authVer;
|
||||
}
|
||||
|
||||
_return:
|
||||
releaseTscObj(connKey->tscRid);
|
||||
if (code) {
|
||||
|
@ -713,8 +603,6 @@ _return:
|
|||
return code;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
|
||||
SUserAuthVersion *users = NULL;
|
||||
uint32_t userNum = 0;
|
||||
|
@ -724,12 +612,22 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
|
|||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
|
||||
if (NULL == pTscObj) {
|
||||
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
|
||||
if (userNum <= 0) {
|
||||
// printf("%s:%d hb get expired: [User:%s] NNNNo user since user num: %d, second:%d\n", __func__, __LINE__, pTscObj->user, userNum,
|
||||
// taosGetTimestampSec());
|
||||
taosMemoryFree(users);
|
||||
releaseTscObj(connKey->tscRid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
releaseTscObj(connKey->tscRid);
|
||||
|
||||
for (int32_t i = 0; i < userNum; ++i) {
|
||||
SUserAuthVersion *user = &users[i];
|
||||
user->version = htonl(user->version);
|
||||
|
@ -867,15 +765,22 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
|
|||
|
||||
hbGetQueryBasicInfo(connKey, req);
|
||||
|
||||
if (hbParam->passKeyCnt > 0) {
|
||||
hbGetUserBasicInfo(connKey, hbParam, req);
|
||||
}
|
||||
|
||||
if (hbParam->reqCnt == 0) {
|
||||
if(!tSimpleHashGet(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(hbParam->clusterId))) {
|
||||
code = hbGetExpiredUserInfo(connKey, pCatalog, req);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
// invoke after hbGetExpiredUserInfo
|
||||
if (atomic_load_8(&hbParam->pAppHbMgr->connHbFlag)) {
|
||||
code = hbGetUserAuthInfo(connKey, hbParam, req);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
atomic_store_8(&hbParam->pAppHbMgr->connHbFlag, 2);
|
||||
}
|
||||
|
||||
code = hbGetExpiredDBInfo(connKey, pCatalog, req);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
|
@ -889,9 +794,6 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
|
|||
}
|
||||
++hbParam->reqCnt; // success to get catalog info
|
||||
|
||||
// N.B. put after hbGetExpiredUserInfo
|
||||
hbGetUserAuthInfo(connKey, hbParam, req);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -918,7 +820,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
|
|||
}
|
||||
|
||||
void *pIter = NULL;
|
||||
SHbParam param = {0};
|
||||
SHbParam param = {.pAppHbMgr = pAppHbMgr};
|
||||
while ((pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter))) {
|
||||
SClientHbReq *pOneReq = pIter;
|
||||
SClientHbKey *connKey = &pOneReq->connKey;
|
||||
|
@ -935,10 +837,8 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
|
|||
if (param.clusterId == 0) {
|
||||
// init
|
||||
param.clusterId = pOneReq->clusterId;
|
||||
param.passVer = INT32_MIN;
|
||||
param.authVer = INT32_MIN;
|
||||
param.connHbFlag = atomic_load_8(&pAppHbMgr->connHbFlag);
|
||||
}
|
||||
param.passKeyCnt = atomic_load_32(&pAppHbMgr->passKeyCnt);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
@ -1022,7 +922,12 @@ static void *hbThreadFunc(void *param) {
|
|||
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
|
||||
if (sz > 0) {
|
||||
hbGatherAppInfo();
|
||||
if (sz > 1 && !clientHbMgr.appHbHash) {
|
||||
clientHbMgr.appHbHash = tSimpleHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||
}
|
||||
}
|
||||
|
||||
tSimpleHashClear(clientHbMgr.appHbHash);
|
||||
|
||||
for (int i = 0; i < sz; i++) {
|
||||
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
|
||||
|
@ -1074,7 +979,7 @@ static void *hbThreadFunc(void *param) {
|
|||
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
|
||||
tFreeClientHbBatchReq(pReq);
|
||||
// hbClearReqInfo(pAppHbMgr);
|
||||
|
||||
tSimpleHashPut(clientHbMgr.appHbHash, &pAppHbMgr->pAppInstInfo->clusterId, sizeof(int64_t), NULL, 0);
|
||||
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
|
||||
}
|
||||
|
||||
|
@ -1082,6 +987,7 @@ static void *hbThreadFunc(void *param) {
|
|||
|
||||
taosMsleep(HEARTBEAT_INTERVAL);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -1130,7 +1036,7 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
|
|||
// init stat
|
||||
pAppHbMgr->startTime = taosGetTimestampMs();
|
||||
pAppHbMgr->connKeyCnt = 0;
|
||||
pAppHbMgr->passKeyCnt = 0;
|
||||
pAppHbMgr->connHbFlag = 1;
|
||||
pAppHbMgr->reportCnt = 0;
|
||||
pAppHbMgr->reportBytes = 0;
|
||||
pAppHbMgr->key = taosStrdup(key);
|
||||
|
@ -1193,6 +1099,11 @@ void appHbMgrCleanup(void) {
|
|||
if (pTarget == NULL) continue;
|
||||
hbFreeAppHbMgr(pTarget);
|
||||
}
|
||||
clientHbMgr.appHbMgrs = taosArrayDestroy(clientHbMgr.appHbMgrs);
|
||||
tSimpleHashCleanup(clientHbMgr.appHbHash);
|
||||
clientHbMgr.appHbHash = NULL;
|
||||
taosHashCleanup(clientHbMgr.appSummary);
|
||||
clientHbMgr.appSummary = NULL;
|
||||
}
|
||||
|
||||
int hbMgrInit() {
|
||||
|
@ -1246,10 +1157,7 @@ void hbMgrCleanUp() {
|
|||
|
||||
taosThreadMutexLock(&clientHbMgr.lock);
|
||||
appHbMgrCleanup();
|
||||
taosArrayDestroy(clientHbMgr.appHbMgrs);
|
||||
taosThreadMutexUnlock(&clientHbMgr.lock);
|
||||
|
||||
clientHbMgr.appHbMgrs = NULL;
|
||||
}
|
||||
|
||||
int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clusterId) {
|
||||
|
@ -1301,12 +1209,6 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) {
|
|||
}
|
||||
|
||||
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
|
||||
|
||||
taosThreadMutexLock(&pTscObj->mutex);
|
||||
if (pTscObj->passInfo.fp) {
|
||||
atomic_sub_fetch_32(&pAppHbMgr->passKeyCnt, 1);
|
||||
}
|
||||
taosThreadMutexUnlock(&pTscObj->mutex);
|
||||
}
|
||||
|
||||
// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner
|
||||
|
|
|
@ -135,11 +135,6 @@ int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)
|
|||
switch (type) {
|
||||
case TAOS_NOTIFY_PASSVER: {
|
||||
taosThreadMutexLock(&pObj->mutex);
|
||||
if (fp && !pObj->passInfo.fp) {
|
||||
atomic_add_fetch_32(&pObj->pAppInfo->pAppHbMgr->passKeyCnt, 1);
|
||||
} else if (!fp && pObj->passInfo.fp) {
|
||||
atomic_sub_fetch_32(&pObj->pAppInfo->pAppHbMgr->passKeyCnt, 1);
|
||||
}
|
||||
pObj->passInfo.fp = fp;
|
||||
pObj->passInfo.param = param;
|
||||
taosThreadMutexUnlock(&pObj->mutex);
|
||||
|
|
|
@ -1518,6 +1518,9 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
|
|||
useDb = taosHashIterate(pRsp->useDbs, useDb);
|
||||
}
|
||||
|
||||
// since 3.0.7.0
|
||||
if (tEncodeI32(pEncoder, pRsp->passVer) < 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1639,6 +1642,12 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
|
|||
taosHashPut(pRsp->useDbs, key, strlen(key), &ref, sizeof(ref));
|
||||
taosMemoryFree(key);
|
||||
}
|
||||
// since 3.0.7.0
|
||||
if (!tDecodeIsEnd(pDecoder)) {
|
||||
if (tDecodeI32(pDecoder, &pRsp->passVer) < 0) return -1;
|
||||
} else {
|
||||
pRsp->passVer = 0;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -3024,59 +3033,6 @@ void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp *pRsp) {
|
|||
taosArrayDestroy(pRsp->pArray);
|
||||
}
|
||||
|
||||
int32_t tSerializeSUserPassBatchRsp(void *buf, int32_t bufLen, SUserPassBatchRsp *pRsp) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
|
||||
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
|
||||
if (tEncodeI32(&encoder, numOfBatch) < 0) return -1;
|
||||
for (int32_t i = 0; i < numOfBatch; ++i) {
|
||||
SGetUserPassRsp *pUserPassRsp = taosArrayGet(pRsp->pArray, i);
|
||||
if (tEncodeCStr(&encoder, pUserPassRsp->user) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pUserPassRsp->version) < 0) return -1;
|
||||
}
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSUserPassBatchRsp(void *buf, int32_t bufLen, SUserPassBatchRsp *pRsp) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
|
||||
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
|
||||
if (tDecodeI32(&decoder, &numOfBatch) < 0) return -1;
|
||||
|
||||
pRsp->pArray = taosArrayInit(numOfBatch, sizeof(SGetUserPassRsp));
|
||||
if (pRsp->pArray == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfBatch; ++i) {
|
||||
SGetUserPassRsp rsp = {0};
|
||||
if (tDecodeCStrTo(&decoder, rsp.user) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &rsp.version) < 0) return -1;
|
||||
taosArrayPush(pRsp->pArray, &rsp);
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tFreeSUserPassBatchRsp(SUserPassBatchRsp *pRsp) {
|
||||
if(pRsp) {
|
||||
taosArrayDestroy(pRsp->pArray);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tSerializeSDbCfgReq(void *buf, int32_t bufLen, SDbCfgReq *pReq) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
@ -4181,15 +4137,15 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
|
|||
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
if (tDecodeI32(&decoder, &pRsp->passVer) < 0) return -1;
|
||||
} else {
|
||||
pRsp->passVer = 0;
|
||||
}
|
||||
// since 3.0.7.0
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
if (tDecodeI32(&decoder, &pRsp->authVer) < 0) return -1;
|
||||
} else {
|
||||
pRsp->authVer = 0;
|
||||
}
|
||||
} else {
|
||||
pRsp->passVer = 0;
|
||||
pRsp->authVer = 0;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
|
|
|
@ -35,8 +35,6 @@ SHashObj *mndDupTableHash(SHashObj *pOld);
|
|||
SHashObj *mndDupTopicHash(SHashObj *pOld);
|
||||
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp,
|
||||
int32_t *pRspLen);
|
||||
int32_t mndValidateUserPassInfo(SMnode *pMnode, SUserPassVersion *pUsers, int32_t numOfUses, void **ppRsp,
|
||||
int32_t *pRspLen);
|
||||
int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db);
|
||||
int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic);
|
||||
|
||||
|
|
|
@ -37,6 +37,7 @@ int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp
|
|||
pRsp->superAuth = 1;
|
||||
pRsp->enable = pUser->enable;
|
||||
pRsp->version = pUser->authVersion;
|
||||
pRsp->passVer = pUser->passVersion;
|
||||
pRsp->sysInfo = pUser->sysInfo;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -552,16 +552,6 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
|||
}
|
||||
break;
|
||||
}
|
||||
case HEARTBEAT_KEY_USER_PASSINFO: {
|
||||
void *rspMsg = NULL;
|
||||
int32_t rspLen = 0;
|
||||
mndValidateUserPassInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserPassVersion), &rspMsg, &rspLen);
|
||||
if (rspMsg && rspLen > 0) {
|
||||
SKv kv1 = {.key = HEARTBEAT_KEY_USER_PASSINFO, .valueLen = rspLen, .value = rspMsg};
|
||||
taosArrayPush(hbRsp.info, &kv1);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
mError("invalid kv key:%d", kv->key);
|
||||
hbRsp.status = TSDB_CODE_APP_ERROR;
|
||||
|
|
|
@ -824,7 +824,6 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
|
|||
|
||||
if (mndUserDupObj(pUser, &newUser) != 0) goto _OVER;
|
||||
|
||||
newUser.passVersion = pUser->passVersion;
|
||||
if (alterReq.alterType == TSDB_ALTER_USER_PASSWD) {
|
||||
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
||||
taosEncryptPass_c((uint8_t *)alterReq.pass, strlen(alterReq.pass), pass);
|
||||
|
@ -1432,69 +1431,6 @@ _OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t mndValidateUserPassInfo(SMnode *pMnode, SUserPassVersion *pUsers, int32_t numOfUses, void **ppRsp,
|
||||
int32_t *pRspLen) {
|
||||
int32_t code = 0;
|
||||
SUserPassBatchRsp batchRsp = {0};
|
||||
|
||||
for (int32_t i = 0; i < numOfUses; ++i) {
|
||||
SUserObj *pUser = mndAcquireUser(pMnode, pUsers[i].user);
|
||||
if (pUser == NULL) {
|
||||
mError("user:%s, failed to validate user pass since %s", pUsers[i].user, terrstr());
|
||||
continue;
|
||||
}
|
||||
|
||||
pUsers[i].version = ntohl(pUsers[i].version);
|
||||
if (pUser->passVersion <= pUsers[i].version) {
|
||||
mTrace("user:%s, not update since mnd passVer %d <= client passVer %d", pUsers[i].user, pUser->passVersion,
|
||||
pUsers[i].version);
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
continue;
|
||||
}
|
||||
|
||||
SGetUserPassRsp rsp = {0};
|
||||
memcpy(rsp.user, pUser->user, TSDB_USER_LEN);
|
||||
rsp.version = pUser->passVersion;
|
||||
|
||||
if (!batchRsp.pArray && !(batchRsp.pArray = taosArrayInit(numOfUses, sizeof(SGetUserPassRsp)))) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
taosArrayPush(batchRsp.pArray, &rsp);
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(batchRsp.pArray) <= 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
int32_t rspLen = tSerializeSUserPassBatchRsp(NULL, 0, &batchRsp);
|
||||
if (rspLen < 0) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
void *pRsp = taosMemoryMalloc(rspLen);
|
||||
if (pRsp == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
tSerializeSUserPassBatchRsp(pRsp, rspLen, &batchRsp);
|
||||
|
||||
*ppRsp = pRsp;
|
||||
*pRspLen = rspLen;
|
||||
|
||||
_OVER:
|
||||
if (code) {
|
||||
*ppRsp = NULL;
|
||||
*pRspLen = 0;
|
||||
}
|
||||
|
||||
tFreeSUserPassBatchRsp(&batchRsp);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) {
|
||||
int32_t code = 0;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
|
|
@ -166,22 +166,24 @@ void passVerTestMulti(const char *host, char *qstr) {
|
|||
// calculate the nPassVerNotified for root and users
|
||||
int nConn = nRoot + nUser;
|
||||
|
||||
for (int i = 0; i < 15; ++i) {
|
||||
for (int i = 1; i < 15; ++i) {
|
||||
if (nPassVerNotified >= nConn) break;
|
||||
sleep(1);
|
||||
printf("%s:%d %d second(s) elasped, passVer notification received:%d, total:%d\n", __func__, __LINE__, i,
|
||||
nPassVerNotified, nConn);
|
||||
}
|
||||
|
||||
// close the taos_conn
|
||||
for (int i = 0; i < nRoot; ++i) {
|
||||
taos_close(taos[i]);
|
||||
printf("%s:%d close taos[%d]\n", __func__, __LINE__, i);
|
||||
sleep(1);
|
||||
// sleep(1);
|
||||
}
|
||||
|
||||
for (int i = 0; i < nUser; ++i) {
|
||||
taos_close(taosu[i]);
|
||||
printf("%s:%d close taosu[%d]\n", __func__, __LINE__, i);
|
||||
sleep(1);
|
||||
// sleep(1);
|
||||
}
|
||||
|
||||
if (nPassVerNotified >= nConn) {
|
||||
|
|
Loading…
Reference in New Issue