feat: add batch get table vgId API
This commit is contained in:
parent
650ea06b69
commit
36c582c678
|
@ -220,6 +220,7 @@ DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res);
|
||||||
|
|
||||||
DLL_EXPORT int taos_get_db_route_info(TAOS *taos, const char *db, TAOS_DB_ROUTE_INFO *dbInfo);
|
DLL_EXPORT int taos_get_db_route_info(TAOS *taos, const char *db, TAOS_DB_ROUTE_INFO *dbInfo);
|
||||||
DLL_EXPORT int taos_get_table_vgId(TAOS *taos, const char *db, const char *table, int *vgId);
|
DLL_EXPORT int taos_get_table_vgId(TAOS *taos, const char *db, const char *table, int *vgId);
|
||||||
|
DLL_EXPORT int taos_get_tables_vgId(TAOS *taos, const char *db, const char *table[], int tableNum, int *vgId);
|
||||||
|
|
||||||
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
|
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
|
||||||
|
|
||||||
|
|
|
@ -210,6 +210,9 @@ int32_t catalogGetCachedTableMeta(SCatalog* pCtg, const SName* pTableName, STabl
|
||||||
|
|
||||||
int32_t catalogGetCachedSTableMeta(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta);
|
int32_t catalogGetCachedSTableMeta(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta);
|
||||||
|
|
||||||
|
int32_t catalogGetTablesHashVgId(SCatalog* pCtg, SRequestConnInfo* pConn, int32_t acctId, const char* pDb, const char* pTableName[],
|
||||||
|
int32_t tableNum, int32_t *vgId);
|
||||||
|
|
||||||
int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists);
|
int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists);
|
||||||
|
|
||||||
int32_t catalogGetCachedTableVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta);
|
int32_t catalogGetCachedTableVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta);
|
||||||
|
|
|
@ -1168,6 +1168,54 @@ _return:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int taos_get_tables_vgId(TAOS *taos, const char *db, const char *table[], int tableNum, int *vgId) {
|
||||||
|
if (NULL == taos) {
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == db || NULL == table || NULL == vgId || tableNum <= 0) {
|
||||||
|
tscError("invalid input param, db:%p, table:%p, vgId:%p, tbNum:%d", db, table, vgId, tableNum);
|
||||||
|
terrno = TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t connId = *(int64_t *)taos;
|
||||||
|
SRequestObj *pRequest = NULL;
|
||||||
|
char *sql = "taos_get_table_vgId";
|
||||||
|
int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest, 0);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRequest->syncQuery = true;
|
||||||
|
|
||||||
|
STscObj *pTscObj = pRequest->pTscObj;
|
||||||
|
SCatalog *pCtg = NULL;
|
||||||
|
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRequestConnInfo conn = {
|
||||||
|
.pTrans = pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
|
||||||
|
|
||||||
|
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
||||||
|
|
||||||
|
code = catalogGetTablesHashVgId(pCtg, &conn, pTscObj->acctId, db, table, tableNum, vgId);
|
||||||
|
if (code) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
terrno = code;
|
||||||
|
|
||||||
|
destroyRequest(pRequest);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
||||||
if (NULL == taos) {
|
if (NULL == taos) {
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
|
|
@ -776,6 +776,7 @@ void ctgFreeHandleImpl(SCatalog* pCtg);
|
||||||
int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup);
|
int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup);
|
||||||
int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo* dbInfo, SCtgTbHashsCtx* pCtx,
|
int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo* dbInfo, SCtgTbHashsCtx* pCtx,
|
||||||
char* dbFName, SArray* pNames, bool update);
|
char* dbFName, SArray* pNames, bool update);
|
||||||
|
int32_t ctgGetVgIdsFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, char* dbFName, const char* pTbs[], int32_t tbNum, int32_t* vgId);
|
||||||
void ctgResetTbMetaTask(SCtgTask* pTask);
|
void ctgResetTbMetaTask(SCtgTask* pTask);
|
||||||
void ctgFreeDbCache(SCtgDBCache* dbCache);
|
void ctgFreeDbCache(SCtgDBCache* dbCache);
|
||||||
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2);
|
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2);
|
||||||
|
|
|
@ -551,6 +551,37 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgGetTbsHashVgId(SCatalog* pCtg, SRequestConnInfo* pConn, int32_t acctId, const char* pDb, const char* pTbs[], int32_t tbNum, int32_t* vgId) {
|
||||||
|
if (IS_SYS_DBNAME(pDb)) {
|
||||||
|
ctgError("no valid vgInfo for db, dbname:%s", pDb);
|
||||||
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
SCtgDBCache* dbCache = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||||
|
snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%s", acctId, pDb);
|
||||||
|
|
||||||
|
SDBVgInfo* vgInfo = NULL;
|
||||||
|
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, dbFName, &dbCache, &vgInfo, NULL));
|
||||||
|
|
||||||
|
CTG_ERR_JRET(ctgGetVgIdsFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgCache.vgInfo, dbFName, pTbs, tbNum, vgId));
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
if (dbCache) {
|
||||||
|
ctgRUnlockVgInfo(dbCache);
|
||||||
|
ctgReleaseDBCache(pCtg, dbCache);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (vgInfo) {
|
||||||
|
freeVgInfo(vgInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
CTG_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgGetCachedTbVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) {
|
int32_t ctgGetCachedTbVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
char db[TSDB_DB_FNAME_LEN] = {0};
|
char db[TSDB_DB_FNAME_LEN] = {0};
|
||||||
|
@ -1141,6 +1172,13 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const
|
||||||
CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, pConn, pTableName, pVgroup, NULL));
|
CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, pConn, pTableName, pVgroup, NULL));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t catalogGetTablesHashVgId(SCatalog* pCtg, SRequestConnInfo* pConn, int32_t acctId, const char* pDb, const char* pTableName[],
|
||||||
|
int32_t tableNum, int32_t *vgId) {
|
||||||
|
CTG_API_ENTER();
|
||||||
|
|
||||||
|
CTG_API_LEAVE(ctgGetTbsHashVgId(pCtg, pConn, acctId, pDb, pTableName, tableNum, vgId));
|
||||||
|
}
|
||||||
|
|
||||||
int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists) {
|
int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
|
|
|
@ -986,6 +986,43 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo*
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgGetVgIdsFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, char* dbFName, const char* pTbs[], int32_t tbNum, int32_t* vgId) {
|
||||||
|
int32_t code = 0;
|
||||||
|
CTG_ERR_RET(ctgMakeVgArray(dbInfo));
|
||||||
|
|
||||||
|
int32_t vgNum = taosArrayGetSize(dbInfo->vgArray);
|
||||||
|
|
||||||
|
if (vgNum <= 0) {
|
||||||
|
ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
|
||||||
|
CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
SVgroupInfo* vgInfo = NULL;
|
||||||
|
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||||
|
snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
|
||||||
|
int32_t offset = strlen(tbFullName);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < tbNum; ++i) {
|
||||||
|
snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", pTbs[i]);
|
||||||
|
uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
|
||||||
|
dbInfo->hashPrefix, dbInfo->hashSuffix);
|
||||||
|
|
||||||
|
vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, ctgHashValueComp, TD_EQ);
|
||||||
|
if (NULL == vgInfo) {
|
||||||
|
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
|
||||||
|
(int32_t)taosArrayGetSize(dbInfo->vgArray));
|
||||||
|
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
vgId[i] = vgInfo->vgId;
|
||||||
|
|
||||||
|
ctgDebug("Got tb %s vgId:%d", tbFullName, vgInfo->vgId);
|
||||||
|
}
|
||||||
|
|
||||||
|
CTG_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) {
|
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) {
|
||||||
if (*(uint64_t*)key1 < ((SSTableVersion*)key2)->suid) {
|
if (*(uint64_t*)key1 < ((SSTableVersion*)key2)->suid) {
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -26,7 +26,10 @@
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
|
|
||||||
int rtTables = 20;
|
#define RT_TABLE_NUM 100
|
||||||
|
|
||||||
|
int rtTables = RT_TABLE_NUM;
|
||||||
|
int rtTableUs[RT_TABLE_NUM] = {0};
|
||||||
char hostName[128];
|
char hostName[128];
|
||||||
|
|
||||||
static void rtExecSQL(TAOS *taos, char *command) {
|
static void rtExecSQL(TAOS *taos, char *command) {
|
||||||
|
@ -101,6 +104,22 @@ int rtPrepare(TAOS ** p, int prefix, int suffix) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t rtGetTimeOfDay(struct timeval *tv) {
|
||||||
|
return gettimeofday(tv, NULL);
|
||||||
|
}
|
||||||
|
static int64_t rtGetTimestampMs() {
|
||||||
|
struct timeval systemTime;
|
||||||
|
rtGetTimeOfDay(&systemTime);
|
||||||
|
return (int64_t)systemTime.tv_sec * 1000LL + (int64_t)systemTime.tv_usec/1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int64_t rtGetTimestampUs() {
|
||||||
|
struct timeval systemTime;
|
||||||
|
rtGetTimeOfDay(&systemTime);
|
||||||
|
return (int64_t)systemTime.tv_sec * 1000000LL + (int64_t)systemTime.tv_usec;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int rtGetDbRouteInfo(TAOS * taos) {
|
int rtGetDbRouteInfo(TAOS * taos) {
|
||||||
TAOS_DB_ROUTE_INFO dbInfo;
|
TAOS_DB_ROUTE_INFO dbInfo;
|
||||||
int code = taos_get_db_route_info(taos, "db1", &dbInfo);
|
int code = taos_get_db_route_info(taos, "db1", &dbInfo);
|
||||||
|
@ -126,7 +145,10 @@ int rtGetTableRouteInfo(TAOS * taos) {
|
||||||
char sql[1024] = {0};
|
char sql[1024] = {0};
|
||||||
for (int32_t i = 0; i < rtTables; ++i) {
|
for (int32_t i = 0; i < rtTables; ++i) {
|
||||||
sprintf(table, "tb%d", i);
|
sprintf(table, "tb%d", i);
|
||||||
|
int64_t startTs = rtGetTimestampUs();
|
||||||
int code = taos_get_table_vgId(taos, "db1", table, &vgId1);
|
int code = taos_get_table_vgId(taos, "db1", table, &vgId1);
|
||||||
|
int64_t endTs = rtGetTimestampUs();
|
||||||
|
rtTableUs[i] = (int)(endTs - startTs);
|
||||||
if (code) {
|
if (code) {
|
||||||
rtExit("taos_get_table_vgId", taos_errstr(NULL));
|
rtExit("taos_get_table_vgId", taos_errstr(NULL));
|
||||||
}
|
}
|
||||||
|
@ -142,9 +164,61 @@ int rtGetTableRouteInfo(TAOS * taos) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
printf("table vgId use us:");
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < rtTables; ++i) {
|
||||||
|
printf("%d ", rtTableUs[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("\n");
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int rtGetTablesRouteInfo(TAOS * taos) {
|
||||||
|
char *table = {0};
|
||||||
|
int *vgId1 = malloc(rtTables * sizeof(int));
|
||||||
|
int vgId2 = 0;
|
||||||
|
char sql[1024] = {0};
|
||||||
|
const char *tbs[RT_TABLE_NUM] = {0};
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < rtTables; ++i) {
|
||||||
|
table = malloc(10);
|
||||||
|
sprintf(table, "tb%d", i);
|
||||||
|
tbs[i] = table;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t startTs = rtGetTimestampUs();
|
||||||
|
int code = taos_get_tables_vgId(taos, "db1", tbs, rtTables, vgId1);
|
||||||
|
int64_t endTs = rtGetTimestampUs();
|
||||||
|
rtTableUs[0] = (int)(endTs - startTs);
|
||||||
|
if (code) {
|
||||||
|
rtExit("taos_get_tables_vgId", taos_errstr(NULL));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < rtTables; ++i) {
|
||||||
|
sprintf(sql, "select vgroup_id from information_schema.ins_tables where table_name=\"tb%d\"", i);
|
||||||
|
|
||||||
|
rtFetchVgId(taos, sql, &vgId2);
|
||||||
|
if (vgId1[i] != vgId2) {
|
||||||
|
fprintf(stderr, "!!!! table tb%d vgId mis-match, vgId(api):%d, vgId(sys):%d\n", i, vgId1[i], vgId2);
|
||||||
|
exit(1);
|
||||||
|
} else {
|
||||||
|
printf("table tb%d vgId %d\n", i, vgId1[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("tables vgId use us:%d\n", rtTableUs[0]);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < rtTables; ++i) {
|
||||||
|
free((void*)tbs[i]);
|
||||||
|
}
|
||||||
|
free(vgId1);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void rtClose(TAOS * taos) {
|
void rtClose(TAOS * taos) {
|
||||||
taos_close(taos);
|
taos_close(taos);
|
||||||
}
|
}
|
||||||
|
@ -170,6 +244,16 @@ int rtRunCase2(void) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int rtRunCase3(void) {
|
||||||
|
TAOS *taos = NULL;
|
||||||
|
rtPrepare(&taos, 0, 0);
|
||||||
|
rtGetTablesRouteInfo(taos);
|
||||||
|
rtClose(taos);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
int main(int argc, char *argv[]) {
|
||||||
if (argc != 2) {
|
if (argc != 2) {
|
||||||
printf("usage: %s server-ip\n", argv[0]);
|
printf("usage: %s server-ip\n", argv[0]);
|
||||||
|
@ -182,6 +266,7 @@ int main(int argc, char *argv[]) {
|
||||||
|
|
||||||
rtRunCase1();
|
rtRunCase1();
|
||||||
rtRunCase2();
|
rtRunCase2();
|
||||||
|
rtRunCase3();
|
||||||
|
|
||||||
int32_t l = 5;
|
int32_t l = 5;
|
||||||
while (l) {
|
while (l) {
|
||||||
|
|
Loading…
Reference in New Issue