diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 5b640dce92..f2f7ac5699 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -26,6 +26,7 @@ extern "C" { #include "tlog.h" #include "tmsg.h" #include "tmsgcb.h" +#include "systable.h" typedef enum { JOB_TASK_STATUS_NULL = 0, @@ -284,9 +285,10 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define REQUEST_TOTAL_EXEC_TIMES 2 -#define IS_SYS_DBNAME(_dbname) \ - (((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB))) || \ - ((*(_dbname) == 'p') && (0 == strcmp(_dbname, TSDB_PERFORMANCE_SCHEMA_DB)))) +#define IS_INFORMATION_SCHEMA_DB(_name) ((*(_name) == 'i') && (0 == strcmp(_name, TSDB_INFORMATION_SCHEMA_DB))) +#define IS_PERFORMANCE_SCHEMA_DB(_name) ((*(_name) == 'p') && (0 == strcmp(_name, TSDB_PERFORMANCE_SCHEMA_DB))) + +#define IS_SYS_DBNAME(_dbname) (IS_INFORMATION_SCHEMA_DB(_dbname) || IS_PERFORMANCE_SCHEMA_DB(_dbname)) #define qFatal(...) \ do { \ diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index b01a871702..5c4bcf7946 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -49,6 +49,48 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC return TSDB_CODE_SUCCESS; } +static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) { + int32_t code = 0; + SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo)); + if (NULL == vgInfo) { + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + + vgInfo->vgVersion = rsp->vgVersion; + vgInfo->stateTs = rsp->stateTs; + vgInfo->hashMethod = rsp->hashMethod; + vgInfo->hashPrefix = rsp->hashPrefix; + vgInfo->hashSuffix = rsp->hashSuffix; + vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (NULL == vgInfo->vgHash) { + taosMemoryFree(vgInfo); + tscError("hash init[%d] failed", rsp->vgNum); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _return; + } + + for (int32_t j = 0; j < rsp->vgNum; ++j) { + SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j); + if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) { + tscError("hash push failed, errno:%d", errno); + taosHashCleanup(vgInfo->vgHash); + taosMemoryFree(vgInfo); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _return; + } + } + +_return: + if (code) { + taosHashCleanup(vgInfo->vgHash); + taosMemoryFreeClear(vgInfo); + } + + *pInfo = vgInfo; + return code; +} + static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { int32_t code = 0; @@ -67,37 +109,22 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog if (rsp->vgVersion < 0) { code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid); } else { - SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo)); - if (NULL == vgInfo) { - code = TSDB_CODE_OUT_OF_MEMORY; + SDBVgInfo *vgInfo = NULL; + code = hbGenerateVgInfoFromRsp(&vgInfo, rsp); + if (TSDB_CODE_SUCCESS != code) { goto _return; } - vgInfo->vgVersion = rsp->vgVersion; - vgInfo->stateTs = rsp->stateTs; - vgInfo->hashMethod = rsp->hashMethod; - vgInfo->hashPrefix = rsp->hashPrefix; - vgInfo->hashSuffix = rsp->hashSuffix; - vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - if (NULL == vgInfo->vgHash) { - taosMemoryFree(vgInfo); - tscError("hash init[%d] failed", rsp->vgNum); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _return; - } - - for (int32_t j = 0; j < rsp->vgNum; ++j) { - SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j); - if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) { - tscError("hash push failed, errno:%d", errno); - taosHashCleanup(vgInfo->vgHash); - taosMemoryFree(vgInfo); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _return; - } - } - catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, vgInfo); + + if (IS_SYS_DBNAME(rsp->db)) { + code = hbGenerateVgInfoFromRsp(&vgInfo, rsp); + if (TSDB_CODE_SUCCESS != code) { + goto _return; + } + + catalogUpdateDBVgInfo(pCatalog, (rsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB, rsp->uid, vgInfo); + } } if (code) { @@ -492,6 +519,9 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl for (int32_t i = 0; i < dbNum; ++i) { SDbVgVersion *db = &dbs[i]; + tscDebug("the %dth expired dbFName:%s, dbId:%" PRId64 ", vgVersion:%d, numOfTable:%d, startTs:%" PRId64, + i, db->dbFName, db->dbId, db->vgVersion, db->numOfTable, db->stateTs); + db->dbId = htobe64(db->dbId); db->vgVersion = htonl(db->vgVersion); db->numOfTable = htonl(db->numOfTable); diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 0596241ce1..54bfab1ebc 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -195,6 +195,7 @@ class TDTestCase: tdSql.checkData(1, 1, 1) tdSql.checkData(1, 2, '{"k1":1,"k2":"hello"}') + time.sleep(10) tdSql.query("select * from information_schema.ins_tables where table_name = 'stt4'") uid1 = tdSql.getData(0, 5) uid2 = tdSql.getData(1, 5)