diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 27a01e4818..bbe3975cfb 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -76,6 +76,13 @@ typedef enum { HEARTBEAT_TYPE_MAX } EHbType; +enum { + HEARTBEAT_KEY_DBINFO = 1, + HEARTBEAT_KEY_STBINFO, + HEARTBEAT_KEY_MQ_TMP, +}; + + typedef enum _mgmt_table { TSDB_MGMT_TABLE_START, TSDB_MGMT_TABLE_ACCT, @@ -147,7 +154,7 @@ typedef struct { } SBuildTableMetaInput; typedef struct { - char db[TSDB_TABLE_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; int32_t vgVersion; } SBuildUseDBInput; @@ -745,7 +752,7 @@ typedef struct { typedef struct { char db[TSDB_DB_FNAME_LEN]; - int64_t uid; + uint64_t uid; int32_t vgVersion; int32_t vgNum; int8_t hashMethod; @@ -1340,9 +1347,8 @@ static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBat } typedef struct { - int32_t keyLen; + int32_t key; int32_t valueLen; - void* key; void* value; } SKv; @@ -1364,8 +1370,7 @@ typedef struct { typedef struct { SClientHbKey connKey; int32_t status; - int32_t bodyLen; - void* body; + SArray* info; // Array } SClientHbRsp; typedef struct { @@ -1384,9 +1389,26 @@ void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq); int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp); void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp); + +static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) { + void *pIter = taosHashIterate(info, NULL); + while (pIter != NULL) { + SKv* kv = (SKv*)pIter; + + tfree(kv->value); + + pIter = taosHashIterate(info, pIter); + } +} + + static FORCE_INLINE void tFreeClientHbReq(void *pReq) { SClientHbReq* req = (SClientHbReq*)pReq; - if (req->info) taosHashCleanup(req->info); + if (req->info) { + tFreeReqKvHash(req->info); + + taosHashCleanup(req->info); + } } int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq); @@ -1402,22 +1424,39 @@ static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { free(pReq); } +static FORCE_INLINE void tFreeClientKv(void *pKv) { + SKv *kv = (SKv *)pKv; + if (kv) { + tfree(kv->value); + } +} + +static FORCE_INLINE void tFreeClientHbRsp(void *pRsp) { + SClientHbRsp* rsp = (SClientHbRsp*)pRsp; + if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv); +} + + +static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) { + SClientHbBatchRsp *rsp = (SClientHbBatchRsp*)pRsp; + taosArrayDestroyEx(rsp->rsps, tFreeClientHbRsp); +} + + int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp); void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp); static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) { int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKv->keyLen); + tlen += taosEncodeFixedI32(buf, pKv->key); tlen += taosEncodeFixedI32(buf, pKv->valueLen); - tlen += taosEncodeBinary(buf, pKv->key, pKv->keyLen); tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen); return tlen; } static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) { - buf = taosDecodeFixedI32(buf, &pKv->keyLen); + buf = taosDecodeFixedI32(buf, &pKv->key); buf = taosDecodeFixedI32(buf, &pKv->valueLen); - buf = taosDecodeBinary(buf, &pKv->key, pKv->keyLen); buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen); return buf; } diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 70cff7ed1a..0b465b7b4e 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -59,6 +59,7 @@ typedef struct SSTableMetaVersion { } SSTableMetaVersion; typedef struct SDbVgVersion { + char dbName[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t vgVersion; } SDbVgVersion; @@ -98,6 +99,8 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pTransporter, const int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); +int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo); + /** * Get a table's meta data. * @param pCatalog (input, got with catalogGetHandle) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 6cd33b1067..c7ca2d9e1d 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -82,7 +82,7 @@ typedef struct STableMeta { typedef struct SDBVgroupInfo { SRWLatch lock; - int64_t dbId; + uint64_t dbId; int32_t vgVersion; int8_t hashMethod; SHashObj *vgInfo; //key:vgId, value:SVgroupInfo @@ -94,7 +94,7 @@ typedef struct SUseDbOutput { } SUseDbOutput; enum { - META_TYPE_NON_TABLE = 1, + META_TYPE_NULL_TABLE = 1, META_TYPE_CTABLE, META_TYPE_TABLE, META_TYPE_BOTH_TABLE @@ -163,7 +163,7 @@ extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSi extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize); -#define SET_META_TYPE_NONE(t) (t) = META_TYPE_NON_TABLE +#define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE #define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE #define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE #define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 56fe0421a3..77249e3f0b 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -35,9 +35,13 @@ extern "C" { typedef struct SAppInstInfo SAppInstInfo; -typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); +typedef struct SHbConnInfo { + void *param; + SClientHbReq *req; +} SHbConnInfo; typedef struct SAppHbMgr { + char *key; // statistics int32_t reportCnt; int32_t connKeyCnt; @@ -49,9 +53,15 @@ typedef struct SAppHbMgr { SAppInstInfo* pAppInstInfo; // info SHashObj* activeInfo; // hash - SHashObj* getInfoFuncs; // hash + SHashObj* connInfo; // hash } SAppHbMgr; + +typedef int32_t (*FHbRspHandle)(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp); + +typedef int32_t (*FHbReqHandle)(SClientHbKey *connKey, void* param, SClientHbReq *req); + + typedef struct SClientHbMgr { int8_t inited; // ctl @@ -59,12 +69,10 @@ typedef struct SClientHbMgr { pthread_t thread; pthread_mutex_t lock; // used when app init and cleanup SArray* appHbMgrs; // SArray one for each cluster - FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; + FHbReqHandle reqHandle[HEARTBEAT_TYPE_MAX]; + FHbRspHandle rspHandle[HEARTBEAT_TYPE_MAX]; } SClientHbMgr; -// TODO: embed param into function -// return type: SArray -typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param); typedef struct SQueryExecMetric { int64_t start; // start timestamp @@ -218,11 +226,11 @@ void hbMgrCleanUp(); int hbHandleRsp(SClientHbBatchRsp* hbRsp); // cluster level -SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo); +SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key); void appHbMgrCleanup(SAppHbMgr* pAppHbMgr); // conn level -int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func); +int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType); void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey); int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 28faa76785..cb73701bfa 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -127,6 +127,8 @@ void* openTransporter(const char *user, const char *auth, int32_t numOfThread) { void destroyTscObj(void *pObj) { STscObj *pTscObj = pObj; + SClientHbKey connKey = {.connId = pTscObj->connId, .hbType = pTscObj->connType}; + hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); tscDebug("connObj 0x%"PRIx64" destroyed, totalConn:%"PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns); pthread_mutex_destroy(&pTscObj->mutex); @@ -517,4 +519,4 @@ setConfRet taos_set_config(const char *config){ pthread_mutex_unlock(&setConfMutex); return ret; } -#endif \ No newline at end of file +#endif diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 0f4ff6f725..f6c473333c 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -15,33 +15,237 @@ #include "clientInt.h" #include "trpc.h" +#include "catalog.h" +#include "clientLog.h" static SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); static void hbStopThread(); -static int32_t hbMqHbRspHandle(SClientHbRsp* pRsp) { +static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { return 0; } -static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code) { - if (code != 0) { - return -1; +static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { + int32_t msgLen = 0; + int32_t code = 0; + + while (msgLen < valueLen) { + SUseDbRsp *rsp = (SUseDbRsp *)((char *)value + msgLen); + + rsp->vgVersion = ntohl(rsp->vgVersion); + rsp->vgNum = ntohl(rsp->vgNum); + rsp->uid = be64toh(rsp->uid); + + tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid); + + if (rsp->vgVersion < 0) { + SDbVgVersion dbInfo; + strcpy(dbInfo.dbName, rsp->db); + dbInfo.dbId = rsp->uid; + dbInfo.vgVersion = rsp->vgVersion; + + code = catalogRemoveDBVgroup(pCatalog, &dbInfo); + } else { + SDBVgroupInfo vgInfo = {0}; + vgInfo.dbId = rsp->uid; + vgInfo.vgVersion = rsp->vgVersion; + vgInfo.hashMethod = rsp->hashMethod; + vgInfo.vgInfo = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (NULL == vgInfo.vgInfo) { + tscError("hash init[%d] failed", rsp->vgNum); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < rsp->vgNum; ++i) { + rsp->vgroupInfo[i].vgId = ntohl(rsp->vgroupInfo[i].vgId); + rsp->vgroupInfo[i].hashBegin = ntohl(rsp->vgroupInfo[i].hashBegin); + rsp->vgroupInfo[i].hashEnd = ntohl(rsp->vgroupInfo[i].hashEnd); + + for (int32_t n = 0; n < rsp->vgroupInfo[i].epset.numOfEps; ++n) { + rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port); + } + + if (0 != taosHashPut(vgInfo.vgInfo, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) { + tscError("hash push failed, errno:%d", errno); + taosHashCleanup(vgInfo.vgInfo); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + + code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo); + if (code) { + taosHashCleanup(vgInfo.vgInfo); + } + } + + if (code) { + return code; + } + + msgLen += sizeof(SUseDbRsp) + rsp->vgNum * sizeof(SVgroupInfo); } - SClientHbRsp* pRsp = (SClientHbRsp*) pMsg->pData; - return hbMqHbRspHandle(pRsp); + + return TSDB_CODE_SUCCESS; } -void hbMgrInitMqHbRspHandle() { - clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle; +static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { + SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); + if (NULL == info) { + tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType); + return TSDB_CODE_SUCCESS; + } + + int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0; + + tscDebug("hb got %d rsp kv", kvNum); + + for (int32_t i = 0; i < kvNum; ++i) { + SKv *kv = taosArrayGet(pRsp->info, i); + switch (kv->key) { + case HEARTBEAT_KEY_DBINFO: { + if (kv->valueLen <= 0 || NULL == kv->value) { + tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value); + break; + } + + int64_t *clusterId = (int64_t *)info->param; + struct SCatalog *pCatalog = NULL; + + int32_t code = catalogGetHandle(*clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); + break; + } + + hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog); + break; + } + case HEARTBEAT_KEY_STBINFO: + + break; + default: + tscError("invalid hb key type:%d", kv->key); + break; + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code) { + static int32_t emptyRspNum = 0; + if (code != 0) { + tfree(param); + return -1; + } + char *key = (char *)param; + SClientHbBatchRsp pRsp = {0}; + tDeserializeSClientHbBatchRsp(pMsg->pData, &pRsp); + + int32_t rspNum = taosArrayGetSize(pRsp.rsps); + + SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); + if (pInst == NULL || NULL == *pInst) { + tscError("cluster not exist, key:%s", key); + tfree(param); + tFreeClientHbBatchRsp(&pRsp); + return -1; + } + + tfree(param); + + if (rspNum) { + tscDebug("hb got %d rsp, %d empty rsp prior", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0)); + } else { + atomic_add_fetch_32(&emptyRspNum, 1); + } + + for (int32_t i = 0; i < rspNum; ++i) { + SClientHbRsp* rsp = taosArrayGet(pRsp.rsps, i); + code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp); + if (code) { + break; + } + } + + tFreeClientHbBatchRsp(&pRsp); + + return code; +} + +int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { + SDbVgVersion *dbs = NULL; + uint32_t dbNum = 0; + int32_t code = 0; + + code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + if (dbNum <= 0) { + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < dbNum; ++i) { + SDbVgVersion *db = &dbs[i]; + db->dbId = htobe64(db->dbId); + db->vgVersion = htonl(db->vgVersion); + } + + SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = sizeof(SDbVgVersion) * dbNum, .value = dbs}; + + tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen); + + taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); + + return TSDB_CODE_SUCCESS; +} + +int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) { + int64_t *clusterId = (int64_t *)param; + struct SCatalog *pCatalog = NULL; + + int32_t code = catalogGetHandle(*clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); + return code; + } + + code = hbGetExpiredDBInfo(connKey, pCatalog, req); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + + return TSDB_CODE_SUCCESS; +} + +int32_t hbMqHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) { + +} + +void hbMgrInitMqHbHandle() { + clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle; + clientHbMgr.reqHandle[HEARTBEAT_TYPE_MQ] = hbMqHbReqHandle; + clientHbMgr.rspHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbRspHandle; + clientHbMgr.rspHandle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle; } static FORCE_INLINE void hbMgrInitHandle() { // init all handle - hbMgrInitMqHbRspHandle(); + hbMgrInitMqHbHandle(); } +void hbFreeReq(void *req) { + SClientHbReq *pReq = (SClientHbReq *)req; + tFreeReqKvHash(pReq->info); +} + + + SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { SClientHbBatchReq* pBatchReq = malloc(sizeof(SClientHbBatchReq)); if (pBatchReq == NULL) { @@ -51,38 +255,58 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); + int32_t code = 0; void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); while (pIter != NULL) { SClientHbReq* pOneReq = pIter; + + SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey)); + if (info) { + code = (*clientHbMgr.reqHandle[pOneReq->connKey.hbType])(&pOneReq->connKey, info->param, pOneReq); + if (code) { + taosHashCancelIterate(pAppHbMgr->activeInfo, pIter); + break; + } + } + taosArrayPush(pBatchReq->reqs, pOneReq); - taosHashClear(pOneReq->info); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); } -#if 0 - pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL); - while (pIter != NULL) { - FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter; - SClientHbKey connKey; - taosHashCopyKey(pIter, &connKey); - SArray* pArray = getConnInfoFp(connKey, NULL); - - pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, pIter); + if (code) { + taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq); + tfree(pBatchReq); } -#endif return pBatchReq; } + +void hbClearReqInfo(SAppHbMgr *pAppHbMgr) { + void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); + while (pIter != NULL) { + SClientHbReq* pOneReq = pIter; + + tFreeReqKvHash(pOneReq->info); + taosHashClear(pOneReq->info); + + pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); + } +} + + + static void* hbThreadFunc(void* param) { setThreadName("hb"); while (1) { - int8_t threadStop = atomic_load_8(&clientHbMgr.threadStop); - if(threadStop) { + int8_t threadStop = atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 1, 2); + if(1 == threadStop) { break; } + pthread_mutex_lock(&clientHbMgr.lock); + int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); for(int i = 0; i < sz; i++) { SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); @@ -98,7 +322,9 @@ static void* hbThreadFunc(void* param) { int tlen = tSerializeSClientHbBatchReq(NULL, pReq); void *buf = malloc(tlen); if (buf == NULL) { - //TODO: error handling + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + tFreeClientHbBatchReq(pReq, false); + hbClearReqInfo(pAppHbMgr); break; } void *abuf = buf; @@ -107,6 +333,7 @@ static void* hbThreadFunc(void* param) { if (pInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; tFreeClientHbBatchReq(pReq, false); + hbClearReqInfo(pAppHbMgr); free(buf); break; } @@ -114,7 +341,7 @@ static void* hbThreadFunc(void* param) { pInfo->msgInfo.pData = buf; pInfo->msgInfo.len = tlen; pInfo->msgType = TDMT_MND_HEARTBEAT; - pInfo->param = NULL; + pInfo->param = strdup(pAppHbMgr->key); pInfo->requestId = generateRequestId(); pInfo->requestObjRefId = 0; @@ -122,10 +349,14 @@ static void* hbThreadFunc(void* param) { int64_t transporterId = 0; SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); - tFreeClientHbBatchReq(pReq, false); + tFreeClientHbBatchReq(pReq, false); + hbClearReqInfo(pAppHbMgr); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); } + + pthread_mutex_unlock(&clientHbMgr.lock); + taosMsleep(HEARTBEAT_INTERVAL); } return NULL; @@ -145,10 +376,19 @@ static int32_t hbCreateThread() { } static void hbStopThread() { - atomic_store_8(&clientHbMgr.threadStop, 1); + if (atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 0, 1)) { + tscDebug("hb thread already stopped"); + return; + } + + while (2 != atomic_load_8(&clientHbMgr.threadStop)) { + usleep(10); + } + + tscDebug("hb thread stopped"); } -SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) { +SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) { hbMgrInit(); SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr)); if (pAppHbMgr == NULL) { @@ -160,6 +400,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) { pAppHbMgr->connKeyCnt = 0; pAppHbMgr->reportCnt = 0; pAppHbMgr->reportBytes = 0; + pAppHbMgr->key = strdup(key); // init app info pAppHbMgr->pAppInstInfo = pAppInstInfo; @@ -174,19 +415,26 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) { } pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq; // init getInfoFunc - pAppHbMgr->getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + pAppHbMgr->connInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - if (pAppHbMgr->getInfoFuncs == NULL) { + if (pAppHbMgr->connInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; free(pAppHbMgr); return NULL; } + pthread_mutex_lock(&clientHbMgr.lock); taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr); + pthread_mutex_unlock(&clientHbMgr.lock); + return pAppHbMgr; } void appHbMgrCleanup(SAppHbMgr* pAppHbMgr) { + if (NULL == pAppHbMgr) { + return; + } + pthread_mutex_lock(&clientHbMgr.lock); int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); @@ -194,7 +442,9 @@ void appHbMgrCleanup(SAppHbMgr* pAppHbMgr) { SAppHbMgr* pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i); if (pAppHbMgr == pTarget) { taosHashCleanup(pTarget->activeInfo); - taosHashCleanup(pTarget->getInfoFuncs); + pTarget->activeInfo = NULL; + taosHashCleanup(pTarget->connInfo); + pTarget->connInfo = NULL; } } @@ -219,31 +469,20 @@ int hbMgrInit() { } void hbMgrCleanUp() { + hbStopThread(); + // destroy all appHbMgr int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); if (old == 0) return; - taosArrayDestroy(clientHbMgr.appHbMgrs); + pthread_mutex_lock(&clientHbMgr.lock); + taosArrayDestroy(clientHbMgr.appHbMgrs); + pthread_mutex_unlock(&clientHbMgr.lock); + + clientHbMgr.appHbMgrs = NULL; } -int hbHandleRsp(SClientHbBatchRsp* hbRsp) { - int64_t reqId = hbRsp->reqId; - int64_t rspId = hbRsp->rspId; - - SArray* rsps = hbRsp->rsps; - int32_t sz = taosArrayGetSize(rsps); - for (int i = 0; i < sz; i++) { - SClientHbRsp* pRsp = taosArrayGet(rsps, i); - if (pRsp->connKey.hbType < HEARTBEAT_TYPE_MAX) { - clientHbMgr.handle[pRsp->connKey.hbType](pRsp); - } else { - // discard rsp - } - } - return 0; -} - -int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func) { +int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) { // init hash in activeinfo void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); if (data != NULL) { @@ -252,20 +491,53 @@ int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func SClientHbReq hbReq; hbReq.connKey = connKey; hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); + // init hash - if (func != NULL) { - taosHashPut(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey), func, sizeof(FGetConnInfo)); + if (info != NULL) { + SClientHbReq * pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + info->req = pReq; + taosHashPut(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey), info, sizeof(SHbConnInfo)); } atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1); return 0; } +int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { + SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY}; + SHbConnInfo info = {0}; + + switch (hbType) { + case HEARTBEAT_TYPE_QUERY: { + int64_t *pClusterId = malloc(sizeof(int64_t)); + *pClusterId = clusterId; + + info.param = pClusterId; + break; + } + case HEARTBEAT_TYPE_MQ: { + break; + } + default: + break; + } + + return hbRegisterConnImpl(pAppHbMgr, connKey, &info); +} + void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) { - taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); - taosHashRemove(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey)); + int32_t code = 0; + code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); + if (code) { + return; + } atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); + if (atomic_load_32(&pAppHbMgr->connKeyCnt) <= 0) { + appHbMgrCleanup(pAppHbMgr); + } } int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 321d9a9a98..d7f0afa785 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -119,7 +119,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); p->mgmtEp = epSet; p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); - /*p->pAppHbMgr = appHbMgrInit(p);*/ + p->pAppHbMgr = appHbMgrInit(p, key); taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); pInst = &p; @@ -472,13 +472,8 @@ SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) { return NULL; } SKv kv = {0}; - kv.key = malloc(256); - if (kv.key == NULL) { - taosArrayDestroy(pArray); - return NULL; - } - strcpy(kv.key, "mq-tmp"); - kv.keyLen = strlen("mq-tmp") + 1; + kv.key = HEARTBEAT_KEY_MQ_TMP; + SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg)); if (pMqHb == NULL) { return pArray; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 61b704355b..bdf3115981 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -46,6 +46,8 @@ void taos_cleanup(void) { clientConnRefPool = -1; taosCloseRef(id); + hbMgrCleanUp(); + rpcCleanup(); catalogDestroy(); taosCloseLog(); diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 70b1fd91ef..24d1c0f03c 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -18,6 +18,7 @@ #include "tname.h" #include "clientInt.h" #include "clientLog.h" +#include "catalog.h" int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); @@ -73,8 +74,9 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { pTscObj->pAppInfo->clusterId = pConnect->clusterId; atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); - /*SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY};*/ - /*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);*/ + pTscObj->connType = HEARTBEAT_TYPE_QUERY; + + hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY); // pRequest->body.resInfo.pRspMsg = pMsg->pData; tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, @@ -290,7 +292,6 @@ int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) { } int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { - // todo: Remove cache in catalog cache. SRequestObj* pRequest = param; if (code != TSDB_CODE_SUCCESS) { setErrno(pRequest, code); @@ -298,6 +299,18 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { return code; } + SDropDbRsp *rsp = (SDropDbRsp *)pMsg->pData; + + SDbVgVersion dbVer = {0}; + struct SCatalog *pCatalog = NULL; + + strncpy(dbVer.dbName, rsp->db, sizeof(dbVer.dbName)); + dbVer.dbId = be64toh(rsp->uid); + + catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + + catalogRemoveDBVgroup(pCatalog, &dbVer); + tsem_post(&pRequest->body.rspSem); return code; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 48e9dce3c1..34b55fd812 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -91,13 +91,11 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { int32_t kvNum = taosHashGetSize(pReq->info); tlen += taosEncodeFixedI32(buf, kvNum); - SKv kv; + SKv *kv; void* pIter = taosHashIterate(pReq->info, NULL); while (pIter != NULL) { - taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen); - kv.valueLen = taosHashGetDataLen(pIter); - kv.value = pIter; - tlen += taosEncodeSKv(buf, &kv); + kv = pIter; + tlen += taosEncodeSKv(buf, kv); pIter = taosHashIterate(pReq->info, pIter); } @@ -116,7 +114,7 @@ void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) { for(int i = 0; i < kvNum; i++) { SKv kv; buf = taosDecodeSKv(buf, &kv); - taosHashPut(pReq->info, kv.key, kv.keyLen, kv.value, kv.valueLen); + taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); } return buf; @@ -124,17 +122,28 @@ void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) { int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp) { int tlen = 0; + int32_t kvNum = taosArrayGetSize(pRsp->info); tlen += taosEncodeSClientHbKey(buf, &pRsp->connKey); tlen += taosEncodeFixedI32(buf, pRsp->status); - tlen += taosEncodeFixedI32(buf, pRsp->bodyLen); - tlen += taosEncodeBinary(buf, pRsp->body, pRsp->bodyLen); + tlen += taosEncodeFixedI32(buf, kvNum); + for (int i = 0; i < kvNum; i++) { + SKv *kv = (SKv *)taosArrayGet(pRsp->info, i); + tlen += taosEncodeSKv(buf, kv); + } return tlen; } void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp) { + int32_t kvNum = 0; buf = taosDecodeSClientHbKey(buf, &pRsp->connKey); buf = taosDecodeFixedI32(buf, &pRsp->status); - buf = taosDecodeFixedI32(buf, &pRsp->bodyLen); - buf = taosDecodeBinary(buf, &pRsp->body, pRsp->bodyLen); + buf = taosDecodeFixedI32(buf, &kvNum); + pRsp->info = taosArrayInit(kvNum, sizeof(SKv)); + for (int i = 0; i < kvNum; i++) { + SKv kv = {0}; + buf = taosDecodeSKv(buf, &kv); + taosArrayPush(pRsp->info, &kv); + } + return buf; } @@ -155,6 +164,7 @@ void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pBatchReq) { if (pBatchReq->reqs == NULL) { pBatchReq->reqs = taosArrayInit(0, sizeof(SClientHbReq)); } + int32_t reqNum; buf = taosDecodeFixedI32(buf, &reqNum); for (int i = 0; i < reqNum; i++) { diff --git a/source/dnode/mnode/impl/inc/mndDb.h b/source/dnode/mnode/impl/inc/mndDb.h index 91f502be7d..5a3e6ed26e 100644 --- a/source/dnode/mnode/impl/inc/mndDb.h +++ b/source/dnode/mnode/impl/inc/mndDb.h @@ -26,6 +26,7 @@ int32_t mndInitDb(SMnode *pMnode); void mndCleanupDb(SMnode *pMnode); SDbObj *mndAcquireDb(SMnode *pMnode, char *db); void mndReleaseDb(SMnode *pMnode, SDbObj *pDb); +int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void **rsp, int32_t *rspLen); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 4bebdb8a5b..5e6f457a67 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -813,6 +813,44 @@ static int32_t mndProcessDropDbReq(SMnodeMsg *pReq) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } +static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SVgroupInfo *vgList, int32_t *vgNum) { + int32_t vindex = 0; + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + while (vindex < pDb->cfg.numOfVgroups) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + if (pVgroup->dbUid == pDb->uid) { + SVgroupInfo *pInfo = &vgList[vindex]; + pInfo->vgId = htonl(pVgroup->vgId); + pInfo->hashBegin = htonl(pVgroup->hashBegin); + pInfo->hashEnd = htonl(pVgroup->hashEnd); + pInfo->epset.numOfEps = pVgroup->replica; + for (int32_t gid = 0; gid < pVgroup->replica; ++gid) { + SVnodeGid *pVgid = &pVgroup->vnodeGid[gid]; + SEp * pEp = &pInfo->epset.eps[gid]; + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode != NULL) { + memcpy(pEp->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + pEp->port = htons(pDnode->port); + } + mndReleaseDnode(pMnode, pDnode); + if (pVgid->role == TAOS_SYNC_STATE_LEADER) { + pInfo->epset.inUse = gid; + } + } + vindex++; + } + + sdbRelease(pSdb, pVgroup); + } + + *vgNum = vindex; +} + static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -834,45 +872,16 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { return -1; } - int32_t vindex = 0; + int32_t vgNum = 0; if (pUse->vgVersion < pDb->vgVersion) { - void *pIter = NULL; - while (vindex < pDb->cfg.numOfVgroups) { - SVgObj *pVgroup = NULL; - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - - if (pVgroup->dbUid == pDb->uid) { - SVgroupInfo *pInfo = &pRsp->vgroupInfo[vindex]; - pInfo->vgId = htonl(pVgroup->vgId); - pInfo->hashBegin = htonl(pVgroup->hashBegin); - pInfo->hashEnd = htonl(pVgroup->hashEnd); - pInfo->epset.numOfEps = pVgroup->replica; - for (int32_t gid = 0; gid < pVgroup->replica; ++gid) { - SVnodeGid *pVgid = &pVgroup->vnodeGid[gid]; - SEp * pEp = &pInfo->epset.eps[gid]; - SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); - if (pDnode != NULL) { - memcpy(pEp->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - pEp->port = htons(pDnode->port); - } - mndReleaseDnode(pMnode, pDnode); - if (pVgid->role == TAOS_SYNC_STATE_LEADER) { - pInfo->epset.inUse = gid; - } - } - vindex++; - } - - sdbRelease(pSdb, pVgroup); - } + mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum); } memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN); pRsp->uid = htobe64(pDb->uid); pRsp->vgVersion = htonl(pDb->vgVersion); - pRsp->vgNum = htonl(vindex); + pRsp->vgNum = htonl(vgNum); pRsp->hashMethod = pDb->hashMethod; pReq->pCont = pRsp; @@ -882,6 +891,77 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { return 0; } +int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void **rsp, int32_t *rspLen) { + SSdb *pSdb = pMnode->pSdb; + int32_t bufSize = num * (sizeof(SUseDbRsp) + TSDB_DEFAULT_VN_PER_DB * sizeof(SVgroupInfo)); + void *buf = malloc(bufSize); + int32_t len = 0; + int32_t contLen = 0; + int32_t bufOffset = 0; + SUseDbRsp *pRsp = NULL; + + for (int32_t i = 0; i < num; ++i) { + SDbVgVersion *db = &dbs[i]; + db->dbId = be64toh(db->dbId); + db->vgVersion = ntohl(db->vgVersion); + + len = 0; + + SDbObj *pDb = mndAcquireDb(pMnode, db->dbName); + if (pDb == NULL) { + mInfo("db %s not exist", db->dbName); + + len = sizeof(SUseDbRsp); + } else if (pDb->uid != db->dbId || db->vgVersion < pDb->vgVersion) { + len = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo); + } + + if (0 == len) { + mndReleaseDb(pMnode, pDb); + + continue; + } + + contLen += len; + + if (contLen > bufSize) { + buf = realloc(buf, contLen); + } + + pRsp = (SUseDbRsp *)((char *)buf + bufOffset); + memcpy(pRsp->db, db->dbName, TSDB_DB_FNAME_LEN); + if (pDb) { + int32_t vgNum = 0; + mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum); + + pRsp->uid = htobe64(pDb->uid); + pRsp->vgVersion = htonl(pDb->vgVersion); + pRsp->vgNum = htonl(vgNum); + pRsp->hashMethod = pDb->hashMethod; + } else { + pRsp->uid = htobe64(db->dbId); + pRsp->vgNum = htonl(0); + pRsp->hashMethod = 0; + pRsp->vgVersion = htonl(-1); + } + + bufOffset += len; + + mndReleaseDb(pMnode, pDb); + } + + if (contLen > 0) { + *rsp = buf; + *rspLen = contLen; + } else { + *rsp = NULL; + tfree(buf); + *rspLen = 0; + } + + return 0; +} + static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq) { SMnode *pMnode = pReq->pMnode; SSyncDbReq *pSync = pReq->rpcMsg.pCont; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 3e2b931413..d379d294ab 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -354,7 +354,41 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { for (int i = 0; i < sz; i++) { SClientHbReq* pHbReq = taosArrayGet(pArray, i); if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) { + int32_t kvNum = taosHashGetSize(pHbReq->info); + if (NULL == pHbReq->info || kvNum <= 0) { + continue; + } + SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = taosArrayInit(kvNum, sizeof(SKv))}; + + void *pIter = taosHashIterate(pHbReq->info, NULL); + while (pIter != NULL) { + SKv* kv = pIter; + + switch (kv->key) { + case HEARTBEAT_KEY_DBINFO: { + void *rspMsg = NULL; + int32_t rspLen = 0; + mndValidateDBInfo(pMnode, (SDbVgVersion *)kv->value, kv->valueLen/sizeof(SDbVgVersion), &rspMsg, &rspLen); + if (rspMsg && rspLen > 0) { + SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg}; + taosArrayPush(hbRsp.info, &kv); + } + break; + } + case HEARTBEAT_KEY_STBINFO: + + break; + default: + mError("invalid kv key:%d", kv->key); + hbRsp.status = TSDB_CODE_MND_APP_ERROR; + break; + } + + pIter = taosHashIterate(pHbReq->info, pIter); + } + + taosArrayPush(batchRsp.rsps, &hbRsp); } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) { SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq); if (pRsp != NULL) { @@ -369,6 +403,18 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { void* buf = rpcMallocCont(tlen); void* abuf = buf; tSerializeSClientHbBatchRsp(&abuf, &batchRsp); + + int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps); + for (int32_t i = 0; i < rspNum; ++i) { + SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i); + int32_t kvNum = (rsp->info) ? taosArrayGetSize(rsp->info): 0; + for (int32_t n = 0; n < kvNum; ++n) { + SKv *kv = taosArrayGet(rsp->info, n); + tfree(kv->value); + } + taosArrayDestroy(rsp->info); + } + taosArrayDestroy(batchRsp.rsps); pReq->contLen = tlen; pReq->pCont = buf; diff --git a/source/dnode/mnode/impl/test/profile/profile.cpp b/source/dnode/mnode/impl/test/profile/profile.cpp index b245ab3c30..ccf13d5d66 100644 --- a/source/dnode/mnode/impl/test/profile/profile.cpp +++ b/source/dnode/mnode/impl/test/profile/profile.cpp @@ -102,11 +102,10 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) { req.connKey = {.connId = 123, .hbType = HEARTBEAT_TYPE_MQ}; req.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); SKv kv; - kv.key = (void*)"abc"; - kv.keyLen = 4; + kv.key = 123; kv.value = (void*)"bcd"; kv.valueLen = 4; - taosHashPut(req.info, kv.key, kv.keyLen, kv.value, kv.valueLen); + taosHashPut(req.info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); taosArrayPush(batchReq.reqs, &req); int32_t tlen = tSerializeSClientHbBatchReq(NULL, &batchReq); diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 9f1ea754c2..7128e143bf 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -31,7 +31,7 @@ extern "C" { #define CTG_DEFAULT_RENT_SECOND 10 #define CTG_DEFAULT_RENT_SLOT_SIZE 10 -#define CTG_RENT_SLOT_SECOND 2 +#define CTG_RENT_SLOT_SECOND 1.5 #define CTG_DEFAULT_INVALID_VERSION (-1) @@ -115,7 +115,7 @@ typedef struct SCatalogMgmt { typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); -#define CTG_IS_META_NONE(type) ((type) == META_TYPE_NON_TABLE) +#define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE) #define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE) #define CTG_IS_META_TABLE(type) ((type) == META_TYPE_TABLE) #define CTG_IS_META_BOTH(type) ((type) == META_TYPE_BOTH_TABLE) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index dc05627374..1b4d711aad 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -241,7 +241,7 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransport if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { - SET_META_TYPE_NONE(output->metaType); + SET_META_TYPE_NULL(output->metaType); ctgDebug("stablemeta not exist in mnode, tbName:%s", tbFullName); return TSDB_CODE_SUCCESS; } @@ -299,7 +299,7 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { - SET_META_TYPE_NONE(output->metaType); + SET_META_TYPE_NULL(output->metaType); ctgDebug("tablemeta not exist in vnode, tbName:%s", tNameGetTableName(pTableName)); return TSDB_CODE_SUCCESS; } @@ -427,9 +427,9 @@ int32_t ctgSTableVersionCompare(const void* key1, const void* key2) { } int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) { - if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) { + if (*(int64_t *)key1 < ((SDbVgVersion*)key2)->dbId) { return -1; - } else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) { + } else if (*(int64_t *)key1 > ((SDbVgVersion*)key2)->dbId) { return 1; } else { return 0; @@ -494,14 +494,14 @@ int32_t ctgMetaRentUpdate(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t s CTG_LOCK(CTG_WRITE, &slot->lock); if (NULL == slot->meta) { - qError("meta in slot is empty, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (slot->needSort) { taosArraySort(slot->meta, compare); slot->needSort = false; - qDebug("slot meta sorted, slot idx:%d, type:%d", widx, mgmt->type); + qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type); } void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ); @@ -526,6 +526,42 @@ _return: CTG_RET(code); } +int32_t ctgMetaRentRemove(SMetaRentMgmt *mgmt, int64_t id, __compar_fn_t compare) { + int16_t widx = abs(id % mgmt->slotNum); + + SRentSlotInfo *slot = &mgmt->slots[widx]; + int32_t code = 0; + + CTG_LOCK(CTG_WRITE, &slot->lock); + if (NULL == slot->meta) { + qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + if (slot->needSort) { + taosArraySort(slot->meta, compare); + slot->needSort = false; + qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type); + } + + int32_t idx = taosArraySearchIdx(slot->meta, &id, compare, TD_EQ); + if (idx < 0) { + qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + taosArrayRemove(slot->meta, idx); + + qDebug("meta in rent removed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + +_return: + + CTG_UNLOCK(CTG_WRITE, &slot->lock); + + CTG_RET(code); +} + + int32_t ctgMetaRentGetImpl(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) { int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1); if (ridx >= mgmt->slotNum) { @@ -600,7 +636,7 @@ int32_t ctgMetaRentGet(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t s int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) { int32_t code = 0; - if (NULL == output->tbMeta) { + if ((!CTG_IS_META_CTABLE(output->metaType)) && NULL == output->tbMeta) { ctgError("no valid table meta got from meta rsp, tbName:%s", output->tbFname); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } @@ -721,7 +757,7 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm } -int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { +int32_t ctgValidateAndFreeDbInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { SDBVgroupInfo *oldInfo = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName)); if (oldInfo) { CTG_LOCK(CTG_WRITE, &oldInfo->lock); @@ -747,6 +783,47 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SD return TSDB_CODE_SUCCESS; } +int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* target, bool *removed) { + *removed = false; + + SDBVgroupInfo *info = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName)); + if (NULL == info) { + ctgInfo("db not exist in dbCache, may be removed, db:%s", target->dbName); + return TSDB_CODE_SUCCESS; + } + + CTG_LOCK(CTG_WRITE, &info->lock); + + if (info->dbId != target->dbId) { + ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, target->dbName, info->dbId, target->dbId); + CTG_UNLOCK(CTG_WRITE, &info->lock); + taosHashRelease(pCatalog->dbCache.cache, info); + return TSDB_CODE_SUCCESS; + } + + if (info->vgInfo) { + ctgInfo("cleanup db vgInfo, db:%s", target->dbName); + taosHashCleanup(info->vgInfo); + info->vgInfo = NULL; + } + + if (taosHashRemove(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName))) { + ctgError("taosHashRemove from dbCache failed, db:%s", target->dbName); + CTG_UNLOCK(CTG_WRITE, &info->lock); + taosHashRelease(pCatalog->dbCache.cache, info); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + CTG_UNLOCK(CTG_WRITE, &info->lock); + + taosHashRelease(pCatalog->dbCache.cache, info); + + *removed = true; + + return TSDB_CODE_SUCCESS; +} + + int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) { if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); @@ -767,7 +844,7 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con // if get from mnode failed, will not try vnode CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput)); - if (CTG_IS_META_NONE(moutput.metaType)) { + if (CTG_IS_META_NULL(moutput.metaType)) { CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput)); } else { output = &moutput; @@ -783,6 +860,8 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput)); + voutput.metaType = moutput.metaType; + tfree(voutput.tbMeta); voutput.tbMeta = moutput.tbMeta; moutput.tbMeta = NULL; @@ -792,20 +871,22 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con if (0 == exist) { CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput)); - if (CTG_IS_META_NONE(moutput.metaType)) { - SET_META_TYPE_NONE(voutput.metaType); + if (CTG_IS_META_NULL(moutput.metaType)) { + SET_META_TYPE_NULL(voutput.metaType); } tfree(voutput.tbMeta); voutput.tbMeta = moutput.tbMeta; moutput.tbMeta = NULL; } else { + tfree(voutput.tbMeta); + SET_META_TYPE_CTABLE(voutput.metaType); } } } - if (CTG_IS_META_NONE(output->metaType)) { + if (CTG_IS_META_NULL(output->metaType)) { ctgError("no tablemeta got, tbNmae:%s", tNameGetTableName(pTableName)); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); } @@ -1118,19 +1199,6 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - if (dbInfo->vgVersion < 0) { - ctgWarn("db vgVersion less than 0, dbName:%s, vgVersion:%d", dbName, dbInfo->vgVersion); - - if (pCatalog->dbCache.cache) { - CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo)); - - CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName))); - } - - ctgWarn("db removed from cache, db:%s", dbName); - goto _return; - } - if (NULL == pCatalog->dbCache.cache) { SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == cache) { @@ -1142,10 +1210,12 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB taosHashCleanup(cache); } } else { - CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo)); + CTG_ERR_JRET(ctgValidateAndFreeDbInfo(pCatalog, dbName, dbInfo)); } bool newAdded = false; + + dbInfo->lock = 0; if (taosHashPutExt(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo), &newAdded) != 0) { ctgError("taosHashPutExt db vgroup to cache failed, db:%s", dbName); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); @@ -1154,6 +1224,8 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB dbInfo->vgInfo = NULL; SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion}; + strncpy(vgVersion.dbName, dbName, sizeof(vgVersion.dbName)); + if (newAdded) { CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion))); } else { @@ -1173,6 +1245,34 @@ _return: CTG_RET(code); } + +int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo) { + int32_t code = 0; + bool removed = false; + + if (NULL == pCatalog || NULL == dbInfo) { + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + } + + if (NULL == pCatalog->dbCache.cache) { + return TSDB_CODE_SUCCESS; + } + + CTG_ERR_RET(ctgValidateAndRemoveDbInfo(pCatalog, dbInfo, &removed)); + if (!removed) { + return TSDB_CODE_SUCCESS; + } + + ctgInfo("db removed from cache, db:%s", dbInfo->dbName); + + CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->dbRent, dbInfo->dbId, ctgDbVgVersionCompare)); + + ctgDebug("db removed from rent, db:%s", dbInfo->dbName); + + CTG_RET(code); +} + + int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1); } @@ -1275,6 +1375,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCatalog, dbInfo, pTableName, pVgroup)); _return: + if (dbInfo) { CTG_UNLOCK(CTG_READ, &dbInfo->lock); taosHashRelease(pCatalog->dbCache.cache, dbInfo); diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index a15744a5c7..c6e5e359bf 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -59,7 +59,7 @@ int32_t ctgTestTagNum = 1; int32_t ctgTestSVersion = 1; int32_t ctgTestTVersion = 1; int32_t ctgTestSuid = 2; -int64_t ctgTestDbId = 33; +uint64_t ctgTestDbId = 33; uint64_t ctgTestClusterId = 0x1; char *ctgTestDbname = "1.db1"; diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 97a0557748..3623f5947d 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -186,6 +186,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); taosFreeQitem(pBuf); *pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; + *pQueryEnd = pDispatcher->queryEnd; } static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 56919ff99e..489fff5d64 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -410,7 +410,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_LEVEL, "tfs invalid level") TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_VALID_DISK, "tfs no valid disk") // catalog -TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_ERROR, "catalog interval error") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_ERROR, "catalog internal error") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INVALID_INPUT, "invalid catalog input parameters") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error")