From c584a88432e1ee6acddeb8870b9fb19370447af1 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 7 Jan 2022 09:56:11 +0800 Subject: [PATCH] feature/qnode --- include/libs/qcom/query.h | 14 +++- source/dnode/mnode/impl/src/mndStb.c | 3 +- source/dnode/vnode/impl/src/vnodeQuery.c | 3 + source/libs/catalog/inc/catalogInt.h | 9 ++- source/libs/catalog/src/catalog.c | 97 +++++++++++++++++------ source/libs/catalog/test/catalogTests.cpp | 2 +- source/libs/qcom/src/querymsg.c | 6 +- 7 files changed, 101 insertions(+), 33 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 637035698b..e1eef1c3f5 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -87,8 +87,15 @@ typedef struct SUseDbOutput { SDBVgroupInfo dbVgroup; } SUseDbOutput; +typedef enum { + META_TYPE_NON_TABLE = 1, + META_TYPE_CTABLE, + META_TYPE_TABLE, + META_TYPE_BOTH_TABLE, +}; + typedef struct STableMetaOutput { - int32_t metaNum; + int32_t metaType; char ctbFname[TSDB_TABLE_FNAME_LEN]; char tbFname[TSDB_TABLE_FNAME_LEN]; SCTableMeta ctbMeta; @@ -150,6 +157,11 @@ void initQueryModuleMsgHandle(); extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize); +#define SET_META_TYPE_NONE(t) (t) = META_TYPE_NON_TABLE +#define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE +#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE +#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE + #define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0) #define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0) #define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0) diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 2609faa41a..d6b6a07de0 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -769,7 +769,8 @@ static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) { pMeta->tableType = TSDB_SUPER_TABLE; pMeta->update = pDb->cfg.update; pMeta->sversion = htonl(pStb->version); - pMeta->suid = htonl(pStb->uid); + pMeta->suid = htobe64(pStb->uid); + pMeta->tuid = htobe64(pStb->uid); for (int32_t i = 0; i < totalCols; ++i) { SSchema *pSchema = &pMeta->pSchema[i]; diff --git a/source/dnode/vnode/impl/src/vnodeQuery.c b/source/dnode/vnode/impl/src/vnodeQuery.c index 308fc9d2e5..a32ee50df5 100644 --- a/source/dnode/vnode/impl/src/vnodeQuery.c +++ b/source/dnode/vnode/impl/src/vnodeQuery.c @@ -105,6 +105,9 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { if (pTbCfg->type == META_CHILD_TABLE) { strcpy(pTbMetaMsg->stbFname, pStbCfg->name); pTbMetaMsg->suid = htobe64(pTbCfg->ctbCfg.suid); + } else if (pTbCfg->type == META_SUPER_TABLE) { + strcpy(pTbMetaMsg->stbFname, pTbCfg->name); + pTbMetaMsg->suid = htobe64(uid); } pTbMetaMsg->numOfTags = htonl(nTagCols); pTbMetaMsg->numOfColumns = htonl(nCols); diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 00a50e90eb..439bb21189 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -35,6 +35,8 @@ extern "C" { #define CTG_DEFAULT_INVALID_VERSION (-1) +#define CTG_ERR_CODE_TABLE_NOT_EXIST TSDB_CODE_TDB_INVALID_TABLE_ID + enum { CTG_READ = 1, CTG_WRITE, @@ -88,13 +90,18 @@ typedef struct SCatalogMgmt { typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); +#define CTG_IS_META_NONE(type) ((type) == META_TYPE_NON_TABLE) +#define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE) +#define CTG_IS_META_TABLE(type) ((type) == META_TYPE_TABLE) +#define CTG_IS_META_BOTH(type) ((type) == META_TYPE_BOTH_TABLE) + #define CTG_IS_STABLE(isSTable) (1 == (isSTable)) #define CTG_IS_NOT_STABLE(isSTable) (0 == (isSTable)) #define CTG_IS_UNKNOWN_STABLE(isSTable) ((isSTable) < 0) #define CTG_SET_STABLE(isSTable, tbType) do { (isSTable) = ((tbType) == TSDB_SUPER_TABLE) ? 1 : ((tbType) > TSDB_SUPER_TABLE ? 0 : -1); } while (0) #define CTG_TBTYPE_MATCH(isSTable, tbType) (CTG_IS_UNKNOWN_STABLE(isSTable) || (CTG_IS_STABLE(isSTable) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STABLE(isSTable) && (tbType) != TSDB_SUPER_TABLE)) -#define CTG_TABLE_NOT_EXIST(code) (code == TSDB_CODE_TDB_INVALID_TABLE_ID) +#define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST) #define ctgFatal(param, ...) qFatal("CTG:%p " param, pCatalog, __VA_ARGS__) #define ctgError(param, ...) qError("CTG:%p " param, pCatalog, __VA_ARGS__) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 3efee95179..9a7129d480 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -96,6 +96,29 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp return TSDB_CODE_SUCCESS; } +int32_t ctgIsTableMetaExistInCache(struct SCatalog* pCatalog, const char* tbFullName, int32_t *exist) { + if (NULL == pCatalog->tableCache.cache) { + *exist = 0; + ctgWarn("empty tablemeta cache, tbName:%s", tbFullName); + return TSDB_CODE_SUCCESS; + } + + size_t sz = 0; + STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName)); + + if (NULL == tbMeta) { + *exist = 0; + ctgDebug("tablemeta not in cache, tbName:%s", tbFullName); + return TSDB_CODE_SUCCESS; + } + + *exist = 1; + + ctgDebug("tablemeta is in cache, tbName:%s", tbFullName); + + return TSDB_CODE_SUCCESS; +} + int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist) { if (NULL == pCatalog->tableCache.cache) { @@ -201,7 +224,7 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) { } } -int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, char* tbFullName, STableMetaOutput* output) { +int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, char* tbFullName, STableMetaOutput* output) { SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName}; char *msg = NULL; SEpSet *pVnodeEpSet = NULL; @@ -223,11 +246,11 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, cons SRpcMsg rpcRsp = {0}; - rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + rpcSendRecv(pTransporter, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { - output->metaNum = 0; + SET_META_TYPE_NONE(output->metaType); ctgDebug("stablemeta not exist in mnode, tbName:%s", tbFullName); return TSDB_CODE_SUCCESS; } @@ -247,15 +270,15 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, cons return TSDB_CODE_SUCCESS; } -int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) { +int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) { char tbFullName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pTableName, tbFullName); - return ctgGetTableMetaFromMnodeImpl(pCatalog, pRpc, pMgmtEps, tbFullName, output); + return ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, tbFullName, output); } -int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { - if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) { +int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { + if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } @@ -285,11 +308,11 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE SEpSet epSet; ctgGenEpSet(&epSet, vgroupInfo); - rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp); + rpcSendRecv(pTransporter, &epSet, &rpcMsg, &rpcRsp); if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { - output->metaNum = 0; + SET_META_TYPE_NONE(output->metaType); ctgDebug("tablemeta not exist in vnode, tbName:%s", pTableName->tname); return TSDB_CODE_SUCCESS; } @@ -589,11 +612,6 @@ int32_t ctgMetaRentGet(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t s int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) { int32_t code = 0; - - if (output->metaNum != 1 && output->metaNum != 2) { - ctgError("invalid table meta number in meta rsp, num:%d", output->metaNum); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); - } if (NULL == output->tbMeta) { ctgError("no valid table meta got from meta rsp, tbName:%s", output->tbFname); @@ -624,20 +642,24 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out } } - if (output->metaNum == 2) { + if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) { if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) { ctgError("taosHashPut ctablemeta to cache failed, ctbName:%s", output->ctbFname); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } - ctgDebug("update tablemeta to cache, tbName:%s", output->ctbFname); - - if (TSDB_SUPER_TABLE != output->tbMeta->tableType) { - ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); - } + ctgDebug("update child tablemeta to cache, tbName:%s", output->ctbFname); } + if (CTG_IS_META_CTABLE(output->metaType)) { + return TSDB_CODE_SUCCESS; + } + + if (CTG_IS_META_BOTH(output->metaType) && TSDB_SUPER_TABLE != output->tbMeta->tableType) { + ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags); if (TSDB_SUPER_TABLE == output->tbMeta->tableType) { @@ -757,30 +779,53 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con if (CTG_IS_STABLE(isSTable)) { ctgDebug("will renew table meta, supposed to be stable, tbName:%s", pTableName->tname); - + + // if get from mnode failed, will not try vnode CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput)); - if (0 == moutput.metaNum) { + if (CTG_IS_META_NONE(moutput.metaType)) { CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput)); } else { output = &moutput; } } else { ctgDebug("will renew table meta, not supposed to be stable, tbName:%s, isStable:%d", pTableName->tname, isSTable); - + + // if get from vnode failed or no table meta, will not try mnode CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput)); - if (voutput.metaNum > 0 && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) { - ctgDebug("will continue to renew table meta since got stable, tbName:%s, metaNum:%d", pTableName->tname, voutput.metaNum); + if (CTG_IS_META_TABLE(voutput.metaType) && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) { + ctgDebug("will continue to renew table meta since got stable, tbName:%s, metaType:%d", pTableName->tname, voutput.metaType); CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput)); tfree(voutput.tbMeta); voutput.tbMeta = moutput.tbMeta; moutput.tbMeta = NULL; + } else if (CTG_IS_META_BOTH(voutput.metaType)) { + int32_t exist = 0; + CTG_ERR_JRET(ctgIsTableMetaExistInCache(pCatalog, voutput.tbFname, &exist)); + if (0 == exist) { + CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput)); + + if (CTG_IS_META_NONE(moutput.metaType)) { + SET_META_TYPE_NONE(voutput.metaType); + } + + tfree(voutput.tbMeta); + voutput.tbMeta = moutput.tbMeta; + moutput.tbMeta = NULL; + } else { + SET_META_TYPE_CTABLE(voutput.metaType); + } } } + if (CTG_IS_META_NONE(output->metaType)) { + ctgError("no tablemeta got, tbNmae:%s", pTableName->tname); + CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); + } + CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, output)); _return: diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 98a25a0f82..68e962795c 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -128,7 +128,7 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) { char tbFullName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(&cn, tbFullName); - output->metaNum = 2; + SET_META_TYPE_BOTH_TABLE(output->metaType); strcpy(output->ctbFname, tbFullName); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 914bf359f5..093e42a3d2 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -210,7 +210,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl pTableMeta->vgId = isSuperTable ? 0 : msg->vgId; pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType; - pTableMeta->uid = msg->tuid; + pTableMeta->uid = isSuperTable ? msg->suid : msg->tuid; pTableMeta->suid = msg->suid; pTableMeta->sversion = msg->sversion; pTableMeta->tversion = msg->tversion; @@ -246,7 +246,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { } if (pMetaMsg->tableType == TSDB_CHILD_TABLE) { - pOut->metaNum = 2; + SET_META_TYPE_BOTH_TABLE(pOut->metaType); if (pMetaMsg->dbFname[0]) { snprintf(pOut->ctbFname, sizeof(pOut->ctbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname); @@ -263,7 +263,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { code = queryCreateTableMetaFromMsg(pMetaMsg, true, &pOut->tbMeta); } else { - pOut->metaNum = 1; + SET_META_TYPE_TABLE(pOut->metaType); if (pMetaMsg->dbFname[0]) { snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);