feature/qnode
This commit is contained in:
parent
2e82b55ad4
commit
953ef456ef
|
@ -174,6 +174,7 @@ typedef enum _mgmt_table {
|
||||||
|
|
||||||
typedef struct SBuildTableMetaInput {
|
typedef struct SBuildTableMetaInput {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
char* dbName;
|
||||||
char* tableFullName;
|
char* tableFullName;
|
||||||
} SBuildTableMetaInput;
|
} SBuildTableMetaInput;
|
||||||
|
|
||||||
|
@ -776,6 +777,7 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead header;
|
SMsgHead header;
|
||||||
|
char dbFname[TSDB_DB_FNAME_LEN];
|
||||||
char tableFname[TSDB_TABLE_FNAME_LEN];
|
char tableFname[TSDB_TABLE_FNAME_LEN];
|
||||||
} STableInfoMsg;
|
} STableInfoMsg;
|
||||||
|
|
||||||
|
|
|
@ -161,7 +161,7 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
|
||||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||||
tNameExtractFullName(pTableName, tbFullName);
|
tNameExtractFullName(pTableName, tbFullName);
|
||||||
|
|
||||||
SBuildTableMetaInput bInput = {.vgId = 0, .tableFullName = tbFullName};
|
SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName};
|
||||||
char *msg = NULL;
|
char *msg = NULL;
|
||||||
SEpSet *pVnodeEpSet = NULL;
|
SEpSet *pVnodeEpSet = NULL;
|
||||||
int32_t msgLen = 0;
|
int32_t msgLen = 0;
|
||||||
|
@ -194,10 +194,10 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
char dbFullName[TSDB_DB_FNAME_LEN];
|
||||||
tNameExtractFullName(pTableName, tbFullName);
|
tNameGetFullDbName(pTableName, dbFullName);
|
||||||
|
|
||||||
SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .tableFullName = tbFullName};
|
SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = pTableName->tname};
|
||||||
char *msg = NULL;
|
char *msg = NULL;
|
||||||
SEpSet *pVnodeEpSet = NULL;
|
SEpSet *pVnodeEpSet = NULL;
|
||||||
int32_t msgLen = 0;
|
int32_t msgLen = 0;
|
||||||
|
@ -355,19 +355,19 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
|
||||||
|
|
||||||
if (output->metaNum != 1 && output->metaNum != 2) {
|
if (output->metaNum != 1 && output->metaNum != 2) {
|
||||||
ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum);
|
ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum);
|
||||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == output->tbMeta) {
|
if (NULL == output->tbMeta) {
|
||||||
ctgError("no valid table meta got from meta rsp");
|
ctgError("no valid table meta got from meta rsp");
|
||||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == pCatalog->tableCache.cache) {
|
if (NULL == pCatalog->tableCache.cache) {
|
||||||
pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
if (NULL == pCatalog->tableCache.cache) {
|
if (NULL == pCatalog->tableCache.cache) {
|
||||||
ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
|
ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
|
||||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -375,19 +375,19 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
|
||||||
pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
|
pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
|
||||||
if (NULL == pCatalog->tableCache.stableCache) {
|
if (NULL == pCatalog->tableCache.stableCache) {
|
||||||
ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
|
ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
|
||||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (output->metaNum == 2) {
|
if (output->metaNum == 2) {
|
||||||
if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
|
if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
|
||||||
ctgError("push ctable[%s] to table cache failed", output->ctbFname);
|
ctgError("push ctable[%s] to table cache failed", output->ctbFname);
|
||||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
|
if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
|
||||||
ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE);
|
ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE);
|
||||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -398,26 +398,23 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
|
||||||
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
|
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
|
||||||
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
|
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
|
||||||
ctgError("push table[%s] to table cache failed", output->tbFname);
|
ctgError("push table[%s] to table cache failed", output->tbFname);
|
||||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname));
|
STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname));
|
||||||
if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES) != 0) {
|
if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES) != 0) {
|
||||||
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
|
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
|
||||||
ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid);
|
ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid);
|
||||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
}
|
}
|
||||||
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
|
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
|
||||||
} else {
|
} else {
|
||||||
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
|
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
|
||||||
ctgError("push table[%s] to table cache failed", output->tbFname);
|
ctgError("push table[%s] to table cache failed", output->tbFname);
|
||||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_return:
|
|
||||||
tfree(output->tbMeta);
|
|
||||||
|
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -557,6 +557,8 @@ void *ctgTestGetCtableMetaThread(void *param) {
|
||||||
assert(0);
|
assert(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tfree(tbMeta);
|
||||||
|
|
||||||
if (ctgTestEnableSleep) {
|
if (ctgTestEnableSleep) {
|
||||||
usleep(rand()%5);
|
usleep(rand()%5);
|
||||||
}
|
}
|
||||||
|
@ -592,6 +594,8 @@ void *ctgTestSetCtableMetaThread(void *param) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tfree(output.tbMeta);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -944,7 +948,6 @@ TEST(dbVgroup, getSetDbVgroupCase) {
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
TEST(multiThread, getSetDbVgroupCase) {
|
TEST(multiThread, getSetDbVgroupCase) {
|
||||||
struct SCatalog* pCtg = NULL;
|
struct SCatalog* pCtg = NULL;
|
||||||
|
@ -996,6 +999,9 @@ TEST(multiThread, getSetDbVgroupCase) {
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
TEST(multiThread, ctableMeta) {
|
TEST(multiThread, ctableMeta) {
|
||||||
struct SCatalog* pCtg = NULL;
|
struct SCatalog* pCtg = NULL;
|
||||||
void *mockPointer = (void *)0x1;
|
void *mockPointer = (void *)0x1;
|
||||||
|
@ -1024,8 +1030,9 @@ TEST(multiThread, ctableMeta) {
|
||||||
pthread_attr_init(&thattr);
|
pthread_attr_init(&thattr);
|
||||||
|
|
||||||
pthread_t thread1, thread2;
|
pthread_t thread1, thread2;
|
||||||
pthread_create(&(thread1), &thattr, ctgTestGetCtableMetaThread, pCtg);
|
|
||||||
pthread_create(&(thread1), &thattr, ctgTestSetCtableMetaThread, pCtg);
|
pthread_create(&(thread1), &thattr, ctgTestSetCtableMetaThread, pCtg);
|
||||||
|
sleep(1);
|
||||||
|
pthread_create(&(thread1), &thattr, ctgTestGetCtableMetaThread, pCtg);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (ctgTestDeadLoop) {
|
if (ctgTestDeadLoop) {
|
||||||
|
|
|
@ -42,6 +42,11 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
|
||||||
|
|
||||||
bMsg->header.vgId = htonl(bInput->vgId);
|
bMsg->header.vgId = htonl(bInput->vgId);
|
||||||
|
|
||||||
|
if (bInput->dbName) {
|
||||||
|
strncpy(bMsg->dbFname, bInput->dbName, sizeof(bMsg->dbFname));
|
||||||
|
bMsg->dbFname[sizeof(bMsg->dbFname) - 1] = 0;
|
||||||
|
}
|
||||||
|
|
||||||
strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname));
|
strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname));
|
||||||
bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0;
|
bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue