Merge pull request #9475 from taosdata/feature/3.0_liaohj
[td-11818] merge 3.0
This commit is contained in:
commit
a85169a1c8
|
@ -21,13 +21,14 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "thash.h"
|
|
||||||
#include "tarray.h"
|
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "transport.h"
|
|
||||||
#include "common.h"
|
|
||||||
#include "tmsg.h"
|
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
|
#include "tname.h"
|
||||||
|
#include "common.h"
|
||||||
|
#include "tarray.h"
|
||||||
|
#include "thash.h"
|
||||||
|
#include "tmsg.h"
|
||||||
|
#include "transport.h"
|
||||||
|
|
||||||
struct SCatalog;
|
struct SCatalog;
|
||||||
|
|
||||||
|
@ -68,35 +69,32 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
|
||||||
* @param pCatalog (input, got with catalogGetHandle)
|
* @param pCatalog (input, got with catalogGetHandle)
|
||||||
* @param pTransporter (input, rpc object)
|
* @param pTransporter (input, rpc object)
|
||||||
* @param pMgmtEps (input, mnode EPs)
|
* @param pMgmtEps (input, mnode EPs)
|
||||||
* @param pDBName (input, full db name)
|
|
||||||
* @param pTableName (input, table name, NOT including db name)
|
* @param pTableName (input, table name, NOT including db name)
|
||||||
* @param pTableMeta(output, table meta data, NEED to free it by calller)
|
* @param pTableMeta(output, table meta data, NEED to free it by calller)
|
||||||
* @return error code
|
* @return error code
|
||||||
*/
|
*/
|
||||||
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta);
|
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Force renew a table's local cached meta data.
|
* Force renew a table's local cached meta data.
|
||||||
* @param pCatalog (input, got with catalogGetHandle)
|
* @param pCatalog (input, got with catalogGetHandle)
|
||||||
* @param pRpc (input, rpc object)
|
* @param pRpc (input, rpc object)
|
||||||
* @param pMgmtEps (input, mnode EPs)
|
* @param pMgmtEps (input, mnode EPs)
|
||||||
* @param pDBName (input, full db name)
|
|
||||||
* @param pTableName (input, table name, NOT including db name)
|
* @param pTableName (input, table name, NOT including db name)
|
||||||
* @return error code
|
* @return error code
|
||||||
*/
|
*/
|
||||||
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName);
|
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Force renew a table's local cached meta data and get the new one.
|
* Force renew a table's local cached meta data and get the new one.
|
||||||
* @param pCatalog (input, got with catalogGetHandle)
|
* @param pCatalog (input, got with catalogGetHandle)
|
||||||
* @param pRpc (input, rpc object)
|
* @param pRpc (input, rpc object)
|
||||||
* @param pMgmtEps (input, mnode EPs)
|
* @param pMgmtEps (input, mnode EPs)
|
||||||
* @param pDBName (input, full db name)
|
|
||||||
* @param pTableName (input, table name, NOT including db name)
|
* @param pTableName (input, table name, NOT including db name)
|
||||||
* @param pTableMeta(output, table meta data, NEED to free it by calller)
|
* @param pTableMeta(output, table meta data, NEED to free it by calller)
|
||||||
* @return error code
|
* @return error code
|
||||||
*/
|
*/
|
||||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta);
|
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -104,24 +102,22 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
|
||||||
* @param pCatalog (input, got with catalogGetHandle)
|
* @param pCatalog (input, got with catalogGetHandle)
|
||||||
* @param pRpc (input, rpc object)
|
* @param pRpc (input, rpc object)
|
||||||
* @param pMgmtEps (input, mnode EPs)
|
* @param pMgmtEps (input, mnode EPs)
|
||||||
* @param pDBName (input, full db name)
|
|
||||||
* @param pTableName (input, table name, NOT including db name)
|
* @param pTableName (input, table name, NOT including db name)
|
||||||
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
|
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
|
||||||
* @return error code
|
* @return error code
|
||||||
*/
|
*/
|
||||||
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray** pVgroupList);
|
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a table's vgroup from its name's hash value.
|
* Get a table's vgroup from its name's hash value.
|
||||||
* @param pCatalog (input, got with catalogGetHandle)
|
* @param pCatalog (input, got with catalogGetHandle)
|
||||||
* @param pTransporter (input, rpc object)
|
* @param pTransporter (input, rpc object)
|
||||||
* @param pMgmtEps (input, mnode EPs)
|
* @param pMgmtEps (input, mnode EPs)
|
||||||
* @param pDBName (input, full db name)
|
|
||||||
* @param pTableName (input, table name, NOT including db name)
|
* @param pTableName (input, table name, NOT including db name)
|
||||||
* @param vgInfo (output, vgroup info)
|
* @param vgInfo (output, vgroup info)
|
||||||
* @return error code
|
* @return error code
|
||||||
*/
|
*/
|
||||||
int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo);
|
int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pName, SVgroupInfo* vgInfo);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -23,6 +23,7 @@ extern "C" {
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
#include "tmsg.h"
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
JOB_TASK_STATUS_NULL = 0,
|
JOB_TASK_STATUS_NULL = 0,
|
||||||
|
@ -73,7 +74,6 @@ typedef struct STableMeta {
|
||||||
SSchema schema[];
|
SSchema schema[];
|
||||||
} STableMeta;
|
} STableMeta;
|
||||||
|
|
||||||
|
|
||||||
typedef struct SDBVgroupInfo {
|
typedef struct SDBVgroupInfo {
|
||||||
int32_t vgVersion;
|
int32_t vgVersion;
|
||||||
int8_t hashMethod;
|
int8_t hashMethod;
|
||||||
|
|
|
@ -162,7 +162,7 @@ TEST(testCase, create_db_Test) {
|
||||||
|
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create database abc1");
|
pRes = taos_query(pConn, "create database abc1 vgroups 4");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||||
}
|
}
|
||||||
|
@ -399,49 +399,49 @@ TEST(testCase, drop_stable_Test) {
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(testCase, create_topic_Test) {
|
//TEST(testCase, create_topic_Test) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
|
||||||
assert(pConn != NULL);
|
|
||||||
|
|
||||||
TAOS_RES* pRes = taos_query(pConn, "create database abc1");
|
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
|
||||||
}
|
|
||||||
taos_free_result(pRes);
|
|
||||||
|
|
||||||
pRes = taos_query(pConn, "use abc1");
|
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
|
||||||
}
|
|
||||||
taos_free_result(pRes);
|
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)");
|
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
printf("error in create stable, reason:%s\n", taos_errstr(pRes));
|
|
||||||
}
|
|
||||||
|
|
||||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
|
||||||
ASSERT_TRUE(pFields == NULL);
|
|
||||||
|
|
||||||
int32_t numOfFields = taos_num_fields(pRes);
|
|
||||||
ASSERT_EQ(numOfFields, 0);
|
|
||||||
|
|
||||||
taos_free_result(pRes);
|
|
||||||
|
|
||||||
char* sql = "select * from st1";
|
|
||||||
tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql));
|
|
||||||
taos_close(pConn);
|
|
||||||
}
|
|
||||||
|
|
||||||
//TEST(testCase, show_table_Test) {
|
|
||||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
// assert(pConn != NULL);
|
// assert(pConn != NULL);
|
||||||
//
|
//
|
||||||
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
// TAOS_RES* pRes = taos_query(pConn, "create database abc1");
|
||||||
|
// if (taos_errno(pRes) != 0) {
|
||||||
|
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||||
|
// }
|
||||||
// taos_free_result(pRes);
|
// taos_free_result(pRes);
|
||||||
//
|
//
|
||||||
// pRes = taos_query(pConn, "show tables");
|
// pRes = taos_query(pConn, "use abc1");
|
||||||
|
// if (taos_errno(pRes) != 0) {
|
||||||
|
// printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||||
|
// }
|
||||||
// taos_free_result(pRes);
|
// taos_free_result(pRes);
|
||||||
//
|
//
|
||||||
|
// pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)");
|
||||||
|
// if (taos_errno(pRes) != 0) {
|
||||||
|
// printf("error in create stable, reason:%s\n", taos_errstr(pRes));
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||||
|
// ASSERT_TRUE(pFields == NULL);
|
||||||
|
//
|
||||||
|
// int32_t numOfFields = taos_num_fields(pRes);
|
||||||
|
// ASSERT_EQ(numOfFields, 0);
|
||||||
|
//
|
||||||
|
// taos_free_result(pRes);
|
||||||
|
//
|
||||||
|
// char* sql = "select * from st1";
|
||||||
|
// tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql));
|
||||||
// taos_close(pConn);
|
// taos_close(pConn);
|
||||||
//}
|
//}
|
||||||
|
|
||||||
|
TEST(testCase, show_table_Test) {
|
||||||
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
assert(pConn != NULL);
|
||||||
|
|
||||||
|
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "show tables");
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
taos_close(pConn);
|
||||||
|
}
|
||||||
|
|
|
@ -13,10 +13,10 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "catalogInt.h"
|
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
|
#include "catalogInt.h"
|
||||||
|
|
||||||
SCatalogMgmt ctgMgmt = {0};
|
SCatalogMgmt ctgMgmt = {0};
|
||||||
|
|
||||||
|
@ -71,15 +71,14 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const char *dbName, const char* pTableName, STableMeta** pTableMeta, int32_t *exist) {
|
int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist) {
|
||||||
if (NULL == pCatalog->tableCache.cache) {
|
if (NULL == pCatalog->tableCache.cache) {
|
||||||
*exist = 0;
|
*exist = 0;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||||
|
tNameExtractFullName(pTableName, tbFullName);
|
||||||
snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbName, pTableName);
|
|
||||||
|
|
||||||
STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName));
|
STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName));
|
||||||
|
|
||||||
|
@ -135,14 +134,13 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, STableMetaOutput* output) {
|
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
|
||||||
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == output) {
|
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == output) {
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||||
|
tNameExtractFullName(pTableName, tbFullName);
|
||||||
snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);
|
|
||||||
|
|
||||||
SBuildTableMetaInput bInput = {.vgId = 0, .tableFullName = tbFullName};
|
SBuildTableMetaInput bInput = {.vgId = 0, .tableFullName = tbFullName};
|
||||||
char *msg = NULL;
|
char *msg = NULL;
|
||||||
|
@ -248,10 +246,13 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
|
int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
|
||||||
int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
|
int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
|
||||||
|
char db[TSDB_DB_FNAME_LEN] = {0};
|
||||||
|
tNameGetFullDbName(pTableName, db);
|
||||||
|
|
||||||
if (vgNum <= 0) {
|
if (vgNum <= 0) {
|
||||||
ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum);
|
ctgError("db[%s] vgroup cache invalid, vgroup number:%d", db, vgNum);
|
||||||
CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
|
CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -261,8 +262,7 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, co
|
||||||
CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
|
CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
|
||||||
|
|
||||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||||
|
tNameExtractFullName(pTableName, tbFullName);
|
||||||
snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);
|
|
||||||
|
|
||||||
uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));
|
uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));
|
||||||
|
|
||||||
|
@ -287,24 +287,24 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, co
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) {
|
int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta) {
|
||||||
if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
|
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t exist = 0;
|
int32_t exist = 0;
|
||||||
|
|
||||||
if (!forceUpdate) {
|
if (!forceUpdate) {
|
||||||
CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pDBName, pTableName, pTableMeta, &exist));
|
CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));
|
||||||
|
|
||||||
if (exist) {
|
if (exist) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
CTG_ERR_RET(catalogRenewTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName));
|
CTG_ERR_RET(catalogRenewTableMeta(pCatalog, pRpc, pMgmtEps, pTableName));
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pDBName, pTableName, pTableMeta, &exist));
|
CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));
|
||||||
|
|
||||||
if (0 == exist) {
|
if (0 == exist) {
|
||||||
ctgError("get table meta from cache failed, but fetch succeed");
|
ctgError("get table meta from cache failed, but fetch succeed");
|
||||||
|
@ -532,25 +532,25 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
|
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
|
||||||
return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pDBName, pTableName, false, pTableMeta);
|
return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName) {
|
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName) {
|
||||||
if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName) {
|
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName) {
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgroupInfo vgroupInfo = {0};
|
SVgroupInfo vgroupInfo = {0};
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo));
|
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pTableName, &vgroupInfo));
|
||||||
|
|
||||||
STableMetaOutput output = {0};
|
STableMetaOutput output = {0};
|
||||||
|
|
||||||
//CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo, &output));
|
//CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo, &output));
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &output));
|
CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output));
|
||||||
|
|
||||||
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));
|
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));
|
||||||
|
|
||||||
|
@ -561,12 +561,12 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
|
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
|
||||||
return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, true, pTableMeta);
|
return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pTableName, true, pTableMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray** pVgroupList) {
|
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) {
|
||||||
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == pVgroupList) {
|
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgroupList) {
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -575,9 +575,11 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
|
||||||
SVgroupInfo vgroupInfo = {0};
|
SVgroupInfo vgroupInfo = {0};
|
||||||
SDBVgroupInfo dbVgroup = {0};
|
SDBVgroupInfo dbVgroup = {0};
|
||||||
|
|
||||||
CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &tbMeta));
|
CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pTableName, &tbMeta));
|
||||||
|
|
||||||
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup));
|
char db[TSDB_DB_FNAME_LEN] = {0};
|
||||||
|
tNameGetFullDbName(pTableName, db);
|
||||||
|
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbVgroup));
|
||||||
|
|
||||||
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
|
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
|
||||||
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, &dbVgroup, pVgroupList));
|
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, &dbVgroup, pVgroupList));
|
||||||
|
@ -601,32 +603,30 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(tbMeta);
|
tfree(tbMeta);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
tfree(tbMeta);
|
tfree(tbMeta);
|
||||||
|
|
||||||
taosArrayDestroy(*pVgroupList);
|
taosArrayDestroy(*pVgroupList);
|
||||||
*pVgroupList = NULL;
|
|
||||||
|
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
|
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
|
||||||
SDBVgroupInfo dbInfo = {0};
|
SDBVgroupInfo dbInfo = {0};
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t vgId = 0;
|
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, pDBName, false, &dbInfo));
|
char db[TSDB_DB_FNAME_LEN] = {0};
|
||||||
|
tNameGetFullDbName(pTableName, db);
|
||||||
|
|
||||||
|
CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbInfo));
|
||||||
|
|
||||||
if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) {
|
if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) {
|
||||||
ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo);
|
ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", db, dbInfo.vgVersion, dbInfo.vgInfo);
|
||||||
CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
|
CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup));
|
CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pTableName, pVgroup));
|
||||||
|
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
@ -640,7 +640,6 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (pReq->pTableName) {
|
if (pReq->pTableName) {
|
||||||
char dbName[TSDB_DB_FNAME_LEN];
|
|
||||||
int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
|
int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
|
||||||
if (tbNum <= 0) {
|
if (tbNum <= 0) {
|
||||||
ctgError("empty table name list");
|
ctgError("empty table name list");
|
||||||
|
@ -657,9 +656,7 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p
|
||||||
SName *name = taosArrayGet(pReq->pTableName, i);
|
SName *name = taosArrayGet(pReq->pTableName, i);
|
||||||
STableMeta *pTableMeta = NULL;
|
STableMeta *pTableMeta = NULL;
|
||||||
|
|
||||||
snprintf(dbName, sizeof(dbName), "%d.%s", name->acctId, name->dbname);
|
CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, name, &pTableMeta));
|
||||||
|
|
||||||
CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, dbName, name->tname, &pTableMeta));
|
|
||||||
|
|
||||||
if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
|
if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
|
||||||
ctgError("taosArrayPush failed, idx:%d", i);
|
ctgError("taosArrayPush failed, idx:%d", i);
|
||||||
|
|
|
@ -388,7 +388,11 @@ TEST(tableMeta, normalTable) {
|
||||||
code = catalogGetHandle(ctgTestClusterId, &pCtg);
|
code = catalogGetHandle(ctgTestClusterId, &pCtg);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestTablename, &vgInfo);
|
SName n = {.type = T_NAME_TABLE, .acctId = 1};
|
||||||
|
strcpy(n.dbname, "db1");
|
||||||
|
strcpy(n.tname, ctgTestTablename);
|
||||||
|
|
||||||
|
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(vgInfo.vgId, 8);
|
ASSERT_EQ(vgInfo.vgId, 8);
|
||||||
ASSERT_EQ(vgInfo.numOfEps, 3);
|
ASSERT_EQ(vgInfo.numOfEps, 3);
|
||||||
|
@ -396,7 +400,7 @@ TEST(tableMeta, normalTable) {
|
||||||
ctgTestSetPrepareTableMeta();
|
ctgTestSetPrepareTableMeta();
|
||||||
|
|
||||||
STableMeta *tableMeta = NULL;
|
STableMeta *tableMeta = NULL;
|
||||||
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestTablename, &tableMeta);
|
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(tableMeta->vgId, 8);
|
ASSERT_EQ(tableMeta->vgId, 8);
|
||||||
ASSERT_EQ(tableMeta->tableType, TSDB_NORMAL_TABLE);
|
ASSERT_EQ(tableMeta->tableType, TSDB_NORMAL_TABLE);
|
||||||
|
@ -408,7 +412,7 @@ TEST(tableMeta, normalTable) {
|
||||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||||
|
|
||||||
tableMeta = NULL;
|
tableMeta = NULL;
|
||||||
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestTablename, &tableMeta);
|
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(tableMeta->vgId, 8);
|
ASSERT_EQ(tableMeta->vgId, 8);
|
||||||
ASSERT_EQ(tableMeta->tableType, TSDB_NORMAL_TABLE);
|
ASSERT_EQ(tableMeta->tableType, TSDB_NORMAL_TABLE);
|
||||||
|
@ -433,14 +437,15 @@ TEST(tableMeta, childTableCase) {
|
||||||
|
|
||||||
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
|
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
|
||||||
|
|
||||||
int32_t code = catalogInit(NULL);
|
int32_t code = catalogGetHandle(ctgTestClusterId, &pCtg);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
code = catalogGetHandle(ctgTestClusterId, &pCtg);
|
SName n = {.type = T_NAME_TABLE, .acctId = 1};
|
||||||
ASSERT_EQ(code, 0);
|
strcpy(n.dbname, "db1");
|
||||||
|
strcpy(n.tname, ctgTestCTablename);
|
||||||
|
|
||||||
STableMeta *tableMeta = NULL;
|
STableMeta *tableMeta = NULL;
|
||||||
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestCTablename, &tableMeta);
|
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(tableMeta->vgId, 9);
|
ASSERT_EQ(tableMeta->vgId, 9);
|
||||||
ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE);
|
ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE);
|
||||||
|
@ -452,7 +457,7 @@ TEST(tableMeta, childTableCase) {
|
||||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||||
|
|
||||||
tableMeta = NULL;
|
tableMeta = NULL;
|
||||||
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestCTablename, &tableMeta);
|
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(tableMeta->vgId, 9);
|
ASSERT_EQ(tableMeta->vgId, 9);
|
||||||
ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE);
|
ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE);
|
||||||
|
@ -464,7 +469,9 @@ TEST(tableMeta, childTableCase) {
|
||||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||||
|
|
||||||
tableMeta = NULL;
|
tableMeta = NULL;
|
||||||
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestSTablename, &tableMeta);
|
|
||||||
|
strcpy(n.tname, ctgTestSTablename);
|
||||||
|
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(tableMeta->vgId, 0);
|
ASSERT_EQ(tableMeta->vgId, 0);
|
||||||
ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE);
|
ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE);
|
||||||
|
@ -488,15 +495,15 @@ TEST(tableMeta, superTableCase) {
|
||||||
initQueryModuleMsgHandle();
|
initQueryModuleMsgHandle();
|
||||||
|
|
||||||
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
|
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
|
||||||
|
int32_t code = catalogGetHandle(ctgTestClusterId, &pCtg);
|
||||||
int32_t code = catalogInit(NULL);
|
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
code = catalogGetHandle(ctgTestClusterId, &pCtg);
|
SName n = {.type = T_NAME_TABLE, .acctId = 1};
|
||||||
ASSERT_EQ(code, 0);
|
strcpy(n.dbname, "db1");
|
||||||
|
strcpy(n.tname, ctgTestSTablename);
|
||||||
|
|
||||||
STableMeta *tableMeta = NULL;
|
STableMeta *tableMeta = NULL;
|
||||||
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestSTablename, &tableMeta);
|
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(tableMeta->vgId, 0);
|
ASSERT_EQ(tableMeta->vgId, 0);
|
||||||
ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE);
|
ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE);
|
||||||
|
@ -510,7 +517,10 @@ TEST(tableMeta, superTableCase) {
|
||||||
ctgTestSetPrepareCTableMeta();
|
ctgTestSetPrepareCTableMeta();
|
||||||
|
|
||||||
tableMeta = NULL;
|
tableMeta = NULL;
|
||||||
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestCTablename, &tableMeta);
|
|
||||||
|
strcpy(n.dbname, "db1");
|
||||||
|
strcpy(n.tname, ctgTestCTablename);
|
||||||
|
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(tableMeta->vgId, 9);
|
ASSERT_EQ(tableMeta->vgId, 9);
|
||||||
ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE);
|
ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE);
|
||||||
|
@ -522,7 +532,7 @@ TEST(tableMeta, superTableCase) {
|
||||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||||
|
|
||||||
tableMeta = NULL;
|
tableMeta = NULL;
|
||||||
code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestCTablename, &tableMeta);
|
code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(tableMeta->vgId, 9);
|
ASSERT_EQ(tableMeta->vgId, 9);
|
||||||
ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE);
|
ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE);
|
||||||
|
@ -550,14 +560,14 @@ TEST(tableDistVgroup, normalTable) {
|
||||||
|
|
||||||
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
|
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
|
||||||
|
|
||||||
int32_t code = catalogInit(NULL);
|
int32_t code = catalogGetHandle(ctgTestClusterId, &pCtg);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
code = catalogGetHandle(ctgTestClusterId, &pCtg);
|
SName n = {.type = T_NAME_TABLE, .acctId = 1};
|
||||||
ASSERT_EQ(code, 0);
|
strcpy(n.dbname, "db1");
|
||||||
|
strcpy(n.tname, ctgTestTablename);
|
||||||
|
|
||||||
|
code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
|
||||||
code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestTablename, &vgList);
|
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
|
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
|
||||||
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
|
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
|
||||||
|
@ -585,7 +595,11 @@ TEST(tableDistVgroup, childTableCase) {
|
||||||
code = catalogGetHandle(ctgTestClusterId, &pCtg);
|
code = catalogGetHandle(ctgTestClusterId, &pCtg);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestCTablename, &vgList);
|
SName n = {.type = T_NAME_TABLE, .acctId = 1};
|
||||||
|
strcpy(n.dbname, "db1");
|
||||||
|
strcpy(n.tname, ctgTestCTablename);
|
||||||
|
|
||||||
|
code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
|
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
|
||||||
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
|
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
|
||||||
|
@ -607,14 +621,14 @@ TEST(tableDistVgroup, superTableCase) {
|
||||||
initQueryModuleMsgHandle();
|
initQueryModuleMsgHandle();
|
||||||
|
|
||||||
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
|
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
|
||||||
|
int32_t code = catalogGetHandle(ctgTestClusterId, &pCtg);
|
||||||
int32_t code = catalogInit(NULL);
|
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
code = catalogGetHandle(ctgTestClusterId, &pCtg);
|
SName n = {.type = T_NAME_TABLE, .acctId = 1};
|
||||||
ASSERT_EQ(code, 0);
|
strcpy(n.dbname, "db1");
|
||||||
|
strcpy(n.tname, ctgTestSTablename);
|
||||||
|
|
||||||
code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestSTablename, &vgList);
|
code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 10);
|
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 10);
|
||||||
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
|
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
|
||||||
|
|
|
@ -156,7 +156,7 @@ typedef struct SCreateDbInfo {
|
||||||
SToken dbname;
|
SToken dbname;
|
||||||
int32_t replica;
|
int32_t replica;
|
||||||
int32_t cacheBlockSize;
|
int32_t cacheBlockSize;
|
||||||
int32_t maxTablesPerVnode;
|
int32_t numOfVgroups;
|
||||||
int32_t numOfBlocks;
|
int32_t numOfBlocks;
|
||||||
int32_t daysPerFile;
|
int32_t daysPerFile;
|
||||||
int32_t minRowsPerBlock;
|
int32_t minRowsPerBlock;
|
||||||
|
|
|
@ -4,7 +4,6 @@
|
||||||
#include "parserInt.h"
|
#include "parserInt.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
|
|
||||||
int32_t createSName(SName* pName, SToken* pTableName, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
|
|
||||||
|
|
||||||
SCreateUserMsg* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
|
SCreateUserMsg* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
|
||||||
SCreateAcctMsg* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
|
SCreateAcctMsg* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
|
||||||
|
|
|
@ -81,6 +81,8 @@ int32_t KvRowAppend(const void *value, int32_t len, void *param);
|
||||||
typedef int32_t (*_row_append_fn_t)(const void *value, int32_t len, void *param);
|
typedef int32_t (*_row_append_fn_t)(const void *value, int32_t len, void *param);
|
||||||
int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf);
|
int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf);
|
||||||
|
|
||||||
|
int32_t createSName(SName* pName, SToken* pTableName, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -280,6 +280,7 @@ comp(Y) ::= COMP INTEGER(X). { Y = X; }
|
||||||
prec(Y) ::= PRECISION STRING(X). { Y = X; }
|
prec(Y) ::= PRECISION STRING(X). { Y = X; }
|
||||||
update(Y) ::= UPDATE INTEGER(X). { Y = X; }
|
update(Y) ::= UPDATE INTEGER(X). { Y = X; }
|
||||||
cachelast(Y) ::= CACHELAST INTEGER(X). { Y = X; }
|
cachelast(Y) ::= CACHELAST INTEGER(X). { Y = X; }
|
||||||
|
vgroups(Y) ::= VGROUPS INTEGER(X). { Y = X; }
|
||||||
//partitions(Y) ::= PARTITIONS INTEGER(X). { Y = X; }
|
//partitions(Y) ::= PARTITIONS INTEGER(X). { Y = X; }
|
||||||
|
|
||||||
%type db_optr {SCreateDbInfo}
|
%type db_optr {SCreateDbInfo}
|
||||||
|
@ -300,6 +301,7 @@ db_optr(Y) ::= db_optr(Z) prec(X). { Y = Z; Y.precision = X; }
|
||||||
db_optr(Y) ::= db_optr(Z) keep(X). { Y = Z; Y.keep = X; }
|
db_optr(Y) ::= db_optr(Z) keep(X). { Y = Z; Y.keep = X; }
|
||||||
db_optr(Y) ::= db_optr(Z) update(X). { Y = Z; Y.update = strtol(X.z, NULL, 10); }
|
db_optr(Y) ::= db_optr(Z) update(X). { Y = Z; Y.update = strtol(X.z, NULL, 10); }
|
||||||
db_optr(Y) ::= db_optr(Z) cachelast(X). { Y = Z; Y.cachelast = strtol(X.z, NULL, 10); }
|
db_optr(Y) ::= db_optr(Z) cachelast(X). { Y = Z; Y.cachelast = strtol(X.z, NULL, 10); }
|
||||||
|
db_optr(Y) ::= db_optr(Z) vgroups(X). { Y = Z; Y.numOfVgroups = strtol(X.z, NULL, 10); }
|
||||||
|
|
||||||
//%type topic_optr {SCreateDbInfo}
|
//%type topic_optr {SCreateDbInfo}
|
||||||
//
|
//
|
||||||
|
|
|
@ -13,11 +13,12 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "astGenerator.h"
|
|
||||||
#include <parserInt.h>
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
|
#include "tmsg.h"
|
||||||
|
#include "parserInt.h"
|
||||||
#include "tmsgtype.h"
|
#include "tmsgtype.h"
|
||||||
|
#include "astGenerator.h"
|
||||||
|
|
||||||
SArray *tListItemAppend(SArray *pList, SVariant *pVar, uint8_t sortOrder) {
|
SArray *tListItemAppend(SArray *pList, SVariant *pVar, uint8_t sortOrder) {
|
||||||
if (pList == NULL) {
|
if (pList == NULL) {
|
||||||
|
@ -947,23 +948,19 @@ void setCompactVnodeSql(SSqlInfo *pInfo, int32_t type, SArray *pParam) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void setDefaultCreateDbOption(SCreateDbInfo *pDBInfo) {
|
void setDefaultCreateDbOption(SCreateDbInfo *pDBInfo) {
|
||||||
pDBInfo->compressionLevel = -1;
|
pDBInfo->compressionLevel= -1;
|
||||||
|
|
||||||
pDBInfo->walLevel = -1;
|
pDBInfo->walLevel = -1;
|
||||||
pDBInfo->fsyncPeriod = -1;
|
pDBInfo->fsyncPeriod = -1;
|
||||||
pDBInfo->commitTime = -1;
|
pDBInfo->commitTime = -1;
|
||||||
pDBInfo->maxTablesPerVnode = -1;
|
pDBInfo->numOfVgroups = 2;
|
||||||
|
|
||||||
pDBInfo->cacheBlockSize = -1;
|
pDBInfo->cacheBlockSize = -1;
|
||||||
pDBInfo->numOfBlocks = -1;
|
pDBInfo->numOfBlocks = -1;
|
||||||
pDBInfo->maxRowsPerBlock = -1;
|
pDBInfo->maxRowsPerBlock = -1;
|
||||||
pDBInfo->minRowsPerBlock = -1;
|
pDBInfo->minRowsPerBlock = -1;
|
||||||
pDBInfo->daysPerFile = -1;
|
pDBInfo->daysPerFile = -1;
|
||||||
|
|
||||||
pDBInfo->replica = -1;
|
pDBInfo->replica = -1;
|
||||||
pDBInfo->quorum = -1;
|
pDBInfo->quorum = -1;
|
||||||
pDBInfo->keep = NULL;
|
pDBInfo->keep = NULL;
|
||||||
|
|
||||||
pDBInfo->update = -1;
|
pDBInfo->update = -1;
|
||||||
pDBInfo->cachelast = -1;
|
pDBInfo->cachelast = -1;
|
||||||
|
|
||||||
|
|
|
@ -188,13 +188,14 @@ static void doSetDbOptions(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDb) {
|
||||||
pMsg->minRows = htonl(pCreateDb->minRowsPerBlock);
|
pMsg->minRows = htonl(pCreateDb->minRowsPerBlock);
|
||||||
pMsg->maxRows = htonl(pCreateDb->maxRowsPerBlock);
|
pMsg->maxRows = htonl(pCreateDb->maxRowsPerBlock);
|
||||||
pMsg->fsyncPeriod = htonl(pCreateDb->fsyncPeriod);
|
pMsg->fsyncPeriod = htonl(pCreateDb->fsyncPeriod);
|
||||||
pMsg->compression = pCreateDb->compressionLevel;
|
pMsg->compression = (int8_t) pCreateDb->compressionLevel;
|
||||||
pMsg->walLevel = (char)pCreateDb->walLevel;
|
pMsg->walLevel = (char)pCreateDb->walLevel;
|
||||||
pMsg->replications = pCreateDb->replica;
|
pMsg->replications = pCreateDb->replica;
|
||||||
pMsg->quorum = pCreateDb->quorum;
|
pMsg->quorum = pCreateDb->quorum;
|
||||||
pMsg->ignoreExist = pCreateDb->ignoreExists;
|
pMsg->ignoreExist = pCreateDb->ignoreExists;
|
||||||
pMsg->update = pCreateDb->update;
|
pMsg->update = pCreateDb->update;
|
||||||
pMsg->cacheLastRow = pCreateDb->cachelast;
|
pMsg->cacheLastRow = pCreateDb->cachelast;
|
||||||
|
pMsg->numOfVgroups = htonl(pCreateDb->numOfVgroups);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t setDbOptions(SCreateDbMsg* pCreateDbMsg, const SCreateDbInfo* pCreateDbSql, SMsgBuf* pMsgBuf) {
|
int32_t setDbOptions(SCreateDbMsg* pCreateDbMsg, const SCreateDbInfo* pCreateDbSql, SMsgBuf* pMsgBuf) {
|
||||||
|
@ -208,8 +209,6 @@ int32_t setDbOptions(SCreateDbMsg* pCreateDbMsg, const SCreateDbInfo* pCreateDbS
|
||||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo configurable
|
|
||||||
pCreateDbMsg->numOfVgroups = htonl(2);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -233,45 +232,6 @@ SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCt
|
||||||
return pCreateMsg;
|
return pCreateMsg;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createSName(SName* pName, SToken* pTableName, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
|
|
||||||
const char* msg1 = "name too long";
|
|
||||||
const char* msg2 = "acctId too long";
|
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
char* p = strnchr(pTableName->z, TS_PATH_DELIMITER[0], pTableName->n, false);
|
|
||||||
|
|
||||||
if (p != NULL) { // db has been specified in sql string so we ignore current db path
|
|
||||||
code = tNameSetAcctId(pName, pParseCtx->acctId);
|
|
||||||
if (code != 0) {
|
|
||||||
return buildInvalidOperationMsg(pMsgBuf, msg2);
|
|
||||||
}
|
|
||||||
|
|
||||||
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
|
||||||
strncpy(name, pTableName->z, pTableName->n);
|
|
||||||
|
|
||||||
code = tNameFromString(pName, name, T_NAME_DB|T_NAME_TABLE);
|
|
||||||
if (code != 0) {
|
|
||||||
return buildInvalidOperationMsg(pMsgBuf, msg1);
|
|
||||||
}
|
|
||||||
} else { // get current DB name first, and then set it into path
|
|
||||||
if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
|
|
||||||
return buildInvalidOperationMsg(pMsgBuf, msg1);
|
|
||||||
}
|
|
||||||
|
|
||||||
tNameSetDbName(pName, pParseCtx->acctId, pParseCtx->db, strlen(pParseCtx->db));
|
|
||||||
|
|
||||||
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
|
||||||
strncpy(name, pTableName->z, pTableName->n);
|
|
||||||
|
|
||||||
code = tNameFromString(pName, name, T_NAME_TABLE);
|
|
||||||
if (code != 0) {
|
|
||||||
code = buildInvalidOperationMsg(pMsgBuf, msg1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
|
SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
|
||||||
SSchema* pSchema;
|
SSchema* pSchema;
|
||||||
|
|
||||||
|
|
|
@ -157,6 +157,12 @@ static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) {
|
||||||
return buildInvalidOperationMsg(pMsgBuf, msg);
|
return buildInvalidOperationMsg(pMsgBuf, msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val = htonl(pCreate->numOfVgroups);
|
||||||
|
if (val < TSDB_MIN_VNODES_PER_DB || val > TSDB_MAX_VNODES_PER_DB) {
|
||||||
|
snprintf(msg, tListLen(msg), "invalid number of vgroups for DB:%d valid range: [%d, %d]", val,
|
||||||
|
TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB);
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -316,16 +322,12 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
const char* pStableName = tNameGetTableName(&name);
|
|
||||||
SArray* pValList = pCreateTableInfo->pTagVals;
|
SArray* pValList = pCreateTableInfo->pTagVals;
|
||||||
|
|
||||||
size_t numOfInputTag = taosArrayGetSize(pValList);
|
size_t numOfInputTag = taosArrayGetSize(pValList);
|
||||||
STableMeta* pSuperTableMeta = NULL;
|
STableMeta* pSuperTableMeta = NULL;
|
||||||
|
|
||||||
char dbName[TSDB_DB_FNAME_LEN] = {0};
|
catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta);
|
||||||
tNameGetFullDbName(&name, dbName);
|
|
||||||
|
|
||||||
catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbName, pStableName, &pSuperTableMeta);
|
|
||||||
assert(pSuperTableMeta != NULL);
|
assert(pSuperTableMeta != NULL);
|
||||||
|
|
||||||
// too long tag values will return invalid sql, not be truncated automatically
|
// too long tag values will return invalid sql, not be truncated automatically
|
||||||
|
@ -488,7 +490,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
|
||||||
*len = serLen;
|
*len = serLen;
|
||||||
|
|
||||||
SVgroupInfo info = {0};
|
SVgroupInfo info = {0};
|
||||||
catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbName, req.name, &info);
|
catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info);
|
||||||
|
|
||||||
pEpSet->inUse = info.inUse;
|
pEpSet->inUse = info.inUse;
|
||||||
pEpSet->numOfEps = info.numOfEps;
|
pEpSet->numOfEps = info.numOfEps;
|
||||||
|
@ -496,6 +498,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
|
||||||
pEpSet->port[i] = info.epAddr[i].port;
|
pEpSet->port[i] = info.epAddr[i].port;
|
||||||
tstrncpy(pEpSet->fqdn[i], info.epAddr[i].fqdn, tListLen(pEpSet->fqdn[i]));
|
tstrncpy(pEpSet->fqdn[i], info.epAddr[i].fqdn, tListLen(pEpSet->fqdn[i]));
|
||||||
}
|
}
|
||||||
|
|
||||||
((SMsgHead*)(*pOutput))->vgId = htonl(info.vgId);
|
((SMsgHead*)(*pOutput))->vgId = htonl(info.vgId);
|
||||||
((SMsgHead*)(*pOutput))->contLen = htonl(serLen);
|
((SMsgHead*)(*pOutput))->contLen = htonl(serLen);
|
||||||
}
|
}
|
||||||
|
|
|
@ -154,12 +154,17 @@ static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullD
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
|
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
|
||||||
char fullDbName[TSDB_DB_FNAME_LEN] = {0};
|
SName name = {0};
|
||||||
char tableName[TSDB_TABLE_NAME_LEN] = {0};
|
createSName(&name, pTname, &pCxt->pComCxt->ctx, &pCxt->msg);
|
||||||
CHECK_CODE(buildName(pCxt, pTname, fullDbName, tableName));
|
|
||||||
CHECK_CODE(catalogGetTableMeta(pCxt->pComCxt->ctx.pCatalog, pCxt->pComCxt->ctx.pTransporter, &pCxt->pComCxt->ctx.mgmtEpSet, fullDbName, tableName, &pCxt->pTableMeta));
|
char tableName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
|
tNameExtractFullName(&name, tableName);
|
||||||
|
|
||||||
|
SParseBasicCtx* pBasicCtx = &pCxt->pComCxt->ctx;
|
||||||
|
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta));
|
||||||
|
|
||||||
SVgroupInfo vg;
|
SVgroupInfo vg;
|
||||||
CHECK_CODE(catalogGetTableHashVgroup(pCxt->pComCxt->ctx.pCatalog, pCxt->pComCxt->ctx.pTransporter, &pCxt->pComCxt->ctx.mgmtEpSet, fullDbName, tableName, &vg));
|
CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
|
||||||
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
|
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1947,3 +1947,38 @@ int32_t KvRowAppend(const void *value, int32_t len, void *param) {
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t createSName(SName* pName, SToken* pTableName, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
|
||||||
|
const char* msg1 = "name too long";
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
char* p = strnchr(pTableName->z, TS_PATH_DELIMITER[0], pTableName->n, false);
|
||||||
|
|
||||||
|
if (p != NULL) { // db has been specified in sql string so we ignore current db path
|
||||||
|
tNameSetAcctId(pName, pParseCtx->acctId);
|
||||||
|
|
||||||
|
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
|
strncpy(name, pTableName->z, pTableName->n);
|
||||||
|
|
||||||
|
code = tNameFromString(pName, name, T_NAME_DB|T_NAME_TABLE);
|
||||||
|
if (code != 0) {
|
||||||
|
return buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||||
|
}
|
||||||
|
} else { // get current DB name first, and then set it into path
|
||||||
|
if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
|
||||||
|
return buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||||
|
}
|
||||||
|
|
||||||
|
tNameSetDbName(pName, pParseCtx->acctId, pParseCtx->db, strlen(pParseCtx->db));
|
||||||
|
|
||||||
|
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
|
strncpy(name, pTableName->z, pTableName->n);
|
||||||
|
|
||||||
|
code = tNameFromString(pName, name, T_NAME_TABLE);
|
||||||
|
if (code != 0) {
|
||||||
|
code = buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -45,12 +45,12 @@ int32_t __catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandl
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t __catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
|
int32_t __catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
|
||||||
return mockCatalogService->catalogGetTableMeta(pDBName, pTableName, pTableMeta);
|
return mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t __catalogGetTableHashVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo) {
|
int32_t __catalogGetTableHashVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo* vgInfo) {
|
||||||
return mockCatalogService->catalogGetTableHashVgroup(pDBName, pTableName, vgInfo);
|
return mockCatalogService->catalogGetTableHashVgroup(pTableName, vgInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
void initMetaDataEnv() {
|
void initMetaDataEnv() {
|
||||||
|
|
|
@ -94,9 +94,14 @@ public:
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogGetTableMeta(const char* pDbFullName, const char* pTableName, STableMeta** pTableMeta) const {
|
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const {
|
||||||
std::unique_ptr<STableMeta> table;
|
std::unique_ptr<STableMeta> table;
|
||||||
int32_t code = copyTableSchemaMeta(toDbname(pDbFullName), pTableName, &table);
|
|
||||||
|
char db[TSDB_DB_FNAME_LEN] = {0};
|
||||||
|
tNameGetFullDbName(pTableName, db);
|
||||||
|
|
||||||
|
const char* tname = tNameGetTableName(pTableName);
|
||||||
|
int32_t code = copyTableSchemaMeta(db, tname, &table);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -104,7 +109,7 @@ public:
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogGetTableHashVgroup(const char* pDbFullName, const char* pTableName, SVgroupInfo* vgInfo) const {
|
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const {
|
||||||
// todo
|
// todo
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -283,10 +288,10 @@ std::shared_ptr<MockTableMeta> MockCatalogService::getTableMeta(const std::strin
|
||||||
return impl_->getTableMeta(db, tbname);
|
return impl_->getTableMeta(db, tbname);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t MockCatalogService::catalogGetTableMeta(const char* pDBName, const char* pTableName, STableMeta** pTableMeta) const {
|
int32_t MockCatalogService::catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const {
|
||||||
return impl_->catalogGetTableMeta(pDBName, pTableName, pTableMeta);
|
return impl_->catalogGetTableMeta(pTableName, pTableMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t MockCatalogService::catalogGetTableHashVgroup(const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo) const {
|
int32_t MockCatalogService::catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const {
|
||||||
return impl_->catalogGetTableHashVgroup(pDBName, pTableName, vgInfo);
|
return impl_->catalogGetTableHashVgroup(pTableName, vgInfo);
|
||||||
}
|
}
|
|
@ -57,8 +57,8 @@ public:
|
||||||
void showTables() const;
|
void showTables() const;
|
||||||
std::shared_ptr<MockTableMeta> getTableMeta(const std::string& db, const std::string& tbname) const;
|
std::shared_ptr<MockTableMeta> getTableMeta(const std::string& db, const std::string& tbname) const;
|
||||||
|
|
||||||
int32_t catalogGetTableMeta(const char* pDBName, const char* pTableName, STableMeta** pTableMeta) const;
|
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const;
|
||||||
int32_t catalogGetTableHashVgroup(const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo) const;
|
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::unique_ptr<MockCatalogServiceImpl> impl_;
|
std::unique_ptr<MockCatalogServiceImpl> impl_;
|
||||||
|
|
Loading…
Reference in New Issue