From 4d1ba65c46da4749a804ea86c052189fdc5a4b03 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 24 Jan 2022 14:31:57 +0800 Subject: [PATCH 1/9] catalog remove db vgroup info --- include/common/tmsg.h | 4 +- include/libs/catalog/catalog.h | 1 + include/libs/qcom/query.h | 2 +- source/libs/catalog/src/catalog.c | 104 ++++++++++++++++++---- source/libs/catalog/test/catalogTests.cpp | 2 +- 5 files changed, 94 insertions(+), 19 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7630c5f5e5..75e8195215 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -147,7 +147,7 @@ typedef struct { } SBuildTableMetaInput; typedef struct { - char db[TSDB_TABLE_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; int32_t vgVersion; } SBuildUseDBInput; @@ -746,7 +746,7 @@ typedef struct { typedef struct { char db[TSDB_DB_FNAME_LEN]; - int64_t uid; + uint64_t uid; int32_t vgVersion; int32_t vgNum; int8_t hashMethod; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 70cff7ed1a..9c9e370dca 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -59,6 +59,7 @@ typedef struct SSTableMetaVersion { } SSTableMetaVersion; typedef struct SDbVgVersion { + char dbName[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t vgVersion; } SDbVgVersion; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 1925f0e3bd..53ef6f4f9b 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -82,7 +82,7 @@ typedef struct STableMeta { typedef struct SDBVgroupInfo { SRWLatch lock; - int64_t dbId; + uint64_t dbId; int32_t vgVersion; int8_t hashMethod; SHashObj *vgInfo; //key:vgId, value:SVgroupInfo diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 2197fdfd62..d052e04552 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -737,7 +737,7 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm } -int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { +int32_t ctgValidateAndFreeDbInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { SDBVgroupInfo *oldInfo = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName)); if (oldInfo) { CTG_LOCK(CTG_WRITE, &oldInfo->lock); @@ -763,6 +763,48 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SD return TSDB_CODE_SUCCESS; } +int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* target) { + SDBVgroupInfo *info = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName)); + if (info) { + CTG_LOCK(CTG_WRITE, &info->lock); + if (info->dbId != target->dbId) { + ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, target->dbName, info->dbId, target->dbId); + CTG_UNLOCK(CTG_WRITE, &info->lock); + taosHashRelease(pCatalog->dbCache.cache, info); + + return TSDB_CODE_SUCCESS; + } + + if (info->vgVersion > target->vgVersion) { + ctgInfo("db vgVersion already updated, db:%s, version:%d, targetVer:%d", target->dbName, info->vgVersion, target->vgVersion); + CTG_UNLOCK(CTG_WRITE, &info->lock); + taosHashRelease(pCatalog->dbCache.cache, info); + + return TSDB_CODE_SUCCESS; + } + + if (info->vgInfo) { + ctgInfo("cleanup db vgInfo, db:%s", target->dbName); + taosHashCleanup(info->vgInfo); + info->vgInfo = NULL; + } + + if (taosHashRemove(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName))) { + ctgError("taosHashRemove from dbCache failed, db:%s", target->dbName); + CTG_UNLOCK(CTG_WRITE, &info->lock); + taosHashRelease(pCatalog->dbCache.cache, info); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + CTG_UNLOCK(CTG_WRITE, &info->lock); + + taosHashRelease(pCatalog->dbCache.cache, info); + } + + return TSDB_CODE_SUCCESS; +} + + int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) { if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); @@ -1134,19 +1176,6 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - if (dbInfo->vgVersion < 0) { - ctgWarn("db vgVersion less than 0, dbName:%s, vgVersion:%d", dbName, dbInfo->vgVersion); - - if (pCatalog->dbCache.cache) { - CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo)); - - CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName))); - } - - ctgWarn("db removed from cache, db:%s", dbName); - goto _return; - } - if (NULL == pCatalog->dbCache.cache) { SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == cache) { @@ -1158,7 +1187,7 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB taosHashCleanup(cache); } } else { - CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo)); + CTG_ERR_JRET(ctgValidateAndFreeDbInfo(pCatalog, dbName, dbInfo)); } bool newAdded = false; @@ -1189,6 +1218,51 @@ _return: CTG_RET(code); } + +int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo) { + int32_t code = 0; + + if (NULL == pCatalog || NULL == dbInfo) { + CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); + } + + if (pCatalog->dbCache.cache) { + CTG_ERR_JRET(ctgValidateAndRemoveDbInfo(pCatalog, dbInfo)); + + CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName))); + } + + ctgWarn("db removed from cache, db:%s", dbName); + + bool newAdded = false; + if (taosHashPutExt(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo), &newAdded) != 0) { + ctgError("taosHashPutExt db vgroup to cache failed, db:%s", dbName); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + dbInfo->vgInfo = NULL; + + SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion}; + if (newAdded) { + CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion))); + } else { + CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare)); + } + + ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, dbInfo->vgVersion); + + +_return: + + if (dbInfo && dbInfo->vgInfo) { + taosHashCleanup(dbInfo->vgInfo); + dbInfo->vgInfo = NULL; + } + + CTG_RET(code); +} + + int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1); } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index a01c3bcf5d..35272c0dce 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -59,7 +59,7 @@ int32_t ctgTestTagNum = 1; int32_t ctgTestSVersion = 1; int32_t ctgTestTVersion = 1; int32_t ctgTestSuid = 2; -int64_t ctgTestDbId = 33; +uint64_t ctgTestDbId = 33; uint64_t ctgTestClusterId = 0x1; char *ctgTestDbname = "1.db1"; From 03a366fa40325206bd31552dca901517e6252d55 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 25 Jan 2022 18:29:26 +0800 Subject: [PATCH 2/9] hb process --- include/common/tmsg.h | 18 +- include/libs/qcom/query.h | 4 +- source/client/inc/clientInt.h | 16 +- source/client/src/clientHb.c | 203 ++++++++++++++++++++--- source/client/src/clientImpl.c | 2 +- source/client/src/clientMsgHandler.c | 3 +- source/common/src/tmsg.c | 30 ++-- source/dnode/mnode/impl/src/mndDb.c | 141 ++++++++++++---- source/dnode/mnode/impl/src/mndProfile.c | 47 ++++++ source/libs/catalog/inc/catalogInt.h | 2 +- source/libs/catalog/src/catalog.c | 167 +++++++++++-------- 11 files changed, 479 insertions(+), 154 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 75e8195215..97ba56bd76 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -76,6 +76,12 @@ typedef enum { HEARTBEAT_TYPE_MAX } EHbType; +typedef enum { + HEARTBEAT_KEY_DBINFO = 1, + HEARTBEAT_KEY_STBINFO, +}; + + typedef enum _mgmt_table { TSDB_MGMT_TABLE_START, TSDB_MGMT_TABLE_ACCT, @@ -1335,9 +1341,8 @@ static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBat } typedef struct { - int32_t keyLen; + int32_t key; int32_t valueLen; - void* key; void* value; } SKv; @@ -1359,8 +1364,7 @@ typedef struct { typedef struct { SClientHbKey connKey; int32_t status; - int32_t bodyLen; - void* body; + SArray* info; // Array } SClientHbRsp; typedef struct { @@ -1402,17 +1406,15 @@ void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp); static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) { int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKv->keyLen); + tlen += taosEncodeFixedI32(buf, pKv->key); tlen += taosEncodeFixedI32(buf, pKv->valueLen); - tlen += taosEncodeBinary(buf, pKv->key, pKv->keyLen); tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen); return tlen; } static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) { - buf = taosDecodeFixedI32(buf, &pKv->keyLen); + buf = taosDecodeFixedI32(buf, &pKv->key); buf = taosDecodeFixedI32(buf, &pKv->valueLen); - buf = taosDecodeBinary(buf, &pKv->key, pKv->keyLen); buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen); return buf; } diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 53ef6f4f9b..6498abb7fc 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -94,7 +94,7 @@ typedef struct SUseDbOutput { } SUseDbOutput; enum { - META_TYPE_NON_TABLE = 1, + META_TYPE_NULL_TABLE = 1, META_TYPE_CTABLE, META_TYPE_TABLE, META_TYPE_BOTH_TABLE @@ -174,7 +174,7 @@ extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSi extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize); -#define SET_META_TYPE_NONE(t) (t) = META_TYPE_NON_TABLE +#define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE #define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE #define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE #define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index c61f3da6bd..a25d79bf30 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -37,7 +37,15 @@ typedef struct SAppInstInfo SAppInstInfo; typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); +typedef int32_t (*FHbReqHandle)(SClientHbKey *connKey, void* param, SClientHbReq *req); + +typedef struct SHbConnInfo { + void *param; + SClientHbReq *req; +} SHbConnInfo; + typedef struct SAppHbMgr { + char *key; // statistics int32_t reportCnt; int32_t connKeyCnt; @@ -49,7 +57,7 @@ typedef struct SAppHbMgr { SAppInstInfo* pAppInstInfo; // info SHashObj* activeInfo; // hash - SHashObj* getInfoFuncs; // hash + SHashObj* connInfo; // hash } SAppHbMgr; typedef struct SClientHbMgr { @@ -59,12 +67,10 @@ typedef struct SClientHbMgr { pthread_t thread; pthread_mutex_t lock; // used when app init and cleanup SArray* appHbMgrs; // SArray one for each cluster - FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; + FHbReqHandle reqHandle[HEARTBEAT_TYPE_MAX]; + FHbRspHandle rspHandle[HEARTBEAT_TYPE_MAX]; } SClientHbMgr; -// TODO: embed param into function -// return type: SArray -typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param); typedef struct SQueryExecMetric { int64_t start; // start timestamp diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 0f4ff6f725..8fc9fd086c 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -21,27 +21,136 @@ static SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); static void hbStopThread(); -static int32_t hbMqHbRspHandle(SClientHbRsp* pRsp) { +static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { return 0; } +static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { + SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); + if (NULL == info) { + tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType); + return TSDB_CODE_SUCCESS; + } + + int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0; + for (int32_t i = 0; i < kvNum; ++i) { + SKv *kv = taosArrayGet(pRsp->info, i); + switch (kv->key) { + case HEARTBEAT_KEY_DBINFO: + + break; + case HEARTBEAT_KEY_STBINFO: + + break; + default: + tscError("invalid hb key type:%d", kv->key); + break; + } + } + + return TSDB_CODE_SUCCESS; +} + static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code) { if (code != 0) { return -1; } - SClientHbRsp* pRsp = (SClientHbRsp*) pMsg->pData; - return hbMqHbRspHandle(pRsp); + char *key = (char *)param; + SClientHbBatchRsp* pRsp = (SClientHbBatchRsp*) pMsg->pData; + int32_t reqNum = taosArrayGetSize(pRsp->rsps); + + SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); + if (pInst == NULL || NULL == *pInst) { + tscError("cluster not exist, key:%s", key); + return -1; + } + + for (int32_t i = 0; i < reqNum; ++i) { + SClientHbRsp* rsp = taosArrayGet(pRsp->rsps, i); + code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp); + if (code) { + break; + } + } + + return code; } -void hbMgrInitMqHbRspHandle() { - clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle; +int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { + SDbVgVersion *dbs = NULL; + uint32_t dbNum = 0; + int32_t code = 0; + + code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + for (int32_t i = 0; i < dbNum; ++i) { + SDbVgVersion *db = &dbs[i]; + db->dbId = htobe64(db->dbId); + db->vgVersion = htonl(db->vgVersion); + } + + SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = sizeof(SDbVgVersion) * dbNum, .value = dbs}; + taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); + + return TSDB_CODE_SUCCESS; +} + +int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) { + int64_t *clusterId = (int64_t *)param; + struct SCatalog *pCatalog = NULL; + + int32_t code = catalogGetHandle(*clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); + return code; + } + + code = hbGetExpiredDBInfo(connKey, pCatalog, req); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + + return TSDB_CODE_SUCCESS; +} + +int32_t hbMqHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) { + +} + +void hbMgrInitMqHbHandle() { + clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle; + clientHbMgr.reqHandle[HEARTBEAT_TYPE_MQ] = hbMqHbReqHandle; + clientHbMgr.rspHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbRspHandle; + clientHbMgr.rspHandle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle; } static FORCE_INLINE void hbMgrInitHandle() { // init all handle - hbMgrInitMqHbRspHandle(); + hbMgrInitMqHbHandle(); } + +void hbFreeReqKvHash(SHashObj* info) { + void *pIter = taosHashIterate(info, NULL); + while (pIter != NULL) { + SKv* kv = pIter; + + tfree(kv->value); + + pIter = taosHashIterate(info, pIter); + } +} + +void hbFreeReq(SClientHbReq *req) { + hbFreeReqKvHash(req->info); +} + + + SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { SClientHbBatchReq* pBatchReq = malloc(sizeof(SClientHbBatchReq)); if (pBatchReq == NULL) { @@ -51,30 +160,48 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); + int32_t code = 0; void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); while (pIter != NULL) { SClientHbReq* pOneReq = pIter; + + SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey)); + if (info) { + code = (*clientHbMgr.reqHandle[pOneReq->connKey.hbType])(&pOneReq->connKey, info->param, pOneReq); + if (code) { + taosHashCancelIterate(pAppHbMgr->activeInfo, pIter); + break; + } + } + taosArrayPush(pBatchReq->reqs, pOneReq); - taosHashClear(pOneReq->info); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); } -#if 0 - pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL); - while (pIter != NULL) { - FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter; - SClientHbKey connKey; - taosHashCopyKey(pIter, &connKey); - SArray* pArray = getConnInfoFp(connKey, NULL); - - pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, pIter); + if (code) { + taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq); + tfree(pBatchReq); } -#endif return pBatchReq; } + +void hbClearReqInfo(SAppHbMgr *pAppHbMgr) { + void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); + while (pIter != NULL) { + SClientHbReq* pOneReq = pIter; + + hbFreeReqKvHash(pOneReq->info); + taosHashClear(pOneReq->info); + + pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); + } +} + + + static void* hbThreadFunc(void* param) { setThreadName("hb"); while (1) { @@ -98,7 +225,9 @@ static void* hbThreadFunc(void* param) { int tlen = tSerializeSClientHbBatchReq(NULL, pReq); void *buf = malloc(tlen); if (buf == NULL) { - //TODO: error handling + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + tFreeClientHbBatchReq(pReq, false); + hbClearReqInfo(pAppHbMgr); break; } void *abuf = buf; @@ -107,6 +236,7 @@ static void* hbThreadFunc(void* param) { if (pInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; tFreeClientHbBatchReq(pReq, false); + hbClearReqInfo(pAppHbMgr); free(buf); break; } @@ -114,7 +244,7 @@ static void* hbThreadFunc(void* param) { pInfo->msgInfo.pData = buf; pInfo->msgInfo.len = tlen; pInfo->msgType = TDMT_MND_HEARTBEAT; - pInfo->param = NULL; + pInfo->param = strdup(pAppHbMgr->key); pInfo->requestId = generateRequestId(); pInfo->requestObjRefId = 0; @@ -148,7 +278,7 @@ static void hbStopThread() { atomic_store_8(&clientHbMgr.threadStop, 1); } -SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) { +SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) { hbMgrInit(); SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr)); if (pAppHbMgr == NULL) { @@ -160,6 +290,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) { pAppHbMgr->connKeyCnt = 0; pAppHbMgr->reportCnt = 0; pAppHbMgr->reportBytes = 0; + pAppHbMgr->key = strdup(key); // init app info pAppHbMgr->pAppInstInfo = pAppInstInfo; @@ -243,7 +374,7 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp) { return 0; } -int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func) { +int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) { // init hash in activeinfo void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); if (data != NULL) { @@ -252,16 +383,42 @@ int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func SClientHbReq hbReq; hbReq.connKey = connKey; hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); + // init hash - if (func != NULL) { - taosHashPut(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey), func, sizeof(FGetConnInfo)); + if (info != NULL) { + SClientHbReq * pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + info->req = pReq; + taosHashPut(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey), info, sizeof(SHbConnInfo)); } atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1); return 0; } +int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { + SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY}; + SHbConnInfo info = {0}; + + switch (hbType) { + case HEARTBEAT_TYPE_QUERY: { + int64_t *pClusterId = malloc(sizeof(int64_t)); + *pClusterId = clusterId; + + info.param = pClusterId; + break; + } + case HEARTBEAT_TYPE_MQ: { + break; + } + default: + break; + } + + return hbRegisterConnImpl(pAppHbMgr, connKey, &info); +} + void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) { taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); taosHashRemove(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey)); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 6014042e11..a39a0d637b 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -119,7 +119,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); p->mgmtEp = epSet; p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); - p->pAppHbMgr = appHbMgrInit(p); + p->pAppHbMgr = appHbMgrInit(p, key); taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); pInst = &p; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index ec088eb073..831006ac89 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -71,8 +71,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { pTscObj->pAppInfo->clusterId = pConnect->clusterId; atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); - SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY}; - hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL); + hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY); // pRequest->body.resInfo.pRspMsg = pMsg->pData; tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 48e9dce3c1..1e9131c800 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -91,13 +91,11 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { int32_t kvNum = taosHashGetSize(pReq->info); tlen += taosEncodeFixedI32(buf, kvNum); - SKv kv; + SKv *kv; void* pIter = taosHashIterate(pReq->info, NULL); while (pIter != NULL) { - taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen); - kv.valueLen = taosHashGetDataLen(pIter); - kv.value = pIter; - tlen += taosEncodeSKv(buf, &kv); + kv = pIter; + tlen += taosEncodeSKv(buf, kv); pIter = taosHashIterate(pReq->info, pIter); } @@ -116,7 +114,7 @@ void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) { for(int i = 0; i < kvNum; i++) { SKv kv; buf = taosDecodeSKv(buf, &kv); - taosHashPut(pReq->info, kv.key, kv.keyLen, kv.value, kv.valueLen); + taosHashPut(pReq->info, kv.key, sizeof(kv.key), kv.value, kv.valueLen); } return buf; @@ -124,17 +122,28 @@ void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) { int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp) { int tlen = 0; + int32_t kvNum = taosArrayGetSize(pRsp->info); tlen += taosEncodeSClientHbKey(buf, &pRsp->connKey); tlen += taosEncodeFixedI32(buf, pRsp->status); - tlen += taosEncodeFixedI32(buf, pRsp->bodyLen); - tlen += taosEncodeBinary(buf, pRsp->body, pRsp->bodyLen); + tlen += taosEncodeFixedI32(buf, kvNum); + for (int i = 0; i < kvNum; i++) { + SKv *kv = (SKv *)taosArrayGet(pRsp->info, i); + tlen += taosEncodeSKv(buf, kv); + } return tlen; } void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp) { + int32_t kvNum = 0; buf = taosDecodeSClientHbKey(buf, &pRsp->connKey); buf = taosDecodeFixedI32(buf, &pRsp->status); - buf = taosDecodeFixedI32(buf, &pRsp->bodyLen); - buf = taosDecodeBinary(buf, &pRsp->body, pRsp->bodyLen); + buf = taosDecodeFixedI32(buf, &kvNum); + pRsp->info = taosArrayInit(kvNum, sizeof(SKv)); + for (int i = 0; i < kvNum; i++) { + SKv kv = {0}; + buf = taosDecodeSKv(buf, &kv); + taosArrayPush(pRsp->info, &kv); + } + return buf; } @@ -155,6 +164,7 @@ void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pBatchReq) { if (pBatchReq->reqs == NULL) { pBatchReq->reqs = taosArrayInit(0, sizeof(SClientHbReq)); } + int32_t reqNum; buf = taosDecodeFixedI32(buf, &reqNum); for (int i = 0; i < reqNum; i++) { diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 39be41a4e5..5917093c7b 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -805,6 +805,44 @@ static int32_t mndProcessDropDbReq(SMnodeMsg *pReq) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } +static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SVgroupInfo *vgList, int32_t *vgNum) { + int32_t vindex = 0; + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + while (vindex < pDb->cfg.numOfVgroups) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + if (pVgroup->dbUid == pDb->uid) { + SVgroupInfo *pInfo = &vgList[vindex]; + pInfo->vgId = htonl(pVgroup->vgId); + pInfo->hashBegin = htonl(pVgroup->hashBegin); + pInfo->hashEnd = htonl(pVgroup->hashEnd); + pInfo->numOfEps = pVgroup->replica; + for (int32_t gid = 0; gid < pVgroup->replica; ++gid) { + SVnodeGid *pVgid = &pVgroup->vnodeGid[gid]; + SEpAddr *pEpArrr = &pInfo->epAddr[gid]; + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode != NULL) { + memcpy(pEpArrr->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + pEpArrr->port = htons(pDnode->port); + } + mndReleaseDnode(pMnode, pDnode); + if (pVgid->role == TAOS_SYNC_STATE_LEADER) { + pInfo->inUse = gid; + } + } + vindex++; + } + + sdbRelease(pSdb, pVgroup); + } + + *vgNum = vindex; +} + static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -826,45 +864,16 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { return -1; } - int32_t vindex = 0; + int32_t vgNum = 0; if (pUse->vgVersion < pDb->vgVersion) { - void *pIter = NULL; - while (vindex < pDb->cfg.numOfVgroups) { - SVgObj *pVgroup = NULL; - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - - if (pVgroup->dbUid == pDb->uid) { - SVgroupInfo *pInfo = &pRsp->vgroupInfo[vindex]; - pInfo->vgId = htonl(pVgroup->vgId); - pInfo->hashBegin = htonl(pVgroup->hashBegin); - pInfo->hashEnd = htonl(pVgroup->hashEnd); - pInfo->numOfEps = pVgroup->replica; - for (int32_t gid = 0; gid < pVgroup->replica; ++gid) { - SVnodeGid *pVgid = &pVgroup->vnodeGid[gid]; - SEpAddr *pEpArrr = &pInfo->epAddr[gid]; - SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); - if (pDnode != NULL) { - memcpy(pEpArrr->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - pEpArrr->port = htons(pDnode->port); - } - mndReleaseDnode(pMnode, pDnode); - if (pVgid->role == TAOS_SYNC_STATE_LEADER) { - pInfo->inUse = gid; - } - } - vindex++; - } - - sdbRelease(pSdb, pVgroup); - } + mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum); } memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN); pRsp->uid = htobe64(pDb->uid); pRsp->vgVersion = htonl(pDb->vgVersion); - pRsp->vgNum = htonl(vindex); + pRsp->vgNum = htonl(vgNum); pRsp->hashMethod = pDb->hashMethod; pReq->pCont = pRsp; @@ -874,6 +883,72 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { return 0; } +int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void **rsp, int32_t *rspLen) { + SSdb *pSdb = pMnode->pSdb; + int32_t bufSize = num * (sizeof(SUseDbRsp) + TSDB_DEFAULT_VN_PER_DB * sizeof(SVgroupInfo)); + void *buf = malloc(bufSize); + int32_t len = 0; + int32_t contLen = 0; + int32_t bufOffset = 0; + SUseDbRsp *pRsp = NULL; + + for (int32_t i = 0; i < num; ++i) { + SDbVgVersion *db = &dbs[i]; + + len = 0; + + SDbObj *pDb = mndAcquireDb(pMnode, db->dbName); + if (pDb == NULL) { + mInfo("db %s not exist", db->dbName); + + len = sizeof(SUseDbRsp); + } else if (pDb->uid != db->dbId || db->vgVersion < pDb->vgVersion) { + len = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo); + } + + if (0 == len) { + mndReleaseDb(pMnode, pDb); + + continue; + } + + contLen += len; + + if (contLen > bufSize) { + buf = realloc(buf, contLen); + } + + pRsp = (SUseDbRsp *)((char *)buf + bufOffset); + memcpy(pRsp->db, db->dbName, TSDB_DB_FNAME_LEN); + if (pDb) { + int32_t vgNum = 0; + mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum); + + pRsp->uid = htobe64(pDb->uid); + pRsp->vgVersion = htonl(pDb->vgVersion); + pRsp->vgNum = htonl(vgNum); + pRsp->hashMethod = pDb->hashMethod; + } else { + pRsp->vgVersion = htonl(-1); + } + + bufOffset += len; + + mndReleaseDb(pMnode, pDb); + } + + if (contLen > 0) { + *rsp = buf; + *rspLen = contLen; + } else { + *rsp = NULL; + tfree(buf); + *rspLen = 0; + } + + return 0; +} + static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq) { SMnode *pMnode = pReq->pMnode; SSyncDbReq *pSync = pReq->rpcMsg.pCont; @@ -1166,4 +1241,4 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3 static void mndCancelGetNextDb(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 76fabc96ce..8296cd2c6c 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -346,12 +346,46 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { int sz = taosArrayGetSize(pArray); SClientHbBatchRsp batchRsp = {0}; + batchRsp.key = batchReq.key; + batchRsp.keyLen = batchReq.keyLen; batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp)); for (int i = 0; i < sz; i++) { SClientHbReq* pHbReq = taosArrayGet(pArray, i); if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) { + int32_t kvNum = taosHashGetSize(pHbReq->info); + if (NULL == pHbReq->info || kvNum <= 0) { + continue; + } + SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = taosArrayInit(kvNum, sizeof(SKv))}; + + void *pIter = taosHashIterate(pHbReq->info, NULL); + while (pIter != NULL) { + SKv* kv = pIter; + + switch (kv->key) { + case HEARTBEAT_KEY_DBINFO: + void *rspMsg = NULL; + int32_t rspLen = 0; + mndValidateDBInfo(pMnode, (SDbVgVersion *)kv->value, kv->valueLen/sizeof(SDbVgVersion), &rspMsg, &rspLen); + if (rspMsg && rspLen > 0) { + SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg}; + taosArrayPush(hbRsp->info, &kv); + + taosArrayPush(batchRsp.rsps, &hbRsp); + } + break; + case HEARTBEAT_KEY_STBINFO: + + break; + default: + mError("invalid kv key:%d", kv->key); + break; + } + + pIter = taosHashIterate(pHbReq->info, pIter); + } } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) { SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq); if (pRsp != NULL) { @@ -366,6 +400,19 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { void* buf = rpcMallocCont(tlen); void* abuf = buf; tSerializeSClientHbBatchRsp(&abuf, &batchRsp); + + int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps); + for (int32_t i = 0; i < rspNum; ++i) { + SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i); + int32_t kvNum = (rsp->info) ? taosArrayGetSize(rsp->info): 0; + for (int32_t n = 0; n < kvNum; ++n) { + SKv *kv = taosArrayGet(rsp->info, n); + tfree(kv->value); + } + taosArrayDestroy(rsp->info); + } + + tfree(batchRsp.key); taosArrayDestroy(batchRsp.rsps); pReq->contLen = tlen; pReq->pCont = buf; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 9f1ea754c2..a96b12f597 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -115,7 +115,7 @@ typedef struct SCatalogMgmt { typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); -#define CTG_IS_META_NONE(type) ((type) == META_TYPE_NON_TABLE) +#define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE) #define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE) #define CTG_IS_META_TABLE(type) ((type) == META_TYPE_TABLE) #define CTG_IS_META_BOTH(type) ((type) == META_TYPE_BOTH_TABLE) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index d052e04552..3ce71ccb44 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -253,7 +253,7 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransport if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { - SET_META_TYPE_NONE(output->metaType); + SET_META_TYPE_NULL(output->metaType); ctgDebug("stablemeta not exist in mnode, tbName:%s", tbFullName); return TSDB_CODE_SUCCESS; } @@ -315,7 +315,7 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { - SET_META_TYPE_NONE(output->metaType); + SET_META_TYPE_NULL(output->metaType); ctgDebug("tablemeta not exist in vnode, tbName:%s", tNameGetTableName(pTableName)); return TSDB_CODE_SUCCESS; } @@ -510,14 +510,14 @@ int32_t ctgMetaRentUpdate(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t s CTG_LOCK(CTG_WRITE, &slot->lock); if (NULL == slot->meta) { - qError("meta in slot is empty, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (slot->needSort) { taosArraySort(slot->meta, compare); slot->needSort = false; - qDebug("slot meta sorted, slot idx:%d, type:%d", widx, mgmt->type); + qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type); } void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ); @@ -542,6 +542,42 @@ _return: CTG_RET(code); } +int32_t ctgMetaRentRemove(SMetaRentMgmt *mgmt, int64_t id, __compar_fn_t compare) { + int16_t widx = abs(id % mgmt->slotNum); + + SRentSlotInfo *slot = &mgmt->slots[widx]; + int32_t code = 0; + + CTG_LOCK(CTG_WRITE, &slot->lock); + if (NULL == slot->meta) { + qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + if (slot->needSort) { + taosArraySort(slot->meta, compare); + slot->needSort = false; + qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type); + } + + int32_t idx = taosArraySearchIdx(slot->meta, &id, compare, TD_EQ); + if (idx < 0) { + qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + taosArrayRemove(slot->meta, idx); + + qDebug("meta in rent removed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + +_return: + + CTG_UNLOCK(CTG_WRITE, &slot->lock); + + CTG_RET(code); +} + + int32_t ctgMetaRentGetImpl(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) { int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1); if (ridx >= mgmt->slotNum) { @@ -763,44 +799,49 @@ int32_t ctgValidateAndFreeDbInfo(struct SCatalog* pCatalog, const char* dbName, return TSDB_CODE_SUCCESS; } -int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* target) { - SDBVgroupInfo *info = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName)); - if (info) { - CTG_LOCK(CTG_WRITE, &info->lock); - if (info->dbId != target->dbId) { - ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, target->dbName, info->dbId, target->dbId); - CTG_UNLOCK(CTG_WRITE, &info->lock); - taosHashRelease(pCatalog->dbCache.cache, info); - - return TSDB_CODE_SUCCESS; - } - - if (info->vgVersion > target->vgVersion) { - ctgInfo("db vgVersion already updated, db:%s, version:%d, targetVer:%d", target->dbName, info->vgVersion, target->vgVersion); - CTG_UNLOCK(CTG_WRITE, &info->lock); - taosHashRelease(pCatalog->dbCache.cache, info); - - return TSDB_CODE_SUCCESS; - } - - if (info->vgInfo) { - ctgInfo("cleanup db vgInfo, db:%s", target->dbName); - taosHashCleanup(info->vgInfo); - info->vgInfo = NULL; - } +int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* target, bool *removed) { + *removed = false; - if (taosHashRemove(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName))) { - ctgError("taosHashRemove from dbCache failed, db:%s", target->dbName); - CTG_UNLOCK(CTG_WRITE, &info->lock); - taosHashRelease(pCatalog->dbCache.cache, info); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - - CTG_UNLOCK(CTG_WRITE, &info->lock); + SDBVgroupInfo *info = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName)); + if (NULL == info) { + ctgInfo("db not exist in dbCache, may be removed, db:%s", target->dbName); + return TSDB_CODE_SUCCESS; + } + CTG_LOCK(CTG_WRITE, &info->lock); + if (info->dbId != target->dbId) { + ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, target->dbName, info->dbId, target->dbId); + CTG_UNLOCK(CTG_WRITE, &info->lock); taosHashRelease(pCatalog->dbCache.cache, info); + return TSDB_CODE_SUCCESS; + } + + if (info->vgVersion > target->vgVersion) { + ctgInfo("db vgVersion already updated, db:%s, version:%d, targetVer:%d", target->dbName, info->vgVersion, target->vgVersion); + CTG_UNLOCK(CTG_WRITE, &info->lock); + taosHashRelease(pCatalog->dbCache.cache, info); + return TSDB_CODE_SUCCESS; + } + + if (info->vgInfo) { + ctgInfo("cleanup db vgInfo, db:%s", target->dbName); + taosHashCleanup(info->vgInfo); + info->vgInfo = NULL; } + if (taosHashRemove(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName))) { + ctgError("taosHashRemove from dbCache failed, db:%s", target->dbName); + CTG_UNLOCK(CTG_WRITE, &info->lock); + taosHashRelease(pCatalog->dbCache.cache, info); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + CTG_UNLOCK(CTG_WRITE, &info->lock); + + taosHashRelease(pCatalog->dbCache.cache, info); + + *removed = true; + return TSDB_CODE_SUCCESS; } @@ -825,7 +866,7 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con // if get from mnode failed, will not try vnode CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput)); - if (CTG_IS_META_NONE(moutput.metaType)) { + if (CTG_IS_META_NULL(moutput.metaType)) { CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput)); } else { output = &moutput; @@ -841,6 +882,8 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput)); + voutput.metaType = moutput.metaType; + tfree(voutput.tbMeta); voutput.tbMeta = moutput.tbMeta; moutput.tbMeta = NULL; @@ -850,20 +893,22 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con if (0 == exist) { CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput)); - if (CTG_IS_META_NONE(moutput.metaType)) { - SET_META_TYPE_NONE(voutput.metaType); + if (CTG_IS_META_NULL(moutput.metaType)) { + SET_META_TYPE_NULL(voutput.metaType); } tfree(voutput.tbMeta); voutput.tbMeta = moutput.tbMeta; moutput.tbMeta = NULL; } else { + tfree(voutput.tbMeta); + SET_META_TYPE_CTABLE(voutput.metaType); } } } - if (CTG_IS_META_NONE(output->metaType)) { + if (CTG_IS_META_NULL(output->metaType)) { ctgError("no tablemeta got, tbNmae:%s", tNameGetTableName(pTableName)); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); } @@ -1221,43 +1266,26 @@ _return: int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo) { int32_t code = 0; + bool removed = false; if (NULL == pCatalog || NULL == dbInfo) { CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); } - if (pCatalog->dbCache.cache) { - CTG_ERR_JRET(ctgValidateAndRemoveDbInfo(pCatalog, dbInfo)); - - CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName))); + if (NULL == pCatalog->dbCache.cache) { + return TSDB_CODE_SUCCESS; } - ctgWarn("db removed from cache, db:%s", dbName); - - bool newAdded = false; - if (taosHashPutExt(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo), &newAdded) != 0) { - ctgError("taosHashPutExt db vgroup to cache failed, db:%s", dbName); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - dbInfo->vgInfo = NULL; - - SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion}; - if (newAdded) { - CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion))); - } else { - CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare)); + CTG_ERR_RET(ctgValidateAndRemoveDbInfo(pCatalog, dbInfo, &removed)); + if (!removed) { + return TSDB_CODE_SUCCESS; } - ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, dbInfo->vgVersion); + ctgInfo("db removed from cache, db:%s", dbInfo->dbName); - -_return: - - if (dbInfo && dbInfo->vgInfo) { - taosHashCleanup(dbInfo->vgInfo); - dbInfo->vgInfo = NULL; - } + CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->dbRent, dbInfo->dbId, ctgDbVgVersionCompare)); + + ctgDebug("db removed from rent, db:%s", dbInfo->dbName); CTG_RET(code); } @@ -1365,6 +1393,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCatalog, dbInfo, pTableName, pVgroup)); _return: + if (dbInfo) { CTG_UNLOCK(CTG_READ, &dbInfo->lock); taosHashRelease(pCatalog->dbCache.cache, dbInfo); From 09b86cbf76c7975604f274330fae77568533f6ce Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 26 Jan 2022 08:50:32 +0800 Subject: [PATCH 3/9] fix bugs --- include/common/tmsg.h | 3 +- include/libs/catalog/catalog.h | 2 + source/client/inc/clientInt.h | 14 ++- source/client/src/clientHb.c | 118 ++++++++++++++---- source/client/src/clientImpl.c | 9 +- source/common/src/tmsg.c | 2 +- source/dnode/mnode/impl/inc/mndDb.h | 1 + source/dnode/mnode/impl/src/mndProfile.c | 8 +- .../dnode/mnode/impl/test/profile/profile.cpp | 5 +- source/libs/catalog/src/catalog.c | 4 +- 10 files changed, 115 insertions(+), 51 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 97ba56bd76..1316847f48 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -76,9 +76,10 @@ typedef enum { HEARTBEAT_TYPE_MAX } EHbType; -typedef enum { +enum { HEARTBEAT_KEY_DBINFO = 1, HEARTBEAT_KEY_STBINFO, + HEARTBEAT_KEY_MQ_TMP, }; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 9c9e370dca..0b465b7b4e 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -99,6 +99,8 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pTransporter, const int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); +int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo); + /** * Get a table's meta data. * @param pCatalog (input, got with catalogGetHandle) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index a25d79bf30..b346e54add 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -35,10 +35,6 @@ extern "C" { typedef struct SAppInstInfo SAppInstInfo; -typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); - -typedef int32_t (*FHbReqHandle)(SClientHbKey *connKey, void* param, SClientHbReq *req); - typedef struct SHbConnInfo { void *param; SClientHbReq *req; @@ -60,6 +56,12 @@ typedef struct SAppHbMgr { SHashObj* connInfo; // hash } SAppHbMgr; + +typedef int32_t (*FHbRspHandle)(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp); + +typedef int32_t (*FHbReqHandle)(SClientHbKey *connKey, void* param, SClientHbReq *req); + + typedef struct SClientHbMgr { int8_t inited; // ctl @@ -223,11 +225,11 @@ void hbMgrCleanUp(); int hbHandleRsp(SClientHbBatchRsp* hbRsp); // cluster level -SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo); +SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key); void appHbMgrCleanup(SAppHbMgr* pAppHbMgr); // conn level -int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func); +int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType); void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey); int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 8fc9fd086c..0ab3089f13 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -15,6 +15,8 @@ #include "clientInt.h" #include "trpc.h" +#include "catalog.h" +#include "clientLog.h" static SClientHbMgr clientHbMgr = {0}; @@ -25,6 +27,67 @@ static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) return 0; } +static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { + int32_t msgLen = 0; + int32_t code = 0; + + while (msgLen < valueLen) { + SUseDbRsp *rsp = (SUseDbRsp *)((char *)value + msgLen); + + rsp->vgVersion = ntohl(rsp->vgVersion); + rsp->vgNum = ntohl(rsp->vgNum); + rsp->uid = be64toh(rsp->uid); + + if (rsp->vgVersion < 0) { + SDbVgVersion dbInfo; + strcpy(dbInfo.dbName, rsp->db); + dbInfo.dbId = rsp->uid; + dbInfo.vgVersion = rsp->vgVersion; + + code = catalogRemoveDBVgroup(pCatalog, &dbInfo); + } else { + SDBVgroupInfo vgInfo = {0}; + vgInfo.dbId = rsp->uid; + vgInfo.vgVersion = rsp->vgVersion; + vgInfo.hashMethod = rsp->hashMethod; + vgInfo.vgInfo = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (NULL == vgInfo.vgInfo) { + tscError("hash init[%d] failed", rsp->vgNum); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < rsp->vgNum; ++i) { + rsp->vgroupInfo[i].vgId = ntohl(rsp->vgroupInfo[i].vgId); + rsp->vgroupInfo[i].hashBegin = ntohl(rsp->vgroupInfo[i].hashBegin); + rsp->vgroupInfo[i].hashEnd = ntohl(rsp->vgroupInfo[i].hashEnd); + + for (int32_t n = 0; n < rsp->vgroupInfo[i].numOfEps; ++n) { + rsp->vgroupInfo[i].epAddr[n].port = ntohs(rsp->vgroupInfo[i].epAddr[n].port); + } + + if (0 != taosHashPut(vgInfo.vgInfo, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) { + tscError("hash push failed, errno:%d", errno); + taosHashCleanup(vgInfo.vgInfo); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + + code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo); + if (code) { + taosHashCleanup(vgInfo.vgInfo); + } + } + + if (code) { + return code; + } + + msgLen += sizeof(SUseDbRsp) + rsp->vgNum * sizeof(SVgroupInfo); + } + + return TSDB_CODE_SUCCESS; +} + static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); if (NULL == info) { @@ -36,9 +99,24 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs for (int32_t i = 0; i < kvNum; ++i) { SKv *kv = taosArrayGet(pRsp->info, i); switch (kv->key) { - case HEARTBEAT_KEY_DBINFO: + case HEARTBEAT_KEY_DBINFO: { + if (kv->valueLen <= 0 || NULL == kv->value) { + tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value); + break; + } + int64_t *clusterId = (int64_t *)info->param; + struct SCatalog *pCatalog = NULL; + + int32_t code = catalogGetHandle(*clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); + break; + } + + hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog); break; + } case HEARTBEAT_KEY_STBINFO: break; @@ -56,8 +134,10 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code return -1; } char *key = (char *)param; - SClientHbBatchRsp* pRsp = (SClientHbBatchRsp*) pMsg->pData; - int32_t reqNum = taosArrayGetSize(pRsp->rsps); + SClientHbBatchRsp pRsp = {0}; + tDeserializeSClientHbBatchRsp(pMsg->pData, &pRsp); + + int32_t reqNum = taosArrayGetSize(pRsp.rsps); SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); if (pInst == NULL || NULL == *pInst) { @@ -66,7 +146,7 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code } for (int32_t i = 0; i < reqNum; ++i) { - SClientHbRsp* rsp = taosArrayGet(pRsp->rsps, i); + SClientHbRsp* rsp = taosArrayGet(pRsp.rsps, i); code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp); if (code) { break; @@ -145,8 +225,9 @@ void hbFreeReqKvHash(SHashObj* info) { } } -void hbFreeReq(SClientHbReq *req) { - hbFreeReqKvHash(req->info); +void hbFreeReq(void *req) { + SClientHbReq *pReq = (SClientHbReq *)req; + hbFreeReqKvHash(pReq->info); } @@ -305,9 +386,9 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) { } pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq; // init getInfoFunc - pAppHbMgr->getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + pAppHbMgr->connInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - if (pAppHbMgr->getInfoFuncs == NULL) { + if (pAppHbMgr->connInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; free(pAppHbMgr); return NULL; @@ -325,7 +406,7 @@ void appHbMgrCleanup(SAppHbMgr* pAppHbMgr) { SAppHbMgr* pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i); if (pAppHbMgr == pTarget) { taosHashCleanup(pTarget->activeInfo); - taosHashCleanup(pTarget->getInfoFuncs); + taosHashCleanup(pTarget->connInfo); } } @@ -357,23 +438,6 @@ void hbMgrCleanUp() { taosArrayDestroy(clientHbMgr.appHbMgrs); } -int hbHandleRsp(SClientHbBatchRsp* hbRsp) { - int64_t reqId = hbRsp->reqId; - int64_t rspId = hbRsp->rspId; - - SArray* rsps = hbRsp->rsps; - int32_t sz = taosArrayGetSize(rsps); - for (int i = 0; i < sz; i++) { - SClientHbRsp* pRsp = taosArrayGet(rsps, i); - if (pRsp->connKey.hbType < HEARTBEAT_TYPE_MAX) { - clientHbMgr.handle[pRsp->connKey.hbType](pRsp); - } else { - // discard rsp - } - } - return 0; -} - int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) { // init hash in activeinfo void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); @@ -421,7 +485,7 @@ int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int3 void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) { taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); - taosHashRemove(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey)); + taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a39a0d637b..ce0e594955 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -434,13 +434,8 @@ SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) { return NULL; } SKv kv = {0}; - kv.key = malloc(256); - if (kv.key == NULL) { - taosArrayDestroy(pArray); - return NULL; - } - strcpy(kv.key, "mq-tmp"); - kv.keyLen = strlen("mq-tmp") + 1; + kv.key = HEARTBEAT_KEY_MQ_TMP; + SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg)); if (pMqHb == NULL) { return pArray; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 1e9131c800..0d1576d03c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -114,7 +114,7 @@ void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) { for(int i = 0; i < kvNum; i++) { SKv kv; buf = taosDecodeSKv(buf, &kv); - taosHashPut(pReq->info, kv.key, sizeof(kv.key), kv.value, kv.valueLen); + taosHashPut(pReq->info, &kv.key, sizeof(kv.key), kv.value, kv.valueLen); } return buf; diff --git a/source/dnode/mnode/impl/inc/mndDb.h b/source/dnode/mnode/impl/inc/mndDb.h index 91f502be7d..5a3e6ed26e 100644 --- a/source/dnode/mnode/impl/inc/mndDb.h +++ b/source/dnode/mnode/impl/inc/mndDb.h @@ -26,6 +26,7 @@ int32_t mndInitDb(SMnode *pMnode); void mndCleanupDb(SMnode *pMnode); SDbObj *mndAcquireDb(SMnode *pMnode, char *db); void mndReleaseDb(SMnode *pMnode, SDbObj *pDb); +int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void **rsp, int32_t *rspLen); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 8296cd2c6c..49265fa35e 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -346,8 +346,6 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { int sz = taosArrayGetSize(pArray); SClientHbBatchRsp batchRsp = {0}; - batchRsp.key = batchReq.key; - batchRsp.keyLen = batchReq.keyLen; batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp)); for (int i = 0; i < sz; i++) { @@ -365,17 +363,18 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { SKv* kv = pIter; switch (kv->key) { - case HEARTBEAT_KEY_DBINFO: + case HEARTBEAT_KEY_DBINFO: { void *rspMsg = NULL; int32_t rspLen = 0; mndValidateDBInfo(pMnode, (SDbVgVersion *)kv->value, kv->valueLen/sizeof(SDbVgVersion), &rspMsg, &rspLen); if (rspMsg && rspLen > 0) { SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg}; - taosArrayPush(hbRsp->info, &kv); + taosArrayPush(hbRsp.info, &kv); taosArrayPush(batchRsp.rsps, &hbRsp); } break; + } case HEARTBEAT_KEY_STBINFO: break; @@ -412,7 +411,6 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { taosArrayDestroy(rsp->info); } - tfree(batchRsp.key); taosArrayDestroy(batchRsp.rsps); pReq->contLen = tlen; pReq->pCont = buf; diff --git a/source/dnode/mnode/impl/test/profile/profile.cpp b/source/dnode/mnode/impl/test/profile/profile.cpp index 4ad979cdd3..991b4c1249 100644 --- a/source/dnode/mnode/impl/test/profile/profile.cpp +++ b/source/dnode/mnode/impl/test/profile/profile.cpp @@ -102,11 +102,10 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) { req.connKey = {.connId = 123, .hbType = HEARTBEAT_TYPE_MQ}; req.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); SKv kv; - kv.key = (void*)"abc"; - kv.keyLen = 4; + kv.key = 123; kv.value = (void*)"bcd"; kv.valueLen = 4; - taosHashPut(req.info, kv.key, kv.keyLen, kv.value, kv.valueLen); + taosHashPut(req.info, &kv.key, sizeof(kv.key), kv.value, kv.valueLen); taosArrayPush(batchReq.reqs, &req); int32_t tlen = tSerializeSClientHbBatchReq(NULL, &batchReq); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 3ce71ccb44..07086e49ee 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1236,6 +1236,8 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB } bool newAdded = false; + + dbInfo->lock = 0; if (taosHashPutExt(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo), &newAdded) != 0) { ctgError("taosHashPutExt db vgroup to cache failed, db:%s", dbName); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); @@ -1269,7 +1271,7 @@ int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo) { bool removed = false; if (NULL == pCatalog || NULL == dbInfo) { - CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } if (NULL == pCatalog->dbCache.cache) { From 78e49d6b08d3b8c72c726c141d835e1a4d6501a0 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 26 Jan 2022 17:42:31 +0800 Subject: [PATCH 4/9] feature/qnode --- include/common/tmsg.h | 38 ++++++++++++++++- source/client/src/clientHb.c | 51 +++++++++++++++-------- source/client/src/clientMsgHandler.c | 14 ++++++- source/common/src/tmsg.c | 2 +- source/dnode/mnode/impl/src/mndDb.c | 5 +++ source/dnode/mnode/impl/src/mndProfile.c | 5 ++- source/libs/catalog/inc/catalogInt.h | 2 +- source/libs/catalog/src/catalog.c | 23 +++++----- source/libs/executor/src/dataDispatcher.c | 1 + source/util/src/terror.c | 2 +- 10 files changed, 107 insertions(+), 36 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 210804143b..8e62b4d0db 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1391,9 +1391,26 @@ void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq); int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp); void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp); + +static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) { + void *pIter = taosHashIterate(info, NULL); + while (pIter != NULL) { + SKv* kv = (SKv*)pIter; + + tfree(kv->value); + + pIter = taosHashIterate(info, pIter); + } +} + + static FORCE_INLINE void tFreeClientHbReq(void *pReq) { SClientHbReq* req = (SClientHbReq*)pReq; - if (req->info) taosHashCleanup(req->info); + if (req->info) { + tFreeReqKvHash(req->info); + + taosHashCleanup(req->info); + } } int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq); @@ -1409,6 +1426,25 @@ static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { free(pReq); } +static FORCE_INLINE void tFreeClientKv(void *pKv) { + SKv *kv = (SKv *)pKv; + if (kv) { + tfree(kv->value); + } +} + +static FORCE_INLINE void tFreeClientHbRsp(void *pRsp) { + SClientHbRsp* rsp = (SClientHbRsp*)pRsp; + if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv); +} + + +static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) { + SClientHbBatchRsp *rsp = (SClientHbBatchRsp*)pRsp; + taosArrayDestroyEx(rsp->rsps, tFreeClientHbRsp); +} + + int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp); void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 0ab3089f13..4cb2cd2aae 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -37,6 +37,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog rsp->vgVersion = ntohl(rsp->vgVersion); rsp->vgNum = ntohl(rsp->vgNum); rsp->uid = be64toh(rsp->uid); + + tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid); if (rsp->vgVersion < 0) { SDbVgVersion dbInfo; @@ -96,6 +98,9 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs } int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0; + + tscDebug("hb got %d rsp kv", kvNum); + for (int32_t i = 0; i < kvNum; ++i) { SKv *kv = taosArrayGet(pRsp->info, i); switch (kv->key) { @@ -130,28 +135,42 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs } static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code) { + static int32_t emptyRspNum = 0; if (code != 0) { + tfree(param); return -1; } char *key = (char *)param; SClientHbBatchRsp pRsp = {0}; tDeserializeSClientHbBatchRsp(pMsg->pData, &pRsp); - int32_t reqNum = taosArrayGetSize(pRsp.rsps); + int32_t rspNum = taosArrayGetSize(pRsp.rsps); SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); if (pInst == NULL || NULL == *pInst) { - tscError("cluster not exist, key:%s", key); + tscError("cluster not exist, key:%s", key); + tfree(param); + tFreeClientHbBatchRsp(&pRsp); return -1; } - for (int32_t i = 0; i < reqNum; ++i) { + tfree(param); + + if (rspNum) { + tscDebug("hb got %d rsp, %d empty rsp prior", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0)); + } else { + atomic_add_fetch_32(&emptyRspNum, 1); + } + + for (int32_t i = 0; i < rspNum; ++i) { SClientHbRsp* rsp = taosArrayGet(pRsp.rsps, i); code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp); if (code) { break; } } + + tFreeClientHbBatchRsp(&pRsp); return code; } @@ -166,6 +185,10 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl return code; } + if (dbNum <= 0) { + return TSDB_CODE_SUCCESS; + } + for (int32_t i = 0; i < dbNum; ++i) { SDbVgVersion *db = &dbs[i]; db->dbId = htobe64(db->dbId); @@ -173,6 +196,9 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl } SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = sizeof(SDbVgVersion) * dbNum, .value = dbs}; + + tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen); + taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); return TSDB_CODE_SUCCESS; @@ -213,21 +239,9 @@ static FORCE_INLINE void hbMgrInitHandle() { hbMgrInitMqHbHandle(); } - -void hbFreeReqKvHash(SHashObj* info) { - void *pIter = taosHashIterate(info, NULL); - while (pIter != NULL) { - SKv* kv = pIter; - - tfree(kv->value); - - pIter = taosHashIterate(info, pIter); - } -} - void hbFreeReq(void *req) { SClientHbReq *pReq = (SClientHbReq *)req; - hbFreeReqKvHash(pReq->info); + tFreeReqKvHash(pReq->info); } @@ -274,7 +288,7 @@ void hbClearReqInfo(SAppHbMgr *pAppHbMgr) { while (pIter != NULL) { SClientHbReq* pOneReq = pIter; - hbFreeReqKvHash(pOneReq->info); + tFreeReqKvHash(pOneReq->info); taosHashClear(pOneReq->info); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); @@ -333,7 +347,8 @@ static void* hbThreadFunc(void* param) { int64_t transporterId = 0; SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); - tFreeClientHbBatchReq(pReq, false); + tFreeClientHbBatchReq(pReq, false); + hbClearReqInfo(pAppHbMgr); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 831006ac89..7b4b3353fb 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -18,6 +18,7 @@ #include "tname.h" #include "clientInt.h" #include "clientLog.h" +#include "catalog.h" int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); @@ -287,7 +288,6 @@ int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) { } int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { - // todo: Remove cache in catalog cache. SRequestObj* pRequest = param; if (code != TSDB_CODE_SUCCESS) { setErrno(pRequest, code); @@ -295,6 +295,18 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { return code; } + SDropDbReq *req = pRequest->body.requestMsg.pData; + + SDbVgVersion dbVer = {0}; + struct SCatalog *pCatalog = NULL; + + strncpy(dbVer.dbName, req->db, sizeof(dbVer.dbName)); + dbVer.dbId = 0; //TODO GET DBID FROM RSP + + catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + + catalogRemoveDBVgroup(pCatalog, &dbVer); + tsem_post(&pRequest->body.rspSem); return code; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 0d1576d03c..34b55fd812 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -114,7 +114,7 @@ void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) { for(int i = 0; i < kvNum; i++) { SKv kv; buf = taosDecodeSKv(buf, &kv); - taosHashPut(pReq->info, &kv.key, sizeof(kv.key), kv.value, kv.valueLen); + taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); } return buf; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 5917093c7b..94dff2e559 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -894,6 +894,8 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void * for (int32_t i = 0; i < num; ++i) { SDbVgVersion *db = &dbs[i]; + db->dbId = be64toh(db->dbId); + db->vgVersion = ntohl(db->vgVersion); len = 0; @@ -929,6 +931,9 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void * pRsp->vgNum = htonl(vgNum); pRsp->hashMethod = pDb->hashMethod; } else { + pRsp->uid = htobe64(db->dbId); + pRsp->vgNum = htonl(0); + pRsp->hashMethod = 0; pRsp->vgVersion = htonl(-1); } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 49265fa35e..005f9a3d3c 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -370,8 +370,6 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { if (rspMsg && rspLen > 0) { SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg}; taosArrayPush(hbRsp.info, &kv); - - taosArrayPush(batchRsp.rsps, &hbRsp); } break; } @@ -380,11 +378,14 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { break; default: mError("invalid kv key:%d", kv->key); + hbRsp.status = TSDB_CODE_MND_APP_ERROR; break; } pIter = taosHashIterate(pHbReq->info, pIter); } + + taosArrayPush(batchRsp.rsps, &hbRsp); } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) { SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq); if (pRsp != NULL) { diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index a96b12f597..7128e143bf 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -31,7 +31,7 @@ extern "C" { #define CTG_DEFAULT_RENT_SECOND 10 #define CTG_DEFAULT_RENT_SLOT_SIZE 10 -#define CTG_RENT_SLOT_SECOND 2 +#define CTG_RENT_SLOT_SECOND 1.5 #define CTG_DEFAULT_INVALID_VERSION (-1) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 07086e49ee..6b68ee87e5 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -443,9 +443,9 @@ int32_t ctgSTableVersionCompare(const void* key1, const void* key2) { } int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) { - if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) { + if (*(int64_t *)key1 < ((SDbVgVersion*)key2)->dbId) { return -1; - } else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) { + } else if (*(int64_t *)key1 > ((SDbVgVersion*)key2)->dbId) { return 1; } else { return 0; @@ -652,7 +652,7 @@ int32_t ctgMetaRentGet(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t s int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) { int32_t code = 0; - if (NULL == output->tbMeta) { + if ((!CTG_IS_META_CTABLE(output->metaType)) && NULL == output->tbMeta) { ctgError("no valid table meta got from meta rsp, tbName:%s", output->tbFname); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } @@ -809,20 +809,19 @@ int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* targ } CTG_LOCK(CTG_WRITE, &info->lock); + + //TODO OPEN IT +#if 0 if (info->dbId != target->dbId) { ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, target->dbName, info->dbId, target->dbId); CTG_UNLOCK(CTG_WRITE, &info->lock); taosHashRelease(pCatalog->dbCache.cache, info); return TSDB_CODE_SUCCESS; } - - if (info->vgVersion > target->vgVersion) { - ctgInfo("db vgVersion already updated, db:%s, version:%d, targetVer:%d", target->dbName, info->vgVersion, target->vgVersion); - CTG_UNLOCK(CTG_WRITE, &info->lock); - taosHashRelease(pCatalog->dbCache.cache, info); - return TSDB_CODE_SUCCESS; - } - +#else + target->dbId = info->dbId; +#endif + if (info->vgInfo) { ctgInfo("cleanup db vgInfo, db:%s", target->dbName); taosHashCleanup(info->vgInfo); @@ -1246,6 +1245,8 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB dbInfo->vgInfo = NULL; SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion}; + strncpy(vgVersion.dbName, dbName, sizeof(vgVersion.dbName)); + if (newAdded) { CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion))); } else { diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 97a0557748..3623f5947d 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -186,6 +186,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); taosFreeQitem(pBuf); *pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; + *pQueryEnd = pDispatcher->queryEnd; } static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 56919ff99e..489fff5d64 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -410,7 +410,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_LEVEL, "tfs invalid level") TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_VALID_DISK, "tfs no valid disk") // catalog -TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_ERROR, "catalog interval error") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_ERROR, "catalog internal error") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INVALID_INPUT, "invalid catalog input parameters") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error") From 5815ae163984f5e667ce7719f669322d64809d29 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 27 Jan 2022 10:57:12 +0800 Subject: [PATCH 5/9] feature/qnode --- source/client/src/clientHb.c | 7 +++++-- source/dnode/mnode/impl/src/mndDb.c | 3 +-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 0379a76832..f8957f6fd8 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -63,8 +63,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog rsp->vgroupInfo[i].hashBegin = ntohl(rsp->vgroupInfo[i].hashBegin); rsp->vgroupInfo[i].hashEnd = ntohl(rsp->vgroupInfo[i].hashEnd); - for (int32_t n = 0; n < rsp->vgroupInfo[i].numOfEps; ++n) { - rsp->vgroupInfo[i].epAddr[n].port = ntohs(rsp->vgroupInfo[i].epAddr[n].port); + for (int32_t n = 0; n < rsp->vgroupInfo[i].epset.numOfEps; ++n) { + rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port); } if (0 != taosHashPut(vgInfo.vgInfo, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) { @@ -377,12 +377,15 @@ static int32_t hbCreateThread() { static void hbStopThread() { if (atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 0, 1)) { + tscDebug("hb thread already stopped"); return; } while (2 != atomic_load_8(&clientHbMgr.threadStop)) { usleep(10); } + + tscDebug("hb thread stopped"); } SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) { diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index c1e070b16e..73c67fc59a 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1245,5 +1245,4 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3 static void mndCancelGetNextDb(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; - sdbCancelFetch(pSdb, pIter); -} + sdbCancelFetch(pSdb, \ No newline at end of file From da3e2329da0d14eb787219fc168300b11acea29a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 27 Jan 2022 11:20:38 +0800 Subject: [PATCH 6/9] feature/qnode --- source/dnode/mnode/impl/src/mndDb.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 87a8becf33..5e6f457a67 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -824,7 +824,7 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SVgroupInfo *vgLis if (pIter == NULL) break; if (pVgroup->dbUid == pDb->uid) { - SVgroupInfo *pInfo = &pRsp->vgroupInfo[vindex]; + SVgroupInfo *pInfo = &vgList[vindex]; pInfo->vgId = htonl(pVgroup->vgId); pInfo->hashBegin = htonl(pVgroup->hashBegin); pInfo->hashEnd = htonl(pVgroup->hashEnd); @@ -1253,4 +1253,5 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3 static void mndCancelGetNextDb(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; - sdbCancelFetch(pSdb, \ No newline at end of file + sdbCancelFetch(pSdb, pIter); +} \ No newline at end of file From a3241613d7f3a2a7eec4dcb4f9ccadfd2802f706 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 27 Jan 2022 13:29:53 +0800 Subject: [PATCH 7/9] fix destroy connection issue --- source/client/src/clientHb.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index f8957f6fd8..f6c473333c 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -528,8 +528,12 @@ int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int3 } void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) { - taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); - taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); + int32_t code = 0; + code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); + if (code) { + return; + } atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); if (atomic_load_32(&pAppHbMgr->connKeyCnt) <= 0) { appHbMgrCleanup(pAppHbMgr); From 84a7534256b74ec66ee9d48789c1c40a44cf7f87 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 27 Jan 2022 13:44:54 +0800 Subject: [PATCH 8/9] process drop database rsp --- source/client/src/clientMsgHandler.c | 6 +++--- source/libs/catalog/src/catalog.c | 5 ----- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index c9b1d07965..24d1c0f03c 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -299,13 +299,13 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { return code; } - SDropDbReq *req = pRequest->body.requestMsg.pData; + SDropDbRsp *rsp = (SDropDbRsp *)pMsg->pData; SDbVgVersion dbVer = {0}; struct SCatalog *pCatalog = NULL; - strncpy(dbVer.dbName, req->db, sizeof(dbVer.dbName)); - dbVer.dbId = 0; //TODO GET DBID FROM RSP + strncpy(dbVer.dbName, rsp->db, sizeof(dbVer.dbName)); + dbVer.dbId = be64toh(rsp->uid); catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index f5487f7da8..1b4d711aad 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -794,17 +794,12 @@ int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* targ CTG_LOCK(CTG_WRITE, &info->lock); - //TODO OPEN IT -#if 0 if (info->dbId != target->dbId) { ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, target->dbName, info->dbId, target->dbId); CTG_UNLOCK(CTG_WRITE, &info->lock); taosHashRelease(pCatalog->dbCache.cache, info); return TSDB_CODE_SUCCESS; } -#else - target->dbId = info->dbId; -#endif if (info->vgInfo) { ctgInfo("cleanup db vgInfo, db:%s", target->dbName); From a53d7036eceb9dceaeffaa9e7ceac97908bcc271 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 27 Jan 2022 13:52:10 +0800 Subject: [PATCH 9/9] fix ut case issue --- source/dnode/mnode/impl/test/profile/profile.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/test/profile/profile.cpp b/source/dnode/mnode/impl/test/profile/profile.cpp index 36550f875e..ccf13d5d66 100644 --- a/source/dnode/mnode/impl/test/profile/profile.cpp +++ b/source/dnode/mnode/impl/test/profile/profile.cpp @@ -105,7 +105,7 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) { kv.key = 123; kv.value = (void*)"bcd"; kv.valueLen = 4; - taosHashPut(req.info, &kv.key, sizeof(kv.key), kv.value, kv.valueLen); + taosHashPut(req.info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); taosArrayPush(batchReq.reqs, &req); int32_t tlen = tSerializeSClientHbBatchReq(NULL, &batchReq);