catalog refactor
This commit is contained in:
parent
a53d7036ec
commit
6be54c112e
|
@ -149,8 +149,8 @@ typedef enum _mgmt_table {
|
|||
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
char* dbName;
|
||||
char* tableFullName;
|
||||
char* dbFName;
|
||||
char* tbName;
|
||||
} SBuildTableMetaInput;
|
||||
|
||||
typedef struct {
|
||||
|
@ -691,8 +691,8 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
SMsgHead header;
|
||||
char dbFname[TSDB_DB_FNAME_LEN];
|
||||
char tableFname[TSDB_TABLE_FNAME_LEN];
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
} STableInfoReq;
|
||||
|
||||
typedef struct {
|
||||
|
@ -717,9 +717,9 @@ typedef struct {
|
|||
} SVgroupsInfo;
|
||||
|
||||
typedef struct {
|
||||
char tbFname[TSDB_TABLE_FNAME_LEN]; // table full name
|
||||
char stbFname[TSDB_TABLE_FNAME_LEN];
|
||||
char dbFname[TSDB_DB_FNAME_LEN];
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
char stbName[TSDB_TABLE_NAME_LEN];
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
int32_t numOfTags;
|
||||
int32_t numOfColumns;
|
||||
int8_t precision;
|
||||
|
|
|
@ -49,7 +49,7 @@ typedef struct SCatalogCfg {
|
|||
uint32_t maxTblCacheNum;
|
||||
uint32_t maxDBCacheNum;
|
||||
uint32_t dbRentSec;
|
||||
uint32_t stableRentSec;
|
||||
uint32_t stbRentSec;
|
||||
} SCatalogCfg;
|
||||
|
||||
typedef struct SSTableMetaVersion {
|
||||
|
@ -99,7 +99,7 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pTransporter, const
|
|||
|
||||
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
|
||||
|
||||
int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo);
|
||||
int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId);
|
||||
|
||||
/**
|
||||
* Get a table's meta data.
|
||||
|
|
|
@ -81,16 +81,15 @@ typedef struct STableMeta {
|
|||
} STableMeta;
|
||||
|
||||
typedef struct SDBVgroupInfo {
|
||||
SRWLatch lock;
|
||||
uint64_t dbId;
|
||||
int32_t vgVersion;
|
||||
int8_t hashMethod;
|
||||
SHashObj *vgInfo; //key:vgId, value:SVgroupInfo
|
||||
SHashObj *vgHash; //key:vgId, value:SVgroupInfo
|
||||
} SDBVgroupInfo;
|
||||
|
||||
typedef struct SUseDbOutput {
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
SDBVgroupInfo dbVgroup;
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
SDBVgroupInfo *dbVgroup;
|
||||
} SUseDbOutput;
|
||||
|
||||
enum {
|
||||
|
@ -103,8 +102,9 @@ enum {
|
|||
|
||||
typedef struct STableMetaOutput {
|
||||
int32_t metaType;
|
||||
char ctbFname[TSDB_TABLE_FNAME_LEN];
|
||||
char tbFname[TSDB_TABLE_FNAME_LEN];
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
char ctbName[TSDB_TABLE_NAME_LEN];
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
SCTableMeta ctbMeta;
|
||||
STableMeta *tbMeta;
|
||||
} STableMetaOutput;
|
||||
|
|
|
@ -41,19 +41,14 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
|
|||
tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid);
|
||||
|
||||
if (rsp->vgVersion < 0) {
|
||||
SDbVgVersion dbInfo;
|
||||
strcpy(dbInfo.dbName, rsp->db);
|
||||
dbInfo.dbId = rsp->uid;
|
||||
dbInfo.vgVersion = rsp->vgVersion;
|
||||
|
||||
code = catalogRemoveDBVgroup(pCatalog, &dbInfo);
|
||||
code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
|
||||
} else {
|
||||
SDBVgroupInfo vgInfo = {0};
|
||||
vgInfo.dbId = rsp->uid;
|
||||
vgInfo.vgVersion = rsp->vgVersion;
|
||||
vgInfo.hashMethod = rsp->hashMethod;
|
||||
vgInfo.vgInfo = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == vgInfo.vgInfo) {
|
||||
vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == vgInfo.vgHash) {
|
||||
tscError("hash init[%d] failed", rsp->vgNum);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -67,16 +62,16 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
|
|||
rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port);
|
||||
}
|
||||
|
||||
if (0 != taosHashPut(vgInfo.vgInfo, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) {
|
||||
if (0 != taosHashPut(vgInfo.vgHash, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) {
|
||||
tscError("hash push failed, errno:%d", errno);
|
||||
taosHashCleanup(vgInfo.vgInfo);
|
||||
taosHashCleanup(vgInfo.vgHash);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo);
|
||||
if (code) {
|
||||
taosHashCleanup(vgInfo.vgInfo);
|
||||
taosHashCleanup(vgInfo.vgHash);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -301,15 +301,12 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
|
||||
SDropDbRsp *rsp = (SDropDbRsp *)pMsg->pData;
|
||||
|
||||
SDbVgVersion dbVer = {0};
|
||||
struct SCatalog *pCatalog = NULL;
|
||||
|
||||
strncpy(dbVer.dbName, rsp->db, sizeof(dbVer.dbName));
|
||||
dbVer.dbId = be64toh(rsp->uid);
|
||||
rsp->uid = be64toh(rsp->uid);
|
||||
|
||||
catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||
|
||||
catalogRemoveDBVgroup(pCatalog, &dbVer);
|
||||
catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
|
||||
|
||||
tsem_post(&pRequest->body.rspSem);
|
||||
return code;
|
||||
|
|
|
@ -125,7 +125,7 @@ const char* Testbase::GetMetaName(int32_t index) {
|
|||
|
||||
int32_t Testbase::GetMetaNum() { return pMeta->numOfColumns; }
|
||||
|
||||
const char* Testbase::GetMetaTbName() { return pMeta->tbFname; }
|
||||
const char* Testbase::GetMetaTbName() { return pMeta->tbName; }
|
||||
|
||||
void Testbase::SendShowRetrieveReq() {
|
||||
int32_t contLen = sizeof(SRetrieveTableReq);
|
||||
|
@ -144,7 +144,7 @@ void Testbase::SendShowRetrieveReq() {
|
|||
pos = 0;
|
||||
}
|
||||
|
||||
const char* Testbase::GetShowName() { return pMeta->tbFname; }
|
||||
const char* Testbase::GetShowName() { return pMeta->tbName; }
|
||||
|
||||
int8_t Testbase::GetShowInt8() {
|
||||
int8_t data = *((int8_t*)(pData + pos));
|
||||
|
|
|
@ -426,7 +426,7 @@ static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_BNODE);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -186,7 +186,7 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp
|
|||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htonl(cols);
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
pShow->numOfColumns = cols;
|
||||
|
||||
pShow->offset[0] = 0;
|
||||
|
@ -196,7 +196,7 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp
|
|||
|
||||
pShow->numOfRows = 1;
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -1113,7 +1113,7 @@ static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMe
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_DB);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -608,7 +608,7 @@ static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp
|
|||
|
||||
pShow->numOfRows = TSDB_CONFIG_NUMBER;
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -715,7 +715,7 @@ static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_DNODE);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -482,7 +482,7 @@ static int32_t mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *p
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_FUNC);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -624,7 +624,7 @@ static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_MNODE);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
mndUpdateMnodeRole(pMnode);
|
||||
return 0;
|
||||
|
|
|
@ -623,7 +623,7 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
|||
|
||||
pShow->numOfRows = taosHashGetSize(pMgmt->cache->pHashTable);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -792,7 +792,7 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
|||
|
||||
pShow->numOfRows = 1000000;
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -426,7 +426,7 @@ static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_QNODE);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -428,7 +428,7 @@ static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_SNODE);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -684,20 +684,23 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
|
|||
SMnode *pMnode = pReq->pMnode;
|
||||
STableInfoReq *pInfo = pReq->rpcMsg.pCont;
|
||||
|
||||
mDebug("stb:%s, start to retrieve meta", pInfo->tableFname);
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
snprintf(tbFName, sizeof(tbFName), "%s.%s", pInfo->dbFName, pInfo->tbName);
|
||||
|
||||
SDbObj *pDb = mndAcquireDbByStb(pMnode, pInfo->tableFname);
|
||||
mDebug("stb:%s, start to retrieve meta", tbFName);
|
||||
|
||||
SDbObj *pDb = mndAcquireDbByStb(pMnode, tbFName);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
mError("stb:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
|
||||
mError("stb:%s, failed to retrieve meta since %s", tbFName, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SStbObj *pStb = mndAcquireStb(pMnode, pInfo->tableFname);
|
||||
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
|
||||
if (pStb == NULL) {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
terrno = TSDB_CODE_MND_INVALID_STB;
|
||||
mError("stb:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
|
||||
mError("stb:%s, failed to get meta since %s", tbFName, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -711,11 +714,12 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
|
|||
mndReleaseDb(pMnode, pDb);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mError("stb:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
|
||||
mError("stb:%s, failed to get meta since %s", tbFName, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(pMeta->tbFname, pStb->name, TSDB_TABLE_FNAME_LEN);
|
||||
strcpy(pMeta->dbFName, pStb->db);
|
||||
strcpy(pMeta->tbName, pInfo->tbName);
|
||||
pMeta->numOfTags = htonl(pStb->numOfTags);
|
||||
pMeta->numOfColumns = htonl(pStb->numOfColumns);
|
||||
pMeta->precision = pDb->cfg.precision;
|
||||
|
@ -740,7 +744,7 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
|
|||
pReq->pCont = pMeta;
|
||||
pReq->contLen = contLen;
|
||||
|
||||
mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pStb->numOfColumns, pStb->numOfTags);
|
||||
mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", tbFName, pStb->numOfColumns, pStb->numOfTags);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -816,7 +820,7 @@ static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pM
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_STB);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -646,7 +646,7 @@ static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
|
|||
SMnode *pMnode = pMsg->pMnode;
|
||||
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
|
||||
|
||||
mDebug("subscribe:%s, start to retrieve meta", pInfo->tableFname);
|
||||
mDebug("subscribe:%s, start to retrieve meta", pInfo->tbName);
|
||||
|
||||
#if 0
|
||||
SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname);
|
||||
|
@ -777,7 +777,7 @@ static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRs
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -339,7 +339,7 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) {
|
|||
SMnode *pMnode = pMsg->pMnode;
|
||||
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
|
||||
|
||||
mDebug("topic:%s, start to retrieve meta", pInfo->tableFname);
|
||||
mDebug("topic:%s, start to retrieve meta", pInfo->tbName);
|
||||
|
||||
#if 0
|
||||
SDbObj *pDb = mndAcquireDbByTopic(pMnode, pInfo->tableFname);
|
||||
|
@ -470,7 +470,7 @@ static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_TOPIC);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -474,7 +474,7 @@ static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *p
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_USER);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -525,7 +525,7 @@ static int32_t mndGetVgroupMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp
|
|||
}
|
||||
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -638,7 +638,7 @@ static int32_t mndGetVnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
|||
pShow->replica = dnodeId;
|
||||
pShow->numOfRows = mndGetVnodesNum(pMnode, dnodeId);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -64,7 +64,7 @@ TEST_F(MndTestShow, 03_ShowMsg_Conn) {
|
|||
test.SendShowMetaReq(TSDB_MGMT_TABLE_CONNS, "");
|
||||
|
||||
STableMetaRsp* pMeta = test.GetShowMeta();
|
||||
EXPECT_STREQ(pMeta->tbFname, "show connections");
|
||||
EXPECT_STREQ(pMeta->tbName, "show connections");
|
||||
EXPECT_EQ(pMeta->numOfTags, 0);
|
||||
EXPECT_EQ(pMeta->numOfColumns, 7);
|
||||
EXPECT_EQ(pMeta->precision, 0);
|
||||
|
|
|
@ -126,7 +126,8 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
|||
int32_t contLen = sizeof(STableInfoReq);
|
||||
|
||||
STableInfoReq* pReq = (STableInfoReq*)rpcMallocCont(contLen);
|
||||
strcpy(pReq->tableFname, "1.d1.stb");
|
||||
strcpy(pReq->dbFName, "1.d1");
|
||||
strcpy(pReq->tbName, "stb");
|
||||
|
||||
SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
|
@ -146,8 +147,8 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
|||
pSchema->bytes = htonl(pSchema->bytes);
|
||||
}
|
||||
|
||||
EXPECT_STREQ(pRsp->tbFname, "1.d1.stb");
|
||||
EXPECT_STREQ(pRsp->stbFname, "");
|
||||
EXPECT_STREQ(pRsp->tbName, "1.d1");
|
||||
EXPECT_STREQ(pRsp->stbName, "stb");
|
||||
EXPECT_EQ(pRsp->numOfColumns, 2);
|
||||
EXPECT_EQ(pRsp->numOfTags, 3);
|
||||
EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
|
||||
|
|
|
@ -82,7 +82,11 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
int msgLen = 0;
|
||||
int32_t code = TSDB_CODE_VND_APP_ERROR;
|
||||
|
||||
pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tableFname, &uid);
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
snprintf(tbFName, sizeof(tbFName), "%s.%s", pReq->dbFName, pReq->tbName);
|
||||
|
||||
|
||||
pTbCfg = metaGetTbInfoByName(pVnode->pMeta, tbFName, &uid);
|
||||
if (pTbCfg == NULL) {
|
||||
code = TSDB_CODE_VND_TB_NOT_EXIST;
|
||||
goto _exit;
|
||||
|
@ -117,13 +121,13 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
goto _exit;
|
||||
}
|
||||
|
||||
memcpy(pTbMetaMsg->dbFname, pReq->dbFname, sizeof(pTbMetaMsg->dbFname));
|
||||
strcpy(pTbMetaMsg->tbFname, pTbCfg->name);
|
||||
memcpy(pTbMetaMsg->dbFName, pReq->dbFName, sizeof(pTbMetaMsg->dbFName));
|
||||
strcpy(pTbMetaMsg->tbName, pReq->tbName);
|
||||
if (pTbCfg->type == META_CHILD_TABLE) {
|
||||
strcpy(pTbMetaMsg->stbFname, pStbCfg->name);
|
||||
strcpy(pTbMetaMsg->stbName, pStbCfg->name + strlen(pReq->dbFName) + 1);
|
||||
pTbMetaMsg->suid = htobe64(pTbCfg->ctbCfg.suid);
|
||||
} else if (pTbCfg->type == META_SUPER_TABLE) {
|
||||
strcpy(pTbMetaMsg->stbFname, pTbCfg->name);
|
||||
strcpy(pTbMetaMsg->stbName, pTbCfg->name + strlen(pReq->dbFName) + 1);
|
||||
pTbMetaMsg->suid = htobe64(uid);
|
||||
}
|
||||
pTbMetaMsg->numOfTags = htonl(nTagCols);
|
||||
|
|
|
@ -47,55 +47,52 @@ enum {
|
|||
CTG_RENT_STABLE,
|
||||
};
|
||||
|
||||
typedef struct SCTGDebug {
|
||||
typedef struct SCtgDebug {
|
||||
int32_t lockDebug;
|
||||
} SCTGDebug;
|
||||
} SCtgDebug;
|
||||
|
||||
|
||||
typedef struct SVgroupListCache {
|
||||
int32_t vgroupVersion;
|
||||
SHashObj *cache; // key:vgId, value:SVgroupInfo
|
||||
} SVgroupListCache;
|
||||
typedef struct SCtgTbMetaCache {
|
||||
SRWLatch stbLock;
|
||||
SHashObj *cache; //key:tbname, value:STableMeta
|
||||
SHashObj *stbCache; //key:suid, value:STableMeta*
|
||||
} SCtgTbMetaCache;
|
||||
|
||||
typedef struct SDBVgroupCache {
|
||||
SHashObj *cache; //key:dbname, value:SDBVgroupInfo
|
||||
} SDBVgroupCache;
|
||||
typedef struct SCtgDBCache {
|
||||
SRWLatch vgLock;
|
||||
int8_t deleted;
|
||||
SDBVgroupInfo *vgInfo;
|
||||
SCtgTbMetaCache tbCache;
|
||||
} SCtgDBCache;
|
||||
|
||||
typedef struct STableMetaCache {
|
||||
SRWLatch stableLock;
|
||||
SHashObj *cache; //key:fulltablename, value:STableMeta
|
||||
SHashObj *stableCache; //key:suid, value:STableMeta*
|
||||
} STableMetaCache;
|
||||
|
||||
typedef struct SRentSlotInfo {
|
||||
typedef struct SCtgRentSlot {
|
||||
SRWLatch lock;
|
||||
bool needSort;
|
||||
SArray *meta; // element is SDbVgVersion or SSTableMetaVersion
|
||||
} SRentSlotInfo;
|
||||
} SCtgRentSlot;
|
||||
|
||||
typedef struct SMetaRentMgmt {
|
||||
typedef struct SCtgRentMgmt {
|
||||
int8_t type;
|
||||
uint16_t slotNum;
|
||||
uint16_t slotRIdx;
|
||||
int64_t lastReadMsec;
|
||||
SRentSlotInfo *slots;
|
||||
} SMetaRentMgmt;
|
||||
SCtgRentSlot *slots;
|
||||
} SCtgRentMgmt;
|
||||
|
||||
typedef struct SCatalog {
|
||||
uint64_t clusterId;
|
||||
SDBVgroupCache dbCache;
|
||||
STableMetaCache tableCache;
|
||||
SMetaRentMgmt dbRent;
|
||||
SMetaRentMgmt stableRent;
|
||||
uint64_t clusterId;
|
||||
SHashObj *dbCache; //key:dbname, value:SCtgDBCache
|
||||
SCtgRentMgmt dbRent;
|
||||
SCtgRentMgmt stbRent;
|
||||
} SCatalog;
|
||||
|
||||
typedef struct SCtgApiStat {
|
||||
|
||||
} SCtgApiStat;
|
||||
|
||||
typedef struct SCtgResourceStat {
|
||||
typedef struct SCtgRuntimeStat {
|
||||
|
||||
} SCtgResourceStat;
|
||||
} SCtgRuntimeStat;
|
||||
|
||||
typedef struct SCtgCacheStat {
|
||||
|
||||
|
@ -103,7 +100,7 @@ typedef struct SCtgCacheStat {
|
|||
|
||||
typedef struct SCatalogStat {
|
||||
SCtgApiStat api;
|
||||
SCtgResourceStat resource;
|
||||
SCtgRuntimeStat runtime;
|
||||
SCtgCacheStat cache;
|
||||
} SCatalogStat;
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -128,15 +128,11 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) {
|
|||
strcpy(sn.dbname, "db1");
|
||||
strcpy(sn.tname, ctgTestSTablename);
|
||||
|
||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(&cn, tbFullName);
|
||||
|
||||
strcpy(output->dbFName, cn.dbname);
|
||||
SET_META_TYPE_BOTH_TABLE(output->metaType);
|
||||
|
||||
strcpy(output->ctbFname, tbFullName);
|
||||
|
||||
tNameExtractFullName(&cn, tbFullName);
|
||||
strcpy(output->tbFname, tbFullName);
|
||||
strcpy(output->ctbName, cn.tname);
|
||||
strcpy(output->tbName, sn.tname);
|
||||
|
||||
output->ctbMeta.vgId = 9;
|
||||
output->ctbMeta.tableType = TSDB_CHILD_TABLE;
|
||||
|
@ -186,7 +182,7 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
|
|||
|
||||
dbVgroup->hashMethod = 0;
|
||||
dbVgroup->dbId = ctgTestDbId;
|
||||
dbVgroup->vgInfo = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
dbVgroup->vgHash = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
|
||||
vgNum = ctgTestGetVgNumFromVgVersion(dbVgroup->vgVersion);
|
||||
uint32_t hashUnit = UINT32_MAX / vgNum;
|
||||
|
@ -203,7 +199,7 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
|
|||
addr->port = htons(n + 22);
|
||||
}
|
||||
|
||||
taosHashPut(dbVgroup->vgInfo, &vgInfo.vgId, sizeof(vgInfo.vgId), &vgInfo, sizeof(vgInfo));
|
||||
taosHashPut(dbVgroup->vgHash, &vgInfo.vgId, sizeof(vgInfo.vgId), &vgInfo, sizeof(vgInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -250,7 +246,8 @@ void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
|
|||
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
|
||||
pRsp->pCont = calloc(1, pRsp->contLen);
|
||||
rspMsg = (STableMetaRsp *)pRsp->pCont;
|
||||
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestTablename);
|
||||
strcpy(rspMsg->dbFName, ctgTestDbname);
|
||||
strcpy(rspMsg->tbName, ctgTestTablename);
|
||||
rspMsg->numOfTags = 0;
|
||||
rspMsg->numOfColumns = htonl(ctgTestColNum);
|
||||
rspMsg->precision = 1;
|
||||
|
@ -285,8 +282,9 @@ void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
|
|||
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
|
||||
pRsp->pCont = calloc(1, pRsp->contLen);
|
||||
rspMsg = (STableMetaRsp *)pRsp->pCont;
|
||||
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestCTablename);
|
||||
sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename);
|
||||
strcpy(rspMsg->dbFName, ctgTestDbname);
|
||||
strcpy(rspMsg->tbName, ctgTestCTablename);
|
||||
strcpy(rspMsg->stbName, ctgTestSTablename);
|
||||
rspMsg->numOfTags = htonl(ctgTestTagNum);
|
||||
rspMsg->numOfColumns = htonl(ctgTestColNum);
|
||||
rspMsg->precision = 1;
|
||||
|
@ -327,8 +325,9 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
|
|||
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
|
||||
pRsp->pCont = calloc(1, pRsp->contLen);
|
||||
rspMsg = (STableMetaRsp *)pRsp->pCont;
|
||||
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestSTablename);
|
||||
sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename);
|
||||
strcpy(rspMsg->dbFName, ctgTestDbname);
|
||||
strcpy(rspMsg->tbName, ctgTestSTablename);
|
||||
strcpy(rspMsg->stbName, ctgTestSTablename);
|
||||
rspMsg->numOfTags = htonl(ctgTestTagNum);
|
||||
rspMsg->numOfColumns = htonl(ctgTestColNum);
|
||||
rspMsg->precision = 1;
|
||||
|
@ -370,8 +369,9 @@ void ctgTestPrepareMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg,
|
|||
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
|
||||
pRsp->pCont = calloc(1, pRsp->contLen);
|
||||
rspMsg = (STableMetaRsp *)pRsp->pCont;
|
||||
sprintf(rspMsg->tbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx);
|
||||
sprintf(rspMsg->stbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx);
|
||||
strcpy(rspMsg->dbFName, ctgTestDbname);
|
||||
strcpy(rspMsg->tbName, ctgTestSTablename);
|
||||
sprintf(rspMsg->stbName, "%s_%d", ctgTestSTablename, idx);
|
||||
rspMsg->numOfTags = htonl(ctgTestTagNum);
|
||||
rspMsg->numOfColumns = htonl(ctgTestColNum);
|
||||
rspMsg->precision = 1;
|
||||
|
@ -1291,4 +1291,4 @@ int main(int argc, char **argv) {
|
|||
return RUN_ALL_TESTS();
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic pop
|
||||
#pragma GCC diagnostic pop
|
||||
|
|
|
@ -45,11 +45,11 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
|
|||
|
||||
bMsg->header.vgId = htonl(bInput->vgId);
|
||||
|
||||
if (bInput->dbName) {
|
||||
tstrncpy(bMsg->dbFname, bInput->dbName, tListLen(bMsg->dbFname));
|
||||
if (bInput->dbFName) {
|
||||
tstrncpy(bMsg->dbFName, bInput->dbFName, tListLen(bMsg->dbFName));
|
||||
}
|
||||
|
||||
tstrncpy(bMsg->tableFname, bInput->tableFullName, tListLen(bMsg->tableFname));
|
||||
tstrncpy(bMsg->tbName, bInput->tbName, tListLen(bMsg->tbName));
|
||||
|
||||
*msgLen = (int32_t)sizeof(*bMsg);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -113,12 +113,19 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
|
|||
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
||||
}
|
||||
|
||||
pOut->dbVgroup.vgVersion = pRsp->vgVersion;
|
||||
pOut->dbVgroup.hashMethod = pRsp->hashMethod;
|
||||
pOut->dbVgroup.dbId = pRsp->uid;
|
||||
pOut->dbVgroup.vgInfo = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == pOut->dbVgroup.vgInfo) {
|
||||
qError("hash init[%d] failed", pRsp->vgNum);
|
||||
pOut->dbVgroup = calloc(1, sizeof(SDBVgroupInfo));
|
||||
if (NULL == pOut->dbVgroup) {
|
||||
qError("calloc %d failed", (int32_t)sizeof(SDBVgroupInfo));
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pOut->dbVgroup->vgVersion = pRsp->vgVersion;
|
||||
pOut->dbVgroup->hashMethod = pRsp->hashMethod;
|
||||
pOut->dbVgroup->dbId = pRsp->uid;
|
||||
pOut->dbVgroup->vgHash = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == pOut->dbVgroup->vgHash) {
|
||||
qError("taosHashInit %d failed", pRsp->vgNum);
|
||||
tfree(pOut->dbVgroup);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -131,8 +138,8 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
|
|||
pRsp->vgroupInfo[i].epset.eps[n].port = ntohs(pRsp->vgroupInfo[i].epset.eps[n].port);
|
||||
}
|
||||
|
||||
if (0 != taosHashPut(pOut->dbVgroup.vgInfo, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) {
|
||||
qError("hash push failed");
|
||||
if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) {
|
||||
qError("taosHashPut failed");
|
||||
goto _return;
|
||||
}
|
||||
}
|
||||
|
@ -142,8 +149,10 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
|
|||
return code;
|
||||
|
||||
_return:
|
||||
|
||||
if (pOut) {
|
||||
tfree(pOut->dbVgroup.vgInfo);
|
||||
taosHashCleanup(pOut->dbVgroup->vgHash);
|
||||
tfree(pOut->dbVgroup);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -248,16 +257,13 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
|
|||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
}
|
||||
|
||||
strcpy(pOut->dbFName, pMetaMsg->dbFName);
|
||||
|
||||
if (pMetaMsg->tableType == TSDB_CHILD_TABLE) {
|
||||
SET_META_TYPE_BOTH_TABLE(pOut->metaType);
|
||||
|
||||
if (pMetaMsg->dbFname[0]) {
|
||||
snprintf(pOut->ctbFname, sizeof(pOut->ctbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);
|
||||
snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->stbFname);
|
||||
} else {
|
||||
memcpy(pOut->ctbFname, pMetaMsg->tbFname, sizeof(pOut->ctbFname));
|
||||
memcpy(pOut->tbFname, pMetaMsg->stbFname, sizeof(pOut->tbFname));
|
||||
}
|
||||
strcpy(pOut->ctbName, pMetaMsg->tbName);
|
||||
strcpy(pOut->tbName, pMetaMsg->stbName);
|
||||
|
||||
pOut->ctbMeta.vgId = pMetaMsg->vgId;
|
||||
pOut->ctbMeta.tableType = pMetaMsg->tableType;
|
||||
|
@ -268,11 +274,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
|
|||
} else {
|
||||
SET_META_TYPE_TABLE(pOut->metaType);
|
||||
|
||||
if (pMetaMsg->dbFname[0]) {
|
||||
snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);
|
||||
} else {
|
||||
memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname));
|
||||
}
|
||||
strcpy(pOut->tbName, pMetaMsg->tbName);
|
||||
|
||||
code = queryCreateTableMetaFromMsg(pMetaMsg, (pMetaMsg->tableType == TSDB_SUPER_TABLE), &pOut->tbMeta);
|
||||
}
|
||||
|
@ -291,4 +293,4 @@ void initQueryModuleMsgHandle() {
|
|||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic pop
|
||||
#pragma GCC diagnostic pop
|
||||
|
|
Loading…
Reference in New Issue