feature/qnode

This commit is contained in:
dapan1121 2022-01-07 09:56:11 +08:00
parent d3c67de054
commit c584a88432
7 changed files with 101 additions and 33 deletions

View File

@ -87,8 +87,15 @@ typedef struct SUseDbOutput {
SDBVgroupInfo dbVgroup;
} SUseDbOutput;
typedef enum {
META_TYPE_NON_TABLE = 1,
META_TYPE_CTABLE,
META_TYPE_TABLE,
META_TYPE_BOTH_TABLE,
};
typedef struct STableMetaOutput {
int32_t metaNum;
int32_t metaType;
char ctbFname[TSDB_TABLE_FNAME_LEN];
char tbFname[TSDB_TABLE_FNAME_LEN];
SCTableMeta ctbMeta;
@ -150,6 +157,11 @@ void initQueryModuleMsgHandle();
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize);
#define SET_META_TYPE_NONE(t) (t) = META_TYPE_NON_TABLE
#define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE
#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0)

View File

@ -769,7 +769,8 @@ static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
pMeta->tableType = TSDB_SUPER_TABLE;
pMeta->update = pDb->cfg.update;
pMeta->sversion = htonl(pStb->version);
pMeta->suid = htonl(pStb->uid);
pMeta->suid = htobe64(pStb->uid);
pMeta->tuid = htobe64(pStb->uid);
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pMeta->pSchema[i];

View File

@ -105,6 +105,9 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
if (pTbCfg->type == META_CHILD_TABLE) {
strcpy(pTbMetaMsg->stbFname, pStbCfg->name);
pTbMetaMsg->suid = htobe64(pTbCfg->ctbCfg.suid);
} else if (pTbCfg->type == META_SUPER_TABLE) {
strcpy(pTbMetaMsg->stbFname, pTbCfg->name);
pTbMetaMsg->suid = htobe64(uid);
}
pTbMetaMsg->numOfTags = htonl(nTagCols);
pTbMetaMsg->numOfColumns = htonl(nCols);

View File

@ -35,6 +35,8 @@ extern "C" {
#define CTG_DEFAULT_INVALID_VERSION (-1)
#define CTG_ERR_CODE_TABLE_NOT_EXIST TSDB_CODE_TDB_INVALID_TABLE_ID
enum {
CTG_READ = 1,
CTG_WRITE,
@ -88,13 +90,18 @@ typedef struct SCatalogMgmt {
typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_IS_META_NONE(type) ((type) == META_TYPE_NON_TABLE)
#define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE)
#define CTG_IS_META_TABLE(type) ((type) == META_TYPE_TABLE)
#define CTG_IS_META_BOTH(type) ((type) == META_TYPE_BOTH_TABLE)
#define CTG_IS_STABLE(isSTable) (1 == (isSTable))
#define CTG_IS_NOT_STABLE(isSTable) (0 == (isSTable))
#define CTG_IS_UNKNOWN_STABLE(isSTable) ((isSTable) < 0)
#define CTG_SET_STABLE(isSTable, tbType) do { (isSTable) = ((tbType) == TSDB_SUPER_TABLE) ? 1 : ((tbType) > TSDB_SUPER_TABLE ? 0 : -1); } while (0)
#define CTG_TBTYPE_MATCH(isSTable, tbType) (CTG_IS_UNKNOWN_STABLE(isSTable) || (CTG_IS_STABLE(isSTable) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STABLE(isSTable) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_TABLE_NOT_EXIST(code) (code == TSDB_CODE_TDB_INVALID_TABLE_ID)
#define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST)
#define ctgFatal(param, ...) qFatal("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgError(param, ...) qError("CTG:%p " param, pCatalog, __VA_ARGS__)

View File

@ -96,6 +96,29 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
return TSDB_CODE_SUCCESS;
}
int32_t ctgIsTableMetaExistInCache(struct SCatalog* pCatalog, const char* tbFullName, int32_t *exist) {
if (NULL == pCatalog->tableCache.cache) {
*exist = 0;
ctgWarn("empty tablemeta cache, tbName:%s", tbFullName);
return TSDB_CODE_SUCCESS;
}
size_t sz = 0;
STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName));
if (NULL == tbMeta) {
*exist = 0;
ctgDebug("tablemeta not in cache, tbName:%s", tbFullName);
return TSDB_CODE_SUCCESS;
}
*exist = 1;
ctgDebug("tablemeta is in cache, tbName:%s", tbFullName);
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist) {
if (NULL == pCatalog->tableCache.cache) {
@ -201,7 +224,7 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
}
}
int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, char* tbFullName, STableMetaOutput* output) {
int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, char* tbFullName, STableMetaOutput* output) {
SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName};
char *msg = NULL;
SEpSet *pVnodeEpSet = NULL;
@ -223,11 +246,11 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, cons
SRpcMsg rpcRsp = {0};
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
rpcSendRecv(pTransporter, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
output->metaNum = 0;
SET_META_TYPE_NONE(output->metaType);
ctgDebug("stablemeta not exist in mnode, tbName:%s", tbFullName);
return TSDB_CODE_SUCCESS;
}
@ -247,15 +270,15 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, cons
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFullName);
return ctgGetTableMetaFromMnodeImpl(pCatalog, pRpc, pMgmtEps, tbFullName, output);
return ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, tbFullName, output);
}
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
@ -285,11 +308,11 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE
SEpSet epSet;
ctgGenEpSet(&epSet, vgroupInfo);
rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp);
rpcSendRecv(pTransporter, &epSet, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
output->metaNum = 0;
SET_META_TYPE_NONE(output->metaType);
ctgDebug("tablemeta not exist in vnode, tbName:%s", pTableName->tname);
return TSDB_CODE_SUCCESS;
}
@ -589,11 +612,6 @@ int32_t ctgMetaRentGet(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t s
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
int32_t code = 0;
if (output->metaNum != 1 && output->metaNum != 2) {
ctgError("invalid table meta number in meta rsp, num:%d", output->metaNum);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (NULL == output->tbMeta) {
ctgError("no valid table meta got from meta rsp, tbName:%s", output->tbFname);
@ -624,20 +642,24 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
}
}
if (output->metaNum == 2) {
if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
ctgError("taosHashPut ctablemeta to cache failed, ctbName:%s", output->ctbFname);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
ctgDebug("update tablemeta to cache, tbName:%s", output->ctbFname);
if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
ctgDebug("update child tablemeta to cache, tbName:%s", output->ctbFname);
}
if (CTG_IS_META_CTABLE(output->metaType)) {
return TSDB_CODE_SUCCESS;
}
if (CTG_IS_META_BOTH(output->metaType) && TSDB_SUPER_TABLE != output->tbMeta->tableType) {
ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags);
if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
@ -757,30 +779,53 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con
if (CTG_IS_STABLE(isSTable)) {
ctgDebug("will renew table meta, supposed to be stable, tbName:%s", pTableName->tname);
// if get from mnode failed, will not try vnode
CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput));
if (0 == moutput.metaNum) {
if (CTG_IS_META_NONE(moutput.metaType)) {
CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput));
} else {
output = &moutput;
}
} else {
ctgDebug("will renew table meta, not supposed to be stable, tbName:%s, isStable:%d", pTableName->tname, isSTable);
// if get from vnode failed or no table meta, will not try mnode
CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput));
if (voutput.metaNum > 0 && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) {
ctgDebug("will continue to renew table meta since got stable, tbName:%s, metaNum:%d", pTableName->tname, voutput.metaNum);
if (CTG_IS_META_TABLE(voutput.metaType) && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) {
ctgDebug("will continue to renew table meta since got stable, tbName:%s, metaType:%d", pTableName->tname, voutput.metaType);
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput));
tfree(voutput.tbMeta);
voutput.tbMeta = moutput.tbMeta;
moutput.tbMeta = NULL;
} else if (CTG_IS_META_BOTH(voutput.metaType)) {
int32_t exist = 0;
CTG_ERR_JRET(ctgIsTableMetaExistInCache(pCatalog, voutput.tbFname, &exist));
if (0 == exist) {
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput));
if (CTG_IS_META_NONE(moutput.metaType)) {
SET_META_TYPE_NONE(voutput.metaType);
}
tfree(voutput.tbMeta);
voutput.tbMeta = moutput.tbMeta;
moutput.tbMeta = NULL;
} else {
SET_META_TYPE_CTABLE(voutput.metaType);
}
}
}
if (CTG_IS_META_NONE(output->metaType)) {
ctgError("no tablemeta got, tbNmae:%s", pTableName->tname);
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
}
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, output));
_return:

View File

@ -128,7 +128,7 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) {
char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&cn, tbFullName);
output->metaNum = 2;
SET_META_TYPE_BOTH_TABLE(output->metaType);
strcpy(output->ctbFname, tbFullName);

View File

@ -210,7 +210,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl
pTableMeta->vgId = isSuperTable ? 0 : msg->vgId;
pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType;
pTableMeta->uid = msg->tuid;
pTableMeta->uid = isSuperTable ? msg->suid : msg->tuid;
pTableMeta->suid = msg->suid;
pTableMeta->sversion = msg->sversion;
pTableMeta->tversion = msg->tversion;
@ -246,7 +246,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
}
if (pMetaMsg->tableType == TSDB_CHILD_TABLE) {
pOut->metaNum = 2;
SET_META_TYPE_BOTH_TABLE(pOut->metaType);
if (pMetaMsg->dbFname[0]) {
snprintf(pOut->ctbFname, sizeof(pOut->ctbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);
@ -263,7 +263,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
code = queryCreateTableMetaFromMsg(pMetaMsg, true, &pOut->tbMeta);
} else {
pOut->metaNum = 1;
SET_META_TYPE_TABLE(pOut->metaType);
if (pMetaMsg->dbFname[0]) {
snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);