diff --git a/docs/en/14-reference/03-connector/rust.mdx b/docs/en/14-reference/03-connector/rust.mdx index a5cbaeac80..56ca586c7e 100644 --- a/docs/en/14-reference/03-connector/rust.mdx +++ b/docs/en/14-reference/03-connector/rust.mdx @@ -250,7 +250,7 @@ The [Taos] structure is the connection manager in [libtaos] and provides two mai Column information is stored using [ColumnMeta]. - ``rust + ```rust let cols = &q.column_meta; for col in cols { println!("name: {}, type: {:?} , bytes: {}", col.name, col.type_, col.bytes); diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index fc9d419b1f..f49030466e 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -192,6 +192,7 @@ bool syncEnvIsStart(); const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); +int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index fd57eef83a..e3db1550a0 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -46,7 +46,6 @@ typedef struct SRpcHandleInfo { int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp); int32_t persistHandle; // persist handle or not - SRpcConnInfo connInfo; // app info void *ahandle; // app handle set by client void *wrapper; // wrapper handle @@ -55,6 +54,9 @@ typedef struct SRpcHandleInfo { // resp info void * rsp; int32_t rspLen; + + // conn info + SRpcConnInfo conn; } SRpcHandleInfo; typedef struct SRpcMsg { @@ -63,7 +65,6 @@ typedef struct SRpcMsg { int32_t contLen; int32_t code; SRpcHandleInfo info; - SRpcConnInfo conn; } SRpcMsg; typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf); diff --git a/source/common/src/systable.c b/source/common/src/systable.c index bcc248d122..1552850e76 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -220,7 +220,8 @@ static const SSysDbTableSchema transSchema[] = { {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "db1", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "db2", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "last_action_info", diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 9946185da6..f112727c08 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -228,7 +228,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { vmReleaseVnode(pMgmt, pVnode); terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED; code = terrno; - goto _OVER; + return 0; } snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index b66e559370..13f2452e66 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -21,21 +21,6 @@ static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet); static void dmSendRsp(SRpcMsg *pMsg); static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg); -static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) { - SRpcConnInfo *pConnInfo = &(pRpc->info.connInfo); - // if (IsReq(pRpc)) { - // terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; - // dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->info.ahandle, pRpc->info.handle); - // return -1; - //} - - memcpy(pMsg, pRpc, sizeof(SRpcMsg)); - memcpy(pMsg->conn.user, pConnInfo->user, TSDB_USER_LEN); - pMsg->conn.clientIp = pConnInfo->clientIp; - pMsg->conn.clientPort = pConnInfo->clientPort; - return 0; -} - int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)]; if (msgFp == NULL) { @@ -116,14 +101,10 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { } pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); - if (pMsg == NULL) { - goto _OVER; - } - dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); + if (pMsg == NULL) goto _OVER; + memcpy(pMsg, pRpc, sizeof(SRpcMsg)); - if (dmBuildNodeMsg(pMsg, pRpc) != 0) { - goto _OVER; - } + dTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle); if (InParentProc(pWrapper)) { code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 4daeeaa9bf..8963f6be39 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -124,7 +124,8 @@ typedef struct { int32_t lastErrorNo; tmsg_t lastMsgType; SEpSet lastEpset; - char dbname[TSDB_DB_FNAME_LEN]; + char dbname1[TSDB_DB_FNAME_LEN]; + char dbname2[TSDB_DB_FNAME_LEN]; int32_t startFunc; int32_t stopFunc; int32_t paramLen; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 0175e29a77..bc2d5c82b1 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -68,7 +68,7 @@ int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen); -void mndTransSetDbName(STrans *pTrans, const char *dbname); +void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2); void mndTransSetSerial(STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); diff --git a/source/dnode/mnode/impl/src/mndAuth.c b/source/dnode/mnode/impl/src/mndAuth.c index f036fc48f7..d47fb9dfb4 100644 --- a/source/dnode/mnode/impl/src/mndAuth.c +++ b/source/dnode/mnode/impl/src/mndAuth.c @@ -56,7 +56,7 @@ static int32_t mndProcessAuthReq(SRpcMsg *pReq) { memcpy(authRsp.user, authReq.user, TSDB_USER_LEN); int32_t code = mndRetriveAuth(pReq->info.node, &authRsp); - mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->conn.user, authRsp.spi, authRsp.encrypt, + mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->info.conn.user, authRsp.spi, authRsp.encrypt, authRsp.user); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authRsp); diff --git a/source/dnode/mnode/impl/src/mndBnode.c b/source/dnode/mnode/impl/src/mndBnode.c index aa908b983d..0de40ca671 100644 --- a/source/dnode/mnode/impl/src/mndBnode.c +++ b/source/dnode/mnode/impl/src/mndBnode.c @@ -292,7 +292,7 @@ static int32_t mndProcessCreateBnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_BNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_BNODE) != 0) { goto _OVER; } @@ -394,7 +394,7 @@ static int32_t mndProcessDropBnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_BNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_BNODE) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index c12c6d5b4c..38d6bb2822 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -477,7 +477,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db); - mndTransSetDbName(pTrans, dbObj.name); + mndTransSetDbName(pTrans, dbObj.name, NULL); if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; @@ -521,12 +521,12 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) { goto _OVER; } - pUser = mndAcquireUser(pMnode, pReq->conn.user); + pUser = mndAcquireUser(pMnode, pReq->info.conn.user); if (pUser == NULL) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_DB, NULL) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DB, NULL) != 0) { goto _OVER; } @@ -668,7 +668,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p mDebug("trans:%d, used to alter db:%s", pTrans->id, pOld->name); int32_t code = -1; - mndTransSetDbName(pTrans, pOld->name); + mndTransSetDbName(pTrans, pOld->name, NULL); if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; @@ -700,7 +700,7 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_ALTER_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_ALTER_DB, pDb) != 0) { goto _OVER; } @@ -921,7 +921,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) { if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name); - mndTransSetDbName(pTrans, pDb->name); + mndTransSetDbName(pTrans, pDb->name, NULL); if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER; @@ -980,7 +980,7 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) { } } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_DROP_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_DB, pDb) != 0) { goto _OVER; } @@ -1127,7 +1127,7 @@ static int32_t mndProcessUseDbReq(SRpcMsg *pReq) { mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr()); } else { - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_USE_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_USE_DB, pDb) != 0) { goto _OVER; } @@ -1252,7 +1252,7 @@ static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_COMPACT_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB, pDb) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 6d37143fc9..d1bed90175 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -522,7 +522,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_DNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE) != 0) { goto _OVER; } @@ -623,7 +623,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) { } } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_MNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index dfdc0a3c1a..832f1b8e68 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -318,7 +318,7 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_FUNC) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_FUNC) != 0) { goto _OVER; } @@ -365,7 +365,7 @@ static int32_t mndProcessDropFuncReq(SRpcMsg *pReq) { } } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_FUNC) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_FUNC) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index f277a5169d..7b5025a986 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -505,6 +505,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { } static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { + if (!IsReq(pMsg)) return 0; if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER && diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index e82c479cdd..b40cd713e5 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -414,7 +414,7 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_MNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE) != 0) { goto _OVER; } @@ -621,7 +621,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_MNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) { goto _OVER; } @@ -742,7 +742,7 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) { pMgmt->errCode = 0; pMgmt->transId = -1; tsem_wait(&pMgmt->syncSem); - mInfo("alter mnode sync result:%s", tstrerror(pMgmt->errCode)); + mInfo("alter mnode sync result:0x%x %s", pMgmt->errCode, tstrerror(pMgmt->errCode)); terrno = pMgmt->errCode; return pMgmt->errCode; } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 20d94e341c..39d21aad49 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -133,7 +133,7 @@ static void mndFreeConn(SConnObj *pConn) { taosWLockLatch(&pConn->queryLock); taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc); taosWUnLockLatch(&pConn->queryLock); - + mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn); } @@ -194,15 +194,15 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { goto CONN_OVER; } - taosIp2String(pReq->conn.clientIp, ip); + taosIp2String(pReq->info.conn.clientIp, ip); - pUser = mndAcquireUser(pMnode, pReq->conn.user); + pUser = mndAcquireUser(pMnode, pReq->info.conn.user); if (pUser == NULL) { - mError("user:%s, failed to login while acquire user since %s", pReq->conn.user, terrstr()); + mError("user:%s, failed to login while acquire user since %s", pReq->info.conn.user, terrstr()); goto CONN_OVER; } if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) { - mError("user:%s, failed to auth while acquire user, input:%s", pReq->conn.user, connReq.passwd); + mError("user:%s, failed to auth while acquire user, input:%s", pReq->info.conn.user, connReq.passwd); code = TSDB_CODE_RPC_AUTH_FAILURE; goto CONN_OVER; } @@ -213,15 +213,16 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { pDb = mndAcquireDb(pMnode, db); if (pDb == NULL) { terrno = TSDB_CODE_MND_INVALID_DB; - mError("user:%s, failed to login from %s while use db:%s since %s", pReq->conn.user, ip, connReq.db, terrstr()); + mError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db, + terrstr()); goto CONN_OVER; } } - pConn = mndCreateConn(pMnode, pReq->conn.user, connReq.connType, pReq->conn.clientIp, pReq->conn.clientPort, - connReq.pid, connReq.app, connReq.startTime); + pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp, + pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime); if (pConn == NULL) { - mError("user:%s, failed to login from %s while create connection since %s", pReq->conn.user, ip, terrstr()); + mError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip, terrstr()); goto CONN_OVER; } @@ -246,7 +247,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { pReq->info.rspLen = contLen; pReq->info.rsp = pRsp; - mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->conn.user, ip, pConn->port, pConn->id, connReq.app); + mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app); code = 0; @@ -348,7 +349,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb if (pHbReq->query) { SQueryHbReqBasic *pBasic = pHbReq->query; - SRpcConnInfo connInfo = pMsg->conn; + SRpcConnInfo connInfo = pMsg->info.conn; SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId); if (pConn == NULL) { @@ -361,7 +362,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id); } } - + SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic)); if (rspBasic == NULL) { mndReleaseConn(pMnode, pConn); @@ -386,7 +387,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb mndGetMnodeEpSet(pMnode, &rspBasic->epSet); mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1); - + mndReleaseConn(pMnode, pConn); hbRsp.query = rspBasic; @@ -500,7 +501,7 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SProfileMgmt *pMgmt = &pMnode->profileMgmt; - SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user); + SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user); if (pUser == NULL) return 0; if (!pUser->superUser) { mndReleaseUser(pMnode, pUser); @@ -523,7 +524,7 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) { terrno = TSDB_CODE_MND_INVALID_CONN_ID; return -1; } else { - mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->conn.user); + mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->info.conn.user); pConn->killId = killReq.queryId; taosCacheRelease(pMgmt->cache, (void **)&pConn, false); return 0; @@ -534,7 +535,7 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SProfileMgmt *pMgmt = &pMnode->profileMgmt; - SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user); + SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user); if (pUser == NULL) return 0; if (!pUser->superUser) { mndReleaseUser(pMnode, pUser); @@ -555,7 +556,7 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) { terrno = TSDB_CODE_MND_INVALID_CONN_ID; return -1; } else { - mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->conn.user); + mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->info.conn.user); pConn->killed = 1; taosCacheRelease(pMgmt->cache, (void **)&pConn, false); return TSDB_CODE_SUCCESS; @@ -563,12 +564,12 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) { } static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; - int32_t numOfRows = 0; - int32_t cols = 0; - SConnObj *pConn = NULL; - + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + int32_t cols = 0; + SConnObj *pConn = NULL; + if (pShow->pIter == NULL) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; pShow->pIter = taosCacheCreateIter(pMgmt->cache); @@ -619,12 +620,12 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl } static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; - int32_t numOfRows = 0; - int32_t cols = 0; - SConnObj *pConn = NULL; - + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + int32_t cols = 0; + SConnObj *pConn = NULL; + if (pShow->pIter == NULL) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; pShow->pIter = taosCacheCreateIter(pMgmt->cache); @@ -645,7 +646,7 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p int32_t numOfQueries = taosArrayGetSize(pConn->pQueries); for (int32_t i = 0; i < numOfQueries; ++i) { - SQueryDesc* pQuery = taosArrayGet(pConn->pQueries, i); + SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i); cols = 0; char queryId[26 + VARSTR_HEADER_SIZE] = {0}; @@ -691,14 +692,14 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false); - char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0}; + char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0}; int32_t strSize = sizeof(subStatus); int32_t offset = VARSTR_HEADER_SIZE; for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) { if (i) { offset += snprintf(subStatus + offset, strSize - offset - 1, ","); } - SQuerySubDesc* pDesc = taosArrayGet(pQuery->subDesc, i); + SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i); offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status); } varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]); @@ -712,7 +713,7 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p numOfRows++; } - + taosRUnLockLatch(&pConn->queryLock); } diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index 595287a3af..f5625f32d5 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -294,7 +294,7 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_QNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_QNODE) != 0) { goto _OVER; } @@ -396,7 +396,7 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_QNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_QNODE) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index d312955202..a6447ed405 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -229,7 +229,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type); - // if (mndCheckShowAuth(pMnode, pReq->conn.user, pShow->type) != 0) return -1; + // if (mndCheckShowAuth(pMnode, pReq->info.conn.user, pShow->type) != 0) return -1; int32_t numOfCols = pShow->pMeta->numOfColumns; SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 3eb5b03efd..b6c387a9c8 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -609,7 +609,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq); if (pTrans == NULL) goto _OVER; - mndTransSetDbName(pTrans, pDb->name); + mndTransSetDbName(pTrans, pDb->name, NULL); mndTransSetSerial(pTrans); mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name); @@ -710,7 +710,7 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) { goto _OVER; } @@ -852,7 +852,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name); - mndTransSetDbName(pTrans, pDb->name); + mndTransSetDbName(pTrans, pDb->name, NULL); if (mndSetDropSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER; if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; @@ -971,7 +971,7 @@ static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index c84dc2f3dd..0a99f356b1 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -300,7 +300,7 @@ static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_SNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_SNODE) != 0) { goto _OVER; } @@ -404,7 +404,7 @@ static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_SNODE) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_SNODE) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index b83aa34a3a..345a5215c2 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -758,7 +758,7 @@ _OVER: } int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { - mndTransSetDbName(pTrans, pDb->name); + mndTransSetDbName(pTrans, pDb->name, NULL); if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; @@ -805,7 +805,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) { goto _OVER; } @@ -1396,7 +1396,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name); - mndTransSetDbName(pTrans, pDb->name); + mndTransSetDbName(pTrans, pDb->name, NULL); if (needRsp) { void *pCont = NULL; @@ -1454,7 +1454,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) { goto _OVER; } @@ -1537,7 +1537,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name); - mndTransSetDbName(pTrans, pDb->name); + mndTransSetDbName(pTrans, pDb->name, NULL); if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; @@ -1584,7 +1584,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3a42c694c8..d432256f15 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -545,7 +545,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq return -1; } - if (streamObj.targetSTbName[0] && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) { + if (streamObj.targetSTbName[0] && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) { mError("trans:%d, failed to create stb for stream since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; @@ -602,7 +602,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) { goto _OVER; } #endif @@ -613,9 +613,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - mndTransSetDbName(pTrans, createStreamReq.sourceDB); + mndTransSetDbName(pTrans, createStreamReq.sourceDB, NULL); // TODO - /*mndTransSetDbName(pTrans, streamObj.targetDb);*/ + /*mndTransSetDbName(pTrans, streamObj.targetDb, NULL);*/ mDebug("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); // build stream obj from request @@ -627,7 +627,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } // create stb for stream - if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) { + if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) { mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr()); mndTransDrop(pTrans); goto _OVER; @@ -696,7 +696,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { #if 0 // todo check auth - pUser = mndAcquireUser(pMnode, pReq->conn.user); + pUser = mndAcquireUser(pMnode, pReq->info.conn.user); if (pUser == NULL) { goto DROP_STREAM_OVER; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 65a5d22bec..d2b7a61e83 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -403,7 +403,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg); - mndTransSetDbName(pTrans, pOutput->pSub->dbName); + mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL); if (pTrans == NULL) return -1; // make txn: diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index d71d9b4e72..6b52730372 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -59,8 +59,8 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM if (pMgmt->errCode != 0) { mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode)); } - tsem_post(&pMgmt->syncSem); pMgmt->transId = 0; + tsem_post(&pMgmt->syncSem); } else { STrans *pTrans = mndAcquireTrans(pMnode, transId); if (pTrans != NULL) { @@ -70,7 +70,8 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM if (cbMeta.index - sdbGetApplyIndex(pMnode->pSdb) > 100) { SSnapshotMeta sMeta = {0}; - if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) { + // if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) { + if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, cbMeta.index, &sMeta) == 0) { sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex); } sdbWriteFile(pMnode->pSdb); @@ -90,7 +91,10 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) { SMnode *pMnode = pFsm->data; SSnapshotMeta sMeta = {0}; - if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) { + // if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) { + + SyncIndex snapshotIndex = sdbGetApplyIndex(pMnode->pSdb); + if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, snapshotIndex, &sMeta) == 0) { sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex); } @@ -123,8 +127,8 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM if (pMgmt->errCode != 0) { mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode)); } - tsem_post(&pMgmt->syncSem); pMgmt->transId = 0; + tsem_post(&pMgmt->syncSem); } } @@ -262,6 +266,7 @@ void mndSyncStart(SMnode *pMnode) { void mndSyncStop(SMnode *pMnode) { if (pMnode->syncMgmt.transId != 0) { + pMnode->syncMgmt.transId = 0; tsem_post(&pMnode->syncMgmt.syncSem); pMnode->syncMgmt.transId = 0; } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 4c2730ce94..9632c04f4c 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -474,7 +474,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) { + if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) { goto _OVER; } @@ -566,7 +566,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { #endif STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq); - mndTransSetDbName(pTrans, pTopic->db); + mndTransSetDbName(pTrans, pTopic->db, NULL); if (pTrans == NULL) { mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index dafb2fa1d1..4ffa601889 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -22,8 +22,8 @@ #include "mndSync.h" #include "mndUser.h" -#define TRANS_VER_NUMBER 1 -#define TRANS_ARRAY_SIZE 8 +#define TRANS_VER_NUMBER 1 +#define TRANS_ARRAY_SIZE 8 #define TRANS_RESERVE_SIZE 64 static SSdbRaw *mndTransActionEncode(STrans *pTrans); @@ -122,7 +122,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_INT8(pRaw, dataPos, pTrans->conflict, _OVER) SDB_SET_INT8(pRaw, dataPos, pTrans->exec, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) - SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER) SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER) int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); @@ -270,7 +271,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { pTrans->conflict = conflict; pTrans->exec = exec; SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) - SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) + SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER) + SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER) SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER) SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER) @@ -649,7 +651,14 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void * pTrans->paramLen = paramLen; } -void mndTransSetDbName(STrans *pTrans, const char *dbname) { memcpy(pTrans->dbname, dbname, TSDB_DB_FNAME_LEN); } +void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2) { + if (dbname1 != NULL) { + memcpy(pTrans->dbname1, dbname1, TSDB_DB_FNAME_LEN); + } + if (dbname2 != NULL) { + memcpy(pTrans->dbname2, dbname2, TSDB_DB_FNAME_LEN); + } +} void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; } @@ -688,14 +697,24 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { if (pNew->conflict == TRN_CONFLICT_GLOBAL) conflict = true; if (pNew->conflict == TRN_CONFLICT_DB) { if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; - if (pTrans->conflict == TRN_CONFLICT_DB && strcmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; - if (pTrans->conflict == TRN_CONFLICT_DB_INSIDE && strcmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; + if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) { + if (strcmp(pNew->dbname1, pTrans->dbname1) == 0 || strcmp(pNew->dbname1, pTrans->dbname2) == 0 || + strcmp(pNew->dbname2, pTrans->dbname1) == 0 || strcmp(pNew->dbname2, pTrans->dbname2) == 0) { + conflict = true; + } + } } if (pNew->conflict == TRN_CONFLICT_DB_INSIDE) { if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; - if (pTrans->conflict == TRN_CONFLICT_DB && strcmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; + if (pTrans->conflict == TRN_CONFLICT_DB) { + if (strcmp(pNew->dbname1, pTrans->dbname1) == 0 || strcmp(pNew->dbname1, pTrans->dbname2) == 0 || + strcmp(pNew->dbname2, pTrans->dbname1) == 0 || strcmp(pNew->dbname2, pTrans->dbname2) == 0) { + conflict = true; + } + } } - mError("trans:%d, can't execute since conflict with trans:%d, db:%s", pNew->id, pTrans->id, pTrans->dbname); + mError("trans:%d, can't execute since conflict with trans:%d, db1:%s db2:%s", pNew->id, pTrans->id, pTrans->dbname1, + pTrans->dbname2); sdbRelease(pMnode->pSdb, pTrans); } @@ -704,7 +723,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) { - if (strlen(pTrans->dbname) == 0) { + if (strlen(pTrans->dbname1) == 0 && strlen(pTrans->dbname2) == 0) { terrno = TSDB_CODE_MND_TRANS_CONFLICT; mError("trans:%d, failed to prepare conflict db not set", pTrans->id); return -1; @@ -1369,7 +1388,7 @@ static int32_t mndProcessKillTransReq(SRpcMsg *pReq) { mInfo("trans:%d, start to kill", killReq.transId); - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_KILL_TRANS) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_KILL_TRANS) != 0) { goto _OVER; } @@ -1417,7 +1436,9 @@ void mndTransPullup(SMnode *pMnode) { } SSnapshotMeta sMeta = {0}; - if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) { + // if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) { + SyncIndex snapshotIndex = sdbGetApplyIndex(pMnode->pSdb); + if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, snapshotIndex, &sMeta) == 0) { sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex); } sdbWriteFile(pMnode->pSdb); @@ -1449,10 +1470,15 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)stage, false); - char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(pTrans->dbname), pShow->pMeta->pSchemas[cols].bytes); + char dbname1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(dbname1, mndGetDbStr(pTrans->dbname1), pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)dbname, false); + colDataAppend(pColInfo, numOfRows, (const char *)dbname1, false); + + char dbname2[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(dbname2, mndGetDbStr(pTrans->dbname2), pShow->pMeta->pSchemas[cols].bytes); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)dbname2, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->failedTimes, false); diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 9590823106..eb0a818a60 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -354,13 +354,13 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) { goto _OVER; } - pOperUser = mndAcquireUser(pMnode, pReq->conn.user); + pOperUser = mndAcquireUser(pMnode, pReq->info.conn.user); if (pOperUser == NULL) { terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_USER) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_USER) != 0) { goto _OVER; } @@ -460,7 +460,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) { goto _OVER; } - pOperUser = mndAcquireUser(pMnode, pReq->conn.user); + pOperUser = mndAcquireUser(pMnode, pReq->info.conn.user); if (pOperUser == NULL) { terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; goto _OVER; @@ -643,7 +643,7 @@ static int32_t mndProcessDropUserReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_USER) != 0) { + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_USER) != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 755f4ef0b2..cec83d1af5 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1200,7 +1200,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) { mInfo("vgId:%d, start to redistribute to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3); - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER; + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER; pVgroup = mndAcquireVgroup(pMnode, req.vgId); if (pVgroup == NULL) goto _OVER; @@ -1500,7 +1500,7 @@ static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { pDb = mndAcquireDb(pMnode, pVgroup->dbName); if (pDb == NULL) goto _OVER; - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_SPLIT_VGROUP) != 0) goto _OVER; + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_SPLIT_VGROUP) != 0) goto _OVER; code = mndSplitVgroup(pMnode, pReq, pDb, pVgroup); if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; @@ -1624,7 +1624,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) { mDebug("start to balance vgroup"); - if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_BALANCE_VGROUP) != 0) goto _OVER; + if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_BALANCE_VGROUP) != 0) goto _OVER; while (1) { SDnodeObj *pDnode = NULL; diff --git a/source/dnode/mnode/impl/test/trans/trans2.cpp b/source/dnode/mnode/impl/test/trans/trans2.cpp index 022c82c73d..aee8aa2748 100644 --- a/source/dnode/mnode/impl/test/trans/trans2.cpp +++ b/source/dnode/mnode/impl/test/trans/trans2.cpp @@ -128,7 +128,7 @@ class MndTestTrans2 : public ::testing::Test { mndTransSetCb(pTrans, TRANS_START_FUNC_TEST, TRANS_STOP_FUNC_TEST, param, strlen(param) + 1); if (pDb != NULL) { - mndTransSetDbName(pTrans, pDb->name); + mndTransSetDbName(pTrans, pDb->name, NULL); } int32_t code = mndTransPrepare(pMnode, pTrans); @@ -201,7 +201,7 @@ class MndTestTrans2 : public ::testing::Test { } if (pDb != NULL) { - mndTransSetDbName(pTrans, pDb->name); + mndTransSetDbName(pTrans, pDb->name, NULL); } int32_t code = mndTransPrepare(pMnode, pTrans); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 874210306f..3715866bb8 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -220,7 +220,7 @@ int vnodeCommit(SVnode *pVnode) { info.state.committed = pVnode->state.applied; snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); if (vnodeSaveInfo(dir, &info) < 0) { - // ASSERT(0); + ASSERT(0); return -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index d170931e7e..da1a126a76 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -466,10 +466,10 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe } // process request - // if (metaDropSTable(pVnode->pMeta, version, &req) < 0) { - // rcode = terrno; - // goto _exit; - // } + if (metaDropSTable(pVnode->pMeta, version, &req) < 0) { + rcode = terrno; + goto _exit; + } // return rsp _exit: diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 8662c8369b..f46114746e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -184,7 +184,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; if (rsp.code == 0) { - if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->conn.applyIndex, &rsp) < 0) { + if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) { rsp.code = terrno; vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr()); } @@ -329,8 +329,9 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) { SVnode *pVnode = pFsm->data; - SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index}; + SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); + rpcMsg.info.conn.applyIndex = cbMeta.index; vInfo("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle); @@ -359,10 +360,11 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); - SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index}; + SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); + rpcMsg.info.conn.applyIndex = cbMeta.index; tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); } else { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index fb79c53384..40b019eb5d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3507,8 +3507,7 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) // check for the limitation in each group if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) { pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput); - - if (pProjectInfo->slimit.limit == -1 || pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) { + if (pProjectInfo->slimit.limit > 0 && pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) { pOperator->status = OP_EXEC_DONE; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 23226a0134..4fc25688c4 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -331,7 +331,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { } pOperator->status = OP_RES_TO_RETURN; - closeAllResultRows(&pInfo->binfo.resultRowInfo); #if 0 if(pOperator->fpSet.encodeResultRow){ diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ddebba1c9e..80276007de 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2170,25 +2170,17 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa blockDataEnsureCapacity(p, capacity); while (1) { - STupleHandle* pTupleHandle = NULL; - if (pInfo->prefetchedTuple == NULL) { - pTupleHandle = tsortNextTuple(pHandle); - } else { - pTupleHandle = pInfo->prefetchedTuple; - } - + STupleHandle* pTupleHandle = tsortNextTuple(pHandle); if (pTupleHandle == NULL) { break; } appendOneRowToDataBlock(p, pTupleHandle); - if (p->info.rows >= capacity) { break; } } - qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows); return (p->info.rows > 0) ? p : NULL; } @@ -2261,7 +2253,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL) { goto _error; } @@ -2284,17 +2275,16 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; - pInfo->readHandle = *readHandle; - pInfo->interval = extractIntervalInfo(pTableScanNode); + pInfo->readHandle = *readHandle; + pInfo->interval = extractIntervalInfo(pTableScanNode); pInfo->sample.sampleRatio = pTableScanNode->ratio; - pInfo->sample.seed = taosGetTimestampSec(); - - pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; - pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; - pInfo->dataReaders = dataReaders; - pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColList; - pInfo->curTWinIdx = 0; + pInfo->sample.seed = taosGetTimestampSec(); + pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; + pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; + pInfo->dataReaders = dataReaders; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColList; + pInfo->curTWinIdx = 0; pInfo->pResBlock = createResDataBlock(pDescNode); @@ -2307,22 +2297,27 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN taosArrayPush(pInfo->sortSourceParams, param); taosMemoryFree(param); } + pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); + int32_t rowSize = pInfo->pResBlock->info.rowSize; - pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2; - pInfo->sortBufSize = pInfo->bufPageSize * 16; - pInfo->hasGroupId = false; + pInfo->bufPageSize = getProperSortPageSize(rowSize); + + // todo the total available buffer should be determined by total capacity of buffer of this task. + // the additional one is reserved for merge result + pInfo->sortBufSize = pInfo->bufPageSize * (taosArrayGetSize(dataReaders) + 1); + pInfo->hasGroupId = false; pInfo->prefetchedTuple = NULL; - pOperator->name = "TableMergeScanOperator"; + pOperator->name = "TableMergeScanOperator"; // TODO : change it - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; initResultSizeInfo(pOperator, 1024); pOperator->fpSet = diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 90700580bd..55900c7c7e 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1384,8 +1384,9 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, - int32_t rowIndex); +static void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex); + +static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rIndex); int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); @@ -1407,11 +1408,23 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { if (pEntryInfo->numOfRes > 0) { setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow); + } else { + setNullSelectivityValue(pCtx, pBlock, currentRow); } return pEntryInfo->numOfRes; } +void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex) { + for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { + SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; + int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; + + SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); + colDataAppendNULL(pDstCol, rowIndex); + } +} + void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex) { int32_t pageId = pTuplePos->pageId; int32_t offset = pTuplePos->offset; @@ -4627,8 +4640,6 @@ int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) { STailItem* pItem = pInfo->pItems[i]; colDataAppend(pCol, currentRow, pItem->data, false); - - // setSelectivityValue(pCtx, pBlock, &pInfo->pItems[i].tuplePos, currentRow); currentRow += 1; } diff --git a/source/libs/sync/inc/syncRaftCfg.h b/source/libs/sync/inc/syncRaftCfg.h index e72e1e7be7..cd64402738 100644 --- a/source/libs/sync/inc/syncRaftCfg.h +++ b/source/libs/sync/inc/syncRaftCfg.h @@ -29,6 +29,8 @@ extern "C" { #define CONFIG_FILE_LEN 1024 +#define MAX_CONFIG_INDEX_COUNT 512 + typedef struct SRaftCfg { SSyncCfg cfg; TdFilePtr pFile; @@ -36,6 +38,10 @@ typedef struct SRaftCfg { int8_t isStandBy; int8_t snapshotEnable; SyncIndex lastConfigIndex; + + SyncIndex configIndexArr[MAX_CONFIG_INDEX_COUNT]; + int32_t configIndexCount; + } SRaftCfg; SRaftCfg *raftCfgOpen(const char *path); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index bea51b2dd3..7b2f79e24c 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -713,8 +713,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { // delete confict entries code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); ASSERT(code == 0); - sDebug("vgId:%d sync event %s currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, - syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, delBegin, delEnd); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, + syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, delBegin, delEnd); logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); return code; @@ -880,6 +880,72 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs } } while (0); + // fake match2 + // + // condition1: + // preIndex <= my commit index + // + // operation: + // if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry + // match my-commit-index or my-commit-index + 1 + // no operation on log + do { + bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && + (pMsg->prevLogIndex <= ths->commitIndex); + if (condition) { + sTrace("recv SyncAppendEntries, fake match2, msg-prevLogIndex:%ld, my-commitIndex:%ld", pMsg->prevLogIndex, + ths->commitIndex); + + SyncIndex matchIndex = ths->commitIndex; + bool hasAppendEntries = pMsg->dataLen > 0; + if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) { + // append entry + SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + ASSERT(pAppendEntry != NULL); + + { + // has extra entries (> preIndex) in local log + SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); + bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex; + + if (hasExtraEntries) { + // make log same, rollback deleted entries + code = syncNodeMakeLogSame(ths, pMsg); + ASSERT(code == 0); + } + } + + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + ASSERT(code == 0); + + // pre commit + code = syncNodePreCommit(ths, pAppendEntry); + ASSERT(code == 0); + + matchIndex = pMsg->prevLogIndex + 1; + + syncEntryDestory(pAppendEntry); + } + + // prepare response msg + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; + pReply->success = true; + pReply->matchIndex = matchIndex; + + // send response + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncAppendEntriesReplyDestroy(pReply); + + return ret; + } + } while (0); + // calculate logOK here, before will coredump, due to fake match bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg); @@ -995,8 +1061,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs SyncIndex commitEnd = snapshot.lastApplyIndex; ths->commitIndex = snapshot.lastApplyIndex; - sDebug("vgId:%d sync event %s currentTerm:%lu commit by snapshot from index:%ld to index:%ld", ths->vgId, - syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, commitBegin, commitEnd); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld", + ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, + commitBegin, commitEnd); } SyncIndex beginIndex = ths->commitIndex + 1; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 94fdcc2772..f934f9a268 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -190,21 +190,21 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries if (gRaftDetailLog) { char* s = snapshotSender2Str(pSender); sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d start sender first time, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, " "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu " "sender:%s", - ths->vgId, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, host, port, + ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, s); taosMemoryFree(s); } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d start sender first time, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, " "lastApplyIndex:%ld " "lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu", - ths->vgId, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, host, port, + ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index e3431307fd..d010728c78 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -56,9 +56,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SyncIndex commitEnd = snapshot.lastApplyIndex; pSyncNode->commitIndex = snapshot.lastApplyIndex; - sDebug("vgId:%d sync event %s currentTerm:%lu commit by snapshot from index:%ld to index:%ld", pSyncNode->vgId, - syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, - snapshot.lastApplyIndex); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex); } // update commit index diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 200ac0e614..3e5dcfea42 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -420,6 +420,29 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { return 0; } +int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return -1; + } + assert(rid == pSyncNode->rid); + + ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1); + SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0]; + + for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) { + if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex && + (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) { + lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i]; + } + } + sMeta->lastConfigIndex = lastIndex; + sTrace("sync get snapshot meta by index:%ld lastConfigIndex:%ld", snapshotIndex, sMeta->lastConfigIndex); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return 0; +} + const char* syncGetMyRoleStr(int64_t rid) { const char* s = syncUtilState2String(syncGetMyRole(rid)); return s; @@ -575,9 +598,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { return -1; } assert(rid == pSyncNode->rid); - sDebug("vgId:%d sync event %s currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, - syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), - pMsg->msgType); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, + TMSG_INFO(pMsg->msgType), pMsg->msgType); ret = syncNodePropose(pSyncNode, pMsg, isWeak); taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -586,9 +609,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) { int32_t ret = 0; - sDebug("vgId:%d sync event %s currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, - syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), - pMsg->msgType); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, + TMSG_INFO(pMsg->msgType), pMsg->msgType); if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { SRespStub stub; @@ -834,8 +857,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // snapshot meta // pSyncNode->sMeta.lastConfigIndex = -1; - sDebug("vgId:%d sync event %s currentTerm:%lu sync open", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), - pSyncNode->pRaftStore->currentTerm); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync open", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm); return pSyncNode; } @@ -882,8 +905,8 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { } void syncNodeClose(SSyncNode* pSyncNode) { - sDebug("vgId:%d sync event %s currentTerm:%lu sync close", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), - pSyncNode->pRaftStore->currentTerm); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync close", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm); int32_t ret; assert(pSyncNode != NULL); @@ -1322,9 +1345,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA]; for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { oldSenders[i] = (pSyncNode->senders)[i]; - sDebug("vgId:%d sync event %s currentTerm:%lu save senders %d, %p, privateTerm:%lu", pSyncNode->vgId, - syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], - oldSenders[i]->privateTerm); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], oldSenders[i]->privateTerm); if (gRaftDetailLog) { ; } @@ -1376,9 +1399,12 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex char host[128]; uint16_t port; syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); - sDebug("vgId:%d sync event %s currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j], oldSenders[j]->privateTerm); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, " + "privateTerm:%lu", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j], + oldSenders[j]->privateTerm); (pSyncNode->senders)[i] = oldSenders[j]; oldSenders[j] = NULL; reset = true; @@ -1386,9 +1412,11 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex // reset replicaIndex int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex; (pSyncNode->senders)[i]->replicaIndex = i; - sDebug("vgId:%d sync event %s currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, " + "reset:%d", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset); } } } @@ -1397,9 +1425,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { if ((pSyncNode->senders)[i] == NULL) { (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); - sDebug("vgId:%d sync event %s currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm); } } @@ -1407,8 +1436,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { if (oldSenders[i] != NULL) { snapshotSenderDestroy(oldSenders[i]); - sDebug("vgId:%d sync event %s currentTerm:%lu delete old sender %p replicaIndex:%d", pSyncNode->vgId, - syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, oldSenders[i], i); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, oldSenders[i], i); oldSenders[i] = NULL; } } @@ -1479,10 +1509,11 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { sDebug( - "vgId:%d sync event %s currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, " "restoreFinish:%d, %s", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->restoreFinish, debugStr); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, + pSyncNode->restoreFinish, debugStr); // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -1519,9 +1550,12 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { // reset restoreFinish pSyncNode->restoreFinish = false; - sDebug("vgId:%d sync event %s currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, restoreFinish:%d, %s", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->restoreFinish, debugStr); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, " + "restoreFinish:%d, %s", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, + pSyncNode->restoreFinish, debugStr); // state change pSyncNode->state = TAOS_SYNC_STATE_LEADER; @@ -2095,14 +2129,15 @@ const char* syncStr(ESyncState state) { static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); - sDebug("vgId:%d sync event %s currentTerm:%lu begin leader transfer", ths->vgId, syncUtilState2String(ths->state), - ths->pRaftStore->currentTerm); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer", ths->vgId, + syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm); if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) { - sDebug("vgId:%d sync event %s currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId, - syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, - pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId, + syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, + pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort, + pSyncLeaderTransfer->newLeaderId.addr); // reset elect timer now! int32_t electMS = 1; @@ -2180,8 +2215,16 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE // change isStandBy to normal if (!isDrop) { - char tmpbuf[128]; - snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d", oldSyncCfg.replicaNum, newSyncCfg.replicaNum); + char tmpbuf[512]; + char* oldStr = syncCfg2Str(&oldSyncCfg); + char* newStr = syncCfg2Str(&newSyncCfg); + syncUtilJson2Line(oldStr); + syncUtilJson2Line(newStr); + snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum, + newSyncCfg.replicaNum, pEntry->index, oldStr, newStr); + taosMemoryFree(oldStr); + taosMemoryFree(newStr); + if (ths->state == TAOS_SYNC_STATE_LEADER) { syncNodeBecomeLeader(ths, tmpbuf); } else { @@ -2189,8 +2232,16 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE } } } else { - char tmpbuf[128]; - snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d", oldSyncCfg.replicaNum, newSyncCfg.replicaNum); + char tmpbuf[512]; + char* oldStr = syncCfg2Str(&oldSyncCfg); + char* newStr = syncCfg2Str(&newSyncCfg); + syncUtilJson2Line(oldStr); + syncUtilJson2Line(newStr); + snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum, + newSyncCfg.replicaNum, pEntry->index, oldStr, newStr); + taosMemoryFree(oldStr); + taosMemoryFree(newStr); + syncNodeBecomeFollower(ths, tmpbuf); } @@ -2224,9 +2275,10 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t code = 0; ESyncState state = flag; - sDebug("vgId:%d sync event %s currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", - ths->vgId, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, beginIndex, endIndex, - syncUtilState2String(state)); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 + ", %s", + ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, beginIndex, + endIndex, syncUtilState2String(state)); // execute fsm if (ths->pFsm != NULL) { @@ -2268,15 +2320,16 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, } // restore finish + // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { if (ths->restoreFinish == false) { if (ths->pFsm->FpRestoreFinishCb != NULL) { ths->pFsm->FpRestoreFinishCb(ths->pFsm); } ths->restoreFinish = true; - sDebug("vgId:%d sync event %s currentTerm:%lu restore finish, %s, index:%ld", ths->vgId, - syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, syncUtilState2String(ths->state), - pEntry->index); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld", ths->vgId, + syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, + syncUtilState2String(ths->state), pEntry->index); } } diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index 45e00aca2c..2d51f1f6f0 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -85,16 +85,11 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) { } return pRoot; - /* - cJSON *pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SSyncCfg", pRoot); - return pJson; - */ } char *syncCfg2Str(SSyncCfg *pSyncCfg) { cJSON *pJson = syncCfg2Json(pSyncCfg); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -154,6 +149,16 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex); cJSON_AddStringToObject(pRoot, "lastConfigIndex", buf64); + cJSON_AddNumberToObject(pRoot, "configIndexCount", pRaftCfg->configIndexCount); + cJSON *pIndexArr = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "configIndexArr", pIndexArr); + for (int i = 0; i < pRaftCfg->configIndexCount; ++i) { + snprintf(buf64, sizeof(buf64), "%ld", (pRaftCfg->configIndexArr)[i]); + cJSON *pIndexObj = cJSON_CreateObject(); + cJSON_AddStringToObject(pIndexObj, "index", buf64); + cJSON_AddItemToArray(pIndexArr, pIndexObj); + } + cJSON *pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "RaftCfg", pRoot); return pJson; @@ -161,7 +166,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) { cJSON *pJson = raftCfg2Json(pRaftCfg); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -177,6 +182,9 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) { raftCfg.isStandBy = meta.isStandBy; raftCfg.snapshotEnable = meta.snapshotEnable; raftCfg.lastConfigIndex = meta.lastConfigIndex; + raftCfg.configIndexCount = 1; + memset(raftCfg.configIndexArr, 0, sizeof(raftCfg.configIndexArr)); + raftCfg.configIndexArr[0] = -1; char *s = raftCfg2Str(&raftCfg); char buf[CONFIG_FILE_LEN] = {0}; @@ -207,7 +215,24 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { cJSON *pJsonLastConfigIndex = cJSON_GetObjectItem(pJson, "lastConfigIndex"); pRaftCfg->lastConfigIndex = atoll(cJSON_GetStringValue(pJsonLastConfigIndex)); - cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); + cJSON *pJsonConfigIndexCount = cJSON_GetObjectItem(pJson, "configIndexCount"); + pRaftCfg->configIndexCount = cJSON_GetNumberValue(pJsonConfigIndexCount); + + cJSON *pIndexArr = cJSON_GetObjectItem(pJson, "configIndexArr"); + int arraySize = cJSON_GetArraySize(pIndexArr); + assert(arraySize == pRaftCfg->configIndexCount); + + memset(pRaftCfg->configIndexArr, 0, sizeof(pRaftCfg->configIndexArr)); + for (int i = 0; i < arraySize; ++i) { + cJSON *pIndexObj = cJSON_GetArrayItem(pIndexArr, i); + assert(pIndexObj != NULL); + + cJSON *pIndex = cJSON_GetObjectItem(pIndexObj, "index"); + assert(cJSON_IsString(pIndex)); + (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); + } + + cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); ASSERT(code == 0); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 308edc0b93..87dfef5fdd 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -163,10 +163,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr walFsync(pWal, true); - sDebug("vgId:%d sync event %s currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d", - pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), - pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, - TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, " + "originalRpcType:%s,%d", + pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex, + pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, + TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); return code; } @@ -323,11 +325,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { walFsync(pWal, true); sDebug( - "vgId:%d sync event %s currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, " "originalRpcType:%s,%d", - pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftStore->currentTerm, - pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType), pEntry->msgType, - TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); + pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex, + pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, + TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); return code; } @@ -408,20 +410,18 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { } int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { - /* - SSyncLogStoreData* pData = pLogStore->data; - SWal* pWal = pData->pWal; - // assert(walCommit(pWal, index) == 0); - int32_t code = walCommit(pWal, index); - if (code != 0) { - int32_t err = terrno; - const char* errStr = tstrerror(err); - int32_t linuxErr = errno; - const char* linuxErrMsg = strerror(errno); - sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, - linuxErrMsg); ASSERT(0); - } - */ + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + // assert(walCommit(pWal, index) == 0); + int32_t code = walCommit(pWal, index); + if (code != 0) { + int32_t err = terrno; + const char* errStr = tstrerror(err); + int32_t linuxErr = errno; + const char* linuxErrMsg = strerror(errno); + sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); + ASSERT(0); + } return 0; } diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index cc4004a6bd..551c2611b0 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -46,10 +46,11 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) { taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event %s currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, pStub->rpcMsg.info.handle, - pStub->rpcMsg.info.ahandle); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, + pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); taosThreadMutexUnlock(&(pObj->mutex)); return keyCode; @@ -72,10 +73,12 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) { memcpy(pStub, pTmp, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event %s currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle, - pStub->rpcMsg.info.ahandle); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p " + "ahandle:%p", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, + pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); taosThreadMutexUnlock(&(pObj->mutex)); return 1; // get one object @@ -92,10 +95,12 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu memcpy(pStub, pTmp, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event %s currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle, - pStub->rpcMsg.info.ahandle); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p " + "ahandle:%p", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, + pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); taosHashRemove(pObj->pRespHash, &index, sizeof(index)); taosThreadMutexUnlock(&(pObj->mutex)); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 219a1a5a7d..4973d36295 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -141,21 +141,23 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d " + "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu send " "msg:%s", - pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d " + "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", - pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); @@ -285,31 +287,34 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d " + "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu send " "msg:%s", - pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d " + "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", - pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); } } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d " + "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", - pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); @@ -344,16 +349,19 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu send " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d " + "privateTerm:%lu send " "msg:%s", - pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm, msgStr); taosMemoryFree(msgStr); } else { - sDebug("vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu", - pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), - pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d " + "privateTerm:%lu", + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, + pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm); } syncSnapshotSendDestroy(pMsg); @@ -413,7 +421,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -534,7 +542,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddStringToObject(pFromId, "addr", u64buf); { uint64_t u64 = pReceiver->fromId.addr; - cJSON *pTmp = pFromId; + cJSON * pTmp = pFromId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); @@ -558,7 +566,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -586,19 +594,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, " + "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, - pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, + pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, " + "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld privateTerm:%lu", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, - pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, + pMsg->lastConfigIndex, pReceiver->privateTerm); } } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { @@ -611,7 +623,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // maybe update lastconfig if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) { - int32_t oldReplicaNum = pSyncNode->replicaNum; + // int32_t oldReplicaNum = pSyncNode->replicaNum; + SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg; // update new config myIndex SSyncCfg newSyncCfg = pMsg->lastConfig; @@ -635,24 +648,34 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { bool isDrop; if (IamInNew) { sDebug( - "vgId:%d sync event %s currentTerm:%lu update config by snapshot, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, " + "lastTerm:%lu, " "lastConfigIndex:%ld ", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop); } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu do not update config by snapshot, I am not in newCfg, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in " + "newCfg, " "lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld ", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); } // change isStandBy to normal if (!isDrop) { - char tmpbuf[128]; - snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d", oldReplicaNum, newSyncCfg.replicaNum); + char tmpbuf[512]; + char *oldStr = syncCfg2Str(&oldSyncCfg); + char *newStr = syncCfg2Str(&newSyncCfg); + syncUtilJson2Line(oldStr); + syncUtilJson2Line(newStr); + snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, index:%ld, %s --> %s", + oldSyncCfg.replicaNum, newSyncCfg.replicaNum, pMsg->lastConfigIndex, oldStr, newStr); + taosMemoryFree(oldStr); + taosMemoryFree(newStr); + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { syncNodeBecomeLeader(pSyncNode, tmpbuf); } else { @@ -671,21 +694,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore); sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin " + "index:%ld, " "snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, - pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, - pReceiver->privateTerm, logSimpleStr); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, + snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm, logSimpleStr); taosMemoryFree(logSimpleStr); } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin " + "index:%ld, " "snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, - pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, - pReceiver->privateTerm); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, + snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm); } pReceiver->pWriter = NULL; @@ -696,17 +721,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, " + "lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", - pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), + pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, " + "lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", - pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), + pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); } @@ -723,20 +750,22 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, " + "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv " "msg:%s", - pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), + pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, " + "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", - pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), + pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); } @@ -758,19 +787,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, " + "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, - pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, + pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, " + "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, - pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, + pMsg->lastConfigIndex, pReceiver->privateTerm); } } else { diff --git a/source/libs/sync/test/syncRaftCfgTest.cpp b/source/libs/sync/test/syncRaftCfgTest.cpp index 8c6a704e2d..9460f0ad7a 100644 --- a/source/libs/sync/test/syncRaftCfgTest.cpp +++ b/source/libs/sync/test/syncRaftCfgTest.cpp @@ -27,6 +27,14 @@ SRaftCfg* createRaftCfg() { } pCfg->isStandBy = taosGetTimestampSec() % 100; + pCfg->configIndexCount = 5; + for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) { + (pCfg->configIndexArr)[i] = -1; + } + for (int i = 0; i < pCfg->configIndexCount; ++i) { + (pCfg->configIndexArr)[i] = i * 100; + } + return pCfg; } @@ -100,6 +108,15 @@ void test5() { pCfg->isStandBy += 2; pCfg->snapshotEnable += 3; pCfg->lastConfigIndex += 1000; + + pCfg->configIndexCount = 5; + for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) { + (pCfg->configIndexArr)[i] = -1; + } + for (int i = 0; i < pCfg->configIndexCount; ++i) { + (pCfg->configIndexArr)[i] = i * 100; + } + raftCfgPersist(pCfg); printf("%s update json file: %s myIndex->%d \n", (char*)__FUNCTION__, "./test3_raft_cfg.json", pCfg->cfg.myIndex); @@ -118,6 +135,6 @@ int main() { test3(); test4(); test5(); - + return 0; } diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index 92beeffa0c..2d3d41de64 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -282,6 +282,8 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname) { } int32_t tfsRmdir(STfs *pTfs, const char *rname) { + ASSERT(rname[0] != 0); + char aname[TMPNAME_LEN] = "\0"; for (int32_t level = 0; level < pTfs->nlevel; level++) { @@ -289,6 +291,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) { for (int32_t id = 0; id < pTier->ndisk; id++) { STfsDisk *pDisk = pTier->disks[id]; snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname); + uInfo("====> tfs remove dir : path:%s aname:%s rname:[%s]", pDisk->path, aname, rname); taosRemoveDir(aname); } } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index b6e3fd2676..bd3781f9c7 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -310,7 +310,7 @@ static void uvHandleReq(SSvrConn* pConn) { } // set up conn info - SRpcConnInfo* pConnInfo = &(transMsg.info.connInfo); + SRpcConnInfo* pConnInfo = &(transMsg.info.conn); pConnInfo->clientIp = (uint32_t)(pConn->addr.sin_addr.s_addr); pConnInfo->clientPort = ntohs(pConn->addr.sin_port); tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user)); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 596aab0e09..933e3ba92b 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -276,9 +276,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC, "Invalid topic") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_QUERY, "Topic with invalid query") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Topic with invalid option") TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_EXIST, "Consumer not exist") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being used by some consumer") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED, "Consumer unchanged") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST, "Subcribe not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_OFFSET_NOT_EXIST, "Offset not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer not ready") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_SUBSCRIBED, "Topic subscribed cannot be dropped") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being used by some consumer") +// mnode-stream TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_ALREADY_EXIST, "Stream already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_NOT_EXIST, "Stream not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option") diff --git a/tests/script/tsim/mnode/basic5.sim b/tests/script/tsim/mnode/basic5.sim index 399dced169..fc591aa25d 100644 --- a/tests/script/tsim/mnode/basic5.sim +++ b/tests/script/tsim/mnode/basic5.sim @@ -3,6 +3,10 @@ system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode2 -i 2 system sh/deploy.sh -n dnode3 -i 3 system sh/deploy.sh -n dnode4 -i 4 +system sh/cfg.sh -n dnode1 -c transPullupInterval -v 1 +system sh/cfg.sh -n dnode2 -c transPullupInterval -v 1 +system sh/cfg.sh -n dnode3 -c transPullupInterval -v 1 +system sh/cfg.sh -n dnode4 -c transPullupInterval -v 1 system sh/exec.sh -n dnode1 -s start sql connect @@ -260,7 +264,7 @@ $x = 0 step92: $x = $x + 1 sleep 1000 - if $x == 10 then + if $x == 20 then return -1 endi sql show mnodes diff --git a/tests/system-test/test.py b/tests/system-test/test.py index 5a68f548da..55bfd8c945 100644 --- a/tests/system-test/test.py +++ b/tests/system-test/test.py @@ -33,8 +33,16 @@ import taos def checkRunTimeError(): import win32gui + timeCount = 0 while 1: time.sleep(1) + timeCount = timeCount + 1 + if (timeCount>900): + os.system("TASKKILL /F /IM taosd.exe") + os.system("TASKKILL /F /IM taos.exe") + os.system("TASKKILL /F /IM tmq_sim.exe") + os.system("TASKKILL /F /IM mintty.exe") + quit(0) hwnd = win32gui.FindWindow(None, "Microsoft Visual C++ Runtime Library") if hwnd: os.system("TASKKILL /F /IM taosd.exe") diff --git a/tests/test/c/sdbDump.c b/tests/test/c/sdbDump.c index e5986cf4dd..0f0f7e8d10 100644 --- a/tests/test/c/sdbDump.c +++ b/tests/test/c/sdbDump.c @@ -283,7 +283,8 @@ void dumpTrans(SSdb *pSdb, SJson *json) { tjsonAddIntegerToObject(item, "conflict", pObj->conflict); tjsonAddIntegerToObject(item, "exec", pObj->exec); tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime)); - tjsonAddStringToObject(item, "dbname", pObj->dbname); + tjsonAddStringToObject(item, "dbname1", pObj->dbname1); + tjsonAddStringToObject(item, "dbname2", pObj->dbname2); tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitActions)); tjsonAddIntegerToObject(item, "redoActionNum", taosArrayGetSize(pObj->redoActions)); tjsonAddIntegerToObject(item, "undoActionNum", taosArrayGetSize(pObj->undoActions));