Merge branch 'fix/mnode' of https://github.com/taosdata/TDengine into fix/mnode
This commit is contained in:
commit
cea23e2f31
|
@ -250,7 +250,7 @@ The [Taos] structure is the connection manager in [libtaos] and provides two mai
|
|||
|
||||
Column information is stored using [ColumnMeta].
|
||||
|
||||
``rust
|
||||
```rust
|
||||
let cols = &q.column_meta;
|
||||
for col in cols {
|
||||
println!("name: {}, type: {:?} , bytes: {}", col.name, col.type_, col.bytes);
|
||||
|
|
|
@ -192,6 +192,7 @@ bool syncEnvIsStart();
|
|||
const char* syncStr(ESyncState state);
|
||||
bool syncIsRestoreFinish(int64_t rid);
|
||||
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
|
||||
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
|
||||
|
||||
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
|
||||
|
||||
|
|
|
@ -46,7 +46,6 @@ typedef struct SRpcHandleInfo {
|
|||
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
|
||||
int32_t persistHandle; // persist handle or not
|
||||
|
||||
SRpcConnInfo connInfo;
|
||||
// app info
|
||||
void *ahandle; // app handle set by client
|
||||
void *wrapper; // wrapper handle
|
||||
|
@ -55,6 +54,9 @@ typedef struct SRpcHandleInfo {
|
|||
// resp info
|
||||
void * rsp;
|
||||
int32_t rspLen;
|
||||
|
||||
// conn info
|
||||
SRpcConnInfo conn;
|
||||
} SRpcHandleInfo;
|
||||
|
||||
typedef struct SRpcMsg {
|
||||
|
@ -63,7 +65,6 @@ typedef struct SRpcMsg {
|
|||
int32_t contLen;
|
||||
int32_t code;
|
||||
SRpcHandleInfo info;
|
||||
SRpcConnInfo conn;
|
||||
} SRpcMsg;
|
||||
|
||||
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf);
|
||||
|
|
|
@ -220,7 +220,8 @@ static const SSysDbTableSchema transSchema[] = {
|
|||
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||
{.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "db1", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "db2", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||
{.name = "last_action_info",
|
||||
|
|
|
@ -228,7 +228,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
vmReleaseVnode(pMgmt, pVnode);
|
||||
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
|
||||
code = terrno;
|
||||
goto _OVER;
|
||||
return 0;
|
||||
}
|
||||
|
||||
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
|
||||
|
|
|
@ -21,21 +21,6 @@ static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet);
|
|||
static void dmSendRsp(SRpcMsg *pMsg);
|
||||
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
|
||||
|
||||
static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) {
|
||||
SRpcConnInfo *pConnInfo = &(pRpc->info.connInfo);
|
||||
// if (IsReq(pRpc)) {
|
||||
// terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||
// dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->info.ahandle, pRpc->info.handle);
|
||||
// return -1;
|
||||
//}
|
||||
|
||||
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
||||
memcpy(pMsg->conn.user, pConnInfo->user, TSDB_USER_LEN);
|
||||
pMsg->conn.clientIp = pConnInfo->clientIp;
|
||||
pMsg->conn.clientPort = pConnInfo->clientPort;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
|
||||
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
|
||||
if (msgFp == NULL) {
|
||||
|
@ -116,14 +101,10 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|||
}
|
||||
|
||||
pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
|
||||
if (pMsg == NULL) {
|
||||
goto _OVER;
|
||||
}
|
||||
dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
|
||||
if (pMsg == NULL) goto _OVER;
|
||||
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
||||
|
||||
if (dmBuildNodeMsg(pMsg, pRpc) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
dTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle);
|
||||
|
||||
if (InParentProc(pWrapper)) {
|
||||
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ);
|
||||
|
|
|
@ -124,7 +124,8 @@ typedef struct {
|
|||
int32_t lastErrorNo;
|
||||
tmsg_t lastMsgType;
|
||||
SEpSet lastEpset;
|
||||
char dbname[TSDB_DB_FNAME_LEN];
|
||||
char dbname1[TSDB_DB_FNAME_LEN];
|
||||
char dbname2[TSDB_DB_FNAME_LEN];
|
||||
int32_t startFunc;
|
||||
int32_t stopFunc;
|
||||
int32_t paramLen;
|
||||
|
|
|
@ -68,7 +68,7 @@ int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
|
|||
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
|
||||
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
|
||||
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
|
||||
void mndTransSetDbName(STrans *pTrans, const char *dbname);
|
||||
void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2);
|
||||
void mndTransSetSerial(STrans *pTrans);
|
||||
|
||||
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
|
||||
|
|
|
@ -56,7 +56,7 @@ static int32_t mndProcessAuthReq(SRpcMsg *pReq) {
|
|||
memcpy(authRsp.user, authReq.user, TSDB_USER_LEN);
|
||||
|
||||
int32_t code = mndRetriveAuth(pReq->info.node, &authRsp);
|
||||
mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->conn.user, authRsp.spi, authRsp.encrypt,
|
||||
mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->info.conn.user, authRsp.spi, authRsp.encrypt,
|
||||
authRsp.user);
|
||||
|
||||
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authRsp);
|
||||
|
|
|
@ -292,7 +292,7 @@ static int32_t mndProcessCreateBnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_BNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_BNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -394,7 +394,7 @@ static int32_t mndProcessDropBnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_BNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_BNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -477,7 +477,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
|
|||
|
||||
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
|
||||
|
||||
mndTransSetDbName(pTrans, dbObj.name);
|
||||
mndTransSetDbName(pTrans, dbObj.name, NULL);
|
||||
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
||||
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
||||
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
||||
|
@ -521,12 +521,12 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pUser == NULL) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_DB, NULL) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DB, NULL) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -668,7 +668,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p
|
|||
mDebug("trans:%d, used to alter db:%s", pTrans->id, pOld->name);
|
||||
|
||||
int32_t code = -1;
|
||||
mndTransSetDbName(pTrans, pOld->name);
|
||||
mndTransSetDbName(pTrans, pOld->name, NULL);
|
||||
if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
|
||||
if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
|
||||
if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
|
||||
|
@ -700,7 +700,7 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_ALTER_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_ALTER_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -921,7 +921,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
|
|||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
|
||||
mndTransSetDbName(pTrans, pDb->name);
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
|
||||
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||
|
@ -980,7 +980,7 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_DROP_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -1127,7 +1127,7 @@ static int32_t mndProcessUseDbReq(SRpcMsg *pReq) {
|
|||
|
||||
mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr());
|
||||
} else {
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_USE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_USE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -1252,7 +1252,7 @@ static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_COMPACT_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -522,7 +522,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_DNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -623,7 +623,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_MNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -318,7 +318,7 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_FUNC) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_FUNC) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -365,7 +365,7 @@ static int32_t mndProcessDropFuncReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_FUNC) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_FUNC) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -505,6 +505,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
|
||||
if (!IsReq(pMsg)) return 0;
|
||||
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
|
||||
|
||||
if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER &&
|
||||
|
|
|
@ -414,7 +414,7 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_MNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -621,7 +621,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_MNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -742,7 +742,7 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
|
|||
pMgmt->errCode = 0;
|
||||
pMgmt->transId = -1;
|
||||
tsem_wait(&pMgmt->syncSem);
|
||||
mInfo("alter mnode sync result:%s", tstrerror(pMgmt->errCode));
|
||||
mInfo("alter mnode sync result:0x%x %s", pMgmt->errCode, tstrerror(pMgmt->errCode));
|
||||
terrno = pMgmt->errCode;
|
||||
return pMgmt->errCode;
|
||||
}
|
||||
|
|
|
@ -194,15 +194,15 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
|||
goto CONN_OVER;
|
||||
}
|
||||
|
||||
taosIp2String(pReq->conn.clientIp, ip);
|
||||
taosIp2String(pReq->info.conn.clientIp, ip);
|
||||
|
||||
pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pUser == NULL) {
|
||||
mError("user:%s, failed to login while acquire user since %s", pReq->conn.user, terrstr());
|
||||
mError("user:%s, failed to login while acquire user since %s", pReq->info.conn.user, terrstr());
|
||||
goto CONN_OVER;
|
||||
}
|
||||
if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
|
||||
mError("user:%s, failed to auth while acquire user, input:%s", pReq->conn.user, connReq.passwd);
|
||||
mError("user:%s, failed to auth while acquire user, input:%s", pReq->info.conn.user, connReq.passwd);
|
||||
code = TSDB_CODE_RPC_AUTH_FAILURE;
|
||||
goto CONN_OVER;
|
||||
}
|
||||
|
@ -213,15 +213,16 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
|||
pDb = mndAcquireDb(pMnode, db);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_INVALID_DB;
|
||||
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->conn.user, ip, connReq.db, terrstr());
|
||||
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
|
||||
terrstr());
|
||||
goto CONN_OVER;
|
||||
}
|
||||
}
|
||||
|
||||
pConn = mndCreateConn(pMnode, pReq->conn.user, connReq.connType, pReq->conn.clientIp, pReq->conn.clientPort,
|
||||
connReq.pid, connReq.app, connReq.startTime);
|
||||
pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp,
|
||||
pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime);
|
||||
if (pConn == NULL) {
|
||||
mError("user:%s, failed to login from %s while create connection since %s", pReq->conn.user, ip, terrstr());
|
||||
mError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip, terrstr());
|
||||
goto CONN_OVER;
|
||||
}
|
||||
|
||||
|
@ -246,7 +247,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
|||
pReq->info.rspLen = contLen;
|
||||
pReq->info.rsp = pRsp;
|
||||
|
||||
mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->conn.user, ip, pConn->port, pConn->id, connReq.app);
|
||||
mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app);
|
||||
|
||||
code = 0;
|
||||
|
||||
|
@ -348,7 +349,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
|||
if (pHbReq->query) {
|
||||
SQueryHbReqBasic *pBasic = pHbReq->query;
|
||||
|
||||
SRpcConnInfo connInfo = pMsg->conn;
|
||||
SRpcConnInfo connInfo = pMsg->info.conn;
|
||||
|
||||
SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
|
||||
if (pConn == NULL) {
|
||||
|
@ -500,7 +501,7 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
|
|||
SMnode *pMnode = pReq->info.node;
|
||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||
|
||||
SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pUser == NULL) return 0;
|
||||
if (!pUser->superUser) {
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
|
@ -523,7 +524,7 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
|
|||
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
|
||||
return -1;
|
||||
} else {
|
||||
mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->conn.user);
|
||||
mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->info.conn.user);
|
||||
pConn->killId = killReq.queryId;
|
||||
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
|
||||
return 0;
|
||||
|
@ -534,7 +535,7 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
|
|||
SMnode *pMnode = pReq->info.node;
|
||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||
|
||||
SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pUser == NULL) return 0;
|
||||
if (!pUser->superUser) {
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
|
@ -555,7 +556,7 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
|
|||
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
|
||||
return -1;
|
||||
} else {
|
||||
mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->conn.user);
|
||||
mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->info.conn.user);
|
||||
pConn->killed = 1;
|
||||
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -294,7 +294,7 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_QNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_QNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -396,7 +396,7 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_QNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_QNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -229,7 +229,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
|
|||
|
||||
mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type);
|
||||
|
||||
// if (mndCheckShowAuth(pMnode, pReq->conn.user, pShow->type) != 0) return -1;
|
||||
// if (mndCheckShowAuth(pMnode, pReq->info.conn.user, pShow->type) != 0) return -1;
|
||||
|
||||
int32_t numOfCols = pShow->pMeta->numOfColumns;
|
||||
SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
|
|
|
@ -609,7 +609,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
mndTransSetDbName(pTrans, pDb->name);
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
mndTransSetSerial(pTrans);
|
||||
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
|
||||
|
||||
|
@ -710,7 +710,7 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -852,7 +852,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
|
|||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
|
||||
mndTransSetDbName(pTrans, pDb->name);
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
|
||||
if (mndSetDropSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
|
||||
if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
|
||||
|
@ -971,7 +971,7 @@ static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -300,7 +300,7 @@ static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_SNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_SNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -404,7 +404,7 @@ static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_SNODE) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_SNODE) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -758,7 +758,7 @@ _OVER:
|
|||
}
|
||||
|
||||
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
mndTransSetDbName(pTrans, pDb->name);
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
||||
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
||||
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
||||
|
@ -805,7 +805,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -1396,7 +1396,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
|
|||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name);
|
||||
mndTransSetDbName(pTrans, pDb->name);
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
|
||||
if (needRsp) {
|
||||
void *pCont = NULL;
|
||||
|
@ -1454,7 +1454,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -1537,7 +1537,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p
|
|||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
|
||||
mndTransSetDbName(pTrans, pDb->name);
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
|
||||
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
|
||||
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
|
||||
|
@ -1584,7 +1584,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -545,7 +545,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (streamObj.targetSTbName[0] && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) {
|
||||
if (streamObj.targetSTbName[0] && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
|
||||
mError("trans:%d, failed to create stb for stream since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
|
@ -602,7 +602,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
#endif
|
||||
|
@ -613,9 +613,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
mndTransSetDbName(pTrans, createStreamReq.sourceDB);
|
||||
mndTransSetDbName(pTrans, createStreamReq.sourceDB, NULL);
|
||||
// TODO
|
||||
/*mndTransSetDbName(pTrans, streamObj.targetDb);*/
|
||||
/*mndTransSetDbName(pTrans, streamObj.targetDb, NULL);*/
|
||||
mDebug("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
|
||||
|
||||
// build stream obj from request
|
||||
|
@ -627,7 +627,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
// create stb for stream
|
||||
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) {
|
||||
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
|
||||
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
goto _OVER;
|
||||
|
@ -696,7 +696,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
|
||||
#if 0
|
||||
// todo check auth
|
||||
pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pUser == NULL) {
|
||||
goto DROP_STREAM_OVER;
|
||||
}
|
||||
|
|
|
@ -403,7 +403,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
|
||||
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg);
|
||||
mndTransSetDbName(pTrans, pOutput->pSub->dbName);
|
||||
mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
|
||||
if (pTrans == NULL) return -1;
|
||||
|
||||
// make txn:
|
||||
|
|
|
@ -59,8 +59,8 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
|
|||
if (pMgmt->errCode != 0) {
|
||||
mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode));
|
||||
}
|
||||
tsem_post(&pMgmt->syncSem);
|
||||
pMgmt->transId = 0;
|
||||
tsem_post(&pMgmt->syncSem);
|
||||
} else {
|
||||
STrans *pTrans = mndAcquireTrans(pMnode, transId);
|
||||
if (pTrans != NULL) {
|
||||
|
@ -70,7 +70,8 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
|
|||
|
||||
if (cbMeta.index - sdbGetApplyIndex(pMnode->pSdb) > 100) {
|
||||
SSnapshotMeta sMeta = {0};
|
||||
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
|
||||
// if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
|
||||
if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, cbMeta.index, &sMeta) == 0) {
|
||||
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
|
||||
}
|
||||
sdbWriteFile(pMnode->pSdb);
|
||||
|
@ -90,7 +91,10 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
|
|||
SMnode *pMnode = pFsm->data;
|
||||
|
||||
SSnapshotMeta sMeta = {0};
|
||||
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
|
||||
// if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
|
||||
|
||||
SyncIndex snapshotIndex = sdbGetApplyIndex(pMnode->pSdb);
|
||||
if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, snapshotIndex, &sMeta) == 0) {
|
||||
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
|
||||
}
|
||||
|
||||
|
@ -123,8 +127,8 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
|
|||
if (pMgmt->errCode != 0) {
|
||||
mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode));
|
||||
}
|
||||
tsem_post(&pMgmt->syncSem);
|
||||
pMgmt->transId = 0;
|
||||
tsem_post(&pMgmt->syncSem);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -262,6 +266,7 @@ void mndSyncStart(SMnode *pMnode) {
|
|||
|
||||
void mndSyncStop(SMnode *pMnode) {
|
||||
if (pMnode->syncMgmt.transId != 0) {
|
||||
pMnode->syncMgmt.transId = 0;
|
||||
tsem_post(&pMnode->syncMgmt.syncSem);
|
||||
pMnode->syncMgmt.transId = 0;
|
||||
}
|
||||
|
|
|
@ -474,7 +474,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -566,7 +566,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
|||
#endif
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
|
||||
mndTransSetDbName(pTrans, pTopic->db);
|
||||
mndTransSetDbName(pTrans, pTopic->db, NULL);
|
||||
if (pTrans == NULL) {
|
||||
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||
return -1;
|
||||
|
|
|
@ -122,7 +122,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
|||
SDB_SET_INT8(pRaw, dataPos, pTrans->conflict, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pTrans->exec, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER)
|
||||
|
||||
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
|
||||
|
@ -270,7 +271,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
|||
pTrans->conflict = conflict;
|
||||
pTrans->exec = exec;
|
||||
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER)
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER)
|
||||
|
@ -649,7 +651,14 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *
|
|||
pTrans->paramLen = paramLen;
|
||||
}
|
||||
|
||||
void mndTransSetDbName(STrans *pTrans, const char *dbname) { memcpy(pTrans->dbname, dbname, TSDB_DB_FNAME_LEN); }
|
||||
void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2) {
|
||||
if (dbname1 != NULL) {
|
||||
memcpy(pTrans->dbname1, dbname1, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
if (dbname2 != NULL) {
|
||||
memcpy(pTrans->dbname2, dbname2, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
}
|
||||
|
||||
void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; }
|
||||
|
||||
|
@ -688,14 +697,24 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
|
|||
if (pNew->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||
if (pNew->conflict == TRN_CONFLICT_DB) {
|
||||
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||
if (pTrans->conflict == TRN_CONFLICT_DB && strcmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
|
||||
if (pTrans->conflict == TRN_CONFLICT_DB_INSIDE && strcmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
|
||||
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
|
||||
if (strcmp(pNew->dbname1, pTrans->dbname1) == 0 || strcmp(pNew->dbname1, pTrans->dbname2) == 0 ||
|
||||
strcmp(pNew->dbname2, pTrans->dbname1) == 0 || strcmp(pNew->dbname2, pTrans->dbname2) == 0) {
|
||||
conflict = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (pNew->conflict == TRN_CONFLICT_DB_INSIDE) {
|
||||
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||
if (pTrans->conflict == TRN_CONFLICT_DB && strcmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
|
||||
if (pTrans->conflict == TRN_CONFLICT_DB) {
|
||||
if (strcmp(pNew->dbname1, pTrans->dbname1) == 0 || strcmp(pNew->dbname1, pTrans->dbname2) == 0 ||
|
||||
strcmp(pNew->dbname2, pTrans->dbname1) == 0 || strcmp(pNew->dbname2, pTrans->dbname2) == 0) {
|
||||
conflict = true;
|
||||
}
|
||||
mError("trans:%d, can't execute since conflict with trans:%d, db:%s", pNew->id, pTrans->id, pTrans->dbname);
|
||||
}
|
||||
}
|
||||
mError("trans:%d, can't execute since conflict with trans:%d, db1:%s db2:%s", pNew->id, pTrans->id, pTrans->dbname1,
|
||||
pTrans->dbname2);
|
||||
sdbRelease(pMnode->pSdb, pTrans);
|
||||
}
|
||||
|
||||
|
@ -704,7 +723,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
|
|||
|
||||
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
||||
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
|
||||
if (strlen(pTrans->dbname) == 0) {
|
||||
if (strlen(pTrans->dbname1) == 0 && strlen(pTrans->dbname2) == 0) {
|
||||
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
mError("trans:%d, failed to prepare conflict db not set", pTrans->id);
|
||||
return -1;
|
||||
|
@ -1369,7 +1388,7 @@ static int32_t mndProcessKillTransReq(SRpcMsg *pReq) {
|
|||
|
||||
mInfo("trans:%d, start to kill", killReq.transId);
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_KILL_TRANS) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_KILL_TRANS) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -1417,7 +1436,9 @@ void mndTransPullup(SMnode *pMnode) {
|
|||
}
|
||||
|
||||
SSnapshotMeta sMeta = {0};
|
||||
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
|
||||
// if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
|
||||
SyncIndex snapshotIndex = sdbGetApplyIndex(pMnode->pSdb);
|
||||
if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, snapshotIndex, &sMeta) == 0) {
|
||||
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
|
||||
}
|
||||
sdbWriteFile(pMnode->pSdb);
|
||||
|
@ -1449,10 +1470,15 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)stage, false);
|
||||
|
||||
char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(pTrans->dbname), pShow->pMeta->pSchemas[cols].bytes);
|
||||
char dbname1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(dbname1, mndGetDbStr(pTrans->dbname1), pShow->pMeta->pSchemas[cols].bytes);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)dbname, false);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)dbname1, false);
|
||||
|
||||
char dbname2[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(dbname2, mndGetDbStr(pTrans->dbname2), pShow->pMeta->pSchemas[cols].bytes);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)dbname2, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->failedTimes, false);
|
||||
|
|
|
@ -354,13 +354,13 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
pOperUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
pOperUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pOperUser == NULL) {
|
||||
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_USER) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_USER) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -460,7 +460,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
pOperUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
pOperUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||
if (pOperUser == NULL) {
|
||||
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||
goto _OVER;
|
||||
|
@ -643,7 +643,7 @@ static int32_t mndProcessDropUserReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_USER) != 0) {
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_USER) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -1200,7 +1200,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
|
|||
|
||||
mInfo("vgId:%d, start to redistribute to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER;
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER;
|
||||
|
||||
pVgroup = mndAcquireVgroup(pMnode, req.vgId);
|
||||
if (pVgroup == NULL) goto _OVER;
|
||||
|
@ -1500,7 +1500,7 @@ static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) {
|
|||
pDb = mndAcquireDb(pMnode, pVgroup->dbName);
|
||||
if (pDb == NULL) goto _OVER;
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_SPLIT_VGROUP) != 0) goto _OVER;
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_SPLIT_VGROUP) != 0) goto _OVER;
|
||||
|
||||
code = mndSplitVgroup(pMnode, pReq, pDb, pVgroup);
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
@ -1624,7 +1624,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
|
|||
|
||||
mDebug("start to balance vgroup");
|
||||
|
||||
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_BALANCE_VGROUP) != 0) goto _OVER;
|
||||
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_BALANCE_VGROUP) != 0) goto _OVER;
|
||||
|
||||
while (1) {
|
||||
SDnodeObj *pDnode = NULL;
|
||||
|
|
|
@ -128,7 +128,7 @@ class MndTestTrans2 : public ::testing::Test {
|
|||
mndTransSetCb(pTrans, TRANS_START_FUNC_TEST, TRANS_STOP_FUNC_TEST, param, strlen(param) + 1);
|
||||
|
||||
if (pDb != NULL) {
|
||||
mndTransSetDbName(pTrans, pDb->name);
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
}
|
||||
|
||||
int32_t code = mndTransPrepare(pMnode, pTrans);
|
||||
|
@ -201,7 +201,7 @@ class MndTestTrans2 : public ::testing::Test {
|
|||
}
|
||||
|
||||
if (pDb != NULL) {
|
||||
mndTransSetDbName(pTrans, pDb->name);
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
}
|
||||
|
||||
int32_t code = mndTransPrepare(pMnode, pTrans);
|
||||
|
|
|
@ -220,7 +220,7 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
info.state.committed = pVnode->state.applied;
|
||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
|
||||
if (vnodeSaveInfo(dir, &info) < 0) {
|
||||
// ASSERT(0);
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -466,10 +466,10 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe
|
|||
}
|
||||
|
||||
// process request
|
||||
// if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
|
||||
// rcode = terrno;
|
||||
// goto _exit;
|
||||
// }
|
||||
if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
|
||||
rcode = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// return rsp
|
||||
_exit:
|
||||
|
|
|
@ -184,7 +184,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
|
||||
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
||||
if (rsp.code == 0) {
|
||||
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->conn.applyIndex, &rsp) < 0) {
|
||||
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
|
||||
rsp.code = terrno;
|
||||
vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr());
|
||||
}
|
||||
|
@ -329,8 +329,9 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
|||
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) {
|
||||
SVnode *pVnode = pFsm->data;
|
||||
|
||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index};
|
||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
||||
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
|
||||
rpcMsg.info.conn.applyIndex = cbMeta.index;
|
||||
|
||||
vInfo("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
|
||||
TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle);
|
||||
|
@ -359,10 +360,11 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
|
|||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
|
||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index};
|
||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
||||
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
|
||||
rpcMsg.info.conn.applyIndex = cbMeta.index;
|
||||
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
|
||||
|
||||
} else {
|
||||
|
|
|
@ -3507,8 +3507,7 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock)
|
|||
// check for the limitation in each group
|
||||
if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) {
|
||||
pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput);
|
||||
|
||||
if (pProjectInfo->slimit.limit == -1 || pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) {
|
||||
if (pProjectInfo->slimit.limit > 0 && pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
||||
|
|
|
@ -331,7 +331,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
closeAllResultRows(&pInfo->binfo.resultRowInfo);
|
||||
|
||||
#if 0
|
||||
if(pOperator->fpSet.encodeResultRow){
|
||||
|
|
|
@ -2170,25 +2170,17 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa
|
|||
blockDataEnsureCapacity(p, capacity);
|
||||
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
if (pInfo->prefetchedTuple == NULL) {
|
||||
pTupleHandle = tsortNextTuple(pHandle);
|
||||
} else {
|
||||
pTupleHandle = pInfo->prefetchedTuple;
|
||||
}
|
||||
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
|
||||
if (p->info.rows >= capacity) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows);
|
||||
return (p->info.rows > 0) ? p : NULL;
|
||||
}
|
||||
|
@ -2261,7 +2253,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
|
||||
STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -2288,7 +2279,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
pInfo->interval = extractIntervalInfo(pTableScanNode);
|
||||
pInfo->sample.sampleRatio = pTableScanNode->ratio;
|
||||
pInfo->sample.seed = taosGetTimestampSec();
|
||||
|
||||
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
|
||||
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
|
||||
pInfo->dataReaders = dataReaders;
|
||||
|
@ -2307,17 +2297,22 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
taosArrayPush(pInfo->sortSourceParams, param);
|
||||
taosMemoryFree(param);
|
||||
}
|
||||
|
||||
pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order);
|
||||
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||
|
||||
int32_t rowSize = pInfo->pResBlock->info.rowSize;
|
||||
pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2;
|
||||
pInfo->sortBufSize = pInfo->bufPageSize * 16;
|
||||
pInfo->bufPageSize = getProperSortPageSize(rowSize);
|
||||
|
||||
// todo the total available buffer should be determined by total capacity of buffer of this task.
|
||||
// the additional one is reserved for merge result
|
||||
pInfo->sortBufSize = pInfo->bufPageSize * (taosArrayGetSize(dataReaders) + 1);
|
||||
pInfo->hasGroupId = false;
|
||||
pInfo->prefetchedTuple = NULL;
|
||||
|
||||
pOperator->name = "TableMergeScanOperator";
|
||||
// TODO : change it
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
|
|
|
@ -1384,8 +1384,9 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos,
|
||||
int32_t rowIndex);
|
||||
static void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex);
|
||||
|
||||
static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rIndex);
|
||||
|
||||
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
|
@ -1407,11 +1408,23 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
|
||||
if (pEntryInfo->numOfRes > 0) {
|
||||
setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow);
|
||||
} else {
|
||||
setNullSelectivityValue(pCtx, pBlock, currentRow);
|
||||
}
|
||||
|
||||
return pEntryInfo->numOfRes;
|
||||
}
|
||||
|
||||
void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex) {
|
||||
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||
|
||||
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||
colDataAppendNULL(pDstCol, rowIndex);
|
||||
}
|
||||
}
|
||||
|
||||
void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex) {
|
||||
int32_t pageId = pTuplePos->pageId;
|
||||
int32_t offset = pTuplePos->offset;
|
||||
|
@ -4627,8 +4640,6 @@ int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
|
||||
STailItem* pItem = pInfo->pItems[i];
|
||||
colDataAppend(pCol, currentRow, pItem->data, false);
|
||||
|
||||
// setSelectivityValue(pCtx, pBlock, &pInfo->pItems[i].tuplePos, currentRow);
|
||||
currentRow += 1;
|
||||
}
|
||||
|
||||
|
|
|
@ -29,6 +29,8 @@ extern "C" {
|
|||
|
||||
#define CONFIG_FILE_LEN 1024
|
||||
|
||||
#define MAX_CONFIG_INDEX_COUNT 512
|
||||
|
||||
typedef struct SRaftCfg {
|
||||
SSyncCfg cfg;
|
||||
TdFilePtr pFile;
|
||||
|
@ -36,6 +38,10 @@ typedef struct SRaftCfg {
|
|||
int8_t isStandBy;
|
||||
int8_t snapshotEnable;
|
||||
SyncIndex lastConfigIndex;
|
||||
|
||||
SyncIndex configIndexArr[MAX_CONFIG_INDEX_COUNT];
|
||||
int32_t configIndexCount;
|
||||
|
||||
} SRaftCfg;
|
||||
|
||||
SRaftCfg *raftCfgOpen(const char *path);
|
||||
|
|
|
@ -713,8 +713,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
// delete confict entries
|
||||
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
|
||||
ASSERT(code == 0);
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu log truncate, from %ld to %ld", ths->vgId,
|
||||
syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, delBegin, delEnd);
|
||||
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld", ths->vgId,
|
||||
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, delBegin, delEnd);
|
||||
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
|
||||
|
||||
return code;
|
||||
|
@ -880,6 +880,72 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
|||
}
|
||||
} while (0);
|
||||
|
||||
// fake match2
|
||||
//
|
||||
// condition1:
|
||||
// preIndex <= my commit index
|
||||
//
|
||||
// operation:
|
||||
// if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry
|
||||
// match my-commit-index or my-commit-index + 1
|
||||
// no operation on log
|
||||
do {
|
||||
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
|
||||
(pMsg->prevLogIndex <= ths->commitIndex);
|
||||
if (condition) {
|
||||
sTrace("recv SyncAppendEntries, fake match2, msg-prevLogIndex:%ld, my-commitIndex:%ld", pMsg->prevLogIndex,
|
||||
ths->commitIndex);
|
||||
|
||||
SyncIndex matchIndex = ths->commitIndex;
|
||||
bool hasAppendEntries = pMsg->dataLen > 0;
|
||||
if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) {
|
||||
// append entry
|
||||
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
|
||||
ASSERT(pAppendEntry != NULL);
|
||||
|
||||
{
|
||||
// has extra entries (> preIndex) in local log
|
||||
SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
|
||||
bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex;
|
||||
|
||||
if (hasExtraEntries) {
|
||||
// make log same, rollback deleted entries
|
||||
code = syncNodeMakeLogSame(ths, pMsg);
|
||||
ASSERT(code == 0);
|
||||
}
|
||||
}
|
||||
|
||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
||||
ASSERT(code == 0);
|
||||
|
||||
// pre commit
|
||||
code = syncNodePreCommit(ths, pAppendEntry);
|
||||
ASSERT(code == 0);
|
||||
|
||||
matchIndex = pMsg->prevLogIndex + 1;
|
||||
|
||||
syncEntryDestory(pAppendEntry);
|
||||
}
|
||||
|
||||
// prepare response msg
|
||||
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
||||
pReply->srcId = ths->myRaftId;
|
||||
pReply->destId = pMsg->srcId;
|
||||
pReply->term = ths->pRaftStore->currentTerm;
|
||||
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
|
||||
pReply->success = true;
|
||||
pReply->matchIndex = matchIndex;
|
||||
|
||||
// send response
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||
syncAppendEntriesReplyDestroy(pReply);
|
||||
|
||||
return ret;
|
||||
}
|
||||
} while (0);
|
||||
|
||||
// calculate logOK here, before will coredump, due to fake match
|
||||
bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg);
|
||||
|
||||
|
@ -995,8 +1061,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
|||
SyncIndex commitEnd = snapshot.lastApplyIndex;
|
||||
ths->commitIndex = snapshot.lastApplyIndex;
|
||||
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu commit by snapshot from index:%ld to index:%ld", ths->vgId,
|
||||
syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, commitBegin, commitEnd);
|
||||
sDebug(
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld",
|
||||
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
|
||||
commitBegin, commitEnd);
|
||||
}
|
||||
|
||||
SyncIndex beginIndex = ths->commitIndex + 1;
|
||||
|
|
|
@ -190,21 +190,21 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
|
|||
if (gRaftDetailLog) {
|
||||
char* s = snapshotSender2Str(pSender);
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d start sender first time, "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
|
||||
"lastApplyIndex:%ld "
|
||||
"lastApplyTerm:%lu "
|
||||
"lastConfigIndex:%ld privateTerm:%lu "
|
||||
"sender:%s",
|
||||
ths->vgId, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, host, port,
|
||||
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port,
|
||||
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
|
||||
pSender->privateTerm, s);
|
||||
taosMemoryFree(s);
|
||||
} else {
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d start sender first time, "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
|
||||
"lastApplyIndex:%ld "
|
||||
"lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu",
|
||||
ths->vgId, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, host, port,
|
||||
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port,
|
||||
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
|
||||
pSender->privateTerm);
|
||||
}
|
||||
|
|
|
@ -56,9 +56,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
|||
SyncIndex commitEnd = snapshot.lastApplyIndex;
|
||||
pSyncNode->commitIndex = snapshot.lastApplyIndex;
|
||||
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu commit by snapshot from index:%ld to index:%ld", pSyncNode->vgId,
|
||||
syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex,
|
||||
snapshot.lastApplyIndex);
|
||||
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex);
|
||||
}
|
||||
|
||||
// update commit index
|
||||
|
|
|
@ -420,6 +420,29 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return -1;
|
||||
}
|
||||
assert(rid == pSyncNode->rid);
|
||||
|
||||
ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
|
||||
SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];
|
||||
|
||||
for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
|
||||
if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
|
||||
(pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
|
||||
lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
|
||||
}
|
||||
}
|
||||
sMeta->lastConfigIndex = lastIndex;
|
||||
sTrace("sync get snapshot meta by index:%ld lastConfigIndex:%ld", snapshotIndex, sMeta->lastConfigIndex);
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
return 0;
|
||||
}
|
||||
|
||||
const char* syncGetMyRoleStr(int64_t rid) {
|
||||
const char* s = syncUtilState2String(syncGetMyRole(rid));
|
||||
return s;
|
||||
|
@ -575,9 +598,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
|
|||
return -1;
|
||||
}
|
||||
assert(rid == pSyncNode->rid);
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
|
||||
syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType),
|
||||
pMsg->msgType);
|
||||
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
|
||||
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm,
|
||||
TMSG_INFO(pMsg->msgType), pMsg->msgType);
|
||||
ret = syncNodePropose(pSyncNode, pMsg, isWeak);
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
|
@ -586,9 +609,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
|
|||
|
||||
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) {
|
||||
int32_t ret = 0;
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
|
||||
syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType),
|
||||
pMsg->msgType);
|
||||
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
|
||||
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm,
|
||||
TMSG_INFO(pMsg->msgType), pMsg->msgType);
|
||||
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
SRespStub stub;
|
||||
|
@ -834,8 +857,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
|
|||
// snapshot meta
|
||||
// pSyncNode->sMeta.lastConfigIndex = -1;
|
||||
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu sync open", pSyncNode->vgId, syncUtilState2String(pSyncNode->state),
|
||||
pSyncNode->pRaftStore->currentTerm);
|
||||
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync open", pSyncNode->vgId,
|
||||
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm);
|
||||
|
||||
return pSyncNode;
|
||||
}
|
||||
|
@ -882,8 +905,8 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
void syncNodeClose(SSyncNode* pSyncNode) {
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu sync close", pSyncNode->vgId, syncUtilState2String(pSyncNode->state),
|
||||
pSyncNode->pRaftStore->currentTerm);
|
||||
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync close", pSyncNode->vgId,
|
||||
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm);
|
||||
|
||||
int32_t ret;
|
||||
assert(pSyncNode != NULL);
|
||||
|
@ -1322,9 +1345,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
|
|||
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
|
||||
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
oldSenders[i] = (pSyncNode->senders)[i];
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu save senders %d, %p, privateTerm:%lu", pSyncNode->vgId,
|
||||
syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, i, oldSenders[i],
|
||||
oldSenders[i]->privateTerm);
|
||||
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], oldSenders[i]->privateTerm);
|
||||
if (gRaftDetailLog) {
|
||||
;
|
||||
}
|
||||
|
@ -1376,9 +1399,12 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
|
|||
char host[128];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
|
||||
(pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j], oldSenders[j]->privateTerm);
|
||||
sDebug(
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, "
|
||||
"privateTerm:%lu",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j],
|
||||
oldSenders[j]->privateTerm);
|
||||
(pSyncNode->senders)[i] = oldSenders[j];
|
||||
oldSenders[j] = NULL;
|
||||
reset = true;
|
||||
|
@ -1386,9 +1412,11 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
|
|||
// reset replicaIndex
|
||||
int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
|
||||
(pSyncNode->senders)[i]->replicaIndex = i;
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
|
||||
oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset);
|
||||
sDebug(
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, "
|
||||
"reset:%d",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1397,9 +1425,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
|
|||
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
if ((pSyncNode->senders)[i] == NULL) {
|
||||
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
|
||||
(pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm);
|
||||
sDebug(
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1407,8 +1436,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
|
|||
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
if (oldSenders[i] != NULL) {
|
||||
snapshotSenderDestroy(oldSenders[i]);
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu delete old sender %p replicaIndex:%d", pSyncNode->vgId,
|
||||
syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, oldSenders[i], i);
|
||||
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, oldSenders[i], i);
|
||||
oldSenders[i] = NULL;
|
||||
}
|
||||
}
|
||||
|
@ -1479,10 +1509,11 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
|
|||
|
||||
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, "
|
||||
"restoreFinish:%d, %s",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
|
||||
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->restoreFinish, debugStr);
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
|
||||
pSyncNode->restoreFinish, debugStr);
|
||||
|
||||
// maybe clear leader cache
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
|
@ -1519,9 +1550,12 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
// reset restoreFinish
|
||||
pSyncNode->restoreFinish = false;
|
||||
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, restoreFinish:%d, %s",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
|
||||
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->restoreFinish, debugStr);
|
||||
sDebug(
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, "
|
||||
"restoreFinish:%d, %s",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
|
||||
pSyncNode->restoreFinish, debugStr);
|
||||
|
||||
// state change
|
||||
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
|
||||
|
@ -2095,14 +2129,15 @@ const char* syncStr(ESyncState state) {
|
|||
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
|
||||
SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
|
||||
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu begin leader transfer", ths->vgId, syncUtilState2String(ths->state),
|
||||
ths->pRaftStore->currentTerm);
|
||||
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer", ths->vgId,
|
||||
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm);
|
||||
|
||||
if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
|
||||
pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) {
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId,
|
||||
syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
|
||||
pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
|
||||
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId,
|
||||
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
|
||||
pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort,
|
||||
pSyncLeaderTransfer->newLeaderId.addr);
|
||||
|
||||
// reset elect timer now!
|
||||
int32_t electMS = 1;
|
||||
|
@ -2180,8 +2215,16 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
|
|||
|
||||
// change isStandBy to normal
|
||||
if (!isDrop) {
|
||||
char tmpbuf[128];
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d", oldSyncCfg.replicaNum, newSyncCfg.replicaNum);
|
||||
char tmpbuf[512];
|
||||
char* oldStr = syncCfg2Str(&oldSyncCfg);
|
||||
char* newStr = syncCfg2Str(&newSyncCfg);
|
||||
syncUtilJson2Line(oldStr);
|
||||
syncUtilJson2Line(newStr);
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum,
|
||||
newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
|
||||
taosMemoryFree(oldStr);
|
||||
taosMemoryFree(newStr);
|
||||
|
||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||
syncNodeBecomeLeader(ths, tmpbuf);
|
||||
} else {
|
||||
|
@ -2189,8 +2232,16 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
|
|||
}
|
||||
}
|
||||
} else {
|
||||
char tmpbuf[128];
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d", oldSyncCfg.replicaNum, newSyncCfg.replicaNum);
|
||||
char tmpbuf[512];
|
||||
char* oldStr = syncCfg2Str(&oldSyncCfg);
|
||||
char* newStr = syncCfg2Str(&newSyncCfg);
|
||||
syncUtilJson2Line(oldStr);
|
||||
syncUtilJson2Line(newStr);
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum,
|
||||
newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
|
||||
taosMemoryFree(oldStr);
|
||||
taosMemoryFree(newStr);
|
||||
|
||||
syncNodeBecomeFollower(ths, tmpbuf);
|
||||
}
|
||||
|
||||
|
@ -2224,9 +2275,10 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
|
|||
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
|
||||
int32_t code = 0;
|
||||
ESyncState state = flag;
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s",
|
||||
ths->vgId, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, beginIndex, endIndex,
|
||||
syncUtilState2String(state));
|
||||
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64
|
||||
", %s",
|
||||
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, beginIndex,
|
||||
endIndex, syncUtilState2String(state));
|
||||
|
||||
// execute fsm
|
||||
if (ths->pFsm != NULL) {
|
||||
|
@ -2268,15 +2320,16 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
|
|||
}
|
||||
|
||||
// restore finish
|
||||
// if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
|
||||
if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
|
||||
if (ths->restoreFinish == false) {
|
||||
if (ths->pFsm->FpRestoreFinishCb != NULL) {
|
||||
ths->pFsm->FpRestoreFinishCb(ths->pFsm);
|
||||
}
|
||||
ths->restoreFinish = true;
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu restore finish, %s, index:%ld", ths->vgId,
|
||||
syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, syncUtilState2String(ths->state),
|
||||
pEntry->index);
|
||||
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld", ths->vgId,
|
||||
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
|
||||
syncUtilState2String(ths->state), pEntry->index);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -85,11 +85,6 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
|
|||
}
|
||||
|
||||
return pRoot;
|
||||
/*
|
||||
cJSON *pJson = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pJson, "SSyncCfg", pRoot);
|
||||
return pJson;
|
||||
*/
|
||||
}
|
||||
|
||||
char *syncCfg2Str(SSyncCfg *pSyncCfg) {
|
||||
|
@ -154,6 +149,16 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
|
|||
snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex);
|
||||
cJSON_AddStringToObject(pRoot, "lastConfigIndex", buf64);
|
||||
|
||||
cJSON_AddNumberToObject(pRoot, "configIndexCount", pRaftCfg->configIndexCount);
|
||||
cJSON *pIndexArr = cJSON_CreateArray();
|
||||
cJSON_AddItemToObject(pRoot, "configIndexArr", pIndexArr);
|
||||
for (int i = 0; i < pRaftCfg->configIndexCount; ++i) {
|
||||
snprintf(buf64, sizeof(buf64), "%ld", (pRaftCfg->configIndexArr)[i]);
|
||||
cJSON *pIndexObj = cJSON_CreateObject();
|
||||
cJSON_AddStringToObject(pIndexObj, "index", buf64);
|
||||
cJSON_AddItemToArray(pIndexArr, pIndexObj);
|
||||
}
|
||||
|
||||
cJSON *pJson = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pJson, "RaftCfg", pRoot);
|
||||
return pJson;
|
||||
|
@ -177,6 +182,9 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
|
|||
raftCfg.isStandBy = meta.isStandBy;
|
||||
raftCfg.snapshotEnable = meta.snapshotEnable;
|
||||
raftCfg.lastConfigIndex = meta.lastConfigIndex;
|
||||
raftCfg.configIndexCount = 1;
|
||||
memset(raftCfg.configIndexArr, 0, sizeof(raftCfg.configIndexArr));
|
||||
raftCfg.configIndexArr[0] = -1;
|
||||
char *s = raftCfg2Str(&raftCfg);
|
||||
|
||||
char buf[CONFIG_FILE_LEN] = {0};
|
||||
|
@ -207,6 +215,23 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
|
|||
cJSON *pJsonLastConfigIndex = cJSON_GetObjectItem(pJson, "lastConfigIndex");
|
||||
pRaftCfg->lastConfigIndex = atoll(cJSON_GetStringValue(pJsonLastConfigIndex));
|
||||
|
||||
cJSON *pJsonConfigIndexCount = cJSON_GetObjectItem(pJson, "configIndexCount");
|
||||
pRaftCfg->configIndexCount = cJSON_GetNumberValue(pJsonConfigIndexCount);
|
||||
|
||||
cJSON *pIndexArr = cJSON_GetObjectItem(pJson, "configIndexArr");
|
||||
int arraySize = cJSON_GetArraySize(pIndexArr);
|
||||
assert(arraySize == pRaftCfg->configIndexCount);
|
||||
|
||||
memset(pRaftCfg->configIndexArr, 0, sizeof(pRaftCfg->configIndexArr));
|
||||
for (int i = 0; i < arraySize; ++i) {
|
||||
cJSON *pIndexObj = cJSON_GetArrayItem(pIndexArr, i);
|
||||
assert(pIndexObj != NULL);
|
||||
|
||||
cJSON *pIndex = cJSON_GetObjectItem(pIndexObj, "index");
|
||||
assert(cJSON_IsString(pIndex));
|
||||
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
|
||||
}
|
||||
|
||||
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
|
||||
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
|
||||
ASSERT(code == 0);
|
||||
|
|
|
@ -163,8 +163,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
|
|||
|
||||
walFsync(pWal, true);
|
||||
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d",
|
||||
pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state),
|
||||
sDebug(
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, "
|
||||
"originalRpcType:%s,%d",
|
||||
pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex,
|
||||
pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy,
|
||||
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
|
||||
|
||||
|
@ -323,11 +325,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
|||
walFsync(pWal, true);
|
||||
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, "
|
||||
"originalRpcType:%s,%d",
|
||||
pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftStore->currentTerm,
|
||||
pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType), pEntry->msgType,
|
||||
TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
|
||||
pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex,
|
||||
pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy,
|
||||
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -408,7 +410,6 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
|
|||
}
|
||||
|
||||
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||
/*
|
||||
SSyncLogStoreData* pData = pLogStore->data;
|
||||
SWal* pWal = pData->pWal;
|
||||
// assert(walCommit(pWal, index) == 0);
|
||||
|
@ -418,10 +419,9 @@ int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
|
|||
const char* errStr = tstrerror(err);
|
||||
int32_t linuxErr = errno;
|
||||
const char* linuxErrMsg = strerror(errno);
|
||||
sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
|
||||
linuxErrMsg); ASSERT(0);
|
||||
sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg);
|
||||
ASSERT(0);
|
||||
}
|
||||
*/
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -46,10 +46,11 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
|
|||
taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub));
|
||||
|
||||
SSyncNode *pSyncNode = pObj->data;
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
|
||||
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, pStub->rpcMsg.info.handle,
|
||||
pStub->rpcMsg.info.ahandle);
|
||||
sDebug(
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode,
|
||||
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
|
||||
|
||||
taosThreadMutexUnlock(&(pObj->mutex));
|
||||
return keyCode;
|
||||
|
@ -72,10 +73,12 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
|
|||
memcpy(pStub, pTmp, sizeof(SRespStub));
|
||||
|
||||
SSyncNode *pSyncNode = pObj->data;
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
|
||||
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle,
|
||||
pStub->rpcMsg.info.ahandle);
|
||||
sDebug(
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p "
|
||||
"ahandle:%p",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index,
|
||||
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
|
||||
|
||||
taosThreadMutexUnlock(&(pObj->mutex));
|
||||
return 1; // get one object
|
||||
|
@ -92,10 +95,12 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
|
|||
memcpy(pStub, pTmp, sizeof(SRespStub));
|
||||
|
||||
SSyncNode *pSyncNode = pObj->data;
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
|
||||
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle,
|
||||
pStub->rpcMsg.info.ahandle);
|
||||
sDebug(
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p "
|
||||
"ahandle:%p",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index,
|
||||
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
|
||||
|
||||
taosHashRemove(pObj->pRespHash, &index, sizeof(index));
|
||||
taosThreadMutexUnlock(&(pObj->mutex));
|
||||
|
|
|
@ -141,21 +141,23 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
|||
if (gRaftDetailLog) {
|
||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
|
||||
"lastApplyIndex:%ld "
|
||||
"lastApplyTerm:%lu "
|
||||
"lastConfigIndex:%ld privateTerm:%lu send "
|
||||
"msg:%s",
|
||||
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state),
|
||||
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
|
||||
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
|
||||
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
|
||||
pSender->privateTerm, msgStr);
|
||||
taosMemoryFree(msgStr);
|
||||
} else {
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
|
||||
"lastApplyIndex:%ld "
|
||||
"lastApplyTerm:%lu "
|
||||
"lastConfigIndex:%ld privateTerm:%lu",
|
||||
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state),
|
||||
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
|
||||
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
|
||||
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
|
||||
pSender->privateTerm);
|
||||
|
@ -285,31 +287,34 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
|||
if (gRaftDetailLog) {
|
||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
|
||||
"lastApplyIndex:%ld "
|
||||
"lastApplyTerm:%lu "
|
||||
"lastConfigIndex:%ld privateTerm:%lu send "
|
||||
"msg:%s",
|
||||
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state),
|
||||
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
|
||||
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
|
||||
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
|
||||
pSender->privateTerm, msgStr);
|
||||
taosMemoryFree(msgStr);
|
||||
} else {
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
|
||||
"lastApplyIndex:%ld "
|
||||
"lastApplyTerm:%lu "
|
||||
"lastConfigIndex:%ld privateTerm:%lu",
|
||||
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state),
|
||||
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
|
||||
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
|
||||
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
|
||||
pSender->privateTerm);
|
||||
}
|
||||
} else {
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d "
|
||||
"lastApplyIndex:%ld "
|
||||
"lastApplyTerm:%lu "
|
||||
"lastConfigIndex:%ld privateTerm:%lu",
|
||||
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state),
|
||||
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
|
||||
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
|
||||
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
|
||||
pSender->privateTerm);
|
||||
|
@ -344,15 +349,18 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
|
|||
if (gRaftDetailLog) {
|
||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu send "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
|
||||
"privateTerm:%lu send "
|
||||
"msg:%s",
|
||||
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state),
|
||||
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
|
||||
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm,
|
||||
msgStr);
|
||||
taosMemoryFree(msgStr);
|
||||
} else {
|
||||
sDebug("vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu",
|
||||
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state),
|
||||
sDebug(
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
|
||||
"privateTerm:%lu",
|
||||
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
|
||||
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm);
|
||||
}
|
||||
|
||||
|
@ -586,19 +594,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
|||
if (gRaftDetailLog) {
|
||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
|
||||
"lastIndex:%ld, "
|
||||
"lastTerm:%lu, "
|
||||
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port,
|
||||
pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
|
||||
pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
|
||||
taosMemoryFree(msgStr);
|
||||
} else {
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
|
||||
"lastIndex:%ld, "
|
||||
"lastTerm:%lu, "
|
||||
"lastConfigIndex:%ld privateTerm:%lu",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port,
|
||||
pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
|
||||
pMsg->lastConfigIndex, pReceiver->privateTerm);
|
||||
}
|
||||
|
||||
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
|
||||
|
@ -611,7 +623,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
|||
|
||||
// maybe update lastconfig
|
||||
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
|
||||
int32_t oldReplicaNum = pSyncNode->replicaNum;
|
||||
// int32_t oldReplicaNum = pSyncNode->replicaNum;
|
||||
SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg;
|
||||
|
||||
// update new config myIndex
|
||||
SSyncCfg newSyncCfg = pMsg->lastConfig;
|
||||
|
@ -635,24 +648,34 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
|||
bool isDrop;
|
||||
if (IamInNew) {
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu update config by snapshot, lastIndex:%ld, lastTerm:%lu, "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, "
|
||||
"lastTerm:%lu, "
|
||||
"lastConfigIndex:%ld ",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
|
||||
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
|
||||
syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
|
||||
} else {
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu do not update config by snapshot, I am not in newCfg, "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in "
|
||||
"newCfg, "
|
||||
"lastIndex:%ld, lastTerm:%lu, "
|
||||
"lastConfigIndex:%ld ",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
|
||||
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
|
||||
}
|
||||
|
||||
// change isStandBy to normal
|
||||
if (!isDrop) {
|
||||
char tmpbuf[128];
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d", oldReplicaNum, newSyncCfg.replicaNum);
|
||||
char tmpbuf[512];
|
||||
char *oldStr = syncCfg2Str(&oldSyncCfg);
|
||||
char *newStr = syncCfg2Str(&newSyncCfg);
|
||||
syncUtilJson2Line(oldStr);
|
||||
syncUtilJson2Line(newStr);
|
||||
snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, index:%ld, %s --> %s",
|
||||
oldSyncCfg.replicaNum, newSyncCfg.replicaNum, pMsg->lastConfigIndex, oldStr, newStr);
|
||||
taosMemoryFree(oldStr);
|
||||
taosMemoryFree(newStr);
|
||||
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
syncNodeBecomeLeader(pSyncNode, tmpbuf);
|
||||
} else {
|
||||
|
@ -671,21 +694,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
|||
if (gRaftDetailLog) {
|
||||
char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
|
||||
"index:%ld, "
|
||||
"snapshot.lastApplyIndex:%ld, "
|
||||
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port,
|
||||
pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex,
|
||||
pReceiver->privateTerm, logSimpleStr);
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex,
|
||||
snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm, logSimpleStr);
|
||||
taosMemoryFree(logSimpleStr);
|
||||
} else {
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
|
||||
"index:%ld, "
|
||||
"snapshot.lastApplyIndex:%ld, "
|
||||
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port,
|
||||
pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex,
|
||||
pReceiver->privateTerm);
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex,
|
||||
snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm);
|
||||
}
|
||||
|
||||
pReceiver->pWriter = NULL;
|
||||
|
@ -696,17 +721,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
|||
if (gRaftDetailLog) {
|
||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
|
||||
"lastIndex:%ld, lastTerm:%lu, "
|
||||
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
|
||||
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state),
|
||||
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
|
||||
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
|
||||
taosMemoryFree(msgStr);
|
||||
} else {
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
|
||||
"lastIndex:%ld, lastTerm:%lu, "
|
||||
"lastConfigIndex:%ld, privateTerm:%lu",
|
||||
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state),
|
||||
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
|
||||
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
|
||||
}
|
||||
|
@ -723,20 +750,22 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
|||
if (gRaftDetailLog) {
|
||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
|
||||
"lastIndex:%ld, "
|
||||
"lastTerm:%lu, "
|
||||
"lastConfigIndex:%ld, privateTerm:%lu, recv "
|
||||
"msg:%s",
|
||||
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state),
|
||||
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
|
||||
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
|
||||
taosMemoryFree(msgStr);
|
||||
} else {
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
|
||||
"lastIndex:%ld, "
|
||||
"lastTerm:%lu, "
|
||||
"lastConfigIndex:%ld, privateTerm:%lu",
|
||||
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state),
|
||||
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
|
||||
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
|
||||
}
|
||||
|
@ -758,19 +787,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
|||
if (gRaftDetailLog) {
|
||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
|
||||
"lastIndex:%ld, "
|
||||
"lastTerm:%lu, "
|
||||
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port,
|
||||
pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
|
||||
pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
|
||||
taosMemoryFree(msgStr);
|
||||
} else {
|
||||
sDebug(
|
||||
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, "
|
||||
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
|
||||
"lastIndex:%ld, "
|
||||
"lastTerm:%lu, "
|
||||
"lastConfigIndex:%ld, privateTerm:%lu",
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port,
|
||||
pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
|
||||
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
|
||||
pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
|
||||
pMsg->lastConfigIndex, pReceiver->privateTerm);
|
||||
}
|
||||
|
||||
} else {
|
||||
|
|
|
@ -27,6 +27,14 @@ SRaftCfg* createRaftCfg() {
|
|||
}
|
||||
pCfg->isStandBy = taosGetTimestampSec() % 100;
|
||||
|
||||
pCfg->configIndexCount = 5;
|
||||
for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) {
|
||||
(pCfg->configIndexArr)[i] = -1;
|
||||
}
|
||||
for (int i = 0; i < pCfg->configIndexCount; ++i) {
|
||||
(pCfg->configIndexArr)[i] = i * 100;
|
||||
}
|
||||
|
||||
return pCfg;
|
||||
}
|
||||
|
||||
|
@ -100,6 +108,15 @@ void test5() {
|
|||
pCfg->isStandBy += 2;
|
||||
pCfg->snapshotEnable += 3;
|
||||
pCfg->lastConfigIndex += 1000;
|
||||
|
||||
pCfg->configIndexCount = 5;
|
||||
for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) {
|
||||
(pCfg->configIndexArr)[i] = -1;
|
||||
}
|
||||
for (int i = 0; i < pCfg->configIndexCount; ++i) {
|
||||
(pCfg->configIndexArr)[i] = i * 100;
|
||||
}
|
||||
|
||||
raftCfgPersist(pCfg);
|
||||
|
||||
printf("%s update json file: %s myIndex->%d \n", (char*)__FUNCTION__, "./test3_raft_cfg.json", pCfg->cfg.myIndex);
|
||||
|
|
|
@ -282,6 +282,8 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname) {
|
|||
}
|
||||
|
||||
int32_t tfsRmdir(STfs *pTfs, const char *rname) {
|
||||
ASSERT(rname[0] != 0);
|
||||
|
||||
char aname[TMPNAME_LEN] = "\0";
|
||||
|
||||
for (int32_t level = 0; level < pTfs->nlevel; level++) {
|
||||
|
@ -289,6 +291,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) {
|
|||
for (int32_t id = 0; id < pTier->ndisk; id++) {
|
||||
STfsDisk *pDisk = pTier->disks[id];
|
||||
snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
|
||||
uInfo("====> tfs remove dir : path:%s aname:%s rname:[%s]", pDisk->path, aname, rname);
|
||||
taosRemoveDir(aname);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -310,7 +310,7 @@ static void uvHandleReq(SSvrConn* pConn) {
|
|||
}
|
||||
|
||||
// set up conn info
|
||||
SRpcConnInfo* pConnInfo = &(transMsg.info.connInfo);
|
||||
SRpcConnInfo* pConnInfo = &(transMsg.info.conn);
|
||||
pConnInfo->clientIp = (uint32_t)(pConn->addr.sin_addr.s_addr);
|
||||
pConnInfo->clientPort = ntohs(pConn->addr.sin_port);
|
||||
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));
|
||||
|
|
|
@ -276,9 +276,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC, "Invalid topic")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_QUERY, "Topic with invalid query")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Topic with invalid option")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_EXIST, "Consumer not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being used by some consumer")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED, "Consumer unchanged")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST, "Subcribe not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_OFFSET_NOT_EXIST, "Offset not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer not ready")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_SUBSCRIBED, "Topic subscribed cannot be dropped")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being used by some consumer")
|
||||
|
||||
// mnode-stream
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_ALREADY_EXIST, "Stream already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_NOT_EXIST, "Stream not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option")
|
||||
|
|
|
@ -3,6 +3,10 @@ system sh/deploy.sh -n dnode1 -i 1
|
|||
system sh/deploy.sh -n dnode2 -i 2
|
||||
system sh/deploy.sh -n dnode3 -i 3
|
||||
system sh/deploy.sh -n dnode4 -i 4
|
||||
system sh/cfg.sh -n dnode1 -c transPullupInterval -v 1
|
||||
system sh/cfg.sh -n dnode2 -c transPullupInterval -v 1
|
||||
system sh/cfg.sh -n dnode3 -c transPullupInterval -v 1
|
||||
system sh/cfg.sh -n dnode4 -c transPullupInterval -v 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sql connect
|
||||
|
||||
|
@ -260,7 +264,7 @@ $x = 0
|
|||
step92:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
if $x == 20 then
|
||||
return -1
|
||||
endi
|
||||
sql show mnodes
|
||||
|
|
|
@ -33,8 +33,16 @@ import taos
|
|||
|
||||
def checkRunTimeError():
|
||||
import win32gui
|
||||
timeCount = 0
|
||||
while 1:
|
||||
time.sleep(1)
|
||||
timeCount = timeCount + 1
|
||||
if (timeCount>900):
|
||||
os.system("TASKKILL /F /IM taosd.exe")
|
||||
os.system("TASKKILL /F /IM taos.exe")
|
||||
os.system("TASKKILL /F /IM tmq_sim.exe")
|
||||
os.system("TASKKILL /F /IM mintty.exe")
|
||||
quit(0)
|
||||
hwnd = win32gui.FindWindow(None, "Microsoft Visual C++ Runtime Library")
|
||||
if hwnd:
|
||||
os.system("TASKKILL /F /IM taosd.exe")
|
||||
|
|
|
@ -283,7 +283,8 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
|
|||
tjsonAddIntegerToObject(item, "conflict", pObj->conflict);
|
||||
tjsonAddIntegerToObject(item, "exec", pObj->exec);
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "dbname", pObj->dbname);
|
||||
tjsonAddStringToObject(item, "dbname1", pObj->dbname1);
|
||||
tjsonAddStringToObject(item, "dbname2", pObj->dbname2);
|
||||
tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitActions));
|
||||
tjsonAddIntegerToObject(item, "redoActionNum", taosArrayGetSize(pObj->redoActions));
|
||||
tjsonAddIntegerToObject(item, "undoActionNum", taosArrayGetSize(pObj->undoActions));
|
||||
|
|
Loading…
Reference in New Issue