enh: asynchronous catalog interface optimization
This commit is contained in:
parent
bdee43e3cc
commit
8fc86a31d0
|
@ -58,12 +58,17 @@ typedef struct SDbInfo {
|
||||||
int64_t dbId;
|
int64_t dbId;
|
||||||
} SDbInfo;
|
} SDbInfo;
|
||||||
|
|
||||||
|
typedef struct STablesReq {
|
||||||
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
|
SArray* pTables;
|
||||||
|
} STablesReq;
|
||||||
|
|
||||||
typedef struct SCatalogReq {
|
typedef struct SCatalogReq {
|
||||||
SArray* pDbVgroup; // element is db full name
|
SArray* pDbVgroup; // element is db full name
|
||||||
SArray* pDbCfg; // element is db full name
|
SArray* pDbCfg; // element is db full name
|
||||||
SArray* pDbInfo; // element is db full name
|
SArray* pDbInfo; // element is db full name
|
||||||
SArray* pTableMeta; // element is SNAME
|
SArray* pTableMeta; // element is STablesReq
|
||||||
SArray* pTableHash; // element is SNAME
|
SArray* pTableHash; // element is STablesReq
|
||||||
SArray* pUdf; // element is udf name
|
SArray* pUdf; // element is udf name
|
||||||
SArray* pIndex; // element is index name
|
SArray* pIndex; // element is index name
|
||||||
SArray* pUser; // element is SUserAuthInfo
|
SArray* pUser; // element is SUserAuthInfo
|
||||||
|
|
|
@ -38,6 +38,11 @@ typedef struct SMsgBuf {
|
||||||
char* buf;
|
char* buf;
|
||||||
} SMsgBuf;
|
} SMsgBuf;
|
||||||
|
|
||||||
|
typedef struct SParseTablesMetaReq {
|
||||||
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
|
SHashObj* pTables;
|
||||||
|
} SParseTablesMetaReq;
|
||||||
|
|
||||||
typedef struct SParseMetaCache {
|
typedef struct SParseMetaCache {
|
||||||
SHashObj* pTableMeta; // key is tbFName, element is STableMeta*
|
SHashObj* pTableMeta; // key is tbFName, element is STableMeta*
|
||||||
SHashObj* pDbVgroup; // key is dbFName, element is SArray<SVgroupInfo>*
|
SHashObj* pDbVgroup; // key is dbFName, element is SArray<SVgroupInfo>*
|
||||||
|
@ -94,7 +99,7 @@ int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFun
|
||||||
int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes);
|
int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes);
|
||||||
int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableCfg** pOutput);
|
int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableCfg** pOutput);
|
||||||
int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes);
|
int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes);
|
||||||
void destoryParseMetaCache(SParseMetaCache* pMetaCache);
|
void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -474,6 +474,24 @@ static int32_t buildDbReq(SHashObj* pDbsHash, SArray** pDbs) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t buildTableReqFromDb(SHashObj* pDbsHash, SArray** pDbs) {
|
||||||
|
if (NULL != pDbsHash) {
|
||||||
|
*pDbs = taosArrayInit(taosHashGetSize(pDbsHash), sizeof(STablesReq));
|
||||||
|
if (NULL == *pDbs) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
SParseTablesMetaReq* p = taosHashIterate(pDbsHash, NULL);
|
||||||
|
while (NULL != p) {
|
||||||
|
STablesReq req = {0};
|
||||||
|
strcpy(req.dbFName, p->dbFName);
|
||||||
|
buildTableReq(p->pTables, &req.pTables);
|
||||||
|
taosArrayPush(*pDbs, &req);
|
||||||
|
p = taosHashIterate(pDbsHash, p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t buildUserAuthReq(SHashObj* pUserAuthHash, SArray** pUserAuth) {
|
static int32_t buildUserAuthReq(SHashObj* pUserAuthHash, SArray** pUserAuth) {
|
||||||
if (NULL != pUserAuthHash) {
|
if (NULL != pUserAuthHash) {
|
||||||
*pUserAuth = taosArrayInit(taosHashGetSize(pUserAuthHash), sizeof(SUserAuthInfo));
|
*pUserAuth = taosArrayInit(taosHashGetSize(pUserAuthHash), sizeof(SUserAuthInfo));
|
||||||
|
@ -513,12 +531,12 @@ static int32_t buildUdfReq(SHashObj* pUdfHash, SArray** pUdf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
|
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
|
||||||
int32_t code = buildTableReq(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta);
|
int32_t code = buildTableReqFromDb(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = buildDbReq(pMetaCache->pDbVgroup, &pCatalogReq->pDbVgroup);
|
code = buildDbReq(pMetaCache->pDbVgroup, &pCatalogReq->pDbVgroup);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = buildTableReq(pMetaCache->pTableVgroup, &pCatalogReq->pTableHash);
|
code = buildTableReqFromDb(pMetaCache->pTableVgroup, &pCatalogReq->pTableHash);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = buildDbReq(pMetaCache->pDbCfg, &pCatalogReq->pDbCfg);
|
code = buildDbReq(pMetaCache->pDbCfg, &pCatalogReq->pDbCfg);
|
||||||
|
@ -587,6 +605,24 @@ static int32_t putDbDataToCache(const SArray* pDbReq, const SArray* pDbData, SHa
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t putDbTableDataToCache(const SArray* pDbReq, const SArray* pTableData, SHashObj** pTable) {
|
||||||
|
int32_t ndbs = taosArrayGetSize(pDbReq);
|
||||||
|
int32_t tableNo = 0;
|
||||||
|
for (int32_t i = 0; i < ndbs; ++i) {
|
||||||
|
STablesReq* pReq = taosArrayGet(pDbReq, i);
|
||||||
|
int32_t ntables = taosArrayGetSize(pReq->pTables);
|
||||||
|
for (int32_t j = 0; j < ntables; ++j) {
|
||||||
|
char fullName[TSDB_TABLE_FNAME_LEN];
|
||||||
|
tNameExtractFullName(taosArrayGet(pReq->pTables, j), fullName);
|
||||||
|
if (TSDB_CODE_SUCCESS != putMetaDataToHash(fullName, strlen(fullName), pTableData, tableNo, pTable)) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
++tableNo;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t putUserAuthToCache(const SArray* pUserAuthReq, const SArray* pUserAuthData, SHashObj** pUserAuth) {
|
static int32_t putUserAuthToCache(const SArray* pUserAuthReq, const SArray* pUserAuthData, SHashObj** pUserAuth) {
|
||||||
int32_t nvgs = taosArrayGetSize(pUserAuthReq);
|
int32_t nvgs = taosArrayGetSize(pUserAuthReq);
|
||||||
for (int32_t i = 0; i < nvgs; ++i) {
|
for (int32_t i = 0; i < nvgs; ++i) {
|
||||||
|
@ -612,12 +648,12 @@ static int32_t putUdfToCache(const SArray* pUdfReq, const SArray* pUdfData, SHas
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
|
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
|
||||||
int32_t code = putTableDataToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, &pMetaCache->pTableMeta);
|
int32_t code = putDbTableDataToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, &pMetaCache->pTableMeta);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = putDbDataToCache(pCatalogReq->pDbVgroup, pMetaData->pDbVgroup, &pMetaCache->pDbVgroup);
|
code = putDbDataToCache(pCatalogReq->pDbVgroup, pMetaData->pDbVgroup, &pMetaCache->pDbVgroup);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = putTableDataToCache(pCatalogReq->pTableHash, pMetaData->pTableHash, &pMetaCache->pTableVgroup);
|
code = putDbTableDataToCache(pCatalogReq->pTableHash, pMetaData->pTableHash, &pMetaCache->pTableVgroup);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = putDbDataToCache(pCatalogReq->pDbCfg, pMetaData->pDbCfg, &pMetaCache->pDbCfg);
|
code = putDbDataToCache(pCatalogReq->pDbCfg, pMetaData->pDbCfg, &pMetaCache->pDbCfg);
|
||||||
|
@ -657,14 +693,38 @@ static int32_t reserveTableReqInCache(int32_t acctId, const char* pDb, const cha
|
||||||
return reserveTableReqInCacheImpl(fullName, len, pTables);
|
return reserveTableReqInCacheImpl(fullName, len, pTables);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t reserveTableReqInDbCacheImpl(int32_t acctId, const char* pDb, const char* pTable, SHashObj* pDbs) {
|
||||||
|
SParseTablesMetaReq req = {0};
|
||||||
|
int32_t len = snprintf(req.dbFName, sizeof(req.dbFName), "%d.%s", acctId, pDb);
|
||||||
|
int32_t code = reserveTableReqInCache(acctId, pDb, pTable, &req.pTables);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = taosHashPut(pDbs, req.dbFName, len, &req, sizeof(SParseTablesMetaReq));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t reserveTableReqInDbCache(int32_t acctId, const char* pDb, const char* pTable, SHashObj** pDbs) {
|
||||||
|
if (NULL == *pDbs) {
|
||||||
|
*pDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
|
if (NULL == *pDbs) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
char fullName[TSDB_DB_FNAME_LEN];
|
||||||
|
int32_t len = snprintf(fullName, sizeof(fullName), "%d.%s", acctId, pDb);
|
||||||
|
SParseTablesMetaReq* pReq = taosHashGet(*pDbs, fullName, len);
|
||||||
|
if (NULL == pReq) {
|
||||||
|
return reserveTableReqInDbCacheImpl(acctId, pDb, pTable, *pDbs);
|
||||||
|
}
|
||||||
|
return reserveTableReqInCache(acctId, pDb, pTable, &pReq->pTables);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
|
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
|
||||||
return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableMeta);
|
return reserveTableReqInDbCache(acctId, pDb, pTable, &pMetaCache->pTableMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) {
|
int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) {
|
||||||
char fullName[TSDB_TABLE_FNAME_LEN];
|
return reserveTableReqInDbCache(pName->acctId, pName->dbname, pName->tname, &pMetaCache->pTableMeta);
|
||||||
tNameExtractFullName(pName, fullName);
|
|
||||||
return reserveTableReqInCacheImpl(fullName, strlen(fullName), &pMetaCache->pTableMeta);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) {
|
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) {
|
||||||
|
@ -711,13 +771,11 @@ int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName,
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
|
int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
|
||||||
return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableVgroup);
|
return reserveTableReqInDbCache(acctId, pDb, pTable, &pMetaCache->pTableVgroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t reserveTableVgroupInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) {
|
int32_t reserveTableVgroupInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) {
|
||||||
char fullName[TSDB_TABLE_FNAME_LEN];
|
return reserveTableReqInDbCache(pName->acctId, pName->dbname, pName->tname, &pMetaCache->pTableVgroup);
|
||||||
tNameExtractFullName(pName, fullName);
|
|
||||||
return reserveTableReqInCacheImpl(fullName, strlen(fullName), &pMetaCache->pTableVgroup);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup) {
|
int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup) {
|
||||||
|
@ -919,10 +977,24 @@ int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void destoryParseMetaCache(SParseMetaCache* pMetaCache) {
|
void destoryParseTablesMetaReqHash(SHashObj* pHash) {
|
||||||
taosHashCleanup(pMetaCache->pTableMeta);
|
SParseTablesMetaReq* p = taosHashIterate(pHash, NULL);
|
||||||
|
while (NULL != p) {
|
||||||
|
taosHashCleanup(p->pTables);
|
||||||
|
p = taosHashIterate(pHash, p);
|
||||||
|
}
|
||||||
|
taosHashCleanup(pHash);
|
||||||
|
}
|
||||||
|
|
||||||
|
void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
|
||||||
|
if (request) {
|
||||||
|
destoryParseTablesMetaReqHash(pMetaCache->pTableMeta);
|
||||||
|
destoryParseTablesMetaReqHash(pMetaCache->pTableVgroup);
|
||||||
|
} else {
|
||||||
|
taosHashCleanup(pMetaCache->pTableMeta);
|
||||||
|
taosHashCleanup(pMetaCache->pTableVgroup);
|
||||||
|
}
|
||||||
taosHashCleanup(pMetaCache->pDbVgroup);
|
taosHashCleanup(pMetaCache->pDbVgroup);
|
||||||
taosHashCleanup(pMetaCache->pTableVgroup);
|
|
||||||
taosHashCleanup(pMetaCache->pDbCfg);
|
taosHashCleanup(pMetaCache->pDbCfg);
|
||||||
taosHashCleanup(pMetaCache->pDbInfo);
|
taosHashCleanup(pMetaCache->pDbInfo);
|
||||||
taosHashCleanup(pMetaCache->pUserAuth);
|
taosHashCleanup(pMetaCache->pUserAuth);
|
||||||
|
|
|
@ -85,13 +85,13 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
|
||||||
if (IS_VAR_DATA_TYPE(pVal->node.resType.type)) {
|
if (IS_VAR_DATA_TYPE(pVal->node.resType.type)) {
|
||||||
taosMemoryFreeClear(pVal->datum.p);
|
taosMemoryFreeClear(pVal->datum.p);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pParam->is_null && 1 == *(pParam->is_null)) {
|
if (pParam->is_null && 1 == *(pParam->is_null)) {
|
||||||
pVal->node.resType.type = TSDB_DATA_TYPE_NULL;
|
pVal->node.resType.type = TSDB_DATA_TYPE_NULL;
|
||||||
pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
|
pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t inputSize = (NULL != pParam->length ? *(pParam->length) : tDataTypes[pParam->buffer_type].bytes);
|
int32_t inputSize = (NULL != pParam->length ? *(pParam->length) : tDataTypes[pParam->buffer_type].bytes);
|
||||||
pVal->node.resType.type = pParam->buffer_type;
|
pVal->node.resType.type = pParam->buffer_type;
|
||||||
pVal->node.resType.bytes = inputSize;
|
pVal->node.resType.bytes = inputSize;
|
||||||
|
@ -187,7 +187,7 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = buildCatalogReq(&metaCache, pCatalogReq);
|
code = buildCatalogReq(&metaCache, pCatalogReq);
|
||||||
}
|
}
|
||||||
destoryParseMetaCache(&metaCache);
|
destoryParseMetaCache(&metaCache, true);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -203,7 +203,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata
|
||||||
code = analyseSemantic(pCxt, pQuery, &metaCache);
|
code = analyseSemantic(pCxt, pQuery, &metaCache);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
destoryParseMetaCache(&metaCache);
|
destoryParseMetaCache(&metaCache, false);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -472,12 +472,16 @@ class MockCatalogServiceImpl {
|
||||||
|
|
||||||
int32_t getAllTableMeta(SArray* pTableMetaReq, SArray** pTableMetaData) const {
|
int32_t getAllTableMeta(SArray* pTableMetaReq, SArray** pTableMetaData) const {
|
||||||
if (NULL != pTableMetaReq) {
|
if (NULL != pTableMetaReq) {
|
||||||
int32_t ntables = taosArrayGetSize(pTableMetaReq);
|
int32_t ndbs = taosArrayGetSize(pTableMetaReq);
|
||||||
*pTableMetaData = taosArrayInit(ntables, sizeof(SMetaRes));
|
*pTableMetaData = taosArrayInit(ndbs, sizeof(SMetaRes));
|
||||||
for (int32_t i = 0; i < ntables; ++i) {
|
for (int32_t i = 0; i < ndbs; ++i) {
|
||||||
SMetaRes res = {0};
|
STablesReq* pReq = (STablesReq*)taosArrayGet(pTableMetaReq, i);
|
||||||
res.code = catalogGetTableMeta((const SName*)taosArrayGet(pTableMetaReq, i), (STableMeta**)&res.pRes);
|
int32_t ntables = taosArrayGetSize(pReq->pTables);
|
||||||
taosArrayPush(*pTableMetaData, &res);
|
for (int32_t j = 0; j < ntables; ++j) {
|
||||||
|
SMetaRes res = {0};
|
||||||
|
res.code = catalogGetTableMeta((const SName*)taosArrayGet(pReq->pTables, j), (STableMeta**)&res.pRes);
|
||||||
|
taosArrayPush(*pTableMetaData, &res);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -485,13 +489,17 @@ class MockCatalogServiceImpl {
|
||||||
|
|
||||||
int32_t getAllTableVgroup(SArray* pTableVgroupReq, SArray** pTableVgroupData) const {
|
int32_t getAllTableVgroup(SArray* pTableVgroupReq, SArray** pTableVgroupData) const {
|
||||||
if (NULL != pTableVgroupReq) {
|
if (NULL != pTableVgroupReq) {
|
||||||
int32_t ntables = taosArrayGetSize(pTableVgroupReq);
|
int32_t ndbs = taosArrayGetSize(pTableVgroupReq);
|
||||||
*pTableVgroupData = taosArrayInit(ntables, sizeof(SMetaRes));
|
*pTableVgroupData = taosArrayInit(ndbs, sizeof(SMetaRes));
|
||||||
for (int32_t i = 0; i < ntables; ++i) {
|
for (int32_t i = 0; i < ndbs; ++i) {
|
||||||
SMetaRes res = {0};
|
STablesReq* pReq = (STablesReq*)taosArrayGet(pTableVgroupReq, i);
|
||||||
res.pRes = taosMemoryCalloc(1, sizeof(SVgroupInfo));
|
int32_t ntables = taosArrayGetSize(pReq->pTables);
|
||||||
res.code = catalogGetTableHashVgroup((const SName*)taosArrayGet(pTableVgroupReq, i), (SVgroupInfo*)res.pRes);
|
for (int32_t j = 0; j < ntables; ++j) {
|
||||||
taosArrayPush(*pTableVgroupData, &res);
|
SMetaRes res = {0};
|
||||||
|
res.pRes = taosMemoryCalloc(1, sizeof(SVgroupInfo));
|
||||||
|
res.code = catalogGetTableHashVgroup((const SName*)taosArrayGet(pReq->pTables, j), (SVgroupInfo*)res.pRes);
|
||||||
|
taosArrayPush(*pTableVgroupData, &res);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -677,12 +685,17 @@ int32_t MockCatalogService::catalogGetAllMeta(const SCatalogReq* pCatalogReq, SM
|
||||||
return impl_->catalogGetAllMeta(pCatalogReq, pMetaData);
|
return impl_->catalogGetAllMeta(pCatalogReq, pMetaData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void MockCatalogService::destoryTablesReq(void* p) {
|
||||||
|
STablesReq* pRes = (STablesReq*)p;
|
||||||
|
taosArrayDestroy(pRes->pTables);
|
||||||
|
}
|
||||||
|
|
||||||
void MockCatalogService::destoryCatalogReq(SCatalogReq* pReq) {
|
void MockCatalogService::destoryCatalogReq(SCatalogReq* pReq) {
|
||||||
taosArrayDestroy(pReq->pDbVgroup);
|
taosArrayDestroy(pReq->pDbVgroup);
|
||||||
taosArrayDestroy(pReq->pDbCfg);
|
taosArrayDestroy(pReq->pDbCfg);
|
||||||
taosArrayDestroy(pReq->pDbInfo);
|
taosArrayDestroy(pReq->pDbInfo);
|
||||||
taosArrayDestroy(pReq->pTableMeta);
|
taosArrayDestroyEx(pReq->pTableMeta, destoryTablesReq);
|
||||||
taosArrayDestroy(pReq->pTableHash);
|
taosArrayDestroyEx(pReq->pTableHash, destoryTablesReq);
|
||||||
taosArrayDestroy(pReq->pUdf);
|
taosArrayDestroy(pReq->pUdf);
|
||||||
taosArrayDestroy(pReq->pIndex);
|
taosArrayDestroy(pReq->pIndex);
|
||||||
taosArrayDestroy(pReq->pUser);
|
taosArrayDestroy(pReq->pUser);
|
||||||
|
|
|
@ -50,6 +50,7 @@ struct MockTableMeta {
|
||||||
class MockCatalogServiceImpl;
|
class MockCatalogServiceImpl;
|
||||||
class MockCatalogService {
|
class MockCatalogService {
|
||||||
public:
|
public:
|
||||||
|
static void destoryTablesReq(void* p);
|
||||||
static void destoryCatalogReq(SCatalogReq* pReq);
|
static void destoryCatalogReq(SCatalogReq* pReq);
|
||||||
static void destoryMetaRes(void* p);
|
static void destoryMetaRes(void* p);
|
||||||
static void destoryMetaArrayRes(void* p);
|
static void destoryMetaArrayRes(void* p);
|
||||||
|
|
|
@ -13,6 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <functional>
|
||||||
|
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
#include "mockCatalogService.h"
|
#include "mockCatalogService.h"
|
||||||
|
@ -20,6 +22,7 @@
|
||||||
#include "parInt.h"
|
#include "parInt.h"
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
using namespace std::placeholders;
|
||||||
using namespace testing;
|
using namespace testing;
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
@ -63,7 +66,9 @@ class InsertTest : public Test {
|
||||||
|
|
||||||
int32_t runAsync() {
|
int32_t runAsync() {
|
||||||
cxt_.async = true;
|
cxt_.async = true;
|
||||||
unique_ptr<SParseMetaCache, void (*)(SParseMetaCache*)> metaCache(new SParseMetaCache(), _destoryParseMetaCache);
|
bool request = true;
|
||||||
|
unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache(
|
||||||
|
new SParseMetaCache(), std::bind(_destoryParseMetaCache, _1, cref(request)));
|
||||||
code_ = parseInsertSyntax(&cxt_, &res_, metaCache.get());
|
code_ = parseInsertSyntax(&cxt_, &res_, metaCache.get());
|
||||||
if (code_ != TSDB_CODE_SUCCESS) {
|
if (code_ != TSDB_CODE_SUCCESS) {
|
||||||
cout << "parseInsertSyntax code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
|
cout << "parseInsertSyntax code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
|
||||||
|
@ -81,6 +86,8 @@ class InsertTest : public Test {
|
||||||
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
|
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
|
||||||
g_mockCatalogService->catalogGetAllMeta(catalogReq.get(), metaData.get());
|
g_mockCatalogService->catalogGetAllMeta(catalogReq.get(), metaData.get());
|
||||||
|
|
||||||
|
metaCache.reset(new SParseMetaCache());
|
||||||
|
request = false;
|
||||||
code_ = putMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get());
|
code_ = putMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get());
|
||||||
if (code_ != TSDB_CODE_SUCCESS) {
|
if (code_ != TSDB_CODE_SUCCESS) {
|
||||||
cout << "putMetaDataToCache code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
|
cout << "putMetaDataToCache code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;
|
||||||
|
@ -144,8 +151,8 @@ class InsertTest : public Test {
|
||||||
static const int max_err_len = 1024;
|
static const int max_err_len = 1024;
|
||||||
static const int max_sql_len = 1024 * 1024;
|
static const int max_sql_len = 1024 * 1024;
|
||||||
|
|
||||||
static void _destoryParseMetaCache(SParseMetaCache* pMetaCache) {
|
static void _destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
|
||||||
destoryParseMetaCache(pMetaCache);
|
destoryParseMetaCache(pMetaCache, request);
|
||||||
delete pMetaCache;
|
delete pMetaCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
#include "parInt.h"
|
#include "parInt.h"
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
using namespace std::placeholders;
|
||||||
using namespace testing;
|
using namespace testing;
|
||||||
|
|
||||||
namespace ParserTest {
|
namespace ParserTest {
|
||||||
|
@ -118,8 +119,8 @@ class ParserTestBaseImpl {
|
||||||
TEST_INTERFACE_ASYNC_API
|
TEST_INTERFACE_ASYNC_API
|
||||||
};
|
};
|
||||||
|
|
||||||
static void _destoryParseMetaCache(SParseMetaCache* pMetaCache) {
|
static void _destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
|
||||||
destoryParseMetaCache(pMetaCache);
|
destoryParseMetaCache(pMetaCache, request);
|
||||||
delete pMetaCache;
|
delete pMetaCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -340,7 +341,9 @@ class ParserTestBaseImpl {
|
||||||
doParse(&cxt, query.get());
|
doParse(&cxt, query.get());
|
||||||
SQuery* pQuery = *(query.get());
|
SQuery* pQuery = *(query.get());
|
||||||
|
|
||||||
unique_ptr<SParseMetaCache, void (*)(SParseMetaCache*)> metaCache(new SParseMetaCache(), _destoryParseMetaCache);
|
bool request = true;
|
||||||
|
unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache(
|
||||||
|
new SParseMetaCache(), bind(_destoryParseMetaCache, _1, cref(request)));
|
||||||
doCollectMetaKey(&cxt, pQuery, metaCache.get());
|
doCollectMetaKey(&cxt, pQuery, metaCache.get());
|
||||||
|
|
||||||
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
|
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
|
||||||
|
@ -353,6 +356,8 @@ class ParserTestBaseImpl {
|
||||||
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
|
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
|
||||||
doGetAllMeta(catalogReq.get(), metaData.get());
|
doGetAllMeta(catalogReq.get(), metaData.get());
|
||||||
|
|
||||||
|
metaCache.reset(new SParseMetaCache());
|
||||||
|
request = false;
|
||||||
doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get());
|
doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get());
|
||||||
|
|
||||||
doAuthenticate(&cxt, pQuery, metaCache.get());
|
doAuthenticate(&cxt, pQuery, metaCache.get());
|
||||||
|
|
Loading…
Reference in New Issue