Merge pull request #14204 from taosdata/enh/showvariables
enh: refactor show variables
This commit is contained in:
commit
f7ece71ff7
|
@ -510,7 +510,8 @@ typedef struct {
|
||||||
int8_t superUser;
|
int8_t superUser;
|
||||||
int8_t connType;
|
int8_t connType;
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
char sVersion[128];
|
char sVer[TSDB_VERSION_LEN];
|
||||||
|
char sDetailVer[128];
|
||||||
} SConnectRsp;
|
} SConnectRsp;
|
||||||
|
|
||||||
int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
|
int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
|
||||||
|
@ -836,6 +837,20 @@ typedef struct {
|
||||||
int32_t tSerializeSDnodeListReq(void* buf, int32_t bufLen, SDnodeListReq* pReq);
|
int32_t tSerializeSDnodeListReq(void* buf, int32_t bufLen, SDnodeListReq* pReq);
|
||||||
int32_t tDeserializeSDnodeListReq(void* buf, int32_t bufLen, SDnodeListReq* pReq);
|
int32_t tDeserializeSDnodeListReq(void* buf, int32_t bufLen, SDnodeListReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t useless; // useless
|
||||||
|
} SServerVerReq;
|
||||||
|
|
||||||
|
int32_t tSerializeSServerVerReq(void* buf, int32_t bufLen, SServerVerReq* pReq);
|
||||||
|
int32_t tDeserializeSServerVerReq(void* buf, int32_t bufLen, SServerVerReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char ver[TSDB_VERSION_LEN];
|
||||||
|
} SServerVerRsp;
|
||||||
|
|
||||||
|
int32_t tSerializeSServerVerRsp(void* buf, int32_t bufLen, SServerVerRsp* pRsp);
|
||||||
|
int32_t tDeserializeSServerVerRsp(void* buf, int32_t bufLen, SServerVerRsp* pRsp);
|
||||||
|
|
||||||
|
|
||||||
typedef struct SQueryNodeAddr {
|
typedef struct SQueryNodeAddr {
|
||||||
int32_t nodeId; // vgId or qnodeId
|
int32_t nodeId; // vgId or qnodeId
|
||||||
|
@ -1229,6 +1244,21 @@ typedef struct {
|
||||||
int32_t tSerializeSShowVariablesReq(void* buf, int32_t bufLen, SShowVariablesReq* pReq);
|
int32_t tSerializeSShowVariablesReq(void* buf, int32_t bufLen, SShowVariablesReq* pReq);
|
||||||
int32_t tDeserializeSShowVariablesReq(void* buf, int32_t bufLen, SShowVariablesReq* pReq);
|
int32_t tDeserializeSShowVariablesReq(void* buf, int32_t bufLen, SShowVariablesReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char name[TSDB_CONFIG_OPTION_LEN + 1];
|
||||||
|
char value[TSDB_CONFIG_VALUE_LEN + 1];
|
||||||
|
} SVariablesInfo;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SArray *variables; //SArray<SVariablesInfo>
|
||||||
|
} SShowVariablesRsp;
|
||||||
|
|
||||||
|
int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pReq);
|
||||||
|
int32_t tDeserializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pReq);
|
||||||
|
|
||||||
|
void tFreeSShowVariablesRsp(SShowVariablesRsp* pRsp);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* sql: show tables like '%a_%'
|
* sql: show tables like '%a_%'
|
||||||
* payload is the query condition, e.g., '%a_%'
|
* payload is the query condition, e.g., '%a_%'
|
||||||
|
|
|
@ -163,6 +163,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_REDISTRIBUTE_VGROUP, "redistribute-vgroup", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_REDISTRIBUTE_VGROUP, "redistribute-vgroup", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_VARIABLES, "show-variables", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_VARIABLES, "show-variables", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
|
||||||
|
|
|
@ -71,6 +71,7 @@ typedef struct SCatalogReq {
|
||||||
SArray* pTableCfg; // element is SNAME
|
SArray* pTableCfg; // element is SNAME
|
||||||
bool qNodeRequired; // valid qnode
|
bool qNodeRequired; // valid qnode
|
||||||
bool dNodeRequired; // valid dnode
|
bool dNodeRequired; // valid dnode
|
||||||
|
bool svrVerRequired;
|
||||||
bool forceUpdate;
|
bool forceUpdate;
|
||||||
} SCatalogReq;
|
} SCatalogReq;
|
||||||
|
|
||||||
|
@ -80,18 +81,19 @@ typedef struct SMetaRes {
|
||||||
} SMetaRes;
|
} SMetaRes;
|
||||||
|
|
||||||
typedef struct SMetaData {
|
typedef struct SMetaData {
|
||||||
SArray* pDbVgroup; // pRes = SArray<SVgroupInfo>*
|
SArray* pDbVgroup; // pRes = SArray<SVgroupInfo>*
|
||||||
SArray* pDbCfg; // pRes = SDbCfgInfo*
|
SArray* pDbCfg; // pRes = SDbCfgInfo*
|
||||||
SArray* pDbInfo; // pRes = SDbInfo*
|
SArray* pDbInfo; // pRes = SDbInfo*
|
||||||
SArray* pTableMeta; // pRes = STableMeta*
|
SArray* pTableMeta; // pRes = STableMeta*
|
||||||
SArray* pTableHash; // pRes = SVgroupInfo*
|
SArray* pTableHash; // pRes = SVgroupInfo*
|
||||||
SArray* pTableIndex; // pRes = SArray<STableIndexInfo>*
|
SArray* pTableIndex; // pRes = SArray<STableIndexInfo>*
|
||||||
SArray* pUdfList; // pRes = SFuncInfo*
|
SArray* pUdfList; // pRes = SFuncInfo*
|
||||||
SArray* pIndex; // pRes = SIndexInfo*
|
SArray* pIndex; // pRes = SIndexInfo*
|
||||||
SArray* pUser; // pRes = bool*
|
SArray* pUser; // pRes = bool*
|
||||||
SArray* pQnodeList; // pRes = SArray<SQueryNodeLoad>*
|
SArray* pQnodeList; // pRes = SArray<SQueryNodeLoad>*
|
||||||
SArray* pTableCfg; // pRes = STableCfg*
|
SArray* pTableCfg; // pRes = STableCfg*
|
||||||
SArray* pDnodeList; // pRes = SArray<SEpSet>*
|
SArray* pDnodeList; // pRes = SArray<SEpSet>*
|
||||||
|
SMetaRes* pSvrVer; // pRes = char*
|
||||||
} SMetaData;
|
} SMetaData;
|
||||||
|
|
||||||
typedef struct SCatalogCfg {
|
typedef struct SCatalogCfg {
|
||||||
|
@ -268,7 +270,7 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, SRequestConnInfo* pConn, c
|
||||||
*/
|
*/
|
||||||
int32_t catalogGetAllMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const SCatalogReq* pReq, SMetaData* pRsp);
|
int32_t catalogGetAllMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const SCatalogReq* pReq, SMetaData* pRsp);
|
||||||
|
|
||||||
int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId);
|
int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId);
|
||||||
|
|
||||||
int32_t catalogGetQnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray* pQnodeList);
|
int32_t catalogGetQnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray* pQnodeList);
|
||||||
|
|
||||||
|
@ -298,6 +300,8 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
|
||||||
|
|
||||||
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet);
|
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet);
|
||||||
|
|
||||||
|
int32_t catalogGetServerVersion(SCatalog* pCtg, SRequestConnInfo *pConn, char** pVersion);
|
||||||
|
|
||||||
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t reqId, bool forceUpdate);
|
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t reqId, bool forceUpdate);
|
||||||
|
|
||||||
int32_t catalogClearCache(void);
|
int32_t catalogClearCache(void);
|
||||||
|
|
|
@ -51,6 +51,8 @@ typedef struct SParseContext {
|
||||||
bool isSuperUser;
|
bool isSuperUser;
|
||||||
bool async;
|
bool async;
|
||||||
int8_t schemalessType;
|
int8_t schemalessType;
|
||||||
|
const char* svrVer;
|
||||||
|
bool nodeOffline;
|
||||||
} SParseContext;
|
} SParseContext;
|
||||||
|
|
||||||
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
|
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
|
||||||
|
|
|
@ -54,6 +54,11 @@ enum {
|
||||||
RES_TYPE__TMQ_META,
|
RES_TYPE__TMQ_META,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#define SHOW_VARIABLES_RESULT_COLS 2
|
||||||
|
#define SHOW_VARIABLES_RESULT_FIELD1_LEN (TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE)
|
||||||
|
#define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
|
||||||
|
|
||||||
|
|
||||||
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
|
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
|
||||||
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
|
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
|
||||||
#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META)
|
#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META)
|
||||||
|
@ -104,6 +109,8 @@ typedef struct SHeartBeatInfo {
|
||||||
struct SAppInstInfo {
|
struct SAppInstInfo {
|
||||||
int64_t numOfConns;
|
int64_t numOfConns;
|
||||||
SCorEpSet mgmtEp;
|
SCorEpSet mgmtEp;
|
||||||
|
int32_t totalDnodes;
|
||||||
|
int32_t onlineDnodes;
|
||||||
TdThreadMutex qnodeMutex;
|
TdThreadMutex qnodeMutex;
|
||||||
SArray* pQnodeList;
|
SArray* pQnodeList;
|
||||||
SAppClusterSummary summary;
|
SAppClusterSummary summary;
|
||||||
|
@ -127,7 +134,8 @@ typedef struct STscObj {
|
||||||
char user[TSDB_USER_LEN];
|
char user[TSDB_USER_LEN];
|
||||||
char pass[TSDB_PASSWORD_LEN];
|
char pass[TSDB_PASSWORD_LEN];
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
char ver[128];
|
char sVer[TSDB_VERSION_LEN];
|
||||||
|
char sDetailVer[128];
|
||||||
int8_t connType;
|
int8_t connType;
|
||||||
int32_t acctId;
|
int32_t acctId;
|
||||||
uint32_t connId;
|
uint32_t connId;
|
||||||
|
|
|
@ -161,6 +161,9 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
||||||
|
|
||||||
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
|
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
|
||||||
|
pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
|
||||||
pTscObj->connId = pRsp->query->connId;
|
pTscObj->connId = pRsp->query->connId;
|
||||||
|
|
||||||
if (pRsp->query->killRid) {
|
if (pRsp->query->killRid) {
|
||||||
|
|
|
@ -178,7 +178,9 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
|
||||||
.pStmtCb = pStmtCb,
|
.pStmtCb = pStmtCb,
|
||||||
.pUser = pTscObj->user,
|
.pUser = pTscObj->user,
|
||||||
.schemalessType = pTscObj->schemalessType,
|
.schemalessType = pTscObj->schemalessType,
|
||||||
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER))};
|
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
|
||||||
|
.svrVer = pTscObj->sVer,
|
||||||
|
.nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes)};
|
||||||
|
|
||||||
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
||||||
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
|
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
|
||||||
|
|
|
@ -623,7 +623,7 @@ const char *taos_get_server_info(TAOS *taos) {
|
||||||
|
|
||||||
releaseTscObj(*(int64_t *)taos);
|
releaseTscObj(*(int64_t *)taos);
|
||||||
|
|
||||||
return pTscObj->ver;
|
return pTscObj->sDetailVer;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SqlParseWrapper {
|
typedef struct SqlParseWrapper {
|
||||||
|
@ -766,7 +766,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||||
.requestObjRefId = pCxt->requestRid,
|
.requestObjRefId = pCxt->requestRid,
|
||||||
.mgmtEps = pCxt->mgmtEpSet};
|
.mgmtEps = pCxt->mgmtEpSet};
|
||||||
|
|
||||||
code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, pRequest->requestId, &catalogReq, retrieveMetaCallback, pWrapper,
|
code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper,
|
||||||
&pRequest->body.queryJob);
|
&pRequest->body.queryJob);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
return;
|
return;
|
||||||
|
@ -934,7 +934,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
||||||
|
|
||||||
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
||||||
|
|
||||||
code = catalogAsyncGetAllMeta(pCtg, &conn, pRequest->requestId, &catalogReq, syncCatalogFn, ¶m, NULL);
|
code = catalogAsyncGetAllMeta(pCtg, &conn, &catalogReq, syncCatalogFn, ¶m, NULL);
|
||||||
if (code) {
|
if (code) {
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,7 +82,8 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
|
|
||||||
pTscObj->connId = connectRsp.connId;
|
pTscObj->connId = connectRsp.connId;
|
||||||
pTscObj->acctId = connectRsp.acctId;
|
pTscObj->acctId = connectRsp.acctId;
|
||||||
tstrncpy(pTscObj->ver, connectRsp.sVersion, tListLen(pTscObj->ver));
|
tstrncpy(pTscObj->sVer, connectRsp.sVer, tListLen(pTscObj->sVer));
|
||||||
|
tstrncpy(pTscObj->sDetailVer, connectRsp.sDetailVer, tListLen(pTscObj->sDetailVer));
|
||||||
|
|
||||||
// update the appInstInfo
|
// update the appInstInfo
|
||||||
pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
|
pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
|
||||||
|
@ -287,6 +288,103 @@ int32_t processAlterStbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
|
||||||
|
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||||
|
pBlock->info.hasVarCol = true;
|
||||||
|
|
||||||
|
pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData));
|
||||||
|
|
||||||
|
SColumnInfoData infoData = {0};
|
||||||
|
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
|
||||||
|
infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD1_LEN;
|
||||||
|
|
||||||
|
taosArrayPush(pBlock->pDataBlock, &infoData);
|
||||||
|
|
||||||
|
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
|
||||||
|
infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD2_LEN;
|
||||||
|
taosArrayPush(pBlock->pDataBlock, &infoData);
|
||||||
|
|
||||||
|
int32_t numOfCfg = taosArrayGetSize(pVars);
|
||||||
|
blockDataEnsureCapacity(pBlock, numOfCfg);
|
||||||
|
|
||||||
|
for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
|
||||||
|
SVariablesInfo *pInfo = taosArrayGet(pVars, i);
|
||||||
|
|
||||||
|
char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
|
||||||
|
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
|
||||||
|
colDataAppend(pColInfo, i, name, false);
|
||||||
|
|
||||||
|
char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE);
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
|
||||||
|
colDataAppend(pColInfo, i, value, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
pBlock->info.rows = numOfCfg;
|
||||||
|
|
||||||
|
*block = pBlock;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
|
||||||
|
SSDataBlock* pBlock = NULL;
|
||||||
|
int32_t code = buildShowVariablesBlock(pVars, &pBlock);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
|
||||||
|
*pRsp = taosMemoryCalloc(1, rspSize);
|
||||||
|
if (NULL == *pRsp) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
(*pRsp)->useconds = 0;
|
||||||
|
(*pRsp)->completed = 1;
|
||||||
|
(*pRsp)->precision = 0;
|
||||||
|
(*pRsp)->compressed = 0;
|
||||||
|
(*pRsp)->compLen = 0;
|
||||||
|
(*pRsp)->numOfRows = htonl(pBlock->info.rows);
|
||||||
|
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
|
||||||
|
|
||||||
|
int32_t len = 0;
|
||||||
|
blockCompressEncode(pBlock, (*pRsp)->data, &len, SHOW_VARIABLES_RESULT_COLS, false);
|
||||||
|
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
|
||||||
|
|
||||||
|
blockDataDestroy(pBlock);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t processShowVariablesRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
|
SRequestObj* pRequest = param;
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
setErrno(pRequest, code);
|
||||||
|
} else {
|
||||||
|
SShowVariablesRsp rsp = {0};
|
||||||
|
SRetrieveTableRsp* pRes = NULL;
|
||||||
|
code = tDeserializeSShowVariablesRsp(pMsg->pData, pMsg->len, &rsp);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = buildShowVariablesRsp(rsp.variables, &pRes);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
tFreeSShowVariablesRsp(&rsp);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pRequest->body.queryFp != NULL) {
|
||||||
|
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
||||||
|
} else {
|
||||||
|
tsem_post(&pRequest->body.rspSem);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
|
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
|
||||||
switch (msgType) {
|
switch (msgType) {
|
||||||
case TDMT_MND_CONNECT:
|
case TDMT_MND_CONNECT:
|
||||||
|
@ -301,6 +399,8 @@ __async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
|
||||||
return processDropDbRsp;
|
return processDropDbRsp;
|
||||||
case TDMT_MND_ALTER_STB:
|
case TDMT_MND_ALTER_STB:
|
||||||
return processAlterStbRsp;
|
return processAlterStbRsp;
|
||||||
|
case TDMT_MND_SHOW_VARIABLES:
|
||||||
|
return processShowVariablesRsp;
|
||||||
default:
|
default:
|
||||||
return genericRspCallback;
|
return genericRspCallback;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2250,6 +2250,56 @@ int32_t tDeserializeSDnodeListReq(void *buf, int32_t bufLen, SDnodeListReq *pReq
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSServerVerReq(void *buf, int32_t bufLen, SServerVerReq *pReq) {
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->useless) < 0) return -1;
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSServerVerReq(void *buf, int32_t bufLen, SServerVerReq *pReq) {
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->useless) < 0) return -1;
|
||||||
|
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSServerVerRsp(void *buf, int32_t bufLen, SServerVerRsp *pRsp) {
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pRsp->ver) < 0) return -1;
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSServerVerRsp(void *buf, int32_t bufLen, SServerVerRsp *pRsp) {
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pRsp->ver) < 0) return -1;
|
||||||
|
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) {
|
int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) {
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
|
@ -2859,6 +2909,67 @@ int32_t tDeserializeSShowVariablesReq(void *buf, int32_t bufLen, SShowVariablesR
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeSVariablesInfo(SEncoder* pEncoder, SVariablesInfo* pInfo) {
|
||||||
|
if (tEncodeCStr(pEncoder, pInfo->name) < 0) return -1;
|
||||||
|
if (tEncodeCStr(pEncoder, pInfo->value) < 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeSVariablesInfo(SDecoder* pDecoder, SVariablesInfo* pInfo) {
|
||||||
|
if (tDecodeCStrTo(pDecoder, pInfo->name) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(pDecoder, pInfo->value) < 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pRsp) {
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
int32_t varNum = taosArrayGetSize(pRsp->variables);
|
||||||
|
if (tEncodeI32(&encoder, varNum) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < varNum; ++i) {
|
||||||
|
SVariablesInfo* pInfo = taosArrayGet(pRsp->variables, i);
|
||||||
|
if (tEncodeSVariablesInfo(&encoder, pInfo) < 0) return -1;
|
||||||
|
}
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pRsp) {
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
int32_t varNum = 0;
|
||||||
|
if (tDecodeI32(&decoder, &varNum) < 0) return -1;
|
||||||
|
if (varNum > 0) {
|
||||||
|
pRsp->variables = taosArrayInit(varNum, sizeof(SVariablesInfo));
|
||||||
|
if (NULL == pRsp->variables) return -1;
|
||||||
|
for (int32_t i = 0; i < varNum; ++i) {
|
||||||
|
SVariablesInfo info = {0};
|
||||||
|
if (tDecodeSVariablesInfo(&decoder, &info) < 0) return -1;
|
||||||
|
if (NULL == taosArrayPush(pRsp->variables, &info)) return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tFreeSShowVariablesRsp(SShowVariablesRsp* pRsp) {
|
||||||
|
if (NULL == pRsp) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pRsp->variables);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
|
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, buf, bufLen);
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
@ -3400,7 +3511,8 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
|
||||||
if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1;
|
if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pRsp->connType) < 0) return -1;
|
if (tEncodeI8(&encoder, pRsp->connType) < 0) return -1;
|
||||||
if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1;
|
if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pRsp->sVersion) < 0) return -1;
|
if (tEncodeCStr(&encoder, pRsp->sVer) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pRsp->sDetailVer) < 0) return -1;
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
@ -3420,7 +3532,8 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
|
||||||
if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1;
|
if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pRsp->connType) < 0) return -1;
|
if (tDecodeI8(&decoder, &pRsp->connType) < 0) return -1;
|
||||||
if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1;
|
if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pRsp->sVersion) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pRsp->sVer) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pRsp->sDetailVer) < 0) return -1;
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
|
@ -206,6 +206,8 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_MND_SHOW_VARIABLES, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_MND_SERVER_VERSION, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||||
|
|
|
@ -48,6 +48,7 @@ static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
|
||||||
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
|
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
|
||||||
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
|
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
|
||||||
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq);
|
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq);
|
||||||
|
static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq);
|
||||||
|
|
||||||
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
|
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
|
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
|
||||||
|
@ -78,6 +79,7 @@ int32_t mndInitDnode(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq);
|
||||||
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
|
||||||
|
@ -554,6 +556,60 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) {
|
||||||
|
SShowVariablesRsp rsp = {0};
|
||||||
|
int32_t code = -1;
|
||||||
|
|
||||||
|
rsp.variables = taosArrayInit(4, sizeof(SVariablesInfo));
|
||||||
|
if (NULL == rsp.variables) {
|
||||||
|
mError("failed to alloc SVariablesInfo array while process show variables req");
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
SVariablesInfo info = {0};
|
||||||
|
|
||||||
|
strcpy(info.name, "statusInterval");
|
||||||
|
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
|
||||||
|
taosArrayPush(rsp.variables, &info);
|
||||||
|
|
||||||
|
strcpy(info.name, "timezone");
|
||||||
|
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
|
||||||
|
taosArrayPush(rsp.variables, &info);
|
||||||
|
|
||||||
|
strcpy(info.name, "locale");
|
||||||
|
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
|
||||||
|
taosArrayPush(rsp.variables, &info);
|
||||||
|
|
||||||
|
strcpy(info.name, "charset");
|
||||||
|
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
|
||||||
|
taosArrayPush(rsp.variables, &info);
|
||||||
|
|
||||||
|
int32_t rspLen = tSerializeSShowVariablesRsp(NULL, 0, &rsp);
|
||||||
|
void *pRsp = rpcMallocCont(rspLen);
|
||||||
|
if (pRsp == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
tSerializeSShowVariablesRsp(pRsp, rspLen, &rsp);
|
||||||
|
|
||||||
|
pReq->info.rspLen = rspLen;
|
||||||
|
pReq->info.rsp = pRsp;
|
||||||
|
code = 0;
|
||||||
|
|
||||||
|
_OVER:
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
mError("failed to get show variables info since %s", terrstr());
|
||||||
|
}
|
||||||
|
|
||||||
|
tFreeSShowVariablesRsp(&rsp);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
|
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
|
@ -70,6 +70,7 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
|
||||||
static void mndFreeApp(SAppObj *pApp);
|
static void mndFreeApp(SAppObj *pApp);
|
||||||
static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
static void mndCancelGetNextApp(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextApp(SMnode *pMnode, void *pIter);
|
||||||
|
static int32_t mndProcessSvrVerReq(SRpcMsg *pReq);
|
||||||
|
|
||||||
int32_t mndInitProfile(SMnode *pMnode) {
|
int32_t mndInitProfile(SMnode *pMnode) {
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
|
@ -94,6 +95,7 @@ int32_t mndInitProfile(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_MND_SERVER_VERSION, mndProcessSvrVerReq);
|
||||||
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
|
||||||
|
@ -262,8 +264,9 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
||||||
connectRsp.connId = pConn->id;
|
connectRsp.connId = pConn->id;
|
||||||
connectRsp.connType = connReq.connType;
|
connectRsp.connType = connReq.connType;
|
||||||
connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
|
connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
|
||||||
|
|
||||||
snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
|
strcpy(connectRsp.sVer, version);
|
||||||
|
snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
|
||||||
gitinfo);
|
gitinfo);
|
||||||
mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
|
mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
|
||||||
|
|
||||||
|
@ -460,6 +463,27 @@ static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnIn
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndGetOnlineDnodeNum(SMnode *pMnode, int32_t *num) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
SDnodeObj *pDnode = NULL;
|
||||||
|
int64_t curMs = taosGetTimestampMs();
|
||||||
|
void *pIter = NULL;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
|
bool online = mndIsDnodeOnline(pDnode, curMs);
|
||||||
|
if (online) {
|
||||||
|
(*num)++;
|
||||||
|
}
|
||||||
|
|
||||||
|
sdbRelease(pSdb, pDnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
|
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
|
||||||
SClientHbBatchRsp *pBatchRsp) {
|
SClientHbBatchRsp *pBatchRsp) {
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
|
@ -503,7 +527,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
||||||
|
|
||||||
rspBasic->connId = pConn->id;
|
rspBasic->connId = pConn->id;
|
||||||
rspBasic->totalDnodes = mndGetDnodeSize(pMnode);
|
rspBasic->totalDnodes = mndGetDnodeSize(pMnode);
|
||||||
rspBasic->onlineDnodes = 1; // TODO
|
mndGetOnlineDnodeNum(pMnode, &rspBasic->onlineDnodes);
|
||||||
mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
|
mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
|
||||||
|
|
||||||
mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1);
|
mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1);
|
||||||
|
@ -694,6 +718,28 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndProcessSvrVerReq(SRpcMsg *pReq) {
|
||||||
|
int32_t code = -1;
|
||||||
|
SServerVerRsp rsp = {0};
|
||||||
|
strcpy(rsp.ver, version);
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSServerVerRsp(NULL, 0, &rsp);
|
||||||
|
if (contLen < 0) goto _over;
|
||||||
|
void *pRsp = rpcMallocCont(contLen);
|
||||||
|
if (pRsp == NULL) goto _over;
|
||||||
|
tSerializeSServerVerRsp(pRsp, contLen, &rsp);
|
||||||
|
|
||||||
|
pReq->info.rspLen = contLen;
|
||||||
|
pReq->info.rsp = pRsp;
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
|
||||||
|
_over:
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
|
@ -76,6 +76,7 @@ typedef enum {
|
||||||
CTG_TASK_GET_INDEX,
|
CTG_TASK_GET_INDEX,
|
||||||
CTG_TASK_GET_UDF,
|
CTG_TASK_GET_UDF,
|
||||||
CTG_TASK_GET_USER,
|
CTG_TASK_GET_USER,
|
||||||
|
CTG_TASK_GET_SVR_VER,
|
||||||
} CTG_TASK_TYPE;
|
} CTG_TASK_TYPE;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
@ -224,6 +225,7 @@ typedef struct SCtgJob {
|
||||||
int32_t dbInfoNum;
|
int32_t dbInfoNum;
|
||||||
int32_t tbIndexNum;
|
int32_t tbIndexNum;
|
||||||
int32_t tbCfgNum;
|
int32_t tbCfgNum;
|
||||||
|
int32_t svrVerNum;
|
||||||
} SCtgJob;
|
} SCtgJob;
|
||||||
|
|
||||||
typedef struct SCtgMsgCtx {
|
typedef struct SCtgMsgCtx {
|
||||||
|
@ -578,8 +580,9 @@ int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa
|
||||||
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask);
|
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask);
|
||||||
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask);
|
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask);
|
||||||
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask);
|
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask);
|
||||||
|
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask);
|
||||||
|
|
||||||
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum);
|
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum);
|
||||||
int32_t ctgLaunchJob(SCtgJob *pJob);
|
int32_t ctgLaunchJob(SCtgJob *pJob);
|
||||||
int32_t ctgMakeAsyncRes(SCtgJob *pJob);
|
int32_t ctgMakeAsyncRes(SCtgJob *pJob);
|
||||||
int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param);
|
int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param);
|
||||||
|
|
|
@ -1051,7 +1051,7 @@ _return:
|
||||||
CTG_API_LEAVE(code);
|
CTG_API_LEAVE(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId) {
|
int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
if (NULL == pCtg || NULL == pConn || NULL == pReq || NULL == fp || NULL == param) {
|
if (NULL == pCtg || NULL == pConn || NULL == pReq || NULL == fp || NULL == param) {
|
||||||
|
@ -1060,7 +1060,7 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, uint64_t
|
||||||
|
|
||||||
int32_t code = 0, taskNum = 0;
|
int32_t code = 0, taskNum = 0;
|
||||||
SCtgJob *pJob = NULL;
|
SCtgJob *pJob = NULL;
|
||||||
CTG_ERR_JRET(ctgInitJob(pCtg, pConn, &pJob, reqId, pReq, fp, param, &taskNum));
|
CTG_ERR_JRET(ctgInitJob(pCtg, pConn, &pJob, pReq, fp, param, &taskNum));
|
||||||
if (taskNum <= 0) {
|
if (taskNum <= 0) {
|
||||||
SMetaData* pMetaData = taosMemoryCalloc(1, sizeof(SMetaData));
|
SMetaData* pMetaData = taosMemoryCalloc(1, sizeof(SMetaData));
|
||||||
fp(pMetaData, param, TSDB_CODE_SUCCESS);
|
fp(pMetaData, param, TSDB_CODE_SUCCESS);
|
||||||
|
@ -1247,6 +1247,22 @@ _return:
|
||||||
CTG_API_LEAVE(code);
|
CTG_API_LEAVE(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t catalogGetServerVersion(SCatalog* pCtg, SRequestConnInfo *pConn, char** pVersion) {
|
||||||
|
CTG_API_ENTER();
|
||||||
|
|
||||||
|
if (NULL == pCtg || NULL == pConn || NULL == pVersion) {
|
||||||
|
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
|
CTG_ERR_JRET(ctgGetSvrVerFromMnode(pCtg, pConn, pVersion, NULL));
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
CTG_API_LEAVE(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth) {
|
int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
|
|
|
@ -255,6 +255,20 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgInitGetSvrVerTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
|
||||||
|
SCtgTask task = {0};
|
||||||
|
|
||||||
|
task.type = CTG_TASK_GET_SVR_VER;
|
||||||
|
task.taskId = taskIdx;
|
||||||
|
task.pJob = pJob;
|
||||||
|
|
||||||
|
taosArrayPush(pJob->pTasks, &task);
|
||||||
|
|
||||||
|
qDebug("QID:0x%" PRIx64 " [%dth] task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
|
int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
|
||||||
SName *name = (SName*)param;
|
SName *name = (SName*)param;
|
||||||
SCtgTask task = {0};
|
SCtgTask task = {0};
|
||||||
|
@ -413,7 +427,7 @@ int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *tas
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) {
|
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t tbMetaNum = (int32_t)taosArrayGetSize(pReq->pTableMeta);
|
int32_t tbMetaNum = (int32_t)taosArrayGetSize(pReq->pTableMeta);
|
||||||
int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup);
|
int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup);
|
||||||
|
@ -421,6 +435,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
|
||||||
int32_t udfNum = (int32_t)taosArrayGetSize(pReq->pUdf);
|
int32_t udfNum = (int32_t)taosArrayGetSize(pReq->pUdf);
|
||||||
int32_t qnodeNum = pReq->qNodeRequired ? 1 : 0;
|
int32_t qnodeNum = pReq->qNodeRequired ? 1 : 0;
|
||||||
int32_t dnodeNum = pReq->dNodeRequired ? 1 : 0;
|
int32_t dnodeNum = pReq->dNodeRequired ? 1 : 0;
|
||||||
|
int32_t svrVerNum = pReq->svrVerRequired ? 1 : 0;
|
||||||
int32_t dbCfgNum = (int32_t)taosArrayGetSize(pReq->pDbCfg);
|
int32_t dbCfgNum = (int32_t)taosArrayGetSize(pReq->pDbCfg);
|
||||||
int32_t indexNum = (int32_t)taosArrayGetSize(pReq->pIndex);
|
int32_t indexNum = (int32_t)taosArrayGetSize(pReq->pIndex);
|
||||||
int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser);
|
int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser);
|
||||||
|
@ -428,21 +443,21 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
|
||||||
int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
|
int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
|
||||||
int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
|
int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
|
||||||
|
|
||||||
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum;
|
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum;
|
||||||
if (*taskNum <= 0) {
|
if (*taskNum <= 0) {
|
||||||
ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, reqId);
|
ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, pConn->requestId);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
*job = taosMemoryCalloc(1, sizeof(SCtgJob));
|
*job = taosMemoryCalloc(1, sizeof(SCtgJob));
|
||||||
if (NULL == *job) {
|
if (NULL == *job) {
|
||||||
ctgError("failed to calloc, size:%d, reqId:0x%" PRIx64, (int32_t)sizeof(SCtgJob), reqId);
|
ctgError("failed to calloc, size:%d, reqId:0x%" PRIx64, (int32_t)sizeof(SCtgJob), pConn->requestId);
|
||||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCtgJob *pJob = *job;
|
SCtgJob *pJob = *job;
|
||||||
|
|
||||||
pJob->queryId = reqId;
|
pJob->queryId = pConn->requestId;
|
||||||
pJob->userFp = fp;
|
pJob->userFp = fp;
|
||||||
pJob->pCtg = pCtg;
|
pJob->pCtg = pCtg;
|
||||||
pJob->conn = *pConn;
|
pJob->conn = *pConn;
|
||||||
|
@ -460,6 +475,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
|
||||||
pJob->dbInfoNum = dbInfoNum;
|
pJob->dbInfoNum = dbInfoNum;
|
||||||
pJob->tbIndexNum = tbIndexNum;
|
pJob->tbIndexNum = tbIndexNum;
|
||||||
pJob->tbCfgNum = tbCfgNum;
|
pJob->tbCfgNum = tbCfgNum;
|
||||||
|
pJob->svrVerNum = svrVerNum;
|
||||||
|
|
||||||
pJob->pTasks = taosArrayInit(*taskNum, sizeof(SCtgTask));
|
pJob->pTasks = taosArrayInit(*taskNum, sizeof(SCtgTask));
|
||||||
|
|
||||||
|
@ -530,6 +546,10 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
|
||||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_DNODE, NULL, NULL));
|
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_DNODE, NULL, NULL));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (svrVerNum) {
|
||||||
|
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_SVR_VER, NULL, NULL));
|
||||||
|
}
|
||||||
|
|
||||||
pJob->refId = taosAddRef(gCtgMgmt.jobPool, pJob);
|
pJob->refId = taosAddRef(gCtgMgmt.jobPool, pJob);
|
||||||
if (pJob->refId < 0) {
|
if (pJob->refId < 0) {
|
||||||
ctgError("add job to ref failed, error: %s", tstrerror(terrno));
|
ctgError("add job to ref failed, error: %s", tstrerror(terrno));
|
||||||
|
@ -728,6 +748,21 @@ int32_t ctgDumpUserRes(SCtgTask* pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgDumpSvrVer(SCtgTask* pTask) {
|
||||||
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
if (NULL == pJob->jobRes.pSvrVer) {
|
||||||
|
pJob->jobRes.pSvrVer = taosMemoryCalloc(1, sizeof(SMetaRes));
|
||||||
|
if (NULL == pJob->jobRes.pSvrVer) {
|
||||||
|
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pJob->jobRes.pSvrVer->code = pTask->code;
|
||||||
|
pJob->jobRes.pSvrVer->pRes = pTask->res;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t ctgInvokeSubCb(SCtgTask *pTask) {
|
int32_t ctgInvokeSubCb(SCtgTask *pTask) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -1156,6 +1191,20 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgHandleGetSvrVerRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
|
||||||
|
|
||||||
|
TSWAP(pTask->res, pTask->msgCtx.out);
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
ctgHandleTaskEnd(pTask, code);
|
||||||
|
|
||||||
|
CTG_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
|
int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
|
||||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||||
|
@ -1459,6 +1508,15 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgLaunchGetSvrVerTask(SCtgTask *pTask) {
|
||||||
|
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||||
|
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||||
|
|
||||||
|
CTG_ERR_RET(ctgGetSvrVerFromMnode(pCtg, pConn, NULL, pTask));
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask) {
|
int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask) {
|
||||||
ctgResetTbMetaTask(pTask);
|
ctgResetTbMetaTask(pTask);
|
||||||
|
|
||||||
|
@ -1532,6 +1590,7 @@ SCtgAsyncFps gCtgAsyncFps[] = {
|
||||||
{ctgInitGetIndexTask, ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes, NULL, NULL},
|
{ctgInitGetIndexTask, ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes, NULL, NULL},
|
||||||
{ctgInitGetUdfTask, ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes, NULL, NULL},
|
{ctgInitGetUdfTask, ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes, NULL, NULL},
|
||||||
{ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL},
|
{ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL},
|
||||||
|
{ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL},
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t ctgMakeAsyncRes(SCtgJob *pJob) {
|
int32_t ctgMakeAsyncRes(SCtgJob *pJob) {
|
||||||
|
@ -1633,7 +1692,7 @@ int32_t ctgLaunchJob(SCtgJob *pJob) {
|
||||||
for (int32_t i = 0; i < taskNum; ++i) {
|
for (int32_t i = 0; i < taskNum; ++i) {
|
||||||
SCtgTask *pTask = taosArrayGet(pJob->pTasks, i);
|
SCtgTask *pTask = taosArrayGet(pJob->pTasks, i);
|
||||||
|
|
||||||
qDebug("QID:0x%" PRIx64 " ctg start to launch task %d", pJob->queryId, pTask->taskId);
|
qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId);
|
||||||
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask));
|
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask));
|
||||||
pTask->status = CTG_TASK_LAUNCHED;
|
pTask->status = CTG_TASK_LAUNCHED;
|
||||||
}
|
}
|
||||||
|
|
|
@ -210,7 +210,7 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t re
|
||||||
|
|
||||||
int64_t jobId = 0;
|
int64_t jobId = 0;
|
||||||
|
|
||||||
CTG_ERR_JRET(catalogAsyncGetAllMeta(pCtg, pConn, reqId, &req, ctgdUserCallback, param, &jobId));
|
CTG_ERR_JRET(catalogAsyncGetAllMeta(pCtg, pConn, &req, ctgdUserCallback, param, &jobId));
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
|
|
|
@ -217,6 +217,21 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
|
||||||
qDebug("Got stb cfg from mnode, tbFName:%s", target);
|
qDebug("Got stb cfg from mnode, tbFName:%s", target);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case TDMT_MND_SERVER_VERSION: {
|
||||||
|
if (TSDB_CODE_SUCCESS != rspCode) {
|
||||||
|
qError("error rsp for svr ver from mnode, error:%s", tstrerror(rspCode));
|
||||||
|
CTG_ERR_RET(rspCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
|
||||||
|
if (code) {
|
||||||
|
qError("Process svr ver rsp failed, error:%s", tstrerror(code));
|
||||||
|
CTG_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("Got svr ver from mnode");
|
||||||
|
break;
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
qError("invalid req type %s", TMSG_INFO(reqType));
|
qError("invalid req type %s", TMSG_INFO(reqType));
|
||||||
return TSDB_CODE_APP_ERROR;
|
return TSDB_CODE_APP_ERROR;
|
||||||
|
@ -811,4 +826,38 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask) {
|
||||||
|
char *msg = NULL;
|
||||||
|
int32_t msgLen = 0;
|
||||||
|
int32_t reqType = TDMT_MND_SERVER_VERSION;
|
||||||
|
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||||
|
|
||||||
|
qDebug("try to get svr ver from mnode");
|
||||||
|
|
||||||
|
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
|
||||||
|
if (code) {
|
||||||
|
ctgError("Build get svr ver msg failed, code:%s", tstrerror(code));
|
||||||
|
CTG_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTask) {
|
||||||
|
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL));
|
||||||
|
|
||||||
|
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
|
||||||
|
}
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {
|
||||||
|
.msgType = reqType,
|
||||||
|
.pCont = msg,
|
||||||
|
.contLen = msgLen,
|
||||||
|
};
|
||||||
|
|
||||||
|
SRpcMsg rpcRsp = {0};
|
||||||
|
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
|
||||||
|
|
||||||
|
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -45,6 +45,8 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
|
||||||
return "[get udf]";
|
return "[get udf]";
|
||||||
case CTG_TASK_GET_USER:
|
case CTG_TASK_GET_USER:
|
||||||
return "[get user]";
|
return "[get user]";
|
||||||
|
case CTG_TASK_GET_SVR_VER:
|
||||||
|
return "[get svr ver]";
|
||||||
default:
|
default:
|
||||||
return "unknown";
|
return "unknown";
|
||||||
}
|
}
|
||||||
|
@ -103,8 +105,13 @@ void ctgFreeSMetaData(SMetaData* pData) {
|
||||||
taosArrayDestroy(pData->pQnodeList);
|
taosArrayDestroy(pData->pQnodeList);
|
||||||
pData->pQnodeList = NULL;
|
pData->pQnodeList = NULL;
|
||||||
|
|
||||||
|
taosArrayDestroy(pData->pDnodeList);
|
||||||
|
pData->pDnodeList = NULL;
|
||||||
|
|
||||||
taosArrayDestroy(pData->pTableCfg);
|
taosArrayDestroy(pData->pTableCfg);
|
||||||
pData->pTableCfg = NULL;
|
pData->pTableCfg = NULL;
|
||||||
|
|
||||||
|
taosMemoryFreeClear(pData->pSvrVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ctgFreeSCtgUserAuth(SCtgUserAuth *userCache) {
|
void ctgFreeSCtgUserAuth(SCtgUserAuth *userCache) {
|
||||||
|
@ -346,20 +353,8 @@ void ctgResetTbMetaTask(SCtgTask* pTask) {
|
||||||
|
|
||||||
void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
|
void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case CTG_TASK_GET_QNODE: {
|
case CTG_TASK_GET_QNODE:
|
||||||
taosArrayDestroy((SArray*)*pRes);
|
case CTG_TASK_GET_DNODE:
|
||||||
*pRes = NULL;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case CTG_TASK_GET_DNODE: {
|
|
||||||
taosArrayDestroy((SArray*)*pRes);
|
|
||||||
*pRes = NULL;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case CTG_TASK_GET_TB_META: {
|
|
||||||
taosMemoryFreeClear(*pRes);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case CTG_TASK_GET_DB_VGROUP: {
|
case CTG_TASK_GET_DB_VGROUP: {
|
||||||
taosArrayDestroy((SArray*)*pRes);
|
taosArrayDestroy((SArray*)*pRes);
|
||||||
*pRes = NULL;
|
*pRes = NULL;
|
||||||
|
@ -373,14 +368,6 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case CTG_TASK_GET_DB_INFO: {
|
|
||||||
taosMemoryFreeClear(*pRes);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case CTG_TASK_GET_TB_HASH: {
|
|
||||||
taosMemoryFreeClear(*pRes);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case CTG_TASK_GET_TB_INDEX: {
|
case CTG_TASK_GET_TB_INDEX: {
|
||||||
taosArrayDestroyEx(*pRes, tFreeSTableIndexInfo);
|
taosArrayDestroyEx(*pRes, tFreeSTableIndexInfo);
|
||||||
*pRes = NULL;
|
*pRes = NULL;
|
||||||
|
@ -394,15 +381,13 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case CTG_TASK_GET_INDEX: {
|
case CTG_TASK_GET_TB_HASH:
|
||||||
taosMemoryFreeClear(*pRes);
|
case CTG_TASK_GET_DB_INFO:
|
||||||
break;
|
case CTG_TASK_GET_INDEX:
|
||||||
}
|
case CTG_TASK_GET_UDF:
|
||||||
case CTG_TASK_GET_UDF: {
|
case CTG_TASK_GET_USER:
|
||||||
taosMemoryFreeClear(*pRes);
|
case CTG_TASK_GET_SVR_VER:
|
||||||
break;
|
case CTG_TASK_GET_TB_META: {
|
||||||
}
|
|
||||||
case CTG_TASK_GET_USER: {
|
|
||||||
taosMemoryFreeClear(*pRes);
|
taosMemoryFreeClear(*pRes);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -415,20 +400,12 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
|
||||||
|
|
||||||
void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) {
|
void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case CTG_TASK_GET_QNODE: {
|
case CTG_TASK_GET_QNODE:
|
||||||
taosArrayDestroy((SArray*)*pRes);
|
|
||||||
*pRes = NULL;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case CTG_TASK_GET_DNODE: {
|
case CTG_TASK_GET_DNODE: {
|
||||||
taosArrayDestroy((SArray*)*pRes);
|
taosArrayDestroy((SArray*)*pRes);
|
||||||
*pRes = NULL;
|
*pRes = NULL;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case CTG_TASK_GET_TB_META: {
|
|
||||||
taosMemoryFreeClear(*pRes);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case CTG_TASK_GET_DB_VGROUP: {
|
case CTG_TASK_GET_DB_VGROUP: {
|
||||||
if (*pRes) {
|
if (*pRes) {
|
||||||
SDBVgInfo* pInfo = (SDBVgInfo*)*pRes;
|
SDBVgInfo* pInfo = (SDBVgInfo*)*pRes;
|
||||||
|
@ -445,14 +422,6 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case CTG_TASK_GET_DB_INFO: {
|
|
||||||
taosMemoryFreeClear(*pRes);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case CTG_TASK_GET_TB_HASH: {
|
|
||||||
taosMemoryFreeClear(*pRes);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case CTG_TASK_GET_TB_INDEX: {
|
case CTG_TASK_GET_TB_INDEX: {
|
||||||
taosArrayDestroyEx(*pRes, tFreeSTableIndexInfo);
|
taosArrayDestroyEx(*pRes, tFreeSTableIndexInfo);
|
||||||
*pRes = NULL;
|
*pRes = NULL;
|
||||||
|
@ -466,14 +435,12 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case CTG_TASK_GET_INDEX: {
|
case CTG_TASK_GET_TB_META:
|
||||||
taosMemoryFreeClear(*pRes);
|
case CTG_TASK_GET_DB_INFO:
|
||||||
break;
|
case CTG_TASK_GET_TB_HASH:
|
||||||
}
|
case CTG_TASK_GET_INDEX:
|
||||||
case CTG_TASK_GET_UDF: {
|
case CTG_TASK_GET_UDF:
|
||||||
taosMemoryFreeClear(*pRes);
|
case CTG_TASK_GET_SVR_VER:
|
||||||
break;
|
|
||||||
}
|
|
||||||
case CTG_TASK_GET_USER: {
|
case CTG_TASK_GET_USER: {
|
||||||
taosMemoryFreeClear(*pRes);
|
taosMemoryFreeClear(*pRes);
|
||||||
break;
|
break;
|
||||||
|
@ -497,10 +464,6 @@ void ctgClearSubTaskRes(SCtgSubRes *pRes) {
|
||||||
|
|
||||||
void ctgFreeTaskCtx(SCtgTask* pTask) {
|
void ctgFreeTaskCtx(SCtgTask* pTask) {
|
||||||
switch (pTask->type) {
|
switch (pTask->type) {
|
||||||
case CTG_TASK_GET_QNODE: {
|
|
||||||
taosMemoryFreeClear(pTask->taskCtx);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case CTG_TASK_GET_TB_META: {
|
case CTG_TASK_GET_TB_META: {
|
||||||
SCtgTbMetaCtx* taskCtx = (SCtgTbMetaCtx*)pTask->taskCtx;
|
SCtgTbMetaCtx* taskCtx = (SCtgTbMetaCtx*)pTask->taskCtx;
|
||||||
taosMemoryFreeClear(taskCtx->pName);
|
taosMemoryFreeClear(taskCtx->pName);
|
||||||
|
@ -511,18 +474,6 @@ void ctgFreeTaskCtx(SCtgTask* pTask) {
|
||||||
taosMemoryFreeClear(pTask->taskCtx);
|
taosMemoryFreeClear(pTask->taskCtx);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case CTG_TASK_GET_DB_VGROUP: {
|
|
||||||
taosMemoryFreeClear(pTask->taskCtx);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case CTG_TASK_GET_DB_CFG: {
|
|
||||||
taosMemoryFreeClear(pTask->taskCtx);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case CTG_TASK_GET_DB_INFO: {
|
|
||||||
taosMemoryFreeClear(pTask->taskCtx);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case CTG_TASK_GET_TB_HASH: {
|
case CTG_TASK_GET_TB_HASH: {
|
||||||
SCtgTbHashCtx* taskCtx = (SCtgTbHashCtx*)pTask->taskCtx;
|
SCtgTbHashCtx* taskCtx = (SCtgTbHashCtx*)pTask->taskCtx;
|
||||||
taosMemoryFreeClear(taskCtx->pName);
|
taosMemoryFreeClear(taskCtx->pName);
|
||||||
|
@ -542,14 +493,12 @@ void ctgFreeTaskCtx(SCtgTask* pTask) {
|
||||||
taosMemoryFreeClear(pTask->taskCtx);
|
taosMemoryFreeClear(pTask->taskCtx);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case CTG_TASK_GET_INDEX: {
|
case CTG_TASK_GET_DB_VGROUP:
|
||||||
taosMemoryFreeClear(pTask->taskCtx);
|
case CTG_TASK_GET_DB_CFG:
|
||||||
break;
|
case CTG_TASK_GET_DB_INFO:
|
||||||
}
|
case CTG_TASK_GET_INDEX:
|
||||||
case CTG_TASK_GET_UDF: {
|
case CTG_TASK_GET_UDF:
|
||||||
taosMemoryFreeClear(pTask->taskCtx);
|
case CTG_TASK_GET_QNODE:
|
||||||
break;
|
|
||||||
}
|
|
||||||
case CTG_TASK_GET_USER: {
|
case CTG_TASK_GET_USER: {
|
||||||
taosMemoryFreeClear(pTask->taskCtx);
|
taosMemoryFreeClear(pTask->taskCtx);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -4641,7 +4641,7 @@ static int32_t extractShowCreateTableResultSchema(int32_t* numOfCols, SSchema**
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t extractShowLocalVariablesResultSchema(int32_t* numOfCols, SSchema** pSchema) {
|
static int32_t extractShowVariablesResultSchema(int32_t* numOfCols, SSchema** pSchema) {
|
||||||
*numOfCols = 2;
|
*numOfCols = 2;
|
||||||
*pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema));
|
*pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema));
|
||||||
if (NULL == (*pSchema)) {
|
if (NULL == (*pSchema)) {
|
||||||
|
@ -4678,7 +4678,8 @@ int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pS
|
||||||
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
|
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
|
||||||
return extractShowCreateTableResultSchema(numOfCols, pSchema);
|
return extractShowCreateTableResultSchema(numOfCols, pSchema);
|
||||||
case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT:
|
case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT:
|
||||||
return extractShowLocalVariablesResultSchema(numOfCols, pSchema);
|
case QUERY_NODE_SHOW_VARIABLES_STMT:
|
||||||
|
return extractShowVariablesResultSchema(numOfCols, pSchema);
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -5909,7 +5910,6 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
||||||
case QUERY_NODE_SHOW_CLUSTER_STMT:
|
case QUERY_NODE_SHOW_CLUSTER_STMT:
|
||||||
case QUERY_NODE_SHOW_TOPICS_STMT:
|
case QUERY_NODE_SHOW_TOPICS_STMT:
|
||||||
case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
|
case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
|
||||||
case QUERY_NODE_SHOW_VARIABLES_STMT:
|
|
||||||
case QUERY_NODE_SHOW_APPS_STMT:
|
case QUERY_NODE_SHOW_APPS_STMT:
|
||||||
case QUERY_NODE_SHOW_CONSUMERS_STMT:
|
case QUERY_NODE_SHOW_CONSUMERS_STMT:
|
||||||
case QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT:
|
case QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT:
|
||||||
|
@ -6011,6 +6011,14 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
||||||
case QUERY_NODE_ALTER_LOCAL_STMT:
|
case QUERY_NODE_ALTER_LOCAL_STMT:
|
||||||
pQuery->execMode = QUERY_EXEC_MODE_LOCAL;
|
pQuery->execMode = QUERY_EXEC_MODE_LOCAL;
|
||||||
break;
|
break;
|
||||||
|
case QUERY_NODE_SHOW_VARIABLES_STMT:
|
||||||
|
pQuery->haveResultSet = true;
|
||||||
|
pQuery->execMode = QUERY_EXEC_MODE_RPC;
|
||||||
|
if (NULL != pCxt->pCmdMsg) {
|
||||||
|
TSWAP(pQuery->pCmdMsg, pCxt->pCmdMsg);
|
||||||
|
pQuery->msgType = pQuery->pCmdMsg->msgType;
|
||||||
|
}
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
pQuery->execMode = QUERY_EXEC_MODE_RPC;
|
pQuery->execMode = QUERY_EXEC_MODE_RPC;
|
||||||
if (NULL != pCxt->pCmdMsg) {
|
if (NULL != pCxt->pCmdMsg) {
|
||||||
|
|
|
@ -54,12 +54,6 @@ class ParserDdlTest : public ParserTestBase {
|
||||||
virtual void checkDdl(const SQuery* pQuery, ParserStage stage) {
|
virtual void checkDdl(const SQuery* pQuery, ParserStage stage) {
|
||||||
ASSERT_NE(pQuery, nullptr);
|
ASSERT_NE(pQuery, nullptr);
|
||||||
ASSERT_NE(pQuery->pRoot, nullptr);
|
ASSERT_NE(pQuery->pRoot, nullptr);
|
||||||
if (QUERY_EXEC_MODE_RPC == pQuery->execMode) {
|
|
||||||
ASSERT_EQ(pQuery->haveResultSet, false);
|
|
||||||
ASSERT_EQ(pQuery->numOfResCols, 0);
|
|
||||||
ASSERT_EQ(pQuery->pResSchema, nullptr);
|
|
||||||
ASSERT_EQ(pQuery->precision, 0);
|
|
||||||
}
|
|
||||||
if (nullptr != checkDdl_) {
|
if (nullptr != checkDdl_) {
|
||||||
checkDdl_(pQuery, stage);
|
checkDdl_(pQuery, stage);
|
||||||
}
|
}
|
||||||
|
|
|
@ -144,6 +144,23 @@ int32_t queryBuildDnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t queryBuildGetSerVerMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
|
||||||
|
if (NULL == msg || NULL == msgLen) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
SServerVerReq req = {0};
|
||||||
|
|
||||||
|
int32_t bufLen = tSerializeSServerVerReq(NULL, 0, &req);
|
||||||
|
void *pBuf = (*mallcFp)(bufLen);
|
||||||
|
tSerializeSServerVerReq(pBuf, bufLen, &req);
|
||||||
|
|
||||||
|
*msg = pBuf;
|
||||||
|
*msgLen = bufLen;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
|
int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
|
||||||
if (NULL == msg || NULL == msgLen) {
|
if (NULL == msg || NULL == msgLen) {
|
||||||
|
@ -467,6 +484,26 @@ int32_t queryProcessDnodeListRsp(void *output, char *msg, int32_t msgSize) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t queryProcessGetSerVerRsp(void *output, char *msg, int32_t msgSize) {
|
||||||
|
SServerVerRsp out = {0};
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
if (NULL == output || NULL == msg || msgSize <= 0) {
|
||||||
|
code = TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tDeserializeSServerVerRsp(msg, msgSize, &out) != 0) {
|
||||||
|
qError("invalid svr ver rsp msg, msgSize:%d", msgSize);
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
*(char**)output = strdup(out.ver);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) {
|
int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) {
|
||||||
SDbCfgRsp out = {0};
|
SDbCfgRsp out = {0};
|
||||||
|
@ -583,6 +620,7 @@ void initQueryModuleMsgHandle() {
|
||||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryBuildGetTbIndexMsg;
|
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryBuildGetTbIndexMsg;
|
||||||
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryBuildGetTbCfgMsg;
|
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryBuildGetTbCfgMsg;
|
||||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryBuildGetTbCfgMsg;
|
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryBuildGetTbCfgMsg;
|
||||||
|
queryBuildMsg[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryBuildGetSerVerMsg;
|
||||||
|
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
|
||||||
|
@ -596,6 +634,7 @@ void initQueryModuleMsgHandle() {
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryProcessGetTbIndexRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryProcessGetTbIndexRsp;
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryProcessGetTbCfgRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryProcessGetTbCfgRsp;
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryProcessGetTbCfgRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryProcessGetTbCfgRsp;
|
||||||
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryProcessGetSerVerRsp;
|
||||||
}
|
}
|
||||||
|
|
||||||
#pragma GCC diagnostic pop
|
#pragma GCC diagnostic pop
|
||||||
|
|
Loading…
Reference in New Issue