Merge pull request #9055 from taosdata/feature/dnode3

Feature/dnode3
This commit is contained in:
Shengliang Guan 2021-12-13 10:22:54 +08:00 committed by GitHub
commit 1cdacd2a61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 1318 additions and 630 deletions

View File

@ -74,10 +74,10 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_DB, "compact-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_FUNCTION, "create-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RETRIEVE_FUNCTION, "retrieve-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_FUNCTION, "drop-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE, "create-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE, "alter-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STB, "create-stb" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STB, "alter-stb" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STB, "drop-stb" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STB_VGROUP, "stb-vgroup" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_STREAM, "kill-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_CONN, "kill-conn" )
@ -94,9 +94,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
// message from vnode to dnode
// message from mnode to vnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE_IN, "create-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE_IN, "alter-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE_IN, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STB_IN, "create-stb-in" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STB_IN, "alter-stb-in" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STB_IN, "drop-stb-in" )
// message from mnode to mnode
// message from mnode to qnode
// message from mnode to dnode
@ -159,7 +159,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_DNODE,
TSDB_MGMT_TABLE_MNODE,
TSDB_MGMT_TABLE_VGROUP,
TSDB_MGMT_TABLE_STABLE,
TSDB_MGMT_TABLE_STB,
TSDB_MGMT_TABLE_MODULE,
TSDB_MGMT_TABLE_QUERIES,
TSDB_MGMT_TABLE_STREAMS,
@ -294,7 +294,7 @@ typedef struct {
uint64_t superTableUid;
uint64_t createdTime;
char tableFname[TSDB_TABLE_FNAME_LEN];
char stableFname[TSDB_TABLE_FNAME_LEN];
char stbFname[TSDB_TABLE_FNAME_LEN];
char data[];
} SMDCreateTableMsg;
@ -311,16 +311,23 @@ typedef struct {
} SCreateTableMsg;
typedef struct {
int32_t numOfTables;
int32_t contLen;
} SCMCreateTableMsg;
char name[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
int32_t numOfTags;
int32_t numOfColumns;
SSchema pSchema[];
} SCreateStbMsg;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
// if user specify DROP STABLE, this flag will be set. And an error will be returned if it is not a super table
int8_t supertable;
int8_t igNotExists;
} SCMDropTableMsg;
} SDropStbMsg;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t alterType;
SSchema schema;
} SAlterStbMsg;
typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN];
@ -663,7 +670,6 @@ typedef struct {
typedef struct {
int32_t statusInterval;
int32_t mnodeEqualVnodeNum;
int64_t checkTime; // 1970-01-01 00:00:00.000
char timezone[TSDB_TIMEZONE_LEN]; // tsTimezone
char locale[TSDB_LOCALE_LEN]; // tsLocale
@ -689,7 +695,7 @@ typedef struct SStatusMsg {
int32_t sver;
int32_t dnodeId;
int32_t clusterId;
uint32_t rebootTime; // time stamp for last reboot
int64_t rebootTime; // time stamp for last reboot
int16_t numOfCores;
int16_t numOfSupportMnodes;
int16_t numOfSupportVnodes;
@ -765,7 +771,7 @@ typedef struct {
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
} SStableInfoMsg;
} SStbInfoMsg;
typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN];
@ -797,8 +803,8 @@ typedef struct {
} SVgroupsMsg, SVgroupsInfo;
typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN]; // table id
char stableFname[TSDB_TABLE_FNAME_LEN];
char tbFname[TSDB_TABLE_FNAME_LEN]; // table id
char stbFname[TSDB_TABLE_FNAME_LEN];
int32_t numOfTags;
int32_t numOfColumns;
int8_t precision;
@ -862,7 +868,7 @@ typedef struct {
typedef struct {
int32_t dnodeId;
char config[128];
char config[TSDB_DNODE_CONFIG_LEN];
} SCfgDnodeMsg;
typedef struct {

View File

@ -108,7 +108,6 @@ extern int8_t tsEnableBalance;
extern int8_t tsAlternativeRole;
extern int32_t tsBalanceInterval;
extern int32_t tsOfflineThreshold;
extern int32_t tsMnodeEqualVnodeNum;
extern int8_t tsEnableFlowCtrl;
extern int8_t tsEnableSlaveQuery;
extern int8_t tsEnableAdjustMaster;

View File

@ -33,7 +33,6 @@ typedef struct {
int16_t numOfSupportQnodes;
int8_t enableTelem;
int32_t statusInterval;
int32_t mnodeEqualVnodeNum;
float numOfThreadsPerCore;
float ratioOfQueryCores;
int32_t maxShellConns;

View File

@ -47,7 +47,6 @@ typedef struct SMnodeCfg {
int32_t sver;
int8_t enableTelem;
int32_t statusInterval;
int32_t mnodeEqualVnodeNum;
int32_t shellActivityTimer;
char *timezone;
char *locale;

View File

@ -158,8 +158,8 @@ typedef enum {
SDB_USER = 5,
SDB_AUTH = 6,
SDB_ACCT = 7,
SDB_VGROUP = 9,
SDB_STABLE = 9,
SDB_VGROUP = 8,
SDB_STB = 9,
SDB_DB = 10,
SDB_FUNC = 11,
SDB_MAX = 12

View File

@ -152,22 +152,23 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SDB_INVALID_DATA_LEN TAOS_DEF_ERROR_CODE(0, 0x032A)
#define TSDB_CODE_SDB_INVALID_DATA_CONTENT TAOS_DEF_ERROR_CODE(0, 0x032B)
#define TSDB_CODE_MND_DNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0330) //"DNode already exists")
#define TSDB_CODE_MND_DNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0331) //"DNode does not exist")
// mnode-dnode
#define TSDB_CODE_MND_DNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0330)
#define TSDB_CODE_MND_DNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0331)
#define TSDB_CODE_MND_NO_ENOUGH_DNODES TAOS_DEF_ERROR_CODE(0, 0x0332)
#define TSDB_CODE_MND_INVALID_CLUSTER_CFG TAOS_DEF_ERROR_CODE(0, 0x0333)
#define TSDB_CODE_MND_INVALID_CLUSTER_ID TAOS_DEF_ERROR_CODE(0, 0x0334)
#define TSDB_CODE_MND_INVALID_DNODE_CFG TAOS_DEF_ERROR_CODE(0, 0x0335)
#define TSDB_CODE_MND_INVALID_DNODE_EP TAOS_DEF_ERROR_CODE(0, 0x0336)
#define TSDB_CODE_MND_INVALID_DNODE_ID TAOS_DEF_ERROR_CODE(0, 0x0337)
// mnode-vgroup
#define TSDB_CODE_MND_VGROUP_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0332) //"VGroup does not exist")
#define TSDB_CODE_MND_NO_REMOVE_MASTER TAOS_DEF_ERROR_CODE(0, 0x0333) //"Master DNode cannot be removed")
#define TSDB_CODE_MND_NO_ENOUGH_DNODES TAOS_DEF_ERROR_CODE(0, 0x0334) //"Out of DNodes")
#define TSDB_CODE_MND_CLUSTER_CFG_INCONSISTENT TAOS_DEF_ERROR_CODE(0, 0x0335) //"Cluster cfg inconsistent")
#define TSDB_CODE_MND_INVALID_DNODE_CFG_OPTION TAOS_DEF_ERROR_CODE(0, 0x0336) //"Invalid dnode cfg option")
#define TSDB_CODE_MND_BALANCE_ENABLED TAOS_DEF_ERROR_CODE(0, 0x0337) //"Balance already enabled")
#define TSDB_CODE_MND_VGROUP_NOT_IN_DNODE TAOS_DEF_ERROR_CODE(0, 0x0338) //"Vgroup not in dnode")
#define TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE TAOS_DEF_ERROR_CODE(0, 0x0339) //"Vgroup already in dnode")
#define TSDB_CODE_MND_DNODE_NOT_FREE TAOS_DEF_ERROR_CODE(0, 0x033A) //"Dnode not avaliable")
#define TSDB_CODE_MND_INVALID_CLUSTER_ID TAOS_DEF_ERROR_CODE(0, 0x033B) //"Cluster id not match")
#define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x033C) //"Cluster not ready")
#define TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED TAOS_DEF_ERROR_CODE(0, 0x033D) //"Dnode Id not configured")
#define TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED TAOS_DEF_ERROR_CODE(0, 0x033E) //"Dnode Ep not configured")
// mnode-acct
#define TSDB_CODE_MND_ACCT_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0340) //"Account already exists")
#define TSDB_CODE_MND_ACCT_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0341) //"Invalid account")
#define TSDB_CODE_MND_INVALID_ACCT_OPTION TAOS_DEF_ERROR_CODE(0, 0x0342) //"Invalid account options")
@ -183,8 +184,18 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_MNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0348) //"Mnode already exists")
#define TSDB_CODE_MND_MNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0349) //"Mnode not there")
// mnode-table
#define TSDB_CODE_MND_TABLE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360) //"Table already exists")
// mnode-stable
#define TSDB_CODE_MND_STB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_STB_INVALID_IGEXIST TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_STB_INVALID_COLS_NUM TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_STB_INVALID_TAGS_NUM TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_STB_INVALID_COL_TYPE TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_STB_INVALID_COL_ID TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_STB_INVALID_COL_BYTES TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_STB_INVALID_COL_NAME TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0361) //"Table name too long")
#define TSDB_CODE_MND_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0362) //"Table does not exist")
#define TSDB_CODE_MND_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0363) //"Invalid table type in tsdb")

View File

@ -209,6 +209,8 @@ do { \
#define TSDB_STEP_NAME_LEN 32
#define TSDB_STEP_DESC_LEN 128
#define TSDB_DNODE_CONFIG_LEN 128
#define TSDB_MQTT_HOSTNAME_LEN 64
#define TSDB_MQTT_PORT_LEN 8
#define TSDB_MQTT_USER_LEN 24

View File

@ -46,7 +46,7 @@ int64_t tsDnodeStartTime = 0;
// common
int32_t tsRpcTimer = 300;
int32_t tsRpcMaxTime = 600; // seconds;
int32_t tsRpcForceTcp = 0; //disable this, means query, show command use udp protocol as default
int32_t tsRpcForceTcp = 1; //disable this, means query, show command use udp protocol as default
int32_t tsMaxShellConns = 50000;
int32_t tsMaxConnections = 5000;
int32_t tsShellActivityTimer = 3; // second
@ -156,7 +156,6 @@ int8_t tsEnableBalance = 1;
int8_t tsAlternativeRole = 0;
int32_t tsBalanceInterval = 300; // seconds
int32_t tsOfflineThreshold = 86400 * 10; // seconds of 10 days
int32_t tsMnodeEqualVnodeNum = 4;
int8_t tsEnableFlowCtrl = 1;
int8_t tsEnableSlaveQuery = 1;
int8_t tsEnableAdjustMaster = 1;
@ -1072,17 +1071,6 @@ static void doInitGlobalConfig(void) {
cfg.maxValue = 10000000;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_GB;
taosInitConfigOption(cfg);
// module configs
cfg.option = "mnodeEqualVnodeNum";
cfg.ptr = &tsMnodeEqualVnodeNum;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1000;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// module configs
@ -1595,7 +1583,7 @@ static void doInitGlobalConfig(void) {
taosInitConfigOption(cfg);
assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM);
#else
assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5);
//assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5);
#endif
}

View File

@ -142,7 +142,6 @@ void dmnInitOption(SDnodeOpt *pOption) {
pOption->numOfSupportVnodes = 1;
pOption->numOfSupportQnodes = 1;
pOption->statusInterval = tsStatusInterval;
pOption->mnodeEqualVnodeNum = tsMnodeEqualVnodeNum;
pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore;
pOption->ratioOfQueryCores = tsRatioOfQueryCores;
pOption->maxShellConns = tsMaxShellConns;

View File

@ -31,6 +31,7 @@ int32_t dndGetClusterId(SDnode *pDnode);
void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg);
void dndSendStatusMsg(SDnode *pDnode);
#ifdef __cplusplus
}

View File

@ -58,7 +58,8 @@ typedef struct {
int32_t dnodeId;
int32_t dropped;
int32_t clusterId;
uint32_t rebootTime;
int64_t rebootTime;
int8_t statusSent;
SEpSet mnodeEpSet;
char *file;
SHashObj *dnodeHash;

View File

@ -335,7 +335,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
return 0;
}
static void dndSendStatusMsg(SDnode *pDnode) {
void dndSendStatusMsg(SDnode *pDnode) {
int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
SStatusMsg *pStatus = rpcMallocCont(contLen);
@ -349,7 +349,7 @@ static void dndSendStatusMsg(SDnode *pDnode) {
pStatus->sver = htonl(pDnode->opt.sver);
pStatus->dnodeId = htonl(pMgmt->dnodeId);
pStatus->clusterId = htonl(pMgmt->clusterId);
pStatus->rebootTime = htonl(pMgmt->rebootTime);
pStatus->rebootTime = htobe64(pMgmt->rebootTime);
pStatus->numOfCores = htons(pDnode->opt.numOfCores);
pStatus->numOfSupportMnodes = htons(pDnode->opt.numOfCores);
pStatus->numOfSupportVnodes = htons(pDnode->opt.numOfCores);
@ -357,7 +357,6 @@ static void dndSendStatusMsg(SDnode *pDnode) {
tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN);
pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval);
pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(pDnode->opt.mnodeEqualVnodeNum);
pStatus->clusterCfg.checkTime = 0;
char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
@ -371,6 +370,7 @@ static void dndSendStatusMsg(SDnode *pDnode) {
contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad);
SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS};
pMgmt->statusSent = 1;
dndSendMsgToMnode(pDnode, &rpcMsg);
}
@ -383,7 +383,7 @@ static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
pMgmt->dnodeId = pCfg->dnodeId;
pMgmt->clusterId = pCfg->clusterId;
pMgmt->dropped = pCfg->dropped;
(void)dndWriteDnodes(pDnode);
dndWriteDnodes(pDnode);
taosWUnLockLatch(&pMgmt->latch);
}
}
@ -409,11 +409,16 @@ static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) {
}
static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pEpSet && pEpSet->numOfEps > 0) {
dndUpdateMnodeEpSet(pDnode, pEpSet);
}
if (pMsg->code != TSDB_CODE_SUCCESS) return;
if (pMsg->code != TSDB_CODE_SUCCESS) {
pMgmt->statusSent = 0;
return;
}
SStatusRsp *pRsp = pMsg->pCont;
SDnodeCfg *pCfg = &pRsp->dnodeCfg;
@ -421,7 +426,10 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
pCfg->clusterId = htonl(pCfg->clusterId);
dndUpdateDnodeCfg(pDnode, pCfg);
if (pCfg->dropped) return;
if (pCfg->dropped) {
pMgmt->statusSent = 0;
return;
}
SDnodeEps *pDnodeEps = &pRsp->dnodeEps;
pDnodeEps->num = htonl(pDnodeEps->num);
@ -431,6 +439,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
dndUpdateDnodeEps(pDnode, pDnodeEps);
pMgmt->statusSent = 0;
}
static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); }
@ -461,16 +470,17 @@ static void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) {
}
static void *dnodeThreadRoutine(void *param) {
SDnode *pDnode = param;
int32_t ms = pDnode->opt.statusInterval * 1000;
SDnode *pDnode = param;
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
int32_t ms = pDnode->opt.statusInterval * 1000;
while (true) {
taosMsleep(ms);
pthread_testcancel();
if (dndGetStat(pDnode) == DND_STAT_RUNNING) {
if (dndGetStat(pDnode) == DND_STAT_RUNNING && !pMgmt->statusSent) {
dndSendStatusMsg(pDnode);
}
taosMsleep(ms);
}
}
@ -478,7 +488,7 @@ int32_t dndInitDnode(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
pMgmt->dnodeId = 0;
pMgmt->rebootTime = taosGetTimestampSec();
pMgmt->rebootTime = taosGetTimestampMs();
pMgmt->dropped = 0;
pMgmt->clusterId = 0;

View File

@ -334,7 +334,6 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOption->cfg.sver = pDnode->opt.sver;
pOption->cfg.enableTelem = pDnode->opt.enableTelem;
pOption->cfg.statusInterval = pDnode->opt.statusInterval;
pOption->cfg.mnodeEqualVnodeNum = pDnode->opt.mnodeEqualVnodeNum;
pOption->cfg.shellActivityTimer = pDnode->opt.shellActivityTimer;
pOption->cfg.timezone = pDnode->opt.timezone;
pOption->cfg.charset = pDnode->opt.charset;

View File

@ -69,10 +69,10 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_RETRIEVE_FUNCTION] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_STB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_STB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_STB_VGROUP] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_STREAM] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_CONN] = dndProcessMnodeWriteMsg;
@ -84,12 +84,12 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dndProcessDnodeReq;
// message from mnode to vnode
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_STB_IN] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_STB_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_STB_IN] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_STB_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STB_IN] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STB_IN_RSP] = dndProcessMnodeWriteMsg;
// message from mnode to dnode
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dndProcessVnodeMgmtMsg;
@ -130,7 +130,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
if (pMsg == NULL || pMsg->pCont == NULL) return;
dTrace("RPC %p, rsp:%s app:%p is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
dTrace("RPC %p, rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]);
rpcFreeCont(pMsg->pCont);
return;
}
@ -138,10 +138,9 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
DndMsgFp fp = pMgmt->msgFp[msgType];
if (fp != NULL) {
(*fp)(pDnode, pMsg, pEpSet);
dTrace("RPC %p, rsp:%s app:%p is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->ahandle,
pMsg->code & 0XFFFF);
dTrace("RPC %p, rsp:%s is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->code & 0XFFFF);
} else {
dError("RPC %p, rsp:%s app:%p not processed", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
}
rpcFreeCont(pMsg->pCont);
}

View File

@ -61,15 +61,15 @@ static int32_t dndCheckRunning(char *dataDir) {
FileFd fd = taosOpenFileCreateWriteTrunc(filepath);
if (fd < 0) {
dError("failed to lock file:%s since %s, quit", filepath, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to lock file:%s since %s, quit", filepath, terrstr());
return -1;
}
int32_t ret = taosLockFile(fd);
if (ret != 0) {
dError("failed to lock file:%s since %s, quit", filepath, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to lock file:%s since %s, quit", filepath, terrstr());
taosCloseFile(fd);
return -1;
}
@ -194,6 +194,7 @@ SDnode *dndInit(SDnodeOpt *pOption) {
}
dndSetStat(pDnode, DND_STAT_RUNNING);
dndSendStatusMsg(pDnode);
dndReportStartup(pDnode, "TDengine", "initialized successfully");
dInfo("TDengine is initialized successfully");

View File

@ -1,5 +1,6 @@
# add_subdirectory(acct)
# add_subdirectory(cluster)
add_subdirectory(dnode)
# add_subdirectory(profile)
# add_subdirectory(show)
add_subdirectory(user)

View File

@ -80,7 +80,7 @@ TEST_F(DndTestCluster, ShowCluster) {
EXPECT_NE(pRsp->showId, 0);
EXPECT_EQ(pMeta->contLen, 0);
EXPECT_STREQ(pMeta->tableFname, "show cluster");
EXPECT_STREQ(pMeta->tbFname, "show cluster");
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);

View File

@ -0,0 +1,29 @@
add_executable(dndTestDnode "")
target_sources(dndTestDnode
PRIVATE
"dnode.cpp"
"../sut/deploy.cpp"
)
target_link_libraries(
dndTestDnode
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories(dndTestDnode
PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
"${CMAKE_CURRENT_SOURCE_DIR}/../sut"
)
enable_testing()
add_test(
NAME dndTestDnode
COMMAND dndTestDnode
)

View File

@ -0,0 +1,370 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "deploy.h"
class DndTestDnode : public ::testing::Test {
public:
static SServer* CreateServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
SServer* pServer = createServer(path, fqdn, port, firstEp);
ASSERT(pServer);
return pServer;
}
static void SetUpTestSuite() {
initLog("/tmp/dndTestDnode1");
const char* fqdn = "localhost";
const char* firstEp = "localhost:9521";
pServer1 = CreateServer("/tmp/dndTestDnode1", fqdn, 9521, firstEp);
pServer2 = CreateServer("/tmp/dndTestDnode2", fqdn, 9522, firstEp);
// pServer3 = CreateServer("/tmp/dndTestDnode3", fqdn, 9523, firstEp);
// pServer4 = CreateServer("/tmp/dndTestDnode4", fqdn, 9524, firstEp);
// pServer5 = CreateServer("/tmp/dndTestDnode5", fqdn, 9525, firstEp);
pClient = createClient("root", "taosdata", fqdn, 9521);
taosMsleep(300);
}
static void TearDownTestSuite() {
dropServer(pServer1);
dropServer(pServer2);
dropServer(pServer3);
dropServer(pServer4);
dropServer(pServer5);
dropClient(pClient);
}
static SServer* pServer1;
static SServer* pServer2;
static SServer* pServer3;
static SServer* pServer4;
static SServer* pServer5;
static SClient* pClient;
public:
void SetUp() override {}
void TearDown() override {}
void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns) {
//--- meta ---
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pShow->type = showType;
strcpy(pShow->db, "");
SRpcMsg showRpcMsg = {0};
showRpcMsg.pCont = pShow;
showRpcMsg.contLen = sizeof(SShowMsg);
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &showRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
ASSERT_NE(pShowRsp, nullptr);
pShowRsp->showId = htonl(pShowRsp->showId);
pMeta = &pShowRsp->tableMeta;
pMeta->numOfTags = htons(pMeta->numOfTags);
pMeta->numOfColumns = htons(pMeta->numOfColumns);
pMeta->sversion = htons(pMeta->sversion);
pMeta->tversion = htons(pMeta->tversion);
pMeta->tuid = htobe64(pMeta->tuid);
pMeta->suid = htobe64(pMeta->suid);
showId = pShowRsp->showId;
EXPECT_NE(pShowRsp->showId, 0);
EXPECT_STREQ(pMeta->tbFname, showName);
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->numOfColumns, columns);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);
EXPECT_EQ(pMeta->update, 0);
EXPECT_EQ(pMeta->sversion, 0);
EXPECT_EQ(pMeta->tversion, 0);
EXPECT_EQ(pMeta->tuid, 0);
EXPECT_EQ(pMeta->suid, 0);
}
void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) {
SSchema* pSchema = &pMeta->pSchema[index];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, type);
EXPECT_EQ(pSchema->bytes, bytes);
EXPECT_STREQ(pSchema->name, name);
}
void SendThenCheckShowRetrieveMsg(int32_t rows) {
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pRetrieve->showId = htonl(showId);
pRetrieve->free = 0;
SRpcMsg retrieveRpcMsg = {0};
retrieveRpcMsg.pCont = pRetrieve;
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &retrieveRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
ASSERT_NE(pRetrieveRsp, nullptr);
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
pRetrieveRsp->offset = htobe64(pRetrieveRsp->offset);
pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen);
EXPECT_EQ(pRetrieveRsp->numOfRows, rows);
EXPECT_EQ(pRetrieveRsp->offset, 0);
EXPECT_EQ(pRetrieveRsp->useconds, 0);
EXPECT_EQ(pRetrieveRsp->completed, 1);
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRetrieveRsp->compressed, 0);
EXPECT_EQ(pRetrieveRsp->reserved, 0);
EXPECT_EQ(pRetrieveRsp->compLen, 0);
pData = pRetrieveRsp->data;
pos = 0;
}
void CheckInt16(int16_t val) {
int16_t data = *((int16_t*)(pData + pos));
pos += sizeof(int16_t);
EXPECT_EQ(data, val);
}
void CheckInt64(int64_t val) {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_EQ(data, val);
}
void CheckTimestamp() {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_GT(data, 0);
}
void CheckBinary(const char* val, int32_t len) {
pos += sizeof(VarDataLenT);
char* data = (char*)(pData + pos);
pos += len;
EXPECT_STREQ(data, val);
}
int32_t showId;
STableMetaMsg* pMeta;
SRetrieveTableRsp* pRetrieveRsp;
char* pData;
int32_t pos;
};
SServer* DndTestDnode::pServer1;
SServer* DndTestDnode::pServer2;
SServer* DndTestDnode::pServer3;
SServer* DndTestDnode::pServer4;
SServer* DndTestDnode::pServer5;
SClient* DndTestDnode::pClient;
TEST_F(DndTestDnode, ShowDnode) {
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
CheckSchema(0, TSDB_DATA_TYPE_SMALLINT, 2, "id");
CheckSchema(1, TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN + VARSTR_HEADER_SIZE, "end point");
CheckSchema(2, TSDB_DATA_TYPE_SMALLINT, 2, "vnodes");
CheckSchema(3, TSDB_DATA_TYPE_SMALLINT, 2, "max vnodes");
CheckSchema(4, TSDB_DATA_TYPE_BINARY, 10 + VARSTR_HEADER_SIZE, "status");
CheckSchema(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "create time");
CheckSchema(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "offline reason");
SendThenCheckShowRetrieveMsg(1);
CheckInt16(1);
CheckBinary("localhost:9521", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(1);
CheckBinary("ready", 10);
CheckTimestamp();
CheckBinary("", 24);
}
TEST_F(DndTestDnode, CreateDnode_01) {
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg));
strcpy(pReq->ep, "localhost:9522");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateDnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DNODE;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
taosMsleep(1300);
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
SendThenCheckShowRetrieveMsg(2);
CheckInt16(1);
CheckInt16(2);
CheckBinary("localhost:9521", TSDB_EP_LEN);
CheckBinary("localhost:9522", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(0);
CheckInt16(1);
CheckInt16(1);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckTimestamp();
CheckTimestamp();
CheckBinary("", 24);
CheckBinary("", 24);
}
#if 0
TEST_F(DndTestDnode, AlterUser_01) {
ASSERT_NE(pClient, nullptr);
//--- drop user ---
SAlterUserMsg* pReq = (SAlterUserMsg*)rpcMallocCont(sizeof(SAlterUserMsg));
strcpy(pReq->user, "u1");
strcpy(pReq->pass, "p2");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SAlterUserMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_USER;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
//--- meta ---
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pShow->type = TSDB_MGMT_TABLE_USER;
SRpcMsg showRpcMsg = {0};
showRpcMsg.pCont = pShow;
showRpcMsg.contLen = sizeof(SShowMsg);
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &showRpcMsg);
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
STableMetaMsg* pMeta = &pShowRsp->tableMeta;
pMeta->numOfColumns = htons(pMeta->numOfColumns);
EXPECT_EQ(pMeta->numOfColumns, 4);
//--- retrieve ---
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pRetrieve->showId = pShowRsp->showId;
SRpcMsg retrieveRpcMsg = {0};
retrieveRpcMsg.pCont = pRetrieve;
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &retrieveRpcMsg);
SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
EXPECT_EQ(pRetrieveRsp->numOfRows, 3);
char* pData = pRetrieveRsp->data;
int32_t pos = 0;
char* strVal = NULL;
//--- name ---
{
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "u1");
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "root");
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "_root");
}
}
TEST_F(DndTestDnode, DropUser_01) {
ASSERT_NE(pClient, nullptr);
//--- drop user ---
SDropUserMsg* pReq = (SDropUserMsg*)rpcMallocCont(sizeof(SDropUserMsg));
strcpy(pReq->user, "u1");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SDropUserMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DROP_USER;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
//--- meta ---
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pShow->type = TSDB_MGMT_TABLE_USER;
SRpcMsg showRpcMsg = {0};
showRpcMsg.pCont = pShow;
showRpcMsg.contLen = sizeof(SShowMsg);
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &showRpcMsg);
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
STableMetaMsg* pMeta = &pShowRsp->tableMeta;
pMeta->numOfColumns = htons(pMeta->numOfColumns);
EXPECT_EQ(pMeta->numOfColumns, 4);
//--- retrieve ---
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pRetrieve->showId = pShowRsp->showId;
SRpcMsg retrieveRpcMsg = {0};
retrieveRpcMsg.pCont = pRetrieve;
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &retrieveRpcMsg);
SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
EXPECT_EQ(pRetrieveRsp->numOfRows, 2);
char* pData = pRetrieveRsp->data;
int32_t pos = 0;
char* strVal = NULL;
//--- name ---
{
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "root");
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "_root");
}
}
#endif

View File

@ -139,7 +139,7 @@ TEST_F(DndTestProfile, SConnectMsg_03) {
EXPECT_NE(pRsp->showId, 0);
EXPECT_EQ(pMeta->contLen, 0);
EXPECT_STREQ(pMeta->tableFname, "");
EXPECT_STREQ(pMeta->tbFname, "");
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);
@ -480,7 +480,7 @@ TEST_F(DndTestProfile, SKillQueryMsg_03) {
EXPECT_NE(pRsp->showId, 0);
EXPECT_EQ(pMeta->contLen, 0);
EXPECT_STREQ(pMeta->tableFname, "");
EXPECT_STREQ(pMeta->tbFname, "");
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);
@ -667,7 +667,7 @@ TEST_F(DndTestProfile, SKillStreamMsg_03) {
EXPECT_NE(pRsp->showId, 0);
EXPECT_EQ(pMeta->contLen, 0);
EXPECT_STREQ(pMeta->tableFname, "");
EXPECT_STREQ(pMeta->tbFname, "");
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);

View File

@ -141,7 +141,7 @@ TEST_F(DndTestShow, SShowMsg_04) {
EXPECT_NE(pRsp->showId, 0);
EXPECT_EQ(pMeta->contLen, 0);
EXPECT_STREQ(pMeta->tableFname, "");
EXPECT_STREQ(pMeta->tbFname, "");
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);

View File

@ -16,7 +16,7 @@
#include "deploy.h"
void initLog(const char* path) {
dDebugFlag = 0;
dDebugFlag = 207;
vDebugFlag = 0;
mDebugFlag = 207;
cDebugFlag = 0;
@ -50,14 +50,13 @@ void* runServer(void* param) {
}
}
void initOption(SDnodeOpt* pOption, const char* path, const char* fqdn, uint16_t port) {
void initOption(SDnodeOpt* pOption, const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
pOption->sver = 1;
pOption->numOfCores = 1;
pOption->numOfSupportMnodes = 1;
pOption->numOfSupportVnodes = 1;
pOption->numOfSupportQnodes = 1;
pOption->statusInterval = 1;
pOption->mnodeEqualVnodeNum = 1;
pOption->numOfThreadsPerCore = 1;
pOption->ratioOfQueryCores = 1;
pOption->maxShellConns = 1000;
@ -66,16 +65,15 @@ void initOption(SDnodeOpt* pOption, const char* path, const char* fqdn, uint16_t
strcpy(pOption->dataDir, path);
snprintf(pOption->localEp, TSDB_EP_LEN, "%s:%u", fqdn, port);
snprintf(pOption->localFqdn, TSDB_FQDN_LEN, "%s", fqdn);
snprintf(pOption->firstEp, TSDB_EP_LEN, "%s:%u", fqdn, port);
snprintf(pOption->firstEp, TSDB_EP_LEN, "%s", firstEp);
}
SServer* createServer(const char* path, const char* fqdn, uint16_t port) {
SServer* createServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
taosRemoveDir(path);
taosMkDir(path);
initLog(path);
SDnodeOpt option = {0};
initOption(&option, path, fqdn, port);
initOption(&option, path, fqdn, port, firstEp);
SDnode* pDnode = dndInit(&option);
ASSERT(pDnode);
@ -91,6 +89,7 @@ SServer* createServer(const char* path, const char* fqdn, uint16_t port) {
}
void dropServer(SServer* pServer) {
if (pServer == NULL) return;
if (pServer->threadId != NULL) {
taosDestoryThread(pServer->threadId);
}
@ -99,7 +98,8 @@ void dropServer(SServer* pServer) {
void processClientRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SClient* pClient = (SClient*)parent;
pClient->pRsp = pMsg;
// taosMsleep(1000000);
uInfo("response:%s from dnode, pCont:%p contLen:%d code:0x%X", taosMsg[pMsg->msgType], pMsg->pCont, pMsg->contLen,
pMsg->code);
tsem_post(&pClient->sem);
}
@ -145,7 +145,7 @@ void sendMsg(SClient* pClient, SRpcMsg* pMsg) {
epSet.inUse = 0;
epSet.numOfEps = 1;
epSet.port[0] = pClient->port;
strcpy(epSet.fqdn[0], pClient->fqdn);
memcpy(epSet.fqdn[0], pClient->fqdn, TSDB_FQDN_LEN);
rpcSendRequest(pClient->clientRpc, &epSet, pMsg, NULL);
tsem_wait(&pClient->sem);

View File

@ -39,8 +39,11 @@ typedef struct {
tsem_t sem;
} SClient;
SServer* createServer(const char* path, const char* fqdn, uint16_t port);
void initLog(const char* path);
SServer* createServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp);
void dropServer(SServer* pServer);
SClient* createClient(const char* user, const char* pass, const char* fqdn, uint16_t port);
void dropClient(SClient* pClient);
void sendMsg(SClient* pClient, SRpcMsg* pMsg);
// class DndTest

View File

@ -26,8 +26,9 @@ class DndTestUser : public ::testing::Test {
const char* path = "/tmp/dndTestUser";
const char* fqdn = "localhost";
uint16_t port = 9524;
const char* firstEp = "localhost:9524";
pServer = createServer(path, fqdn, port);
pServer = createServer(path, fqdn, port, firstEp);
ASSERT(pServer);
pClient = createClient(user, pass, fqdn, port);
}
@ -79,7 +80,7 @@ TEST_F(DndTestUser, ShowUser) {
EXPECT_NE(pShowRsp->showId, 0);
EXPECT_EQ(pMeta->contLen, 0);
EXPECT_STREQ(pMeta->tableFname, "show users");
EXPECT_STREQ(pMeta->tbFname, "show users");
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);

View File

@ -92,7 +92,6 @@ typedef enum {
DND_REASON_VERSION_NOT_MATCH,
DND_REASON_DNODE_ID_NOT_MATCH,
DND_REASON_CLUSTER_ID_NOT_MATCH,
DND_REASON_MN_EQUAL_VN_NOT_MATCH,
DND_REASON_STATUS_INTERVAL_NOT_MATCH,
DND_REASON_TIME_ZONE_NOT_MATCH,
DND_REASON_LOCALE_NOT_MATCH,
@ -125,6 +124,7 @@ typedef struct SDnodeObj {
int64_t createdTime;
int64_t updateTime;
int64_t rebootTime;
int64_t lastAccessTime;
int32_t accessTimes;
int16_t numOfMnodes;
int16_t numOfVnodes;
@ -241,7 +241,7 @@ typedef struct SVgObj {
SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
} SVgObj;
typedef struct SStableObj {
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
char db[TSDB_FULL_DB_NAME_LEN];
int64_t createdTime;
@ -251,9 +251,8 @@ typedef struct SStableObj {
int32_t numOfColumns;
int32_t numOfTags;
SRWLatch lock;
SSchema *columnSchema;
SSchema *tagSchema;
} SStableObj;
SSchema *pSchema;
} SStbObj;
typedef struct SFuncObj {
char name[TSDB_FUNC_NAME_LEN];

View File

@ -88,6 +88,8 @@ void mndSendMsgToMnode(SMnode *pMnode, SRpcMsg *pMsg);
void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg);
void mndSetMsgHandle(SMnode *pMnode, int32_t msgType, MndMsgFp fp);
uint64_t mndGenerateUid(char *name, int32_t len) ;
#ifdef __cplusplus
}
#endif

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_STABLE_H_
#define _TD_MND_STABLE_H_
#ifndef _TD_MND_STB_H_
#define _TD_MND_STB_H_
#include "mndInt.h"
@ -22,11 +22,11 @@
extern "C" {
#endif
int32_t mndInitStable(SMnode *pMnode);
void mndCleanupStable(SMnode *pMnode);
int32_t mndInitStb(SMnode *pMnode);
void mndCleanupStb(SMnode *pMnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_MND_STABLE_H_*/
#endif /*_TD_MND_STB_H_*/

View File

@ -159,7 +159,7 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
cols++;
pMeta->numOfColumns = htons(cols);
strcpy(pMeta->tableFname, "show cluster");
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
pShow->numOfColumns = cols;
pShow->offset[0] = 0;

View File

@ -767,7 +767,7 @@ static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMe
pShow->numOfRows = sdbGetSize(pSdb, SDB_DB);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tableFname, mndShowStr(pShow->type));
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
return 0;
}

View File

@ -22,10 +22,13 @@
#include "tutil.h"
#define TSDB_DNODE_VER 1
#define TSDB_DNODE_RESERVE_SIZE 64
#define TSDB_CONFIG_OPTION_LEN 16
#define TSDB_CONIIG_VALUE_LEN 48
#define TSDB_CONFIG_NUMBER 8
static int32_t id = 2;
static const char *offlineReason[] = {
"",
"status msg timeout",
@ -33,7 +36,6 @@ static const char *offlineReason[] = {
"version not match",
"dnodeId not match",
"clusterId not match",
"mnEqualVn not match",
"interval not match",
"timezone not match",
"locale not match",
@ -117,6 +119,7 @@ static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime)
SDB_SET_INT16(pRaw, dataPos, pDnode->port)
SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE)
SDB_SET_DATALEN(pRaw, dataPos);
return pRaw;
@ -142,28 +145,28 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, pRow, dataPos, &pDnode->updateTime)
SDB_GET_INT16(pRaw, pRow, dataPos, &pDnode->port)
SDB_GET_BINARY(pRaw, pRow, dataPos, pDnode->fqdn, TSDB_FQDN_LEN)
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_DNODE_RESERVE_SIZE)
return pRow;
}
static void mnodeResetDnode(SDnodeObj *pDnode) {
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
mTrace("dnode:%d, perform insert action", pDnode->id);
pDnode->rebootTime = 0;
pDnode->lastAccessTime = 0;
pDnode->accessTimes = 0;
pDnode->numOfCores = 0;
pDnode->numOfMnodes = 0;
pDnode->numOfVnodes = 0;
pDnode->numOfQnodes = 0;
pDnode->numOfSupportMnodes = 0;
pDnode->numOfSupportVnodes = 0;
pDnode->numOfSupportQnodes = 0;
pDnode->numOfCores = 0;
pDnode->status = DND_STATUS_OFFLINE;
pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
snprintf(pDnode->ep, TSDB_EP_LEN, "%s:%u", pDnode->fqdn, pDnode->port);
}
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
mTrace("dnode:%d, perform insert action", pDnode->id);
mnodeResetDnode(pDnode);
return 0;
}
@ -174,11 +177,6 @@ static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOldDnode, SDnodeObj *pNewDnode) {
mTrace("dnode:%d, perform update action", pOldDnode->id);
pOldDnode->id = pNewDnode->id;
pOldDnode->createdTime = pNewDnode->createdTime;
pOldDnode->updateTime = pNewDnode->updateTime;
pOldDnode->port = pNewDnode->port;
memcpy(pOldDnode->fqdn, pNewDnode->fqdn, TSDB_FQDN_LEN);
return 0;
}
@ -232,6 +230,7 @@ static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) {
if (pIter == NULL) break;
if (i >= numOfEps) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pDnode);
break;
}
@ -244,20 +243,15 @@ static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) {
pEp->isMnode = 1;
}
i++;
sdbRelease(pSdb, pDnode);
}
pEps->num = htonl(i);
}
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
if (pCfg->mnodeEqualVnodeNum != pMnode->cfg.mnodeEqualVnodeNum) {
mError("\"mnodeEqualVnodeNum\"[%d - %d] cfg inconsistent", pCfg->mnodeEqualVnodeNum,
pMnode->cfg.mnodeEqualVnodeNum);
return DND_REASON_MN_EQUAL_VN_NOT_MATCH;
}
if (pCfg->statusInterval != pMnode->cfg.statusInterval) {
mError("\"statusInterval\"[%d - %d] cfg inconsistent", pCfg->statusInterval, pMnode->cfg.statusInterval);
mError("statusInterval [%d - %d] cfg inconsistent", pCfg->statusInterval, pMnode->cfg.statusInterval);
return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
}
@ -265,18 +259,18 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
if ((0 != strcasecmp(pCfg->timezone, pMnode->cfg.timezone)) && (checkTime != pCfg->checkTime)) {
mError("\"timezone\"[%s - %s] [%" PRId64 " - %" PRId64 "] cfg inconsistent", pCfg->timezone, pMnode->cfg.timezone,
mError("timezone [%s - %s] [%" PRId64 " - %" PRId64 "] cfg inconsistent", pCfg->timezone, pMnode->cfg.timezone,
pCfg->checkTime, checkTime);
return DND_REASON_TIME_ZONE_NOT_MATCH;
}
if (0 != strcasecmp(pCfg->locale, pMnode->cfg.locale)) {
mError("\"locale\"[%s - %s] cfg parameters inconsistent", pCfg->locale, pMnode->cfg.locale);
mError("locale [%s - %s] cfg inconsistent", pCfg->locale, pMnode->cfg.locale);
return DND_REASON_LOCALE_NOT_MATCH;
}
if (0 != strcasecmp(pCfg->charset, pMnode->cfg.charset)) {
mError("\"charset\"[%s - %s] cfg parameters inconsistent.", pCfg->charset, pMnode->cfg.charset);
mError("charset [%s - %s] cfg inconsistent.", pCfg->charset, pMnode->cfg.charset);
return DND_REASON_CHARSET_NOT_MATCH;
}
@ -287,14 +281,12 @@ static void mndParseStatusMsg(SStatusMsg *pStatus) {
pStatus->sver = htonl(pStatus->sver);
pStatus->dnodeId = htonl(pStatus->dnodeId);
pStatus->clusterId = htonl(pStatus->clusterId);
pStatus->rebootTime = htonl(pStatus->rebootTime);
pStatus->rebootTime = htobe64(pStatus->rebootTime);
pStatus->numOfCores = htons(pStatus->numOfCores);
pStatus->numOfSupportMnodes = htons(pStatus->numOfSupportMnodes);
pStatus->numOfSupportVnodes = htons(pStatus->numOfSupportVnodes);
pStatus->numOfSupportQnodes = htons(pStatus->numOfSupportQnodes);
pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval);
pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(pStatus->clusterCfg.mnodeEqualVnodeNum);
pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime);
}
@ -308,7 +300,8 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp);
if (pDnode == NULL) {
mDebug("dnode:%s, not created yet", pStatus->dnodeEp);
return TSDB_CODE_MND_DNODE_NOT_EXIST;
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
return -1;
}
} else {
pDnode = mndAcquireDnode(pMnode, pStatus->dnodeId);
@ -319,7 +312,8 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
}
mError("dnode:%d, %s not exist", pStatus->dnodeId, pStatus->dnodeEp);
mndReleaseDnode(pMnode, pDnode);
return TSDB_CODE_MND_DNODE_NOT_EXIST;
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
return -1;
}
}
@ -329,7 +323,8 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
}
mndReleaseDnode(pMnode, pDnode);
mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver);
return TSDB_CODE_MND_INVALID_MSG_VERSION;
terrno = TSDB_CODE_MND_INVALID_MSG_VERSION;
return -1;
}
if (pStatus->dnodeId == 0) {
@ -341,7 +336,8 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
}
mError("dnode:%d, clusterId %d not match exist %d", pDnode->id, pStatus->clusterId, pMnode->clusterId);
mndReleaseDnode(pMnode, pDnode);
return TSDB_CODE_MND_INVALID_CLUSTER_ID;
terrno != TSDB_CODE_MND_INVALID_CLUSTER_ID;
return -1;
} else {
pDnode->accessTimes++;
mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes);
@ -355,7 +351,8 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
pDnode->offlineReason = ret;
mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]);
mndReleaseDnode(pMnode, pDnode);
return TSDB_CODE_MND_CLUSTER_CFG_INCONSISTENT;
terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
return -1;
}
mInfo("dnode:%d, from offline to online", pDnode->id);
@ -366,6 +363,7 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
pDnode->numOfSupportMnodes = pStatus->numOfSupportMnodes;
pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes;
pDnode->numOfSupportQnodes = pStatus->numOfSupportQnodes;
pDnode->lastAccessTime = taosGetTimestampMs();
pDnode->status = DND_STATUS_READY;
int32_t numOfEps = mndGetDnodeSize(pMnode);
@ -373,7 +371,8 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
SStatusRsp *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
mndReleaseDnode(pMnode, pDnode);
return TSDB_CODE_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pRsp->dnodeCfg.dnodeId = htonl(pDnode->id);
@ -390,13 +389,13 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *pCreate) {
SDnodeObj dnodeObj = {0};
dnodeObj.id = 1; // todo
dnodeObj.id = id++;
dnodeObj.createdTime = taosGetTimestampMs();
dnodeObj.updateTime = dnodeObj.createdTime;
taosGetFqdnPortFromEp(pCreate->ep, dnodeObj.fqdn, &dnodeObj.port);
if (dnodeObj.fqdn[0] == 0 || dnodeObj.port <= 0) {
terrno = TSDB_CODE_SDB_APP_ERROR;
terrno = TSDB_CODE_MND_INVALID_DNODE_EP;
mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr());
return terrno;
}
@ -449,7 +448,7 @@ static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) {
mDebug("dnode:%s, start to create", pCreate->ep);
if (pCreate->ep[0] == 0) {
terrno = TSDB_CODE_SDB_APP_ERROR;
terrno = TSDB_CODE_MND_INVALID_DNODE_EP;
mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr());
return -1;
}
@ -457,7 +456,7 @@ static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) {
SDnodeObj *pDnode = mndAcquireDnodeByEp(pMnode, pCreate->ep);
if (pDnode != NULL) {
mError("dnode:%d, already exist", pDnode->id);
sdbRelease(pMnode->pSdb, pDnode);
mndReleaseDnode(pMnode, pDnode);
terrno = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
return -1;
}
@ -478,7 +477,7 @@ static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode)
mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr());
return -1;
}
mDebug("trans:%d, used to drop user:%d", pTrans->id, pDnode->id);
mDebug("trans:%d, used to drop dnode:%d", pTrans->id, pDnode->id);
SSdbRaw *pRedoRaw = mndDnodeActionEncode(pDnode);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
@ -522,26 +521,26 @@ static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg) {
mDebug("dnode:%d, start to drop", pDrop->dnodeId);
if (pDrop->dnodeId <= 0) {
terrno = TSDB_CODE_SDB_APP_ERROR;
terrno = TSDB_CODE_MND_INVALID_DNODE_ID;
mError("dnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr());
return -1;
}
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDrop->dnodeId);
if (pDnode == NULL) {
mError("dnode:%d, not exist", pDrop->dnodeId);
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
return -1;
}
int32_t code = mndDropDnode(pMnode, pMsg, pDnode);
if (code != 0) {
mError("dnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr());
return -1;
}
sdbRelease(pMnode->pSdb, pDnode);
int32_t code = mndDropDnode(pMnode, pMsg, pDnode);
if (code != 0) {
mndReleaseDnode(pMnode, pDnode);
mError("dnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr());
return -1;
}
mndReleaseDnode(pMnode, pDnode);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
@ -553,7 +552,7 @@ static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg) {
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCfg->dnodeId);
if (pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
mError("dnode:%d, failed to cfg since %s ", pCfg->dnodeId, terrstr());
mError("dnode:%d, failed to config since %s ", pCfg->dnodeId, terrstr());
return -1;
}
@ -562,17 +561,22 @@ static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg) {
SCfgDnodeMsg *pCfgDnode = rpcMallocCont(sizeof(SCfgDnodeMsg));
pCfgDnode->dnodeId = htonl(pCfg->dnodeId);
memcpy(pCfgDnode->config, pCfg->config, 128);
memcpy(pCfgDnode->config, pCfg->config, TSDB_DNODE_CONFIG_LEN);
SRpcMsg rpcMsg = {.msgType = TSDB_MSG_TYPE_CONFIG_DNODE_IN, .pCont = pCfgDnode, .contLen = sizeof(SCfgDnodeMsg)};
SRpcMsg rpcMsg = {.msgType = TSDB_MSG_TYPE_CONFIG_DNODE_IN,
.pCont = pCfgDnode,
.contLen = sizeof(SCfgDnodeMsg),
.ahandle = pMsg->rpcMsg.ahandle};
mInfo("dnode:%d, is configured", pCfg->dnodeId);
mInfo("dnode:%d, app:%p config:%s req send to dnode", pCfg->dnodeId, rpcMsg.ahandle, pCfg->config);
mndSendMsgToDnode(pMnode, &epSet, &rpcMsg);
return 0;
}
static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pMsg) { mInfo("cfg dnode rsp is received"); }
static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pMsg) {
mInfo("app:%p config rsp from dnode", pMsg->rpcMsg.ahandle);
}
static int32_t mndGetConfigMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
int32_t cols = 0;
@ -600,8 +604,7 @@ static int32_t mndGetConfigMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
pShow->numOfRows = TSDB_CONFIG_NUMBER;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pIter = NULL;
strcpy(pMeta->tableFname, mndShowStr(pShow->type));
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
return 0;
}
@ -676,7 +679,7 @@ static int32_t mndGetDnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "cores");
strcpy(pSchema[cols].name, "max vnodes");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
@ -708,7 +711,7 @@ static int32_t mndGetDnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
pShow->numOfRows = sdbGetSize(pSdb, SDB_DNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tableFname, mndShowStr(pShow->type));
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
return 0;
}
@ -740,7 +743,7 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDnode->numOfCores;
*(int16_t *)pWrite = pDnode->numOfSupportVnodes;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
@ -753,7 +756,11 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, offlineReason[pDnode->offlineReason]);
if (pDnode->status == DND_STATUS_READY) {
STR_TO_VARSTR(pWrite, "");
} else {
STR_TO_VARSTR(pWrite, offlineReason[pDnode->offlineReason]);
}
cols++;
numOfRows++;

View File

@ -429,7 +429,7 @@ static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *p
pShow->numOfRows = sdbGetSize(pSdb, SDB_FUNC);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tableFname, "show funcs");
strcpy(pMeta->tbFname, "show funcs");
return 0;
}

View File

@ -405,7 +405,7 @@ static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
pShow->numOfRows = sdbGetSize(pSdb, SDB_MNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tableFname, mndShowStr(pShow->type));
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
return 0;
}

View File

@ -270,7 +270,7 @@ char *mndShowStr(int32_t showType) {
return "show mnodes";
case TSDB_MGMT_TABLE_VGROUP:
return "show vgroups";
case TSDB_MGMT_TABLE_STABLE:
case TSDB_MGMT_TABLE_STB:
return "show stables";
case TSDB_MGMT_TABLE_MODULE:
return "show modules";

View File

@ -1,427 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndStable.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndDb.h"
#include "tname.h"
#define TSDB_STABLE_VER_NUM 1
#define TSDB_STABLE_RESERVE_SIZE 64
static SSdbRaw *mndStableActionEncode(SStableObj *pStb);
static SSdbRow *mndStableActionDecode(SSdbRaw *pRaw);
static int32_t mndStableActionInsert(SSdb *pSdb, SStableObj *pStb);
static int32_t mndStableActionDelete(SSdb *pSdb, SStableObj *pStb);
static int32_t mndStableActionUpdate(SSdb *pSdb, SStableObj *pOldStb, SStableObj *pNewStb);
static int32_t mndProcessCreateStableMsg(SMnodeMsg *pMsg);
static int32_t mndProcessAlterStableMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropStableMsg(SMnodeMsg *pMsg);
static int32_t mndProcessCreateStableInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessAlterStableInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessDropStableInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessStableMetaMsg(SMnodeMsg *pMsg);
static int32_t mndGetStableMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveStables(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextStable(SMnode *pMnode, void *pIter);
int32_t mndInitStable(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_STABLE,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndStableActionEncode,
.decodeFp = (SdbDecodeFp)mndStableActionDecode,
.insertFp = (SdbInsertFp)mndStableActionInsert,
.updateFp = (SdbUpdateFp)mndStableActionUpdate,
.deleteFp = (SdbDeleteFp)mndStableActionDelete};
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_STABLE, mndProcessCreateStableMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_ALTER_STABLE, mndProcessAlterStableMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_STABLE, mndProcessDropStableMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP, mndProcessCreateStableInRsp);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP, mndProcessAlterStableInRsp);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_STABLE_IN_RSP, mndProcessDropStableInRsp);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_TABLE_META, mndProcessStableMetaMsg);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_STABLE, mndGetStableMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STABLE, mndRetrieveStables);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STABLE, mndCancelGetNextStable);
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupStable(SMnode *pMnode) {}
static SSdbRaw *mndStableActionEncode(SStableObj *pStb) {
int32_t size = sizeof(SStableObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema);
SSdbRaw *pRaw = sdbAllocRaw(SDB_STABLE, TSDB_STABLE_VER_NUM, size);
if (pRaw == NULL) return NULL;
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_NAME_LEN)
SDB_SET_INT64(pRaw, dataPos, pStb->createdTime)
SDB_SET_INT64(pRaw, dataPos, pStb->updateTime)
SDB_SET_INT64(pRaw, dataPos, pStb->uid)
SDB_SET_INT64(pRaw, dataPos, pStb->version)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags)
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pStb->columnSchema[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type);
SDB_SET_INT32(pRaw, dataPos, pSchema->colId);
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes);
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN);
}
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pStb->tagSchema[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type);
SDB_SET_INT32(pRaw, dataPos, pSchema->colId);
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes);
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN);
}
SDB_SET_RESERVE(pRaw, dataPos, TSDB_STABLE_RESERVE_SIZE)
SDB_SET_DATALEN(pRaw, dataPos);
return pRaw;
}
static SSdbRow *mndStableActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != TSDB_STABLE_VER_NUM) {
mError("failed to decode stable since %s", terrstr());
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
return NULL;
}
int32_t size = sizeof(SStableObj) + TSDB_MAX_COLUMNS * sizeof(SSchema);
SSdbRow *pRow = sdbAllocRow(size);
SStableObj *pStb = sdbGetRowObj(pRow);
if (pStb == NULL) return NULL;
int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, pRow, dataPos, pStb->name, TSDB_TABLE_NAME_LEN)
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->updateTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->uid)
SDB_GET_INT32(pRaw, pRow, dataPos, &pStb->version)
SDB_GET_INT32(pRaw, pRow, dataPos, &pStb->numOfColumns)
SDB_GET_INT32(pRaw, pRow, dataPos, &pStb->numOfTags)
pStb->columnSchema = calloc(pStb->numOfColumns, sizeof(SSchema));
pStb->tagSchema = calloc(pStb->numOfTags, sizeof(SSchema));
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pStb->columnSchema[i];
SDB_GET_INT8(pRaw, pRow, dataPos, &pSchema->type);
SDB_GET_INT32(pRaw, pRow, dataPos, &pSchema->colId);
SDB_GET_INT32(pRaw, pRow, dataPos, &pSchema->bytes);
SDB_GET_BINARY(pRaw, pRow, dataPos, pSchema->name, TSDB_COL_NAME_LEN);
}
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pStb->tagSchema[i];
SDB_GET_INT8(pRaw, pRow, dataPos, &pSchema->type);
SDB_GET_INT32(pRaw, pRow, dataPos, &pSchema->colId);
SDB_GET_INT32(pRaw, pRow, dataPos, &pSchema->bytes);
SDB_GET_BINARY(pRaw, pRow, dataPos, pSchema->name, TSDB_COL_NAME_LEN);
}
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_STABLE_RESERVE_SIZE)
return pRow;
}
static int32_t mndStableActionInsert(SSdb *pSdb, SStableObj *pStb) {
mTrace("stable:%s, perform insert action", pStb->name);
return 0;
}
static int32_t mndStableActionDelete(SSdb *pSdb, SStableObj *pStb) {
mTrace("stable:%s, perform delete action", pStb->name);
return 0;
}
static int32_t mndStableActionUpdate(SSdb *pSdb, SStableObj *pOldStb, SStableObj *pNewStb) {
mTrace("stable:%s, perform update action", pOldStb->name);
atomic_exchange_32(&pOldStb->updateTime, pNewStb->updateTime);
atomic_exchange_32(&pOldStb->version, pNewStb->version);
taosWLockLatch(&pOldStb->lock);
int32_t numOfTags = pNewStb->numOfTags;
int32_t tagSize = numOfTags * sizeof(SSchema);
int32_t numOfColumns = pNewStb->numOfColumns;
int32_t columnSize = numOfColumns * sizeof(SSchema);
if (pOldStb->numOfTags < numOfTags) {
pOldStb->tagSchema = malloc(tagSize);
}
if (pOldStb->numOfColumns < numOfColumns) {
pOldStb->columnSchema = malloc(columnSize);
}
memcpy(pOldStb->tagSchema, pNewStb->tagSchema, tagSize);
memcpy(pOldStb->columnSchema, pNewStb->columnSchema, columnSize);
taosWUnLockLatch(&pOldStb->lock);
return 0;
}
SStableObj *mndAcquireStb(SMnode *pMnode, char *stbName) {
SSdb *pSdb = pMnode->pSdb;
return sdbAcquire(pSdb, SDB_STABLE, stbName);
}
void mndReleaseStb(SMnode *pMnode, SStableObj *pStb) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pStb);
}
static int32_t mndProcessCreateStableMsg(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessCreateStableInRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessAlterStableMsg(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessAlterStableInRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropStableMsg(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropStableInRsp(SMnodeMsg *pMsg) { return 0; }
static SDbObj *mndGetDbByStbName(SMnode *pMnode, char *stbName) {
SName name = {0};
tNameFromString(&name, stbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char db[TSDB_TABLE_FNAME_LEN] = {0};
tNameGetFullDbName(&name, db);
return mndAcquireDb(pMnode, db);
}
static int32_t mndProcessStableMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SStableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
mDebug("stable:%s, start to retrieve meta", pInfo->name);
SDbObj *pDb = mndGetDbByStbName(pMnode, pInfo->name);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("stable:%s, failed to retrieve meta since %s", pInfo->name, terrstr());
return -1;
}
SStableObj *pStb = mndAcquireStb(pMnode, pInfo->name);
if (pStb == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_TABLE_NAME;
mError("stable:%s, failed to get meta since %s", pInfo->name, terrstr());
return -1;
}
int32_t contLen = sizeof(STableMetaMsg) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema);
STableMetaMsg *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("stable:%s, failed to get meta since %s", pInfo->name, terrstr());
return -1;
}
memcpy(pMeta->stableFname, pStb->name, TSDB_TABLE_FNAME_LEN);
pMeta->numOfTags = htonl(pStb->numOfTags);
pMeta->numOfColumns = htonl(pStb->numOfColumns);
pMeta->precision = pDb->cfg.precision;
pMeta->tableType = TSDB_SUPER_TABLE;
pMeta->update = pDb->cfg.update;
pMeta->sversion = htonl(pStb->version);
pMeta->suid = htonl(pStb->uid);
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pMeta->pSchema[i];
SSchema *pColumn = &pStb->columnSchema[i];
memcpy(pSchema->name, pColumn->name, TSDB_COL_NAME_LEN);
pSchema->type = pColumn->type;
pSchema->colId = htonl(pColumn->colId);
pSchema->bytes = htonl(pColumn->bytes);
}
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pMeta->pSchema[i + pStb->numOfColumns];
SSchema *pTag = &pStb->tagSchema[i];
memcpy(pSchema->name, pTag->name, TSDB_COL_NAME_LEN);
pSchema->type = pTag->type;
pSchema->colId = htons(pTag->colId);
pSchema->bytes = htonl(pTag->bytes);
}
pMsg->pCont = pMeta;
pMsg->contLen = contLen;
mDebug("stable:%s, meta is retrieved, cols:%d tags:%d", pInfo->name, pStb->numOfColumns, pStb->numOfTags);
return 0;
}
static int32_t mndGetNumOfStables(SMnode *pMnode, char *dbName, int32_t *pNumOfStables) {
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
return -1;
}
int32_t numOfStables = 0;
void *pIter = NULL;
while (1) {
SStableObj *pStb = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pStb);
if (pIter == NULL) break;
if (strcmp(pStb->db, dbName) == 0) {
numOfStables++;
}
sdbRelease(pSdb, pStb);
}
*pNumOfStables = numOfStables;
return 0;
}
static int32_t mndGetStableMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
if (mndGetNumOfStables(pMnode, pShow->db, &pShow->numOfRows) != 0) {
return -1;
}
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "columns");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "tags");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tableFname, mndShowStr(pShow->type));
return 0;
}
static void mnodeExtractTableName(char* tableId, char* name) {
int pos = -1;
int num = 0;
for (pos = 0; tableId[pos] != 0; ++pos) {
if (tableId[pos] == '.') num++;
if (num == 2) break;
}
if (num == 2) {
strcpy(name, tableId + pos + 1);
}
}
static int32_t mndRetrieveStables(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SStableObj *pStb = NULL;
int32_t cols = 0;
char *pWrite;
char prefix[64] = {0};
tstrncpy(prefix, pShow->db, 64);
strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = (int32_t)strlen(prefix);
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_STABLE, pShow->pIter, (void **)&pStb);
if (pShow->pIter == NULL) break;
if (strncmp(pStb->name, prefix, prefixLen) != 0) {
sdbRelease(pSdb, pStb);
continue;
}
cols = 0;
char stableName[TSDB_TABLE_FNAME_LEN] = {0};
memcpy(stableName, pStb->name + prefixLen, TSDB_TABLE_FNAME_LEN - prefixLen);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, stableName);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pStb->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pStb->numOfColumns;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pStb->numOfTags;
cols++;
numOfRows++;
sdbRelease(pSdb, pStb);
}
pShow->numOfReads += numOfRows;
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
return numOfRows;
}
static void mndCancelGetNextStable(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}

View File

@ -0,0 +1,675 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndStb.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "tname.h"
#define TSDB_STB_VER_NUM 1
#define TSDB_STB_RESERVE_SIZE 64
static SSdbRaw *mndStbActionEncode(SStbObj *pStb);
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb);
static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg);
static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg);
static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg);
static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
int32_t mndInitStb(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_STB,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndStbActionEncode,
.decodeFp = (SdbDecodeFp)mndStbActionDecode,
.insertFp = (SdbInsertFp)mndStbActionInsert,
.updateFp = (SdbUpdateFp)mndStbActionUpdate,
.deleteFp = (SdbDeleteFp)mndStbActionDelete};
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_STB, mndProcessCreateStbMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_ALTER_STB, mndProcessAlterStbMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_STB, mndProcessDropStbMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_STB_IN_RSP, mndProcessCreateStbInRsp);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_ALTER_STB_IN_RSP, mndProcessAlterStbInRsp);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_STB_IN_RSP, mndProcessDropStbInRsp);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_TABLE_META, mndProcessStbMetaMsg);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_STB, mndGetStbMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STB, mndCancelGetNextStb);
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupStb(SMnode *pMnode) {}
static SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema);
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUM, size);
if (pRaw == NULL) return NULL;
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_NAME_LEN)
SDB_SET_INT64(pRaw, dataPos, pStb->createdTime)
SDB_SET_INT64(pRaw, dataPos, pStb->updateTime)
SDB_SET_INT64(pRaw, dataPos, pStb->uid)
SDB_SET_INT64(pRaw, dataPos, pStb->version)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags)
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pStb->pSchema[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type);
SDB_SET_INT32(pRaw, dataPos, pSchema->colId);
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes);
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN);
}
SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE)
SDB_SET_DATALEN(pRaw, dataPos);
return pRaw;
}
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != TSDB_STB_VER_NUM) {
mError("failed to decode stable since %s", terrstr());
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
return NULL;
}
int32_t size = sizeof(SStbObj) + TSDB_MAX_COLUMNS * sizeof(SSchema);
SSdbRow *pRow = sdbAllocRow(size);
SStbObj *pStb = sdbGetRowObj(pRow);
if (pStb == NULL) return NULL;
int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, pRow, dataPos, pStb->name, TSDB_TABLE_NAME_LEN)
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->updateTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->uid)
SDB_GET_INT32(pRaw, pRow, dataPos, &pStb->version)
SDB_GET_INT32(pRaw, pRow, dataPos, &pStb->numOfColumns)
SDB_GET_INT32(pRaw, pRow, dataPos, &pStb->numOfTags)
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
pStb->pSchema = calloc(totalCols, sizeof(SSchema));
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pStb->pSchema[i];
SDB_GET_INT8(pRaw, pRow, dataPos, &pSchema->type);
SDB_GET_INT32(pRaw, pRow, dataPos, &pSchema->colId);
SDB_GET_INT32(pRaw, pRow, dataPos, &pSchema->bytes);
SDB_GET_BINARY(pRaw, pRow, dataPos, pSchema->name, TSDB_COL_NAME_LEN);
}
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_STB_RESERVE_SIZE)
return pRow;
}
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
mTrace("stb:%s, perform insert action", pStb->name);
return 0;
}
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
mTrace("stb:%s, perform delete action", pStb->name);
return 0;
}
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb) {
mTrace("stb:%s, perform update action", pOldStb->name);
atomic_exchange_32(&pOldStb->updateTime, pNewStb->updateTime);
atomic_exchange_32(&pOldStb->version, pNewStb->version);
taosWLockLatch(&pOldStb->lock);
int32_t totalCols = pNewStb->numOfTags + pNewStb->numOfColumns;
int32_t totalSize = totalCols * sizeof(SSchema);
if (pOldStb->numOfTags + pOldStb->numOfColumns < totalCols) {
pOldStb->pSchema = malloc(totalSize);
}
memcpy(pOldStb->pSchema, pNewStb->pSchema, totalSize);
taosWUnLockLatch(&pOldStb->lock);
return 0;
}
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName) {
SSdb *pSdb = pMnode->pSdb;
return sdbAcquire(pSdb, SDB_STB, stbName);
}
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pStb);
}
static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) {
SName name = {0};
tNameFromString(&name, stbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char db[TSDB_TABLE_FNAME_LEN] = {0};
tNameGetFullDbName(&name, db);
return mndAcquireDb(pMnode, db);
}
static int32_t mndCheckStbMsg(SCreateStbMsg *pCreate) {
pCreate->numOfColumns = htonl(pCreate->numOfColumns);
pCreate->numOfTags = htonl(pCreate->numOfTags);
int32_t totalCols = pCreate->numOfColumns + pCreate->numOfTags;
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pCreate->pSchema[i];
pSchema->colId = htonl(pSchema->colId);
pSchema->bytes = htonl(pSchema->bytes);
}
if (pCreate->igExists < 0 || pCreate->igExists > 1) {
terrno = TSDB_CODE_MND_STB_INVALID_IGEXIST;
return -1;
}
if (pCreate->numOfColumns < TSDB_MIN_COLUMNS || pCreate->numOfColumns > TSDB_MAX_COLUMNS) {
terrno = TSDB_CODE_MND_STB_INVALID_COLS_NUM;
return -1;
}
if (pCreate->numOfTags <= 0 || pCreate->numOfTags > TSDB_MAX_TAGS) {
terrno = TSDB_CODE_MND_STB_INVALID_TAGS_NUM;
return -1;
}
int32_t maxColId = (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS);
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pCreate->pSchema[i];
if (pSchema->type <= 0) {
terrno = TSDB_CODE_MND_STB_INVALID_COL_TYPE;
return -1;
}
if (pSchema->colId < 0 || pSchema->colId >= maxColId) {
terrno = TSDB_CODE_MND_STB_INVALID_COL_ID;
return -1;
}
if (pSchema->bytes <= 0) {
terrno = TSDB_CODE_MND_STB_INVALID_COL_BYTES;
return -1;
}
if (pSchema->name[0] == 0) {
terrno = TSDB_CODE_MND_STB_INVALID_COL_NAME;
return -1;
}
}
return 0;
}
static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCreate, SDbObj *pDb) {
SStbObj stbObj = {0};
tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
tstrncpy(stbObj.db, pDb->name, TSDB_FULL_DB_NAME_LEN);
stbObj.createdTime = taosGetTimestampMs();
stbObj.updateTime = stbObj.createdTime;
stbObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
stbObj.version = 1;
stbObj.numOfColumns = pCreate->numOfColumns;
stbObj.numOfTags = pCreate->numOfTags;
int32_t totalCols = stbObj.numOfColumns + stbObj.numOfTags;
int32_t totalSize = totalCols * sizeof(SSchema);
stbObj.pSchema = malloc(totalSize);
if (stbObj.pSchema == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(stbObj.pSchema, pCreate->pSchema, totalSize);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
if (pTrans == NULL) {
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
SSdbRaw *pRedoRaw = mndStbActionEncode(&stbObj);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING);
SSdbRaw *pUndoRaw = mndStbActionEncode(&stbObj);
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
SSdbRaw *pCommitRaw = mndStbActionEncode(&stbObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
mndTransDrop(pTrans);
return 0;
}
static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SCreateStbMsg *pCreate = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to create", pCreate->name);
if (mndCheckStbMsg(pCreate) != 0) {
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->name);
if (pStb != NULL) {
sdbRelease(pMnode->pSdb, pStb);
if (pCreate->igExists) {
mDebug("stb:%s, already exist, ignore exist is set", pCreate->name);
return 0;
} else {
terrno = TSDB_CODE_MND_STB_ALREADY_EXIST;
mError("db:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
}
SDbObj *pDb = mndAcquireDbByStb(pMnode, pCreate->name);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
int32_t code = mndCreateStb(pMnode, pMsg, pCreate, pDb);
mndReleaseDb(pMnode, pDb);
if (code != 0) {
terrno = code;
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndCheckAlterStbMsg(SAlterStbMsg *pAlter) {
SSchema *pSchema = &pAlter->schema;
pSchema->colId = htonl(pSchema->colId);
pSchema->bytes = htonl(pSchema->bytes);
if (pSchema->type <= 0) {
terrno = TSDB_CODE_MND_STB_INVALID_COL_TYPE;
return -1;
}
if (pSchema->colId < 0 || pSchema->colId >= (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)) {
terrno = TSDB_CODE_MND_STB_INVALID_COL_ID;
return -1;
}
if (pSchema->bytes <= 0) {
terrno = TSDB_CODE_MND_STB_INVALID_COL_BYTES;
return -1;
}
if (pSchema->name[0] == 0) {
terrno = TSDB_CODE_MND_STB_INVALID_COL_NAME;
return -1;
}
return 0;
}
static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pOldStb, SStbObj *pNewStb) { return 0; }
static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SAlterStbMsg *pAlter = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to alter", pAlter->name);
if (mndCheckAlterStbMsg(pAlter) != 0) {
mError("stb:%s, failed to alter since %s", pAlter->name, terrstr());
return -1;
}
SStbObj *pStb = mndAcquireStb(pMnode, pAlter->name);
if (pStb == NULL) {
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
mError("stb:%s, failed to alter since %s", pAlter->name, terrstr());
return -1;
}
SStbObj stbObj = {0};
memcpy(&stbObj, pStb, sizeof(SStbObj));
int32_t code = mndUpdateStb(pMnode, pMsg, pStb, &stbObj);
mndReleaseStb(pMnode, pStb);
if (code != 0) {
mError("stb:%s, failed to alter since %s", pAlter->name, tstrerror(code));
return code;
}
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
if (pTrans == NULL) {
mError("stb:%s, failed to drop since %s", pStb->name, terrstr());
return -1;
}
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING);
SSdbRaw *pUndoRaw = mndStbActionEncode(pStb);
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY);
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
if (mndTransPrepare(pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
mndTransDrop(pTrans);
return 0;
}
static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SDropStbMsg *pDrop = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to drop", pDrop->name);
SStbObj *pStb = mndAcquireStb(pMnode, pDrop->name);
if (pStb == NULL) {
if (pDrop->igNotExists) {
mDebug("stb:%s, not exist, ignore not exist is set", pDrop->name);
return 0;
} else {
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
mError("stb:%s, failed to drop since %s", pDrop->name, terrstr());
return -1;
}
}
int32_t code = mndDropStb(pMnode, pMsg, pStb);
mndReleaseStb(pMnode, pStb);
if (code != 0) {
terrno = code;
mError("stb:%s, failed to drop since %s", pDrop->name, terrstr());
return -1;
}
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SStbInfoMsg *pInfo = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to retrieve meta", pInfo->name);
SDbObj *pDb = mndAcquireDbByStb(pMnode, pInfo->name);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("stb:%s, failed to retrieve meta since %s", pInfo->name, terrstr());
return -1;
}
SStbObj *pStb = mndAcquireStb(pMnode, pInfo->name);
if (pStb == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_TABLE_NAME;
mError("stb:%s, failed to get meta since %s", pInfo->name, terrstr());
return -1;
}
int32_t contLen = sizeof(STableMetaMsg) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema);
STableMetaMsg *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("stb:%s, failed to get meta since %s", pInfo->name, terrstr());
return -1;
}
memcpy(pMeta->stbFname, pStb->name, TSDB_TABLE_FNAME_LEN);
pMeta->numOfTags = htonl(pStb->numOfTags);
pMeta->numOfColumns = htonl(pStb->numOfColumns);
pMeta->precision = pDb->cfg.precision;
pMeta->tableType = TSDB_SUPER_TABLE;
pMeta->update = pDb->cfg.update;
pMeta->sversion = htonl(pStb->version);
pMeta->suid = htonl(pStb->uid);
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pMeta->pSchema[i];
SSchema *pSrcSchema = &pStb->pSchema[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
pMsg->pCont = pMeta;
pMsg->contLen = contLen;
mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", pInfo->name, pStb->numOfColumns, pStb->numOfTags);
return 0;
}
static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) {
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
return -1;
}
int32_t numOfStbs = 0;
void *pIter = NULL;
while (1) {
SStbObj *pStb = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pStb);
if (pIter == NULL) break;
if (strcmp(pStb->db, dbName) == 0) {
numOfStbs++;
}
sdbRelease(pSdb, pStb);
}
*pNumOfStbs = numOfStbs;
return 0;
}
static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
if (mndGetNumOfStbs(pMnode, pShow->db, &pShow->numOfRows) != 0) {
return -1;
}
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "columns");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "tags");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
return 0;
}
static void mnodeExtractTableName(char *tableId, char *name) {
int pos = -1;
int num = 0;
for (pos = 0; tableId[pos] != 0; ++pos) {
if (tableId[pos] == '.') num++;
if (num == 2) break;
}
if (num == 2) {
strcpy(name, tableId + pos + 1);
}
}
static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SStbObj *pStb = NULL;
int32_t cols = 0;
char *pWrite;
char prefix[64] = {0};
tstrncpy(prefix, pShow->db, 64);
strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = (int32_t)strlen(prefix);
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
if (pShow->pIter == NULL) break;
if (strncmp(pStb->name, prefix, prefixLen) != 0) {
sdbRelease(pSdb, pStb);
continue;
}
cols = 0;
char stbName[TSDB_TABLE_FNAME_LEN] = {0};
memcpy(stbName, pStb->name + prefixLen, TSDB_TABLE_FNAME_LEN - prefixLen);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, stbName);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pStb->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pStb->numOfColumns;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pStb->numOfTags;
cols++;
numOfRows++;
sdbRelease(pSdb, pStb);
}
pShow->numOfReads += numOfRows;
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
return numOfRows;
}
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}

View File

@ -489,7 +489,7 @@ static int32_t mndGetUserMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *p
pShow->numOfRows = sdbGetSize(pSdb, SDB_USER);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tableFname, mndShowStr(pShow->type));
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
return 0;
}

View File

@ -45,7 +45,7 @@ static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
int32_t mndInitVgroup(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_VGROUP,
.keyType = SDB_KEY_BINARY,
.keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
.decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
.insertFp = (SdbInsertFp)mndVgroupActionInsert,
@ -238,7 +238,7 @@ static int32_t mndGetVgroupMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
}
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tableFname, mndShowStr(pShow->type));
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
return 0;
}
@ -337,7 +337,7 @@ static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
pShow->replica = dnodeId;
pShow->numOfRows = mndGetVnodesNum(pMnode, dnodeId);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tableFname, mndShowStr(pShow->type));
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
return 0;
}

View File

@ -24,7 +24,7 @@
#include "mndMnode.h"
#include "mndProfile.h"
#include "mndShow.h"
#include "mndStable.h"
#include "mndStb.h"
#include "mndSync.h"
#include "mndTelem.h"
#include "mndTrans.h"
@ -131,7 +131,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-stable", mndInitStable, mndCleanupStable) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
if (pMnode->clusterId <= 0) {
if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1;
@ -205,7 +205,6 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->cfg.sver = pOption->cfg.sver;
pMnode->cfg.enableTelem = pOption->cfg.enableTelem;
pMnode->cfg.statusInterval = pOption->cfg.statusInterval;
pMnode->cfg.mnodeEqualVnodeNum = pOption->cfg.mnodeEqualVnodeNum;
pMnode->cfg.shellActivityTimer = pOption->cfg.shellActivityTimer;
pMnode->cfg.timezone = strdup(pOption->cfg.timezone);
pMnode->cfg.locale = strdup(pOption->cfg.locale);
@ -215,7 +214,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL ||
pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 ||
pMnode->cfg.statusInterval < 1 || pOption->cfg.mnodeEqualVnodeNum < 0) {
pMnode->cfg.statusInterval < 1) {
terrno = TSDB_CODE_MND_INVALID_OPTIONS;
return -1;
}
@ -430,3 +429,10 @@ void mndProcessWriteMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
void mndProcessSyncMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
void mndProcessApplyMsg(SMnodeMsg *pMsg) {}
uint64_t mndGenerateUid(char *name, int32_t len) {
int64_t us = taosGetTimestampUs();
int32_t hashval = MurmurHash3_32(name, len);
uint64_t x = (us & 0x000000FFFFFFFFFF) << 24;
return x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
}

View File

@ -121,7 +121,7 @@ typedef struct SRelationInfo {
typedef struct SCreatedTableInfo {
SToken name; // table name token
SToken stableName; // super table name token , for using clause
SToken stbName; // super table name token , for using clause
SArray *pTagNames; // create by using super table, tag name
SArray *pTagVals; // create by using super table, tag value
char *fullname; // table full name

View File

@ -112,13 +112,13 @@ cmd ::= SHOW dbPrefix(X) TABLES LIKE ids(Y). {
}
cmd ::= SHOW dbPrefix(X) STABLES. {
setShowOptions(pInfo, TSDB_MGMT_TABLE_STABLE, &X, 0);
setShowOptions(pInfo, TSDB_MGMT_TABLE_STB, &X, 0);
}
cmd ::= SHOW dbPrefix(X) STABLES LIKE ids(Y). {
SToken token;
tSetDbName(&token, &X);
setShowOptions(pInfo, TSDB_MGMT_TABLE_STABLE, &token, &Y);
setShowOptions(pInfo, TSDB_MGMT_TABLE_STB, &token, &Y);
}
cmd ::= SHOW dbPrefix(X) VGROUPS. {

View File

@ -634,7 +634,7 @@ SCreatedTableInfo createNewChildTableInfo(SToken *pTableName, SArray *pTagNames,
info.name = *pToken;
info.pTagNames = pTagNames;
info.pTagVals = pTagVals;
info.stableName = *pTableName;
info.stbName = *pTableName;
info.igExist = (igExists->n > 0)? 1:0;
return info;

View File

@ -2312,14 +2312,14 @@ static void yy_reduce(
break;
case 26: /* cmd ::= SHOW dbPrefix STABLES */
{
setShowOptions(pInfo, TSDB_MGMT_TABLE_STABLE, &yymsp[-1].minor.yy0, 0);
setShowOptions(pInfo, TSDB_MGMT_TABLE_STB, &yymsp[-1].minor.yy0, 0);
}
break;
case 27: /* cmd ::= SHOW dbPrefix STABLES LIKE ids */
{
SToken token;
tSetDbName(&token, &yymsp[-3].minor.yy0);
setShowOptions(pInfo, TSDB_MGMT_TABLE_STABLE, &token, &yymsp[0].minor.yy0);
setShowOptions(pInfo, TSDB_MGMT_TABLE_STB, &token, &yymsp[0].minor.yy0);
}
break;
case 28: /* cmd ::= SHOW dbPrefix VGROUPS */

View File

@ -126,6 +126,8 @@ typedef struct SRpcConn {
SRpcReqContext *pContext; // request context
} SRpcConn;
static pthread_once_t tsRpcInitOnce = PTHREAD_ONCE_INIT;
int tsRpcMaxUdpSize = 15000; // bytes
int tsProgressTimer = 100;
// not configurable
@ -220,17 +222,22 @@ static void rpcFree(void *p) {
free(p);
}
int32_t rpcInit(void) {
tsProgressTimer = tsRpcTimer/2;
tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer;
tsRpcHeadSize = RPC_MSG_OVERHEAD;
static void rpcInitImp(void) {
tsProgressTimer = tsRpcTimer / 2;
tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsProgressTimer;
tsRpcHeadSize = RPC_MSG_OVERHEAD;
tsRpcOverhead = sizeof(SRpcReqContext);
tsRpcRefId = taosOpenRef(200, rpcFree);
return 0;
}
int32_t rpcInit(void) {
pthread_once(&tsRpcInitOnce, rpcInitImp);
return 0;
}
void rpcCleanup(void) {
taosCloseRef(tsRpcRefId);
tsRpcRefId = -1;
@ -406,7 +413,7 @@ void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t
// for TDengine, all the query, show commands shall have TCP connection
char type = pMsg->msgType;
if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_SHOW_RETRIEVE
|| type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_STABLE_VGROUP
|| type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_STB_VGROUP
|| type == TSDB_MSG_TYPE_TABLES_META || type == TSDB_MSG_TYPE_TABLE_META
|| type == TSDB_MSG_TYPE_SHOW || type == TSDB_MSG_TYPE_STATUS || type == TSDB_MSG_TYPE_ALTER_TABLE)
pContext->connType = RPC_CONN_TCPC;

View File

@ -162,22 +162,23 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_VER, "Invalid raw data vers
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_LEN, "Invalid raw data len")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_CONTENT, "Invalid raw data content")
// mnode-dnode
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ALREADY_EXIST, "DNode already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_EXIST, "DNode does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_EXIST, "VGroup does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NO_REMOVE_MASTER, "Master DNode cannot be removed")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NO_ENOUGH_DNODES, "Out of DNodes")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CLUSTER_CFG_INCONSISTENT, "Cluster cfg inconsistent")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DNODE_CFG_OPTION, "Invalid dnode cfg option")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_BALANCE_ENABLED, "Balance already enabled")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CLUSTER_CFG, "Cluster cfg inconsistent")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CLUSTER_ID, "Cluster id not match")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DNODE_CFG, "Invalid dnode cfg")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DNODE_EP, "Invalid dnode end point")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DNODE_ID, "Invalid dnode id")
// mnode-vgroup
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_EXIST, "VGroup does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_IN_DNODE, "Vgroup not in dnode")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE, "Vgroup already in dnode")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_FREE, "Dnode not avaliable")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CLUSTER_ID, "Cluster id not match")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, "Cluster not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED, "Dnode Id not configured")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED, "Dnode Ep not configured")
// mnode-acct
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, "Account already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_NOT_EXIST, "Invalid account")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT_OPTION, "Invalid account options")
@ -193,8 +194,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_USERS, "Too many users")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_ALREADY_EXIST, "Mnode already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_NOT_EXIST, "Mnode not there")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TABLE_ALREADY_EXIST, "Table already exists")
// mnode-stable
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "Stable already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_ID, "Table name too long")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_NAME, "Table does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_TYPE, "Invalid table type in tsdb")

View File

@ -28,7 +28,7 @@ typedef struct {
} TAOS_SML_KV;
typedef struct {
char* stableName;
char* stbName;
char* childTableName;
TAOS_SML_KV* tags;