Merge pull request #14118 from taosdata/feature/showdnodevariables

feat: show dnode variables
This commit is contained in:
dapan1121 2022-06-23 13:23:06 +08:00 committed by GitHub
commit b2ff536050
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 668 additions and 29 deletions

View File

@ -834,6 +834,14 @@ typedef struct {
int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
int32_t tDeserializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
typedef struct {
int32_t rowNum;
} SDnodeListReq;
int32_t tSerializeSDnodeListReq(void* buf, int32_t bufLen, SDnodeListReq* pReq);
int32_t tDeserializeSDnodeListReq(void* buf, int32_t bufLen, SDnodeListReq* pReq);
typedef struct SQueryNodeAddr {
int32_t nodeId; // vgId or qnodeId
SEpSet epSet;
@ -852,6 +860,15 @@ int32_t tSerializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
int32_t tDeserializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
void tFreeSQnodeListRsp(SQnodeListRsp* pRsp);
typedef struct {
SArray* dnodeList; // SArray<SEpSet>
} SDnodeListRsp;
int32_t tSerializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp);
int32_t tDeserializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp);
void tFreeSDnodeListRsp(SDnodeListRsp* pRsp);
typedef struct {
SArray* pArray; // Array of SUseDbRsp
} SUseDbBatchRsp;

View File

@ -81,6 +81,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_DND_SERVER_STATUS, "server-status", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_NET_TEST, "net-test", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_SYSTABLE_RETRIEVE, "dnode-retrieve", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_MND_MSG)
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL)
@ -101,6 +102,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_QNODE, "alter-qnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_QNODE, "drop-qnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "qnode-list", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DNODE_LIST, "dnode-list", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SNODE, "create-snode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_SNODE, "alter-snode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_SNODE, "drop-snode", NULL, NULL)
@ -152,7 +154,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "heartbeat", SClientHbBatchReq, SClientHbBatchRsp)
TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "status", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "show", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "mnd-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "grant", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply", NULL, NULL)

View File

@ -36,6 +36,10 @@ extern "C" {
#define SHOW_CREATE_TB_RESULT_FIELD1_LEN (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE)
#define SHOW_CREATE_TB_RESULT_FIELD2_LEN (TSDB_MAX_BINARY_LEN + VARSTR_HEADER_SIZE)
#define SHOW_LOCAL_VARIABLES_RESULT_COLS 2
#define SHOW_LOCAL_VARIABLES_RESULT_FIELD1_LEN (TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE)
#define SHOW_LOCAL_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
#define PRIVILEGE_TYPE_MASK(n) (1 << n)

View File

@ -104,6 +104,8 @@ int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal);
const char *cfgStypeStr(ECfgSrcType type);
const char *cfgDtypeStr(ECfgDataType type);
void cfgDumpItemValue(SConfigItem *pItem, char* buf, int32_t bufSize, int32_t* pLen);
void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump);
int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl);

View File

@ -443,8 +443,8 @@ enum {
#define VNODE_HANDLE -3
#define BNODE_HANDLE -4
#define TSDB_CONFIG_OPTION_LEN 16
#define TSDB_CONIIG_VALUE_LEN 48
#define TSDB_CONFIG_OPTION_LEN 32
#define TSDB_CONFIG_VALUE_LEN 64
#define TSDB_CONFIG_NUMBER 8
#define QUERY_ID_SIZE 20

View File

@ -231,7 +231,13 @@ static const SSysDbTableSchema transSchema[] = {
static const SSysDbTableSchema configSchema[] = {
{.name = "name", .bytes = TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "value", .bytes = TSDB_CONIIG_VALUE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "value", .bytes = TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
};
static const SSysDbTableSchema variablesSchema[] = {
{.name = "dnode_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "name", .bytes = TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "value", .bytes = TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
};
static const SSysTableMeta infosMeta[] = {
@ -253,6 +259,7 @@ static const SSysTableMeta infosMeta[] = {
{TSDB_INS_TABLE_LICENCES, grantsSchema, tListLen(grantsSchema)},
{TSDB_INS_TABLE_VGROUPS, vgroupsSchema, tListLen(vgroupsSchema)},
{TSDB_INS_TABLE_CONFIGS, configSchema, tListLen(configSchema)},
{TSDB_INS_TABLE_DNODE_VARIABLES, variablesSchema, tListLen(variablesSchema)},
};
static const SSysDbTableSchema connectionsSchema[] = {

View File

@ -2194,6 +2194,32 @@ int32_t tDeserializeSQnodeListReq(void *buf, int32_t bufLen, SQnodeListReq *pReq
return 0;
}
int32_t tSerializeSDnodeListReq(void *buf, int32_t bufLen, SDnodeListReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->rowNum) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSDnodeListReq(void *buf, int32_t bufLen, SDnodeListReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->rowNum) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -2237,6 +2263,50 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp
void tFreeSQnodeListRsp(SQnodeListRsp *pRsp) { taosArrayDestroy(pRsp->qnodeList); }
int32_t tSerializeSDnodeListRsp(void *buf, int32_t bufLen, SDnodeListRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->dnodeList);
if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
SEpSet *pEpSet = taosArrayGet(pRsp->dnodeList, i);
if (tEncodeSEpSet(&encoder, pEpSet) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSDnodeListRsp(void *buf, int32_t bufLen, SDnodeListRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (NULL == pRsp->dnodeList) {
pRsp->dnodeList = taosArrayInit(num, sizeof(SEpSet));
if (NULL == pRsp->dnodeList) return -1;
}
for (int32_t i = 0; i < num; ++i) {
SEpSet epSet = {0};
if (tDecodeSEpSet(&decoder, &epSet) < 0) return -1;
taosArrayPush(pRsp->dnodeList, &epSet);
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSDnodeListRsp(SDnodeListRsp *pRsp) { taosArrayDestroy(pRsp->dnodeList); }
int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);

View File

@ -45,6 +45,7 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
// dmWorker.c
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);

View File

@ -15,6 +15,10 @@
#define _DEFAULT_SOURCE
#include "dmInt.h"
#include "systable.h"
extern SConfig *tsCfg;
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
@ -175,6 +179,130 @@ int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0;
}
SSDataBlock* dmBuildVariablesBlock(void) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
size_t size = 0;
const SSysTableMeta* pMeta = NULL;
getInfosDbMeta(&pMeta, &size);
int32_t index = 0;
for (int32_t i = 0; i < size; ++i) {
if (strcmp(pMeta[i].name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) {
index = i;
break;
}
}
pBlock->pDataBlock = taosArrayInit(pMeta[index].colNum, sizeof(SColumnInfoData));
for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
SColumnInfoData colInfoData = {0};
colInfoData.info.colId = i + 1;
colInfoData.info.type = pMeta[index].schema[i].type;
colInfoData.info.bytes = pMeta[index].schema[i].bytes;
taosArrayPush(pBlock->pDataBlock, &colInfoData);
}
pBlock->info.numOfCols = pMeta[index].colNum;
pBlock->info.hasVarCol = true;
return pBlock;
}
int32_t dmAppendVariablesToBlock(SSDataBlock* pBlock, int32_t dnodeId) {
int32_t numOfCfg = taosArrayGetSize(tsCfg->array);
int32_t numOfRows = 0;
blockDataEnsureCapacity(pBlock, numOfCfg);
for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
SConfigItem *pItem = taosArrayGet(tsCfg->array, i);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataAppend(pColInfo, i, (const char *)&dnodeId, false);
char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataAppend(pColInfo, i, name, false);
char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t valueLen = 0;
cfgDumpItemValue(pItem, &value[VARSTR_HEADER_SIZE], TSDB_CONFIG_VALUE_LEN, &valueLen);
varDataSetLen(value, valueLen);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataAppend(pColInfo, i, value, false);
numOfRows++;
}
pBlock->info.rows = numOfRows;
return TSDB_CODE_SUCCESS;
}
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t size = 0;
int32_t rowsRead = 0;
SRetrieveTableReq retrieveReq = {0};
if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (strcasecmp(retrieveReq.tb, TSDB_INS_TABLE_DNODE_VARIABLES)) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
SSDataBlock* pBlock = dmBuildVariablesBlock();
dmAppendVariablesToBlock(pBlock, pMgmt->pData->dnodeId);
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pBlock->info.numOfCols +
blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(pBlock->info.numOfCols);
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
dError("failed to retrieve data since %s", terrstr());
blockDataDestroy(pBlock);
return -1;
}
char *pStart = pRsp->data;
*(int32_t *)pStart = htonl(pBlock->info.numOfCols);
pStart += sizeof(int32_t); // number of columns
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i);
pSchema->bytes = htonl(pColInfo->info.bytes);
pSchema->colId = htons(pColInfo->info.colId);
pSchema->type = pColInfo->info.type;
pStart += sizeof(SSysTableSchema);
}
int32_t len = 0;
blockCompressEncode(pBlock, pStart, &len, pBlock->info.numOfCols, false);
pRsp->numOfRows = htonl(pBlock->info.rows);
pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision
pRsp->completed = 1;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = size;
dDebug("dnode variables retrieve completed");
blockDataDestroy(pBlock);
return TSDB_CODE_SUCCESS;
}
SArray *dmGetMsgHandles() {
int32_t code = -1;
SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
@ -191,6 +319,7 @@ SArray *dmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
// Requests handled by MNODE
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;

View File

@ -141,6 +141,9 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
case TDMT_DND_SERVER_STATUS:
code = dmProcessServerRunStatus(pMgmt, pMsg);
break;
case TDMT_DND_SYSTABLE_RETRIEVE:
code = dmProcessRetrieve(pMgmt, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
break;

View File

@ -161,6 +161,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_QNODE_LIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DNODE_LIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_BNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -85,6 +85,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dmProcessNetTestReq(pDnode, pRpc);
return;
case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
case TDMT_VND_FETCH_RSP:
qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0);
return;

View File

@ -47,6 +47,7 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq);
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
@ -76,6 +77,7 @@ int32_t mndInitDnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
@ -499,6 +501,60 @@ _OVER:
return code;
}
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SDnodeObj *pObj = NULL;
void *pIter = NULL;
SDnodeListRsp rsp = {0};
int32_t code = -1;
rsp.dnodeList = taosArrayInit(5, sizeof(SEpSet));
if (NULL == rsp.dnodeList) {
mError("failed to alloc epSet while process dnode list req");
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
while (1) {
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
if (pIter == NULL) break;
SEpSet epSet = {0};
epSet.numOfEps = 1;
tstrncpy(epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
epSet.eps[0].port = pObj->port;
(void)taosArrayPush(rsp.dnodeList, &epSet);
sdbRelease(pSdb, pObj);
}
int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
tSerializeSDnodeListRsp(pRsp, rspLen, &rsp);
pReq->info.rspLen = rspLen;
pReq->info.rsp = pRsp;
code = 0;
_OVER:
if (code != 0) {
mError("failed to get dnode list since %s", terrstr());
}
tFreeSDnodeListRsp(&rsp);
return code;
}
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
@ -700,28 +756,28 @@ static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
int32_t totalRows = 0;
int32_t numOfRows = 0;
char *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
char cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONIIG_VALUE_LEN + 1] = {0};
char cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONFIG_VALUE_LEN + 1] = {0};
char *pWrite = NULL;
int32_t cols = 0;
cfgOpts[totalRows] = "statusInterval";
snprintf(cfgVals[totalRows], TSDB_CONIIG_VALUE_LEN, "%d", tsStatusInterval);
snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
totalRows++;
cfgOpts[totalRows] = "timezone";
snprintf(cfgVals[totalRows], TSDB_CONIIG_VALUE_LEN, "%s", tsTimezoneStr);
snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
totalRows++;
cfgOpts[totalRows] = "locale";
snprintf(cfgVals[totalRows], TSDB_CONIIG_VALUE_LEN, "%s", tsLocale);
snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
totalRows++;
cfgOpts[totalRows] = "charset";
snprintf(cfgVals[totalRows], TSDB_CONIIG_VALUE_LEN, "%s", tsCharset);
snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
totalRows++;
char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
char bufVal[TSDB_CONIIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
for (int32_t i = 0; i < totalRows; i++) {
cols = 0;
@ -730,7 +786,7 @@ static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)buf, false);
STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONIIG_VALUE_LEN);
STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONFIG_VALUE_LEN);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)bufVal, false);

View File

@ -555,7 +555,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0;
if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
const STraceId *trace = &pMsg->info.traceId;
mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));

View File

@ -65,6 +65,7 @@ enum {
typedef enum {
CTG_TASK_GET_QNODE = 0,
CTG_TASK_GET_DNODE,
CTG_TASK_GET_DB_VGROUP,
CTG_TASK_GET_DB_CFG,
CTG_TASK_GET_DB_INFO,
@ -216,6 +217,7 @@ typedef struct SCtgJob {
int32_t dbVgNum;
int32_t udfNum;
int32_t qnodeNum;
int32_t dnodeNum;
int32_t dbCfgNum;
int32_t indexNum;
int32_t userNum;
@ -565,6 +567,7 @@ int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVg
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target);
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask);
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask);
int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray **out, SCtgTask* pTask);
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *indexName, SIndexInfo *out, SCtgTask* pTask);
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask);

View File

@ -1099,8 +1099,19 @@ _return:
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
int32_t catalogGetDnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray** pDnodeList) {
return TSDB_CODE_CTG_INVALID_INPUT;
int32_t catalogGetDnodeList(SCatalog* pCtg, SRequestConnInfo* pConn, SArray** pDnodeList) {
CTG_API_ENTER();
int32_t code = 0;
if (NULL == pCtg || NULL == pConn || NULL == pDnodeList) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_ERR_JRET(ctgGetDnodeListFromMnode(pCtg, pConn, pDnodeList, NULL));
_return:
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableVersion **stables, uint32_t *num) {

View File

@ -168,6 +168,21 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetDnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
SCtgTask task = {0};
task.type = CTG_TASK_GET_DNODE;
task.taskId = taskIdx;
task.pJob = pJob;
task.taskCtx = NULL;
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
char *name = (char*)param;
SCtgTask task = {0};
@ -405,6 +420,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
int32_t tbHashNum = (int32_t)taosArrayGetSize(pReq->pTableHash);
int32_t udfNum = (int32_t)taosArrayGetSize(pReq->pUdf);
int32_t qnodeNum = pReq->qNodeRequired ? 1 : 0;
int32_t dnodeNum = pReq->dNodeRequired ? 1 : 0;
int32_t dbCfgNum = (int32_t)taosArrayGetSize(pReq->pDbCfg);
int32_t indexNum = (int32_t)taosArrayGetSize(pReq->pIndex);
int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser);
@ -412,7 +428,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum;
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum;
if (*taskNum <= 0) {
ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, reqId);
return TSDB_CODE_SUCCESS;
@ -435,6 +451,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
pJob->tbMetaNum = tbMetaNum;
pJob->tbHashNum = tbHashNum;
pJob->qnodeNum = qnodeNum;
pJob->dnodeNum = dnodeNum;
pJob->dbVgNum = dbVgNum;
pJob->udfNum = udfNum;
pJob->dbCfgNum = dbCfgNum;
@ -509,6 +526,10 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_QNODE, NULL, NULL));
}
if (dnodeNum) {
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_DNODE, NULL, NULL));
}
pJob->refId = taosAddRef(gCtgMgmt.jobPool, pJob);
if (pJob->refId < 0) {
ctgError("add job to ref failed, error: %s", tstrerror(terrno));
@ -631,6 +652,22 @@ int32_t ctgDumpQnodeRes(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgDumpDnodeRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pDnodeList) {
pJob->jobRes.pDnodeList = taosArrayInit(1, sizeof(SMetaRes));
if (NULL == pJob->jobRes.pDnodeList) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
taosArrayPush(pJob->jobRes.pDnodeList, &res);
return TSDB_CODE_SUCCESS;
}
int32_t ctgDumpDbCfgRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pDbCfg) {
@ -1036,6 +1073,19 @@ _return:
CTG_RET(code);
}
int32_t ctgHandleGetDnodeRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out);
_return:
ctgHandleTaskEnd(pTask, code);
CTG_RET(code);
}
int32_t ctgHandleGetIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
@ -1311,6 +1361,15 @@ int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgLaunchGetDnodeTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn;
CTG_ERR_RET(ctgGetDnodeListFromMnode(pCtg, pConn, NULL, pTask));
return TSDB_CODE_SUCCESS;
}
int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn;
@ -1462,6 +1521,7 @@ int32_t ctgCloneDbVg(SCtgTask* pTask, void** pRes) {
SCtgAsyncFps gCtgAsyncFps[] = {
{ctgInitGetQnodeTask, ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes, NULL, NULL},
{ctgInitGetDnodeTask, ctgLaunchGetDnodeTask, ctgHandleGetDnodeRsp, ctgDumpDnodeRes, NULL, NULL},
{ctgInitGetDbVgTask, ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes, ctgCompDbVgTasks, ctgCloneDbVg},
{ctgInitGetDbCfgTask, ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes, NULL, NULL},
{ctgInitGetDbInfoTask, ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes, NULL, NULL},

View File

@ -40,6 +40,21 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(out));
break;
}
case TDMT_MND_DNODE_LIST: {
if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for dnode list, error:%s", tstrerror(rspCode));
CTG_ERR_RET(rspCode);
}
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process dnode list rsp failed, error:%s", tstrerror(rspCode));
CTG_ERR_RET(code);
}
qDebug("Got dnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(*(SArray**)out));
break;
}
case TDMT_MND_USE_DB: {
if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for use db, error:%s, dbFName:%s", tstrerror(rspCode), target);
@ -309,9 +324,6 @@ _return:
CTG_RET(code);
}
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
@ -349,6 +361,39 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray **out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_DNODE_LIST;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build dnode list msg failed, error:%s", tstrerror(code));
CTG_ERR_RET(code);
}
if (pTask) {
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL));
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
.msgType = reqType,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask) {
char *msg = NULL;

View File

@ -23,6 +23,8 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
switch (type) {
case CTG_TASK_GET_QNODE:
return "[get qnode list]";
case CTG_TASK_GET_DNODE:
return "[get dnode list]";
case CTG_TASK_GET_DB_VGROUP:
return "[get db vgroup]";
case CTG_TASK_GET_DB_CFG:
@ -349,6 +351,11 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) {
*pRes = NULL;
break;
}
case CTG_TASK_GET_DNODE: {
taosArrayDestroy((SArray*)*pRes);
*pRes = NULL;
break;
}
case CTG_TASK_GET_TB_META: {
taosMemoryFreeClear(*pRes);
break;
@ -413,6 +420,11 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) {
*pRes = NULL;
break;
}
case CTG_TASK_GET_DNODE: {
taosArrayDestroy((SArray*)*pRes);
*pRes = NULL;
break;
}
case CTG_TASK_GET_TB_META: {
taosMemoryFreeClear(*pRes);
break;

View File

@ -18,6 +18,7 @@
#include "tdatablock.h"
#include "tglobal.h"
extern SConfig *tsCfg;
static int32_t getSchemaBytes(const SSchema* pSchema) {
switch (pSchema->type) {
case TSDB_DATA_TYPE_BINARY:
@ -551,7 +552,85 @@ static int32_t execShowCreateSTable(SShowCreateTableStmt* pStmt, SRetrieveTableR
static int32_t execAlterLocal(SAlterLocalStmt* pStmt) { return TSDB_CODE_FAILED; }
static int32_t execShowLocalVariables() { return TSDB_CODE_FAILED; }
static SSDataBlock* buildLocalVariablesResultDataBlock() {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->info.numOfCols = SHOW_LOCAL_VARIABLES_RESULT_COLS;
pBlock->info.hasVarCol = true;
pBlock->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = SHOW_LOCAL_VARIABLES_RESULT_FIELD1_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData);
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = SHOW_LOCAL_VARIABLES_RESULT_FIELD2_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData);
return pBlock;
}
int32_t setLocalVariablesResultIntoDataBlock(SSDataBlock* pBlock) {
int32_t numOfCfg = taosArrayGetSize(tsCfg->array);
int32_t numOfRows = 0;
blockDataEnsureCapacity(pBlock, numOfCfg);
for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
SConfigItem *pItem = taosArrayGet(tsCfg->array, i);
char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataAppend(pColInfo, i, name, false);
char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t valueLen = 0;
cfgDumpItemValue(pItem, &value[VARSTR_HEADER_SIZE], TSDB_CONFIG_VALUE_LEN, &valueLen);
varDataSetLen(value, valueLen);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataAppend(pColInfo, i, value, false);
numOfRows++;
}
pBlock->info.rows = numOfRows;
return TSDB_CODE_SUCCESS;
}
static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) {
SSDataBlock* pBlock = buildLocalVariablesResultDataBlock();
int32_t code = setLocalVariablesResultIntoDataBlock(pBlock);
if (code) {
return code;
}
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
*pRsp = taosMemoryCalloc(1, rspSize);
if (NULL == *pRsp) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pRsp)->useconds = 0;
(*pRsp)->completed = 1;
(*pRsp)->precision = 0;
(*pRsp)->compressed = 0;
(*pRsp)->compLen = 0;
(*pRsp)->numOfRows = htonl(pBlock->info.rows);
(*pRsp)->numOfCols = htonl(SHOW_LOCAL_VARIABLES_RESULT_COLS);
int32_t len = 0;
blockCompressEncode(pBlock, (*pRsp)->data, &len, SHOW_LOCAL_VARIABLES_RESULT_COLS, false);
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
blockDataDestroy(pBlock);
return TSDB_CODE_SUCCESS;
}
int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp) {
switch (nodeType(pStmt)) {
@ -568,7 +647,7 @@ int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp) {
case QUERY_NODE_ALTER_LOCAL_STMT:
return execAlterLocal((SAlterLocalStmt*)pStmt);
case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT:
return execShowLocalVariables();
return execShowLocalVariables(pRsp);
default:
break;
}

View File

@ -1548,11 +1548,14 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
return NULL;
}
int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE : TDMT_MND_SYSTABLE_RETRIEVE;
pMsgSendInfo->param = pOperator;
pMsgSendInfo->msgInfo.pData = buf1;
pMsgSendInfo->msgInfo.len = contLen;
pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE;
pMsgSendInfo->msgType = msgType;
pMsgSendInfo->fp = loadSysTableCallback;
pMsgSendInfo->requestId = pTaskInfo->id.queryId;
int64_t transporterId = 0;
int32_t code =
@ -1587,6 +1590,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
taosMemoryFree(pRsp);
if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes;
} else if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
}
}

View File

@ -137,6 +137,9 @@ static int32_t collectMetaKeyFromRealTableImpl(SCollectMetaKeyCxt* pCxt, SRealTa
code = reserveTableIndexInCache(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName,
pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == code && (0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_DNODE_VARIABLES))) {
code = reserveDnodeRequiredInCache(pCxt->pMetaCache);
}
return code;
}

View File

@ -4611,6 +4611,25 @@ static int32_t extractShowCreateTableResultSchema(int32_t* numOfCols, SSchema**
return TSDB_CODE_SUCCESS;
}
static int32_t extractShowLocalVariablesResultSchema(int32_t* numOfCols, SSchema** pSchema) {
*numOfCols = 2;
*pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema));
if (NULL == (*pSchema)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pSchema)[0].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[0].bytes = TSDB_CONFIG_OPTION_LEN;
strcpy((*pSchema)[0].name, "name");
(*pSchema)[1].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[1].bytes = TSDB_CONFIG_VALUE_LEN;
strcpy((*pSchema)[1].name, "value");
return TSDB_CODE_SUCCESS;
}
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) {
if (NULL == pRoot) {
return TSDB_CODE_SUCCESS;
@ -4629,6 +4648,8 @@ int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pS
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
return extractShowCreateTableResultSchema(numOfCols, pSchema);
case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT:
return extractShowLocalVariablesResultSchema(numOfCols, pSchema);
default:
break;
}
@ -5945,12 +5966,12 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT:
pQuery->execMode = QUERY_EXEC_MODE_LOCAL;
pQuery->haveResultSet = true;
break;
case QUERY_NODE_RESET_QUERY_CACHE_STMT:
case QUERY_NODE_ALTER_LOCAL_STMT:
case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT:
pQuery->execMode = QUERY_EXEC_MODE_LOCAL;
break;
default:

View File

@ -928,7 +928,12 @@ int32_t reserveDnodeRequiredInCache(SParseMetaCache* pMetaCache) {
}
int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes) {
*pDnodes = taosArrayDup(pMetaCache->pDnodes);
SMetaRes* pRes = taosArrayGet(pMetaCache->pDnodes, 0);
if (pRes->code) {
return pRes->code;
}
*pDnodes = taosArrayDup((SArray*)pRes->pRes);
if (NULL == *pDnodes) {
return TSDB_CODE_OUT_OF_MEMORY;
}

View File

@ -166,10 +166,13 @@ class MockCatalogServiceImpl {
}
int32_t catalogGetDnodeList(SArray** pDnodes) const {
*pDnodes = taosArrayInit(dnode_.size(), sizeof(SEpSet));
SMetaRes res = {0};
res.pRes = taosArrayInit(dnode_.size(), sizeof(SEpSet));
for (const auto& dnode : dnode_) {
taosArrayPush(*pDnodes, &dnode.second);
taosArrayPush((SArray*)res.pRes, &dnode.second);
}
*pDnodes = taosArrayInit(1, sizeof(SMetaRes));
taosArrayPush(*pDnodes, &res);
return TSDB_CODE_SUCCESS;
}

View File

@ -126,6 +126,25 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildDnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
SDnodeListReq dnodeListReq = {0};
dnodeListReq.rowNum = -1;
int32_t bufLen = tSerializeSDnodeListReq(NULL, 0, &dnodeListReq);
void *pBuf = (*mallcFp)(bufLen);
tSerializeSDnodeListReq(pBuf, bufLen, &dnodeListReq);
*msg = pBuf;
*msgLen = bufLen;
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
@ -428,6 +447,27 @@ int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) {
return code;
}
int32_t queryProcessDnodeListRsp(void *output, char *msg, int32_t msgSize) {
SDnodeListRsp out = {0};
int32_t code = 0;
if (NULL == output || NULL == msg || msgSize <= 0) {
code = TSDB_CODE_TSC_INVALID_INPUT;
return code;
}
if (tDeserializeSDnodeListRsp(msg, msgSize, &out) != 0) {
qError("invalid dnode list rsp msg, msgSize:%d", msgSize);
code = TSDB_CODE_INVALID_MSG;
return code;
}
*(SArray**)output = out.dnodeList;
return code;
}
int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) {
SDbCfgRsp out = {0};
@ -535,6 +575,7 @@ void initQueryModuleMsgHandle() {
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_DNODE_LIST)] = queryBuildDnodeListMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg;
@ -547,6 +588,7 @@ void initQueryModuleMsgHandle() {
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_DNODE_LIST)] = queryProcessDnodeListRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp;

View File

@ -503,6 +503,38 @@ const char *cfgDtypeStr(ECfgDataType type) {
}
}
void cfgDumpItemValue(SConfigItem *pItem, char* buf, int32_t bufSize, int32_t* pLen) {
int32_t len = 0;
switch (pItem->dtype) {
case CFG_DTYPE_BOOL:
len = snprintf(buf, bufSize, "%u", pItem->bval);
break;
case CFG_DTYPE_INT32:
len = snprintf(buf, bufSize, "%d", pItem->i32);
break;
case CFG_DTYPE_INT64:
len = snprintf(buf, bufSize, "%" PRId64, pItem->i64);
break;
case CFG_DTYPE_FLOAT:
len = snprintf(buf, bufSize, "%f", pItem->fval);
break;
case CFG_DTYPE_STRING:
case CFG_DTYPE_DIR:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE:
case CFG_DTYPE_NONE:
len = snprintf(buf, bufSize, "%s", pItem->str);
break;
}
if (len > bufSize) {
len = bufSize;
}
*pLen = len;
}
void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) {
if (dump) {
printf(" global config");
@ -996,4 +1028,4 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl
uInfo("fail get apollo url from cmd env file");
return -1;
}
}

View File

@ -99,7 +99,7 @@ if $rows != 1 then
endi
#sql select * from information_schema.`streams`
sql select * from information_schema.user_tables
if $rows != 30 then
if $rows != 31 then
return -1
endi
#sql select * from information_schema.user_table_distributed
@ -197,7 +197,7 @@ if $rows != 1 then
endi
#sql select * from performance_schema.`streams`
sql select * from information_schema.user_tables
if $rows != 30 then
if $rows != 31 then
return -1
endi
#sql select * from information_schema.user_table_distributed
@ -227,5 +227,20 @@ endi
sql_error show create stable t0;
sql show variables;
if $rows != 4 then
return -1
endi
sql show dnode 1 variables;
if $rows != 114 then
return -1
endi
sql show local variables;
if $rows != 50 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT

View File

@ -520,6 +520,16 @@ bool shellIsLimitQuery(const char *sql) {
return false;
}
bool shellIsShowQuery(const char *sql) {
//todo refactor
if (taosStrCaseStr(sql, "show ") != NULL) {
return true;
}
return false;
}
int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) {
TAOS_ROW row = taos_fetch_row(tres);
if (row == NULL) {
@ -682,7 +692,7 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) {
uint64_t resShowMaxNum = UINT64_MAX;
if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsLimitQuery(sql)) {
if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsLimitQuery(sql) && !shellIsShowQuery(sql)) {
resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM;
}