feature/qnode
This commit is contained in:
parent
abf827ed63
commit
e9f7bf866a
|
@ -78,6 +78,7 @@ typedef struct SDbVgVersion {
|
||||||
} SDbVgVersion;
|
} SDbVgVersion;
|
||||||
|
|
||||||
typedef SDbCfgRsp SDbCfgInfo;
|
typedef SDbCfgRsp SDbCfgInfo;
|
||||||
|
typedef SUserIndexRsp SIndexInfo;
|
||||||
|
|
||||||
int32_t catalogInit(SCatalogCfg *cfg);
|
int32_t catalogInit(SCatalogCfg *cfg);
|
||||||
|
|
||||||
|
@ -221,6 +222,8 @@ int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *n
|
||||||
|
|
||||||
int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg);
|
int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg);
|
||||||
|
|
||||||
|
int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Destroy catalog and relase all resources
|
* Destroy catalog and relase all resources
|
||||||
|
|
|
@ -41,8 +41,8 @@ static int32_t mndProcessCompactDbReq(SNodeMsg *pReq);
|
||||||
static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||||
static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
|
||||||
static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq);
|
static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessGetIndexReq(SNodeMsg *pReq);
|
static int32_t mndProcessGetIndexReq(SNodeMsg *pReq);
|
||||||
|
|
||||||
int32_t mndInitDb(SMnode *pMnode) {
|
int32_t mndInitDb(SMnode *pMnode) {
|
||||||
SSdbTable table = {.sdbType = SDB_DB,
|
SSdbTable table = {.sdbType = SDB_DB,
|
||||||
|
@ -1684,6 +1684,8 @@ static int32_t mndProcessGetIndexReq(SNodeMsg *pReq) {
|
||||||
|
|
||||||
if (!exist) {
|
if (!exist) {
|
||||||
//TODO GET INDEX FROM FULLTEXT
|
//TODO GET INDEX FROM FULLTEXT
|
||||||
|
code = -1;
|
||||||
|
terrno = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
|
||||||
} else {
|
} else {
|
||||||
int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp);
|
int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp);
|
||||||
void *pRsp = rpcMallocCont(contLen);
|
void *pRsp = rpcMallocCont(contLen);
|
||||||
|
@ -1702,7 +1704,7 @@ static int32_t mndProcessGetIndexReq(SNodeMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
if (code != 0) {
|
||||||
mError("failed to get index %s since %s", indexReq.indexFName, terrstr());
|
mError("failed to get index %s since %s", indexReq.indexFName, terrstr());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -606,6 +606,43 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *indexName, SIndexInfo *out) {
|
||||||
|
char *msg = NULL;
|
||||||
|
int32_t msgLen = 0;
|
||||||
|
|
||||||
|
ctgDebug("try to get index from mnode, indexName:%s", indexName);
|
||||||
|
|
||||||
|
int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)]((void *)indexName, &msg, 0, &msgLen);
|
||||||
|
if (code) {
|
||||||
|
ctgError("Build get index msg failed, code:%x, db:%s", code, indexName);
|
||||||
|
CTG_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {
|
||||||
|
.msgType = TDMT_MND_GET_INDEX,
|
||||||
|
.pCont = msg,
|
||||||
|
.contLen = msgLen,
|
||||||
|
};
|
||||||
|
|
||||||
|
SRpcMsg rpcRsp = {0};
|
||||||
|
|
||||||
|
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
|
||||||
|
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
|
||||||
|
ctgError("error rsp for get index, error:%s, indexName:%s", tstrerror(rpcRsp.code), indexName);
|
||||||
|
CTG_ERR_RET(rpcRsp.code);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)](out, rpcRsp.pCont, rpcRsp.contLen);
|
||||||
|
if (code) {
|
||||||
|
ctgError("Process get index rsp failed, code:%x, indexName:%s", code, indexName);
|
||||||
|
CTG_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
ctgDebug("Got index from mnode, indexName:%s", indexName);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
|
int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
|
||||||
if (NULL == pCtg->dbCache) {
|
if (NULL == pCtg->dbCache) {
|
||||||
|
@ -2764,6 +2801,17 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
|
||||||
CTG_API_LEAVE(ctgGetDBCfgFromMnode(pCtg, pRpc, pMgmtEps, dbFName, pDbCfg));
|
CTG_API_LEAVE(ctgGetDBCfgFromMnode(pCtg, pRpc, pMgmtEps, dbFName, pDbCfg));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo) {
|
||||||
|
CTG_API_ENTER();
|
||||||
|
|
||||||
|
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == indexName || NULL == pInfo) {
|
||||||
|
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
CTG_API_LEAVE(ctgGetIndexInfoFromMnode(pCtg, pRpc, pMgmtEps, indexName, pInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void catalogDestroy(void) {
|
void catalogDestroy(void) {
|
||||||
qInfo("start to destroy catalog");
|
qInfo("start to destroy catalog");
|
||||||
|
|
||||||
|
|
|
@ -139,6 +139,25 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
|
||||||
|
if (NULL == msg || NULL == msgLen) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
SUserIndexReq indexReq = {0};
|
||||||
|
strcpy(indexReq.indexFName, input);
|
||||||
|
|
||||||
|
int32_t bufLen = tSerializeSUserIndexReq(NULL, 0, &indexReq);
|
||||||
|
void *pBuf = rpcMallocCont(bufLen);
|
||||||
|
tSerializeSUserIndexReq(pBuf, bufLen, &indexReq);
|
||||||
|
|
||||||
|
*msg = pBuf;
|
||||||
|
*msgLen = bufLen;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
|
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
|
||||||
SUseDbOutput *pOut = output;
|
SUseDbOutput *pOut = output;
|
||||||
SUseDbRsp usedbRsp = {0};
|
SUseDbRsp usedbRsp = {0};
|
||||||
|
@ -343,6 +362,22 @@ int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t queryProcessGetIndexRsp(void *output, char *msg, int32_t msgSize) {
|
||||||
|
SUserIndexRsp out = {0};
|
||||||
|
|
||||||
|
if (NULL == output || NULL == msg || msgSize <= 0) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tDeserializeSUserIndexRsp(msg, msgSize, &out) != 0) {
|
||||||
|
qError("tDeserializeSUserIndexRsp failed, msgSize:%d", msgSize);
|
||||||
|
return TSDB_CODE_INVALID_MSG;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(output, &out, sizeof(out));
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
void initQueryModuleMsgHandle() {
|
void initQueryModuleMsgHandle() {
|
||||||
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
|
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
|
||||||
|
@ -350,12 +385,14 @@ void initQueryModuleMsgHandle() {
|
||||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
|
queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
|
||||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
|
queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
|
||||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg;
|
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg;
|
||||||
|
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg;
|
||||||
|
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp;
|
||||||
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp;
|
||||||
}
|
}
|
||||||
|
|
||||||
#pragma GCC diagnostic pop
|
#pragma GCC diagnostic pop
|
||||||
|
|
Loading…
Reference in New Issue