From 953ef456ef6def74e884b6653110d8a6f1c07639 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 31 Dec 2021 14:26:05 +0800 Subject: [PATCH] feature/qnode --- include/common/tmsg.h | 2 ++ source/libs/catalog/src/catalog.c | 29 ++++++++++------------- source/libs/catalog/test/catalogTests.cpp | 11 +++++++-- source/libs/qcom/src/querymsg.c | 5 ++++ 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5cf027591f..483337f43b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -174,6 +174,7 @@ typedef enum _mgmt_table { typedef struct SBuildTableMetaInput { int32_t vgId; + char* dbName; char* tableFullName; } SBuildTableMetaInput; @@ -776,6 +777,7 @@ typedef struct { typedef struct { SMsgHead header; + char dbFname[TSDB_DB_FNAME_LEN]; char tableFname[TSDB_TABLE_FNAME_LEN]; } STableInfoMsg; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 68ff1b8557..236264873e 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -161,7 +161,7 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE char tbFullName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pTableName, tbFullName); - SBuildTableMetaInput bInput = {.vgId = 0, .tableFullName = tbFullName}; + SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName}; char *msg = NULL; SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; @@ -194,10 +194,10 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - char tbFullName[TSDB_TABLE_FNAME_LEN]; - tNameExtractFullName(pTableName, tbFullName); + char dbFullName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(pTableName, dbFullName); - SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .tableFullName = tbFullName}; + SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = pTableName->tname}; char *msg = NULL; SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; @@ -355,19 +355,19 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out if (output->metaNum != 1 && output->metaNum != 2) { ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (NULL == output->tbMeta) { ctgError("no valid table meta got from meta rsp"); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (NULL == pCatalog->tableCache.cache) { pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->tableCache.cache) { ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } } @@ -375,19 +375,19 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->tableCache.stableCache) { ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } } if (output->metaNum == 2) { if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) { ctgError("push ctable[%s] to table cache failed", output->ctbFname); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } if (TSDB_SUPER_TABLE != output->tbMeta->tableType) { ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } } @@ -398,26 +398,23 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); ctgError("push table[%s] to table cache failed", output->tbFname); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname)); if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES) != 0) { CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); } else { if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { ctgError("push table[%s] to table cache failed", output->tbFname); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } } -_return: - tfree(output->tbMeta); - CTG_RET(code); } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 5979d3a147..1d8a48dfcb 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -557,6 +557,8 @@ void *ctgTestGetCtableMetaThread(void *param) { assert(0); } + tfree(tbMeta); + if (ctgTestEnableSleep) { usleep(rand()%5); } @@ -592,6 +594,8 @@ void *ctgTestSetCtableMetaThread(void *param) { } } + tfree(output.tbMeta); + return NULL; } @@ -944,7 +948,6 @@ TEST(dbVgroup, getSetDbVgroupCase) { catalogDestroy(); } -#endif TEST(multiThread, getSetDbVgroupCase) { struct SCatalog* pCtg = NULL; @@ -996,6 +999,9 @@ TEST(multiThread, getSetDbVgroupCase) { catalogDestroy(); } +#endif + + TEST(multiThread, ctableMeta) { struct SCatalog* pCtg = NULL; void *mockPointer = (void *)0x1; @@ -1024,8 +1030,9 @@ TEST(multiThread, ctableMeta) { pthread_attr_init(&thattr); pthread_t thread1, thread2; - pthread_create(&(thread1), &thattr, ctgTestGetCtableMetaThread, pCtg); pthread_create(&(thread1), &thattr, ctgTestSetCtableMetaThread, pCtg); + sleep(1); + pthread_create(&(thread1), &thattr, ctgTestGetCtableMetaThread, pCtg); while (true) { if (ctgTestDeadLoop) { diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 117297b9ff..1e27749a1a 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -42,6 +42,11 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3 bMsg->header.vgId = htonl(bInput->vgId); + if (bInput->dbName) { + strncpy(bMsg->dbFname, bInput->dbName, sizeof(bMsg->dbFname)); + bMsg->dbFname[sizeof(bMsg->dbFname) - 1] = 0; + } + strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname)); bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0;