[td-3036]<feature>: support the db sync operator.

This commit is contained in:
Haojun Liao 2021-03-23 16:44:20 +08:00
parent bfe554a0d6
commit 74148ee3de
11 changed files with 1059 additions and 1008 deletions

View File

@ -670,7 +670,18 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if ((code = setKillInfo(pSql, pInfo, pInfo->type)) != TSDB_CODE_SUCCESS) { if ((code = setKillInfo(pSql, pInfo, pInfo->type)) != TSDB_CODE_SUCCESS) {
return code; return code;
} }
break;
}
case TSDB_SQL_SYNC_DB_REPLICA: {
const char* msg1 = "invalid db name";
SStrToken* pzName = taosArrayGet(pInfo->pMiscInfo->a, 0);
assert(taosArrayGetSize(pInfo->pMiscInfo->a) == 1);
code = tNameSetDbName(&pTableMetaInfo->name, getAccountId(pSql), pzName);
if (code != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
break; break;
} }

View File

@ -1284,6 +1284,23 @@ int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tscBuildSyncDbReplicaMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SSyncDbMsg);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SSyncDbMsg *pSyncMsg = (SSyncDbMsg *)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
tNameExtractFullName(&pTableMetaInfo->name, pSyncMsg->db);
pCmd->msgType = TSDB_MSG_TYPE_CM_SYNC_DB;
return TSDB_CODE_SUCCESS;
}
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
@ -2559,6 +2576,7 @@ void tscInitMsgsFp() {
tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserAcctMsg; tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserAcctMsg;
tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropUserAcctMsg; tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropUserAcctMsg;
tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg; tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
tscBuildMsg[TSDB_SQL_SYNC_DB_REPLICA] = tscBuildSyncDbReplicaMsg;
tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg; tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg; tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg; tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;

View File

@ -51,6 +51,7 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_ACCT, "alter-acct" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_ACCT, "alter-acct" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_TABLE, "alter-table" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_TABLE, "alter-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_DB, "alter-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_DB, "alter-db" )
TSDB_DEFINE_SQL_TYPE(TSDB_SQL_SYNC_DB_REPLICA, "sync db-replica")
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_MNODE, "create-mnode" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_MNODE, "create-mnode" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_MNODE, "drop-mnode" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_MNODE, "drop-mnode" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DNODE, "create-dnode" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DNODE, "create-dnode" )
@ -87,13 +88,13 @@ enum {
*/ */
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_EMPTY_RESULT, "retrieve-empty-result" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_EMPTY_RESULT, "retrieve-empty-result" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RESET_CACHE, "reset-cache" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RESET_CACHE, "reset-cache" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SERV_STATUS, "serv-status" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SERV_STATUS, "serv-status" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CURRENT_DB, "current-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CURRENT_DB, "current-db" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SERV_VERSION, "serv-version" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SERV_VERSION, "serv-version" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CLI_VERSION, "cli-version" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CLI_VERSION, "cli-version" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CURRENT_USER, "current-user ") TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CURRENT_USER, "current-user ")
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CFG_LOCAL, "cfg-local" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CFG_LOCAL, "cfg-local" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MAX, "max" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MAX, "max" )
}; };

View File

@ -49,6 +49,7 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = dnodeDispatchToMWriteQueue;

View File

@ -77,6 +77,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DB, "create-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DB, "drop-db" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DB, "drop-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_DB, "use-db" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_DB, "use-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_DB, "alter-db" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_DB, "alter-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SYNC_DB, "sync-db-replica" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TABLE, "create-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TABLE, "create-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TABLE, "drop-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TABLE, "drop-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TABLE, "alter-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TABLE, "alter-table" )
@ -574,7 +575,7 @@ typedef struct {
typedef struct { typedef struct {
char db[TSDB_TABLE_FNAME_LEN]; char db[TSDB_TABLE_FNAME_LEN];
uint8_t ignoreNotExists; uint8_t ignoreNotExists;
} SDropDbMsg, SUseDbMsg; } SDropDbMsg, SUseDbMsg, SSyncDbMsg;
// IMPORTANT: sizeof(SVnodeStatisticInfo) should not exceed // IMPORTANT: sizeof(SVnodeStatisticInfo) should not exceed
// TSDB_FILE_HEADER_LEN/4 - TSDB_FILE_HEADER_VERSION_SIZE // TSDB_FILE_HEADER_LEN/4 - TSDB_FILE_HEADER_VERSION_SIZE

View File

@ -152,56 +152,57 @@
#define TK_NOW 133 #define TK_NOW 133
#define TK_RESET 134 #define TK_RESET 134
#define TK_QUERY 135 #define TK_QUERY 135
#define TK_ADD 136 #define TK_SYNCDB 136
#define TK_COLUMN 137 #define TK_ADD 137
#define TK_TAG 138 #define TK_COLUMN 138
#define TK_CHANGE 139 #define TK_TAG 139
#define TK_SET 140 #define TK_CHANGE 140
#define TK_KILL 141 #define TK_SET 141
#define TK_CONNECTION 142 #define TK_KILL 142
#define TK_STREAM 143 #define TK_CONNECTION 143
#define TK_COLON 144 #define TK_STREAM 144
#define TK_ABORT 145 #define TK_COLON 145
#define TK_AFTER 146 #define TK_ABORT 146
#define TK_ATTACH 147 #define TK_AFTER 147
#define TK_BEFORE 148 #define TK_ATTACH 148
#define TK_BEGIN 149 #define TK_BEFORE 149
#define TK_CASCADE 150 #define TK_BEGIN 150
#define TK_CLUSTER 151 #define TK_CASCADE 151
#define TK_CONFLICT 152 #define TK_CLUSTER 152
#define TK_COPY 153 #define TK_CONFLICT 153
#define TK_DEFERRED 154 #define TK_COPY 154
#define TK_DELIMITERS 155 #define TK_DEFERRED 155
#define TK_DETACH 156 #define TK_DELIMITERS 156
#define TK_EACH 157 #define TK_DETACH 157
#define TK_END 158 #define TK_EACH 158
#define TK_EXPLAIN 159 #define TK_END 159
#define TK_FAIL 160 #define TK_EXPLAIN 160
#define TK_FOR 161 #define TK_FAIL 161
#define TK_IGNORE 162 #define TK_FOR 162
#define TK_IMMEDIATE 163 #define TK_IGNORE 163
#define TK_INITIALLY 164 #define TK_IMMEDIATE 164
#define TK_INSTEAD 165 #define TK_INITIALLY 165
#define TK_MATCH 166 #define TK_INSTEAD 166
#define TK_KEY 167 #define TK_MATCH 167
#define TK_OF 168 #define TK_KEY 168
#define TK_RAISE 169 #define TK_OF 169
#define TK_REPLACE 170 #define TK_RAISE 170
#define TK_RESTRICT 171 #define TK_REPLACE 171
#define TK_ROW 172 #define TK_RESTRICT 172
#define TK_STATEMENT 173 #define TK_ROW 173
#define TK_TRIGGER 174 #define TK_STATEMENT 174
#define TK_VIEW 175 #define TK_TRIGGER 175
#define TK_SEMI 176 #define TK_VIEW 176
#define TK_NONE 177 #define TK_SEMI 177
#define TK_PREV 178 #define TK_NONE 178
#define TK_LINEAR 179 #define TK_PREV 179
#define TK_IMPORT 180 #define TK_LINEAR 180
#define TK_TBNAME 181 #define TK_IMPORT 181
#define TK_JOIN 182 #define TK_TBNAME 182
#define TK_INSERT 183 #define TK_JOIN 183
#define TK_INTO 184 #define TK_INSERT 184
#define TK_VALUES 185 #define TK_INTO 185
#define TK_VALUES 186
#define TK_SPACE 300 #define TK_SPACE 300

View File

@ -50,6 +50,7 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg);
int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg); int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg);
#ifndef _TOPIC #ifndef _TOPIC
@ -178,6 +179,7 @@ int32_t mnodeInitDbs() {
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_CREATE_DB, mnodeProcessCreateDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_CREATE_DB, mnodeProcessCreateDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mnodeProcessAlterDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mnodeProcessAlterDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mnodeProcessDropDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mnodeProcessDropDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_SYNC_DB, mnodeProcessSyncDbMsg);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs);
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb); mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb);
@ -1184,6 +1186,10 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) {
return mnodeDropDb(pMsg); return mnodeDropDb(pMsg);
} }
static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg) {
return 0;
}
void mnodeDropAllDbs(SAcctObj *pAcct) { void mnodeDropAllDbs(SAcctObj *pAcct) {
int32_t numOfDbs = 0; int32_t numOfDbs = 0;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;

View File

@ -726,6 +726,9 @@ expritem(A) ::= . {A = 0;}
///////////////////////////////////reset query cache////////////////////////////////////// ///////////////////////////////////reset query cache//////////////////////////////////////
cmd ::= RESET QUERY CACHE. { setDCLSqlElems(pInfo, TSDB_SQL_RESET_CACHE, 0);} cmd ::= RESET QUERY CACHE. { setDCLSqlElems(pInfo, TSDB_SQL_RESET_CACHE, 0);}
///////////////////////////////////sync replica database//////////////////////////////////
cmd ::= SYNCDB ids(X) REPLICA.{ setDCLSqlElems(pInfo, TSDB_SQL_SYNC_DB_REPLICA, 1, &X);}
///////////////////////////////////ALTER TABLE statement////////////////////////////////// ///////////////////////////////////ALTER TABLE statement//////////////////////////////////
cmd ::= ALTER TABLE ids(X) cpxName(F) ADD COLUMN columnlist(A). { cmd ::= ALTER TABLE ids(X) cpxName(F) ADD COLUMN columnlist(A). {
X.n += F.n; X.n += F.n;

View File

@ -911,6 +911,7 @@ void setDCLSqlElems(SSqlInfo *pInfo, int32_t type, int32_t nParam, ...) {
SStrToken *pToken = va_arg(va, SStrToken *); SStrToken *pToken = va_arg(va, SStrToken *);
taosArrayPush(pInfo->pMiscInfo->a, pToken); taosArrayPush(pInfo->pMiscInfo->a, pToken);
} }
va_end(va); va_end(va);
} }

View File

@ -100,6 +100,7 @@ static SKeyword keywordTable[] = {
{"ACCOUNT", TK_ACCOUNT}, {"ACCOUNT", TK_ACCOUNT},
{"USE", TK_USE}, {"USE", TK_USE},
{"DESCRIBE", TK_DESCRIBE}, {"DESCRIBE", TK_DESCRIBE},
{"SYNCDB", TK_SYNCDB},
{"ALTER", TK_ALTER}, {"ALTER", TK_ALTER},
{"PASS", TK_PASS}, {"PASS", TK_PASS},
{"PRIVILEGE", TK_PRIVILEGE}, {"PRIVILEGE", TK_PRIVILEGE},

File diff suppressed because it is too large Load Diff