Merge pull request #21893 from taosdata/fix/TS-3551-M

enh: update of user auth version
This commit is contained in:
dapan1121 2023-07-03 14:58:09 +08:00 committed by GitHub
commit a36dac2008
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 329 additions and 250 deletions

View File

@ -106,7 +106,6 @@ enum {
HEARTBEAT_KEY_DBINFO, HEARTBEAT_KEY_DBINFO,
HEARTBEAT_KEY_STBINFO, HEARTBEAT_KEY_STBINFO,
HEARTBEAT_KEY_TMQ, HEARTBEAT_KEY_TMQ,
HEARTBEAT_KEY_USER_PASSINFO,
}; };
typedef enum _mgmt_table { typedef enum _mgmt_table {
@ -636,6 +635,7 @@ typedef struct {
SEpSet epSet; SEpSet epSet;
int32_t svrTimestamp; int32_t svrTimestamp;
int32_t passVer; int32_t passVer;
int32_t authVer;
char sVer[TSDB_VERSION_LEN]; char sVer[TSDB_VERSION_LEN];
char sDetailVer[128]; char sDetailVer[128];
} SConnectRsp; } SConnectRsp;
@ -703,6 +703,7 @@ int32_t tDeserializeSGetUserAuthReq(void* buf, int32_t bufLen, SGetUserAuthReq*
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
int32_t version; int32_t version;
int32_t passVer;
int8_t superAuth; int8_t superAuth;
int8_t sysInfo; int8_t sysInfo;
int8_t enable; int8_t enable;
@ -719,14 +720,6 @@ int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pR
int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp); int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
void tFreeSGetUserAuthRsp(SGetUserAuthRsp* pRsp); void tFreeSGetUserAuthRsp(SGetUserAuthRsp* pRsp);
typedef struct SUserPassVersion {
char user[TSDB_USER_LEN];
int32_t version;
} SUserPassVersion;
typedef SGetUserAuthReq SGetUserPassReq;
typedef SUserPassVersion SGetUserPassRsp;
/* /*
* for client side struct, only column id, type, bytes are necessary * for client side struct, only column id, type, bytes are necessary
* But for data in vnode side, we need all the following information. * But for data in vnode side, we need all the following information.
@ -1070,14 +1063,6 @@ int32_t tSerializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp
int32_t tDeserializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp* pRsp); int32_t tDeserializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp* pRsp);
void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp* pRsp); void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp* pRsp);
typedef struct {
SArray* pArray; // Array of SGetUserPassRsp
} SUserPassBatchRsp;
int32_t tSerializeSUserPassBatchRsp(void* buf, int32_t bufLen, SUserPassBatchRsp* pRsp);
int32_t tDeserializeSUserPassBatchRsp(void* buf, int32_t bufLen, SUserPassBatchRsp* pRsp);
void tFreeSUserPassBatchRsp(SUserPassBatchRsp* pRsp);
typedef struct { typedef struct {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
STimeWindow timeRange; STimeWindow timeRange;

View File

@ -63,7 +63,7 @@ typedef struct {
// statistics // statistics
int32_t reportCnt; int32_t reportCnt;
int32_t connKeyCnt; int32_t connKeyCnt;
int32_t passKeyCnt; // with passVer call back int8_t connHbFlag; // 0 init, 1 send req, 2 get resp
int64_t reportBytes; // not implemented int64_t reportBytes; // not implemented
int64_t startTime; int64_t startTime;
// ctl // ctl
@ -83,8 +83,9 @@ typedef struct {
int8_t threadStop; int8_t threadStop;
int8_t quitByKill; int8_t quitByKill;
TdThread thread; TdThread thread;
TdThreadMutex lock; // used when app init and cleanup TdThreadMutex lock; // used when app init and cleanup
SHashObj* appSummary; SHashObj* appSummary;
SHashObj* appHbHash; // key: clusterId
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
FHbReqHandle reqHandle[CONN_TYPE__MAX]; FHbReqHandle reqHandle[CONN_TYPE__MAX];
FHbRspHandle rspHandle[CONN_TYPE__MAX]; FHbRspHandle rspHandle[CONN_TYPE__MAX];
@ -146,6 +147,7 @@ typedef struct STscObj {
int64_t id; // ref ID returned by taosAddRef int64_t id; // ref ID returned by taosAddRef
TdThreadMutex mutex; // used to protect the operation on db TdThreadMutex mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection int32_t numOfReqs; // number of sqlObj bound to this connection
int32_t authVer;
SAppInstInfo* pAppInfo; SAppInstInfo* pAppInfo;
SHashObj* pRequests; SHashObj* pRequests;
SPassInfo passInfo; SPassInfo passInfo;

View File

@ -22,10 +22,10 @@
typedef struct { typedef struct {
union { union {
struct { struct {
int64_t clusterId; SAppHbMgr *pAppHbMgr;
int32_t passKeyCnt; int64_t clusterId;
int32_t passVer; int32_t reqCnt;
int32_t reqCnt; int8_t connHbFlag;
}; };
}; };
} SHbParam; } SHbParam;
@ -34,12 +34,14 @@ static SClientHbMgr clientHbMgr = {0};
static int32_t hbCreateThread(); static int32_t hbCreateThread();
static void hbStopThread(); static void hbStopThread();
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp);
static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; } static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; }
static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; } static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog,
SAppHbMgr *pAppHbMgr) {
int32_t code = 0; int32_t code = 0;
SUserAuthBatchRsp batchRsp = {0}; SUserAuthBatchRsp batchRsp = {0};
@ -56,54 +58,61 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC
catalogUpdateUserAuthInfo(pCatalog, rsp); catalogUpdateUserAuthInfo(pCatalog, rsp);
} }
if (numOfBatchs > 0) hbUpdateUserAuthInfo(pAppHbMgr, &batchRsp);
atomic_val_compare_exchange_8(&pAppHbMgr->connHbFlag, 1, 2);
taosArrayDestroy(batchRsp.pArray); taosArrayDestroy(batchRsp.pArray);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHbKey *connKey, SAppHbMgr *pAppHbMgr) { static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp) {
int32_t code = 0; uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
int32_t numOfBatchs = 0; for (int i = 0; i < TARRAY_SIZE(clientHbMgr.appHbMgrs); ++i) {
SUserPassBatchRsp batchRsp = {0}; SAppHbMgr *hbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (tDeserializeSUserPassBatchRsp(value, valueLen, &batchRsp) != 0) { if (!hbMgr || hbMgr->pAppInstInfo->clusterId != clusterId) {
code = TSDB_CODE_INVALID_MSG;
return code;
}
numOfBatchs = taosArrayGetSize(batchRsp.pArray);
SClientHbReq *pReq = NULL;
while ((pReq = taosHashIterate(pAppHbMgr->activeInfo, pReq))) {
STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid);
if (!pTscObj) {
continue;
}
SPassInfo *passInfo = &pTscObj->passInfo;
if (!passInfo->fp) {
releaseTscObj(pReq->connKey.tscRid);
continue; continue;
} }
for (int32_t i = 0; i < numOfBatchs; ++i) { SClientHbReq *pReq = NULL;
SGetUserPassRsp *rsp = taosArrayGet(batchRsp.pArray, i); while ((pReq = taosHashIterate(hbMgr->activeInfo, pReq))) {
if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) { STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid);
int32_t oldVer = atomic_load_32(&passInfo->ver); if (!pTscObj) {
if (oldVer < rsp->version) { continue;
atomic_store_32(&passInfo->ver, rsp->version);
if (passInfo->fp) {
(*passInfo->fp)(passInfo->param, &passInfo->ver, TAOS_NOTIFY_PASSVER);
}
tscDebug("update passVer of user %s from %d to %d, tscRid:%" PRIi64, rsp->user, oldVer,
atomic_load_32(&passInfo->ver), pTscObj->id);
}
break;
} }
for (int32_t j = 0; j < TARRAY_SIZE(batchRsp->pArray); ++j) {
SGetUserAuthRsp *rsp = TARRAY_GET_ELEM(batchRsp->pArray, j);
if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) {
pTscObj->authVer = rsp->version;
#if 0 // make jenkins happy temporarily. After PR pass, enable these lines again.
if (pTscObj->sysInfo != rsp->sysInfo) {
tscDebug("update sysInfo of user %s from %" PRIi8 " to %" PRIi8 ", tscRid:%" PRIi64, rsp->user,
pTscObj->sysInfo, rsp->sysInfo, pTscObj->id);
pTscObj->sysInfo = rsp->sysInfo;
}
#endif
if (pTscObj->passInfo.fp) {
SPassInfo *passInfo = &pTscObj->passInfo;
int32_t oldVer = atomic_load_32(&passInfo->ver);
if (oldVer < rsp->passVer) {
atomic_store_32(&passInfo->ver, rsp->passVer);
if (passInfo->fp) {
(*passInfo->fp)(passInfo->param, &rsp->passVer, TAOS_NOTIFY_PASSVER);
}
tscDebug("update passVer of user %s from %d to %d, tscRid:%" PRIi64, rsp->user, oldVer,
atomic_load_32(&passInfo->ver), pTscObj->id);
}
}
break;
}
}
releaseTscObj(pReq->connKey.tscRid);
} }
releaseTscObj(pReq->connKey.tscRid);
} }
return 0;
taosArrayDestroy(batchRsp.pArray);
return code;
} }
static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) { static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
@ -316,7 +325,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
break; break;
} }
hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog); hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog, pAppHbMgr);
break; break;
} }
case HEARTBEAT_KEY_DBINFO: { case HEARTBEAT_KEY_DBINFO: {
@ -353,15 +362,6 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog); hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog);
break; break;
} }
case HEARTBEAT_KEY_USER_PASSINFO: {
if (kv->valueLen <= 0 || NULL == kv->value) {
tscError("invalid hb user pass info, len:%d, value:%p", kv->valueLen, kv->value);
break;
}
hbProcessUserPassInfoRsp(kv->value, kv->valueLen, &pRsp->connKey, pAppHbMgr);
break;
}
default: default:
tscError("invalid hb key type:%d", kv->key); tscError("invalid hb key type:%d", kv->key);
break; break;
@ -543,7 +543,7 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) { static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) {
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
if (!pTscObj) { if (!pTscObj) {
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid); tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
@ -552,46 +552,61 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClien
int32_t code = 0; int32_t code = 0;
if (param && (param->passVer != INT32_MIN) && (param->passVer <= pTscObj->passInfo.ver)) { SKv kv = {.key = HEARTBEAT_KEY_USER_AUTHINFO};
tscDebug("hb got user basic info, no need since passVer %d <= %d", param->passVer, pTscObj->passInfo.ver); SKv *pKv = NULL;
if ((pKv = taosHashGet(req->info, &kv.key, sizeof(kv.key)))) {
int32_t userNum = pKv->valueLen / sizeof(SUserAuthVersion);
SUserAuthVersion *userAuths = (SUserAuthVersion *)pKv->value;
for (int32_t i = 0; i < userNum; ++i) {
SUserAuthVersion *pUserAuth = userAuths + i;
// both key and user exist, update version
if (strncmp(pUserAuth->user, pTscObj->user, TSDB_USER_LEN) == 0) {
pUserAuth->version = htonl(-1); // force get userAuthInfo
goto _return;
}
}
// key exists, user not exist, append user
SUserAuthVersion *qUserAuth =
(SUserAuthVersion *)taosMemoryRealloc(pKv->value, (userNum + 1) * sizeof(SUserAuthVersion));
if (qUserAuth) {
strncpy((qUserAuth + userNum)->user, pTscObj->user, TSDB_USER_LEN);
(qUserAuth + userNum)->version = htonl(-1); // force get userAuthInfo
pKv->value = qUserAuth;
pKv->valueLen += sizeof(SUserAuthVersion);
} else {
code = TSDB_CODE_OUT_OF_MEMORY;
}
goto _return; goto _return;
} }
SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion)); // key/user not exist, add user
SUserAuthVersion *user = taosMemoryMalloc(sizeof(SUserAuthVersion));
if (!user) { if (!user) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _return; goto _return;
} }
strncpy(user->user, pTscObj->user, TSDB_USER_LEN); strncpy(user->user, pTscObj->user, TSDB_USER_LEN);
user->version = htonl(pTscObj->passInfo.ver); user->version = htonl(-1); // force get userAuthInfo
kv.valueLen = sizeof(SUserAuthVersion);
kv.value = user;
SKv kv = { tscDebug("hb got user auth info, valueLen:%d, user:%s, authVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user,
.key = HEARTBEAT_KEY_USER_PASSINFO, pTscObj->authVer, connKey->tscRid);
.valueLen = sizeof(SUserPassVersion),
.value = user,
};
tscDebug("hb got user basic info, valueLen:%d, user:%s, passVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user,
pTscObj->passInfo.ver, connKey->tscRid);
if (!req->info) { if (!req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
} }
if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) < 0) { if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) < 0) {
taosMemoryFree(user);
code = terrno ? terrno : TSDB_CODE_APP_ERROR; code = terrno ? terrno : TSDB_CODE_APP_ERROR;
goto _return; goto _return;
} }
// assign the passVer
if (param) {
param->passVer = pTscObj->passInfo.ver;
}
_return: _return:
releaseTscObj(connKey->tscRid); releaseTscObj(connKey->tscRid);
if (code) { if (code) {
tscError("hb got user basic info failed since %s", terrstr(code)); tscError("hb got user auth info failed since %s", terrstr(code));
} }
return code; return code;
@ -749,14 +764,21 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
hbGetQueryBasicInfo(connKey, req); hbGetQueryBasicInfo(connKey, req);
if (hbParam->passKeyCnt > 0) {
hbGetUserBasicInfo(connKey, hbParam, req);
}
if (hbParam->reqCnt == 0) { if (hbParam->reqCnt == 0) {
code = hbGetExpiredUserInfo(connKey, pCatalog, req); if (!taosHashGet(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(hbParam->clusterId))) {
if (TSDB_CODE_SUCCESS != code) { code = hbGetExpiredUserInfo(connKey, pCatalog, req);
return code; if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
// invoke after hbGetExpiredUserInfo
if (2 != atomic_load_8(&hbParam->pAppHbMgr->connHbFlag)) {
code = hbGetUserAuthInfo(connKey, hbParam, req);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
atomic_store_8(&hbParam->pAppHbMgr->connHbFlag, 1);
} }
code = hbGetExpiredDBInfo(connKey, pCatalog, req); code = hbGetExpiredDBInfo(connKey, pCatalog, req);
@ -770,7 +792,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
} }
} }
++hbParam->reqCnt; // success to get catalog info ++hbParam->reqCnt; // success to get catalog info
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -815,9 +837,9 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
if (param.clusterId == 0) { if (param.clusterId == 0) {
// init // init
param.clusterId = pOneReq->clusterId; param.clusterId = pOneReq->clusterId;
param.passVer = INT32_MIN; param.pAppHbMgr = pAppHbMgr;
param.connHbFlag = atomic_load_8(&pAppHbMgr->connHbFlag);
} }
param.passKeyCnt = atomic_load_32(&pAppHbMgr->passKeyCnt);
break; break;
} }
default: default:
@ -901,6 +923,10 @@ static void *hbThreadFunc(void *param) {
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
if (sz > 0) { if (sz > 0) {
hbGatherAppInfo(); hbGatherAppInfo();
if (sz > 1 && !clientHbMgr.appHbHash) {
clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
}
taosHashClear(clientHbMgr.appHbHash);
} }
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
@ -953,7 +979,7 @@ static void *hbThreadFunc(void *param) {
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
tFreeClientHbBatchReq(pReq); tFreeClientHbBatchReq(pReq);
// hbClearReqInfo(pAppHbMgr); // hbClearReqInfo(pAppHbMgr);
taosHashPut(clientHbMgr.appHbHash, &pAppHbMgr->pAppInstInfo->clusterId, sizeof(uint64_t), NULL, 0);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
} }
@ -961,6 +987,7 @@ static void *hbThreadFunc(void *param) {
taosMsleep(HEARTBEAT_INTERVAL); taosMsleep(HEARTBEAT_INTERVAL);
} }
taosHashCleanup(clientHbMgr.appHbHash);
return NULL; return NULL;
} }
@ -1009,7 +1036,7 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
// init stat // init stat
pAppHbMgr->startTime = taosGetTimestampMs(); pAppHbMgr->startTime = taosGetTimestampMs();
pAppHbMgr->connKeyCnt = 0; pAppHbMgr->connKeyCnt = 0;
pAppHbMgr->passKeyCnt = 0; pAppHbMgr->connHbFlag = 0;
pAppHbMgr->reportCnt = 0; pAppHbMgr->reportCnt = 0;
pAppHbMgr->reportBytes = 0; pAppHbMgr->reportBytes = 0;
pAppHbMgr->key = taosStrdup(key); pAppHbMgr->key = taosStrdup(key);
@ -1127,7 +1154,6 @@ void hbMgrCleanUp() {
appHbMgrCleanup(); appHbMgrCleanup();
taosArrayDestroy(clientHbMgr.appHbMgrs); taosArrayDestroy(clientHbMgr.appHbMgrs);
taosThreadMutexUnlock(&clientHbMgr.lock); taosThreadMutexUnlock(&clientHbMgr.lock);
clientHbMgr.appHbMgrs = NULL; clientHbMgr.appHbMgrs = NULL;
} }
@ -1180,12 +1206,6 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) {
} }
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
taosThreadMutexLock(&pTscObj->mutex);
if (pTscObj->passInfo.fp) {
atomic_sub_fetch_32(&pAppHbMgr->passKeyCnt, 1);
}
taosThreadMutexUnlock(&pTscObj->mutex);
} }
// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner // set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner

View File

@ -135,11 +135,6 @@ int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)
switch (type) { switch (type) {
case TAOS_NOTIFY_PASSVER: { case TAOS_NOTIFY_PASSVER: {
taosThreadMutexLock(&pObj->mutex); taosThreadMutexLock(&pObj->mutex);
if (fp && !pObj->passInfo.fp) {
atomic_add_fetch_32(&pObj->pAppInfo->pAppHbMgr->passKeyCnt, 1);
} else if (!fp && pObj->passInfo.fp) {
atomic_sub_fetch_32(&pObj->pAppInfo->pAppHbMgr->passKeyCnt, 1);
}
pObj->passInfo.fp = fp; pObj->passInfo.fp = fp;
pObj->passInfo.param = param; pObj->passInfo.param = param;
taosThreadMutexUnlock(&pObj->mutex); taosThreadMutexUnlock(&pObj->mutex);

View File

@ -131,6 +131,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
pTscObj->connType = connectRsp.connType; pTscObj->connType = connectRsp.connType;
pTscObj->passInfo.ver = connectRsp.passVer; pTscObj->passInfo.ver = connectRsp.passVer;
pTscObj->authVer = connectRsp.authVer;
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType);

View File

@ -1523,6 +1523,9 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
useDb = taosHashIterate(pRsp->useDbs, useDb); useDb = taosHashIterate(pRsp->useDbs, useDb);
} }
// since 3.0.7.0
if (tEncodeI32(pEncoder, pRsp->passVer) < 0) return -1;
return 0; return 0;
} }
@ -1644,6 +1647,12 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
taosHashPut(pRsp->useDbs, key, strlen(key), &ref, sizeof(ref)); taosHashPut(pRsp->useDbs, key, strlen(key), &ref, sizeof(ref));
taosMemoryFree(key); taosMemoryFree(key);
} }
// since 3.0.7.0
if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI32(pDecoder, &pRsp->passVer) < 0) return -1;
} else {
pRsp->passVer = 0;
}
} }
return 0; return 0;
@ -3029,59 +3038,6 @@ void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp *pRsp) {
taosArrayDestroy(pRsp->pArray); taosArrayDestroy(pRsp->pArray);
} }
int32_t tSerializeSUserPassBatchRsp(void *buf, int32_t bufLen, SUserPassBatchRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
if (tEncodeI32(&encoder, numOfBatch) < 0) return -1;
for (int32_t i = 0; i < numOfBatch; ++i) {
SGetUserPassRsp *pUserPassRsp = taosArrayGet(pRsp->pArray, i);
if (tEncodeCStr(&encoder, pUserPassRsp->user) < 0) return -1;
if (tEncodeI32(&encoder, pUserPassRsp->version) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSUserPassBatchRsp(void *buf, int32_t bufLen, SUserPassBatchRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
if (tDecodeI32(&decoder, &numOfBatch) < 0) return -1;
pRsp->pArray = taosArrayInit(numOfBatch, sizeof(SGetUserPassRsp));
if (pRsp->pArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfBatch; ++i) {
SGetUserPassRsp rsp = {0};
if (tDecodeCStrTo(&decoder, rsp.user) < 0) return -1;
if (tDecodeI32(&decoder, &rsp.version) < 0) return -1;
taosArrayPush(pRsp->pArray, &rsp);
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSUserPassBatchRsp(SUserPassBatchRsp *pRsp) {
if(pRsp) {
taosArrayDestroy(pRsp->pArray);
}
}
int32_t tSerializeSDbCfgReq(void *buf, int32_t bufLen, SDbCfgReq *pReq) { int32_t tSerializeSDbCfgReq(void *buf, int32_t bufLen, SDbCfgReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
@ -4159,6 +4115,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tEncodeCStr(&encoder, pRsp->sVer) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sVer) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->sDetailVer) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sDetailVer) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->passVer) < 0) return -1; if (tEncodeI32(&encoder, pRsp->passVer) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->authVer) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -4188,6 +4145,12 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
} else { } else {
pRsp->passVer = 0; pRsp->passVer = 0;
} }
// since 3.0.7.0
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pRsp->authVer) < 0) return -1;
} else {
pRsp->authVer = 0;
}
tEndDecode(&decoder); tEndDecode(&decoder);

View File

@ -35,8 +35,6 @@ SHashObj *mndDupTableHash(SHashObj *pOld);
SHashObj *mndDupTopicHash(SHashObj *pOld); SHashObj *mndDupTopicHash(SHashObj *pOld);
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp, int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp,
int32_t *pRspLen); int32_t *pRspLen);
int32_t mndValidateUserPassInfo(SMnode *pMnode, SUserPassVersion *pUsers, int32_t numOfUses, void **ppRsp,
int32_t *pRspLen);
int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db); int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db);
int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic); int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic);

View File

@ -36,7 +36,9 @@ int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp
memcpy(pRsp->user, pUser->user, TSDB_USER_LEN); memcpy(pRsp->user, pUser->user, TSDB_USER_LEN);
pRsp->superAuth = 1; pRsp->superAuth = 1;
pRsp->enable = pUser->enable; pRsp->enable = pUser->enable;
pRsp->sysInfo = pUser->sysInfo;
pRsp->version = pUser->authVersion; pRsp->version = pUser->authVersion;
pRsp->passVer = pUser->passVersion;
return 0; return 0;
} }
#endif #endif

View File

@ -288,6 +288,7 @@ _CONNECT:
connectRsp.dnodeNum = mndGetDnodeSize(pMnode); connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
connectRsp.svrTimestamp = taosGetTimestampSec(); connectRsp.svrTimestamp = taosGetTimestampSec();
connectRsp.passVer = pUser->passVersion; connectRsp.passVer = pUser->passVersion;
connectRsp.authVer = pUser->authVersion;
strcpy(connectRsp.sVer, version); strcpy(connectRsp.sVer, version);
snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
@ -552,16 +553,6 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
} }
break; break;
} }
case HEARTBEAT_KEY_USER_PASSINFO: {
void *rspMsg = NULL;
int32_t rspLen = 0;
mndValidateUserPassInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserPassVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv1 = {.key = HEARTBEAT_KEY_USER_PASSINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv1);
}
break;
}
default: default:
mError("invalid kv key:%d", kv->key); mError("invalid kv key:%d", kv->key);
hbRsp.status = TSDB_CODE_APP_ERROR; hbRsp.status = TSDB_CODE_APP_ERROR;

View File

@ -825,7 +825,6 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
if (mndUserDupObj(pUser, &newUser) != 0) goto _OVER; if (mndUserDupObj(pUser, &newUser) != 0) goto _OVER;
newUser.passVersion = pUser->passVersion;
if (alterReq.alterType == TSDB_ALTER_USER_PASSWD) { if (alterReq.alterType == TSDB_ALTER_USER_PASSWD) {
char pass[TSDB_PASSWORD_LEN + 1] = {0}; char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)alterReq.pass, strlen(alterReq.pass), pass); taosEncryptPass_c((uint8_t *)alterReq.pass, strlen(alterReq.pass), pass);
@ -1432,69 +1431,6 @@ _OVER:
return code; return code;
} }
int32_t mndValidateUserPassInfo(SMnode *pMnode, SUserPassVersion *pUsers, int32_t numOfUses, void **ppRsp,
int32_t *pRspLen) {
int32_t code = 0;
SUserPassBatchRsp batchRsp = {0};
for (int32_t i = 0; i < numOfUses; ++i) {
SUserObj *pUser = mndAcquireUser(pMnode, pUsers[i].user);
if (pUser == NULL) {
mError("user:%s, failed to validate user pass since %s", pUsers[i].user, terrstr());
continue;
}
pUsers[i].version = ntohl(pUsers[i].version);
if (pUser->passVersion <= pUsers[i].version) {
mTrace("user:%s, not update since mnd passVer %d <= client passVer %d", pUsers[i].user, pUser->passVersion,
pUsers[i].version);
mndReleaseUser(pMnode, pUser);
continue;
}
SGetUserPassRsp rsp = {0};
memcpy(rsp.user, pUser->user, TSDB_USER_LEN);
rsp.version = pUser->passVersion;
if (!batchRsp.pArray && !(batchRsp.pArray = taosArrayInit(numOfUses, sizeof(SGetUserPassRsp)))) {
code = TSDB_CODE_OUT_OF_MEMORY;
mndReleaseUser(pMnode, pUser);
goto _OVER;
}
taosArrayPush(batchRsp.pArray, &rsp);
mndReleaseUser(pMnode, pUser);
}
if (taosArrayGetSize(batchRsp.pArray) <= 0) {
goto _OVER;
}
int32_t rspLen = tSerializeSUserPassBatchRsp(NULL, 0, &batchRsp);
if (rspLen < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
void *pRsp = taosMemoryMalloc(rspLen);
if (pRsp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
tSerializeSUserPassBatchRsp(pRsp, rspLen, &batchRsp);
*ppRsp = pRsp;
*pRspLen = rspLen;
_OVER:
if (code) {
*ppRsp = NULL;
*pRspLen = 0;
}
tFreeSUserPassBatchRsp(&batchRsp);
return code;
}
int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) { int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) {
int32_t code = 0; int32_t code = 0;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;

View File

@ -32,9 +32,21 @@
#define nRoot 10 #define nRoot 10
#define nUser 10 #define nUser 10
#define USER_LEN 24 #define USER_LEN 24
#define BUF_LEN 256
typedef uint16_t VarDataLenT;
#define TSDB_NCHAR_SIZE sizeof(int32_t)
#define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
#define GET_FLOAT_VAL(x) (*(float *)(x))
#define GET_DOUBLE_VAL(x) (*(double *)(x))
#define varDataLen(v) ((VarDataLenT *)(v))[0]
void createUsers(TAOS *taos, const char *host, char *qstr); void createUsers(TAOS *taos, const char *host, char *qstr);
void passVerTestMulti(const char *host, char *qstr); void passVerTestMulti(const char *host, char *qstr);
void sysInfoTest(const char *host, char *qstr);
int nPassVerNotified = 0; int nPassVerNotified = 0;
TAOS *taosu[nRoot] = {0}; TAOS *taosu[nRoot] = {0};
@ -83,6 +95,95 @@ static void queryDB(TAOS *taos, char *command) {
taos_free_result(pSql); taos_free_result(pSql);
} }
int printRow(char *str, TAOS_ROW row, TAOS_FIELD *fields, int numFields) {
int len = 0;
char split = ' ';
for (int i = 0; i < numFields; ++i) {
if (i > 0) {
str[len++] = split;
}
if (row[i] == NULL) {
len += sprintf(str + len, "%s", "NULL");
continue;
}
switch (fields[i].type) {
case TSDB_DATA_TYPE_TINYINT:
len += sprintf(str + len, "%d", *((int8_t *)row[i]));
break;
case TSDB_DATA_TYPE_UTINYINT:
len += sprintf(str + len, "%u", *((uint8_t *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
len += sprintf(str + len, "%d", *((int16_t *)row[i]));
break;
case TSDB_DATA_TYPE_USMALLINT:
len += sprintf(str + len, "%u", *((uint16_t *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
len += sprintf(str + len, "%d", *((int32_t *)row[i]));
break;
case TSDB_DATA_TYPE_UINT:
len += sprintf(str + len, "%u", *((uint32_t *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:
len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_UBIGINT:
len += sprintf(str + len, "%" PRIu64, *((uint64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT: {
float fv = 0;
fv = GET_FLOAT_VAL(row[i]);
len += sprintf(str + len, "%f", fv);
} break;
case TSDB_DATA_TYPE_DOUBLE: {
double dv = 0;
dv = GET_DOUBLE_VAL(row[i]);
len += sprintf(str + len, "%lf", dv);
} break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_GEOMETRY: {
int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
memcpy(str + len, row[i], charLen);
len += charLen;
} break;
case TSDB_DATA_TYPE_TIMESTAMP:
len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_BOOL:
len += sprintf(str + len, "%d", *((int8_t *)row[i]));
default:
break;
}
}
return len;
}
static int printResult(TAOS_RES *res, char *output) {
int numFields = taos_num_fields(res);
TAOS_FIELD *fields = taos_fetch_fields(res);
char header[BUF_LEN] = {0};
int len = 0;
for (int i = 0; i < numFields; ++i) {
len += sprintf(header + len, "%s ", fields[i].name);
}
puts(header);
if (output) {
strncpy(output, header, BUF_LEN);
}
TAOS_ROW row = NULL;
while ((row = taos_fetch_row(res))) {
char temp[BUF_LEN] = {0};
printRow(temp, row, fields, numFields);
puts(temp);
}
}
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
char qstr[1024]; char qstr[1024];
@ -99,6 +200,7 @@ int main(int argc, char *argv[]) {
} }
createUsers(taos, argv[1], qstr); createUsers(taos, argv[1], qstr);
passVerTestMulti(argv[1], qstr); passVerTestMulti(argv[1], qstr);
sysInfoTest(argv[1], qstr);
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
@ -167,6 +269,8 @@ void passVerTestMulti(const char *host, char *qstr) {
int nConn = nRoot + nUser; int nConn = nRoot + nUser;
for (int i = 0; i < 15; ++i) { for (int i = 0; i < 15; ++i) {
printf("%s:%d [%d] second(s) elasped, passVer notification received:%d, total:%d\n", __func__, __LINE__, i,
nPassVerNotified, nConn);
if (nPassVerNotified >= nConn) break; if (nPassVerNotified >= nConn) break;
sleep(1); sleep(1);
} }
@ -175,19 +279,101 @@ void passVerTestMulti(const char *host, char *qstr) {
for (int i = 0; i < nRoot; ++i) { for (int i = 0; i < nRoot; ++i) {
taos_close(taos[i]); taos_close(taos[i]);
printf("%s:%d close taos[%d]\n", __func__, __LINE__, i); printf("%s:%d close taos[%d]\n", __func__, __LINE__, i);
sleep(1); // sleep(1);
} }
for (int i = 0; i < nUser; ++i) { for (int i = 0; i < nUser; ++i) {
taos_close(taosu[i]); taos_close(taosu[i]);
printf("%s:%d close taosu[%d]\n", __func__, __LINE__, i); printf("%s:%d close taosu[%d]\n", __func__, __LINE__, i);
// sleep(1);
}
fprintf(stderr, "######## %s #########\n", __func__);
if (nPassVerNotified >= nConn) {
fprintf(stderr, ">>> succeed to get passVer notification since nNotify %d >= nConn %d\n", nPassVerNotified,
nConn);
} else {
fprintf(stderr, ">>> failed to get passVer notification since nNotify %d < nConn %d\n", nPassVerNotified, nConn);
}
fprintf(stderr, "######## %s #########\n", __func__);
// sleep(300);
}
void sysInfoTest(const char *host, char *qstr) {
// root
TAOS *taos[nRoot] = {0};
char userName[USER_LEN] = "root";
for (int i = 0; i < nRoot; ++i) {
taos[i] = taos_connect(host, "root", "taos", NULL, 0);
if (taos[i] == NULL) {
fprintf(stderr, "failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/);
exit(1);
}
}
queryDB(taos[0], "create database if not exists demo11 vgroups 1 minrows 10");
queryDB(taos[0], "create database if not exists demo12 vgroups 1 minrows 10");
queryDB(taos[0], "create database if not exists demo13 vgroups 1 minrows 10");
queryDB(taos[0], "create table demo11.stb (ts timestamp, c1 int) tags(t1 int)");
queryDB(taos[0], "create table demo12.stb (ts timestamp, c1 int) tags(t1 int)");
queryDB(taos[0], "create table demo13.stb (ts timestamp, c1 int) tags(t1 int)");
sprintf(qstr, "show grants");
char output[BUF_LEN];
TAOS_RES *res = NULL;
int32_t nRep = 0;
_REP:
fprintf(stderr, "######## %s loop:%d #########\n", __func__, nRep);
res = taos_query(taos[0], qstr);
if (taos_errno(res) != 0) {
fprintf(stderr, "%s:%d failed to execute: %s since %s\n", __func__, __LINE__, qstr, taos_errstr(res));
taos_free_result(res);
exit(EXIT_FAILURE);
}
printResult(res, output);
taos_free_result(res);
if (!strstr(output, "timeseries")) {
fprintf(stderr, "%s:%d expected output: 'timeseries' not occur\n", __func__, __LINE__);
exit(EXIT_FAILURE);
}
queryDB(taos[0], "alter user root sysinfo 0");
fprintf(stderr, "%s:%d sleep 2 seconds to wait HB take effect\n", __func__, __LINE__);
for (int i = 1; i <= 2; ++i) {
sleep(1); sleep(1);
} }
if (nPassVerNotified >= nConn) { res = taos_query(taos[0], qstr);
fprintf(stderr, "succeed to get passVer notification since nNotify %d >= nConn %d\n", nPassVerNotified, nConn); if (taos_errno(res) != 0) {
} else { if (!strstr(taos_errstr(res), "Permission denied")) {
fprintf(stderr, "failed to get passVer notification since nNotify %d < nConn %d\n", nPassVerNotified, nConn); fprintf(stderr, "%s:%d expected error: 'Permission denied' not occur\n", __func__, __LINE__);
taos_free_result(res);
exit(EXIT_FAILURE);
}
} }
// sleep(300); taos_free_result(res);
queryDB(taos[0], "alter user root sysinfo 1");
fprintf(stderr, "%s:%d sleep 2 seconds to wait HB take effect\n", __func__, __LINE__);
for (int i = 1; i <= 2; ++i) {
sleep(1);
}
if(++nRep < 5) {
goto _REP;
}
// close the taos_conn
for (int i = 0; i < nRoot; ++i) {
taos_close(taos[i]);
fprintf(stderr, "%s:%d close taos[%d]\n", __func__, __LINE__, i);
}
fprintf(stderr, "######## %s #########\n", __func__);
fprintf(stderr, ">>> succeed to run sysInfoTest\n");
fprintf(stderr, "######## %s #########\n", __func__);
} }