Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/tsdb_optimize

This commit is contained in:
Hongze Cheng 2023-06-13 13:21:12 +08:00
commit 7baeabc6b9
25 changed files with 513 additions and 362 deletions

View File

@ -150,7 +150,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_HB, "consumer-hb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
TD_DEF_MSG_TYPE(TDMT_MND_UNUSED2, "unused2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_VG, "create-vg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_TIMER, "tmq-tmr", SMTimerReq, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)

View File

@ -154,14 +154,14 @@ typedef struct SSnapshotMeta {
typedef struct SSyncFSM {
void* data;
int32_t (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
int32_t (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta);
SyncIndex (*FpAppliedIndexCb)(const struct SSyncFSM* pFsm);
int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta);
void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta);
void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx);
void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SReConfigCbMeta* pMeta);
void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SReConfigCbMeta* pMeta);
void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta);
bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm);
int32_t (*FpApplyQueueItems)(const struct SSyncFSM* pFsm);

View File

@ -345,7 +345,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TRANS_CLOG_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x03D4)
#define TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL TAOS_DEF_ERROR_CODE(0, 0x03D5)
#define TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED TAOS_DEF_ERROR_CODE(0, 0x03D6) //internal
#define TSDB_CODE_MND_TRNAS_SYNC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x03D7)
#define TSDB_CODE_MND_TRANS_SYNC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x03D7)
#define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03DF)
// mnode-mq

View File

@ -108,7 +108,7 @@ typedef enum {
TRN_STAGE_UNDO_ACTION = 3,
TRN_STAGE_COMMIT = 4,
TRN_STAGE_COMMIT_ACTION = 5,
TRN_STAGE_FINISHED = 6,
TRN_STAGE_FINISH = 6,
TRN_STAGE_PRE_FINISH = 7
} ETrnStage;
@ -157,6 +157,7 @@ typedef struct {
void* rpcRsp;
int32_t rpcRspLen;
int32_t redoActionPos;
SArray* prepareActions;
SArray* redoActions;
SArray* undoActions;
SArray* commitActions;

View File

@ -70,6 +70,7 @@ int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendNullLog(STrans *pTrans);
int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
@ -78,15 +79,23 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbnam
void mndTransSetSerial(STrans *pTrans);
void mndTransSetParallel(STrans *pTrans);
void mndTransSetOper(STrans *pTrans, EOperType oper);
int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans);
int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans);
static int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans) {
return mndTransCheckConflict(pMnode, pTrans);
}
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
int32_t mndTransProcessRsp(SRpcMsg *pRsp);
void mndTransPullup(SMnode *pMnode);
int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans);
void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader);
void mndTransExecute(SMnode *pMnode, STrans *pTrans);
void mndTransRefresh(SMnode *pMnode, STrans *pTrans);
int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, const char *dbname);
SSdbRaw *mndTransEncode(STrans *pTrans);
SSdbRow *mndTransDecode(SSdbRaw *pRaw);
void mndTransDropData(STrans *pTrans);
bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
#ifdef __cplusplus
}
#endif

View File

@ -27,6 +27,7 @@ void mndCleanupVgroup(SMnode *pMnode);
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId);
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup);
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup);
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
void mndSortVnodeGid(SVgObj *pVgroup);
@ -36,6 +37,7 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup);
SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId);
int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups);
int32_t mndAddPrepareNewVgAction(SMnode *, STrans *pTrans, SVgObj *pVg);
int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid);
int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup);
int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType);

View File

@ -414,6 +414,13 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->tsdbPageSize <= 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE;
}
static int32_t mndSetPrepareNewVgActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
if (mndAddPrepareNewVgAction(pMnode, pTrans, (pVgroups + v)) != 0) return -1;
}
return 0;
}
static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
if (pDbRaw == NULL) return -1;
@ -424,7 +431,7 @@ static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1;
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING) != 0) return -1;
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_UPDATE) != 0) return -1;
}
return 0;
@ -589,9 +596,10 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
mInfo("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
mndTransSetDbName(pTrans, dbObj.name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
mndTransSetOper(pTrans, MND_OPER_CREATE_DB);
if (mndSetPrepareNewVgActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups, pNewUserDuped) != 0) goto _OVER;
@ -832,7 +840,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p
int32_t code = -1;
mndTransSetDbName(pTrans, pOld->name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
@ -1129,7 +1137,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
mInfo("trans:%d start to drop db:%s", pTrans->id, pDb->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
goto _OVER;
}

View File

@ -632,7 +632,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
if (pTrans == NULL) goto _OVER;
mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
pRaw = mndDnodeActionEncode(&dnodeObj);
if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
@ -889,7 +889,7 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SM
if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
pRaw = mndDnodeActionEncode(pDnode);
if (pRaw == NULL) goto _OVER;

View File

@ -645,7 +645,7 @@ int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pSt
// mInfo("trans:%d, used to add index to stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
mndTransSetSerial(pTrans);
@ -721,7 +721,7 @@ static int32_t mndDropIdx(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SIdxObj *p
mInfo("trans:%d, used to drop idx:%s", pTrans->id, pIdx->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
mndTransSetSerial(pTrans);
if (mndSetDropIdxRedoLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
@ -860,4 +860,4 @@ int32_t mndDropIdxsByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
}
return 0;
}
}

View File

@ -578,7 +578,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
SMnodeObj mnodeObj = {0};
mnodeObj.id = pDnode->id;
@ -732,7 +732,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;

View File

@ -388,7 +388,7 @@ static int32_t mndSetCreateSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVg
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1;
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING) != 0) return -1;
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_UPDATE) != 0) return -1;
return 0;
}
@ -622,11 +622,11 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-sma");
if (pTrans == NULL) goto _OVER;
mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name);
if (mndAddPrepareNewVgAction(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
@ -845,7 +845,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
mInfo("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
mndTransSetSerial(pTrans);

View File

@ -874,7 +874,7 @@ _OVER:
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) return -1;
if (mndTransCheckConflict(pMnode, pTrans) != 0) return -1;
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
@ -1968,7 +1968,7 @@ static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbOb
mInfo("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (needRsp) {
void *pCont = NULL;
@ -1998,7 +1998,7 @@ static int32_t mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDbO
mInfo("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (needRsp) {
void *pCont = NULL;
@ -2242,7 +2242,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p
mInfo("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
@ -3298,7 +3298,7 @@ static int32_t mndCheckIndexReq(SCreateTagIndexReq *pReq) {
mInfo("trans:%d, used to add index to stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;

View File

@ -735,7 +735,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mndTransDrop(pTrans);
goto _OVER;
}
@ -890,7 +890,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "stream-checkpoint");
if (pTrans == NULL) return -1;
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mndReleaseStream(pMnode, pStream);
mndTransDrop(pTrans);
return -1;
@ -1001,7 +1001,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
@ -1369,7 +1369,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
@ -1477,7 +1477,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;

View File

@ -489,7 +489,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
}
mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mndTransDrop(pTrans);
return -1;
}

View File

@ -17,6 +17,7 @@
#include "mndSync.h"
#include "mndCluster.h"
#include "mndTrans.h"
#include "mndVgroup.h"
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) {
@ -73,76 +74,200 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return code;
}
int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
SMnode *pMnode = pFsm->data;
static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
SSdbRow *pRow = NULL;
int32_t code = -1;
if (pAction->msgType == TDMT_MND_CREATE_VG) {
pRow = mndVgroupActionDecode(pAction->pRaw);
if (pRow == NULL) goto _OUT;
SVgObj *pVgroup = sdbGetRowObj(pRow);
if (pVgroup == NULL) goto _OUT;
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
if (maxVgId > pVgroup->vgId) {
mError("trans:%d, failed to satisfy vgroup id %d of prepare action. maxVgId:%d", pTrans->id, pVgroup->vgId,
maxVgId);
goto _OUT;
}
}
code = 0;
_OUT:
taosMemoryFreeClear(pRow);
return code;
}
static int32_t mndTransValidatePrepareStage(SMnode *pMnode, STrans *pTrans) {
int32_t code = -1;
int32_t action = 0;
int32_t numOfActions = taosArrayGetSize(pTrans->prepareActions);
if (numOfActions == 0) {
code = 0;
goto _OUT;
}
mInfo("trans:%d, validate %d prepare actions.", pTrans->id, numOfActions);
for (action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pTrans->prepareActions, action);
if (pAction->actionType != TRANS_ACTION_RAW) {
mError("trans:%d, prepare action:%d of unexpected type:%d", pTrans->id, action, pAction->actionType);
goto _OUT;
}
code = mndTransValidatePrepareAction(pMnode, pTrans, pAction);
if (code != 0) {
mError("trans:%d, failed to validate prepare action: %d, numOfActions:%d", pTrans->id, action, numOfActions);
goto _OUT;
}
}
code = 0;
_OUT:
return code;
}
static int32_t mndTransValidateImp(SMnode *pMnode, STrans *pTrans) {
if (pTrans->stage == TRN_STAGE_PREPARE) {
if (mndTransCheckConflict(pMnode, pTrans) < 0) {
mError("trans:%d, failed to validate trans conflicts.", pTrans->id);
return -1;
}
return mndTransValidatePrepareStage(pMnode, pTrans);
}
return 0;
}
static int32_t mndTransValidate(SMnode *pMnode, SSdbRaw *pRaw) {
STrans *pTrans = NULL;
int32_t code = -1;
SSdbRow *pRow = mndTransDecode(pRaw);
if (pRow == NULL) goto _OUT;
pTrans = sdbGetRowObj(pRow);
if (pTrans == NULL) goto _OUT;
code = mndTransValidateImp(pMnode, pTrans);
_OUT:
if (pTrans) mndTransDropData(pTrans);
if (pRow) taosMemoryFreeClear(pRow);
if (code) terrno = (terrno ? terrno : TSDB_CODE_MND_TRANS_CONFLICT);
return code;
}
int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
terrno = TSDB_CODE_SUCCESS;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont;
STrans *pTrans = NULL;
int32_t code = -1;
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
if (transId <= 0) {
mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq);
terrno = TSDB_CODE_INVALID_MSG;
goto _OUT;
}
mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
" role:%s raw:%p sec:%d seq:%" PRId64,
transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
pRaw, pMgmt->transSec, pMgmt->transSeq);
if (pMeta->code == 0) {
int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
if (code != 0) {
mError("trans:%d, failed to write to sdb since %s", transId, terrstr());
return 0;
}
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
code = mndTransValidate(pMnode, pRaw);
if (code != 0) {
mError("trans:%d, failed to validate requested trans since %s", transId, terrstr());
code = 0;
pMeta->code = terrno;
goto _OUT;
}
taosThreadMutexLock(&pMgmt->lock);
pMgmt->errCode = pMeta->code;
if (transId <= 0) {
taosThreadMutexUnlock(&pMgmt->lock);
mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq);
} else if (transId == pMgmt->transId) {
if (pMgmt->errCode != 0) {
mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
} else {
mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq);
}
pMgmt->transId = 0;
pMgmt->transSec = 0;
pMgmt->transSeq = 0;
tsem_post(&pMgmt->syncSem);
taosThreadMutexUnlock(&pMgmt->lock);
} else {
taosThreadMutexUnlock(&pMgmt->lock);
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d",
transId, pTrans->createdTime, pMgmt->transId);
mndTransExecute(pMnode, pTrans, false);
mndReleaseTrans(pMnode, pTrans);
} else {
mError("trans:%d, not found while execute in mnode since %s", transId, terrstr());
}
code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
if (code != 0) {
mError("trans:%d, failed to write to sdb since %s", transId, terrstr());
code = 0;
pMeta->code = terrno;
goto _OUT;
}
pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans == NULL) {
mError("trans:%d, not found while execute in mnode since %s", transId, terrstr());
goto _OUT;
}
if (pTrans->stage == TRN_STAGE_PREPARE) {
bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans);
if (!continueExec) goto _OUT;
}
if (pTrans->id != pMgmt->transId) {
mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d",
pTrans->id, pTrans->createdTime, pMgmt->transId);
mndTransRefresh(pMnode, pTrans);
}
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta);
code = 0;
_OUT:
if (pTrans) mndReleaseTrans(pMnode, pTrans);
return code;
}
static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosThreadMutexLock(&pMgmt->lock);
if (pMgmt->transId == 0) {
goto _OUT;
}
pMgmt->transId = 0;
pMgmt->transSec = 0;
pMgmt->transSeq = 0;
pMgmt->errCode = code;
tsem_post(&pMgmt->syncSem);
if (pMgmt->errCode != 0) {
mError("trans:%d, failed to propose since %s, post sem", pMgmt->transId, tstrerror(pMgmt->errCode));
} else {
mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, pMgmt->transId, pMgmt->transSeq);
}
_OUT:
taosThreadMutexUnlock(&pMgmt->lock);
return 0;
}
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
int32_t code = 0;
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
SMnode *pMnode = pFsm->data;
int32_t code = pMsg->code;
if (code != 0) {
goto _OUT;
}
pMsg->info.conn.applyIndex = pMeta->index;
pMsg->info.conn.applyTerm = pMeta->term;
pMeta->code = 0;
if (pMsg->code == 0) {
SMnode *pMnode = pFsm->data;
atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
}
atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
if (!syncUtilUserCommit(pMsg->msgType)) {
goto _out;
goto _OUT;
}
code = mndProcessWriteMsg(pFsm, pMsg, pMeta);
_out:
code = mndProcessWriteMsg(pMnode, pMsg, pMeta);
_OUT:
mndPostMgmtCode(pMnode, code ? code : pMeta->code);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return code;

View File

@ -753,7 +753,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
mndTransSetDbName(pTrans, pTopic->db, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mndReleaseTopic(pMnode, pTopic);
mndTransDrop(pTrans);
return -1;

View File

@ -23,28 +23,25 @@
#include "mndSync.h"
#include "mndUser.h"
#define TRANS_VER_NUMBER 1
#define TRANS_VER1_NUMBER 1
#define TRANS_VER2_NUMBER 2
#define TRANS_ARRAY_SIZE 8
#define TRANS_RESERVE_SIZE 48
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld);
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc);
static int32_t mndTransDelete(SSdb *pSdb, STrans *pTrans, bool callFunc);
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
static void mndTransDropLogs(SArray *pArray);
static void mndTransDropActions(SArray *pArray);
static void mndTransDropData(STrans *pTrans);
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray);
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans);
@ -52,7 +49,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans);
static bool mndCannotExecuteTransAction(SMnode *pMnode) { return !pMnode->deploy && !mndIsLeader(pMnode); }
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans);
@ -67,11 +64,11 @@ int32_t mndInitTrans(SMnode *pMnode) {
SSdbTable table = {
.sdbType = SDB_TRANS,
.keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)mndTransActionEncode,
.decodeFp = (SdbDecodeFp)mndTransActionDecode,
.encodeFp = (SdbEncodeFp)mndTransEncode,
.decodeFp = (SdbDecodeFp)mndTransDecode,
.insertFp = (SdbInsertFp)mndTransActionInsert,
.updateFp = (SdbUpdateFp)mndTransActionUpdate,
.deleteFp = (SdbDeleteFp)mndTransActionDelete,
.deleteFp = (SdbDeleteFp)mndTransDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransTimer);
@ -103,15 +100,55 @@ static int32_t mndTransGetActionsSize(SArray *pArray) {
return rawDataLen;
}
static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
static int32_t mndTransEncodeAction(SSdbRaw *pRaw, int32_t *offset, SArray *pActions, int32_t actionsNum) {
int32_t dataPos = *offset;
int8_t unused = 0;
int32_t ret = -1;
for (int32_t i = 0; i < actionsNum; ++i) {
STransAction *pAction = taosArrayGet(pActions, i);
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER)
if (pAction->actionType == TRANS_ACTION_RAW) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else if (pAction->actionType == TRANS_ACTION_MSG) {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
} else {
// nothing
}
}
ret = 0;
_OVER:
*offset = dataPos;
return ret;
}
SSdbRaw *mndTransEncode(STrans *pTrans) {
terrno = TSDB_CODE_INVALID_MSG;
int8_t sver = taosArrayGetSize(pTrans->prepareActions) ? TRANS_VER2_NUMBER : TRANS_VER1_NUMBER;
int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE + pTrans->paramLen;
rawDataLen += mndTransGetActionsSize(pTrans->prepareActions);
rawDataLen += mndTransGetActionsSize(pTrans->redoActions);
rawDataLen += mndTransGetActionsSize(pTrans->undoActions);
rawDataLen += mndTransGetActionsSize(pTrans->commitActions);
SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TRANS_VER_NUMBER, rawDataLen);
SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, sver, rawDataLen);
if (pRaw == NULL) {
mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr());
return NULL;
@ -131,91 +168,22 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER)
int32_t prepareActionNum = taosArrayGetSize(pTrans->prepareActions);
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);
int32_t commitActionNum = taosArrayGetSize(pTrans->commitActions);
if (sver > TRANS_VER1_NUMBER) {
SDB_SET_INT32(pRaw, dataPos, prepareActionNum, _OVER)
}
SDB_SET_INT32(pRaw, dataPos, redoActionNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, undoActionNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, commitActionNum, _OVER)
int8_t unused = 0;
for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER)
if (pAction->actionType == TRANS_ACTION_RAW) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else if (pAction->actionType == TRANS_ACTION_MSG) {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
} else {
// nothing
}
}
for (int32_t i = 0; i < undoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER)
if (pAction->actionType == TRANS_ACTION_RAW) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else if (pAction->actionType == TRANS_ACTION_MSG) {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
} else {
// nothing
}
}
for (int32_t i = 0; i < commitActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->commitActions, i);
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER)
if (pAction->actionType == TRANS_ACTION_RAW) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else if (pAction->actionType == TRANS_ACTION_MSG) {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
} else {
// nothing
}
}
if (mndTransEncodeAction(pRaw, &dataPos, pTrans->prepareActions, prepareActionNum) < 0) goto _OVER;
if (mndTransEncodeAction(pRaw, &dataPos, pTrans->redoActions, redoActionNum) < 0) goto _OVER;
if (mndTransEncodeAction(pRaw, &dataPos, pTrans->undoActions, undoActionNum) < 0) goto _OVER;
if (mndTransEncodeAction(pRaw, &dataPos, pTrans->commitActions, commitActionNum) < 0) goto _OVER;
SDB_SET_INT32(pRaw, dataPos, pTrans->startFunc, _OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->stopFunc, _OVER)
@ -242,23 +210,76 @@ _OVER:
return pRaw;
}
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
static int32_t mndTransDecodeAction(SSdbRaw *pRaw, int32_t *offset, SArray *pActions, int32_t actionNum) {
STransAction action = {0};
int32_t dataPos = *offset;
int8_t unused = 0;
int8_t stage = 0;
int8_t actionType = 0;
int32_t dataLen = 0;
int32_t ret = -1;
for (int32_t i = 0; i < actionNum; ++i) {
memset(&action, 0, sizeof(action));
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
action.actionType = actionType;
SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
action.stage = stage;
SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER)
if (action.actionType == TRANS_ACTION_RAW) {
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
mTrace("raw:%p, is created", action.pRaw);
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
} else if (action.actionType == TRANS_ACTION_MSG) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
tmsgUpdateDnodeEpSet(&action.epSet);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
} else {
if (taosArrayPush(pActions, &action) == NULL) goto _OVER;
}
}
ret = 0;
_OVER:
*offset = dataPos;
taosMemoryFreeClear(action.pCont);
return ret;
}
SSdbRow *mndTransDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_INVALID_MSG;
SSdbRow *pRow = NULL;
STrans *pTrans = NULL;
char *pData = NULL;
int32_t dataLen = 0;
int8_t sver = 0;
int32_t prepareActionNum = 0;
int32_t redoActionNum = 0;
int32_t undoActionNum = 0;
int32_t commitActionNum = 0;
int32_t dataPos = 0;
STransAction action = {0};
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != TRANS_VER_NUMBER) {
if (sver != TRANS_VER1_NUMBER && sver != TRANS_VER2_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER;
}
@ -294,127 +315,28 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER)
if (sver > TRANS_VER1_NUMBER) {
SDB_GET_INT32(pRaw, dataPos, &prepareActionNum, _OVER)
}
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &commitActionNum, _OVER)
pTrans->prepareActions = taosArrayInit(prepareActionNum, sizeof(STransAction));
pTrans->redoActions = taosArrayInit(redoActionNum, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(undoActionNum, sizeof(STransAction));
pTrans->commitActions = taosArrayInit(commitActionNum, sizeof(STransAction));
if (pTrans->prepareActions == NULL) goto _OVER;
if (pTrans->redoActions == NULL) goto _OVER;
if (pTrans->undoActions == NULL) goto _OVER;
if (pTrans->commitActions == NULL) goto _OVER;
int8_t unused = 0;
for (int32_t i = 0; i < redoActionNum; ++i) {
memset(&action, 0, sizeof(action));
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
action.actionType = actionType;
SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
action.stage = stage;
SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER)
if (action.actionType == TRANS_ACTION_RAW) {
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
mTrace("raw:%p, is created", action.pRaw);
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
} else if (action.actionType == TRANS_ACTION_MSG) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
tmsgUpdateDnodeEpSet(&action.epSet);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
} else {
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
}
}
for (int32_t i = 0; i < undoActionNum; ++i) {
memset(&action, 0, sizeof(action));
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
action.actionType = actionType;
SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
action.stage = stage;
SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER)
if (action.actionType == TRANS_ACTION_RAW) {
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
mTrace("raw:%p, is created", action.pRaw);
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
} else if (action.actionType == TRANS_ACTION_MSG) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
} else {
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
}
}
for (int32_t i = 0; i < commitActionNum; ++i) {
memset(&action, 0, sizeof(action));
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
action.actionType = actionType;
SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
action.stage = stage;
SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER)
if (action.actionType) {
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
mTrace("raw:%p, is created", action.pRaw);
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
} else if (action.actionType == TRANS_ACTION_MSG) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
} else {
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
}
}
if (mndTransDecodeAction(pRaw, &dataPos, pTrans->prepareActions, prepareActionNum) < 0) goto _OVER;
if (mndTransDecodeAction(pRaw, &dataPos, pTrans->redoActions, redoActionNum) < 0) goto _OVER;
if (mndTransDecodeAction(pRaw, &dataPos, pTrans->undoActions, undoActionNum) < 0) goto _OVER;
if (mndTransDecodeAction(pRaw, &dataPos, pTrans->commitActions, commitActionNum) < 0) goto _OVER;
SDB_GET_INT32(pRaw, dataPos, &pTrans->startFunc, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pTrans->stopFunc, _OVER)
@ -434,7 +356,6 @@ _OVER:
mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, terrstr());
mndTransDropData(pTrans);
taosMemoryFreeClear(pRow);
taosMemoryFreeClear(action.pCont);
return NULL;
}
@ -458,7 +379,7 @@ static const char *mndTransStr(ETrnStage stage) {
return "commit";
case TRN_STAGE_COMMIT_ACTION:
return "commitAction";
case TRN_STAGE_FINISHED:
case TRN_STAGE_FINISH:
return "finished";
case TRN_STAGE_PRE_FINISH:
return "pre-finish";
@ -519,7 +440,11 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
return 0;
}
static void mndTransDropData(STrans *pTrans) {
void mndTransDropData(STrans *pTrans) {
if (pTrans->prepareActions != NULL) {
mndTransDropActions(pTrans->prepareActions);
pTrans->prepareActions = NULL;
}
if (pTrans->redoActions != NULL) {
mndTransDropActions(pTrans->redoActions);
pTrans->redoActions = NULL;
@ -549,7 +474,7 @@ static void mndTransDropData(STrans *pTrans) {
(void)taosThreadMutexDestroy(&pTrans->mutex);
}
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
static int32_t mndTransDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
mInfo("trans:%d, perform delete action, row:%p stage:%s callfunc:%d, stopFunc:%d", pTrans->id, pTrans,
mndTransStr(pTrans->stage), callFunc, pTrans->stopFunc);
@ -586,10 +511,11 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage),
pNew->createdTime);
// only occured while sync timeout
terrno = TSDB_CODE_MND_TRNAS_SYNC_TIMEOUT;
terrno = TSDB_CODE_MND_TRANS_SYNC_TIMEOUT;
return -1;
}
mndTransUpdateActions(pOld->prepareActions, pNew->prepareActions);
mndTransUpdateActions(pOld->redoActions, pNew->redoActions);
mndTransUpdateActions(pOld->undoActions, pNew->undoActions);
mndTransUpdateActions(pOld->commitActions, pNew->commitActions);
@ -607,7 +533,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
}
if (pOld->stage == TRN_STAGE_PRE_FINISH) {
pOld->stage = TRN_STAGE_FINISHED;
pOld->stage = TRN_STAGE_FINISH;
mTrace("trans:%d, stage from pre-finish to finished since perform update action", pNew->id);
}
@ -646,6 +572,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
pTrans->conflict = conflict;
pTrans->exec = TRN_EXEC_PARALLEL;
pTrans->createdTime = taosGetTimestampMs();
pTrans->prepareActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
@ -728,6 +655,13 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
return mndTransAppendAction(pTrans->commitActions, &action);
}
int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pAction) {
pAction->stage = TRN_STAGE_PREPARE;
pAction->actionType = TRANS_ACTION_RAW;
pAction->mTraceId = pTrans->mTraceId;
return mndTransAppendAction(pTrans->prepareActions, pAction);
}
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
pAction->stage = TRN_STAGE_REDO_ACTION;
pAction->actionType = TRANS_ACTION_MSG;
@ -800,7 +734,7 @@ void mndTransSetParallel(STrans *pTrans) { pTrans->exec = TRN_EXEC_PARALLEL; }
void mndTransSetOper(STrans *pTrans, EOperType oper) { pTrans->oper = oper; }
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
SSdbRaw *pRaw = mndTransActionEncode(pTrans);
SSdbRaw *pRaw = mndTransEncode(pTrans);
if (pRaw == NULL) {
mError("trans:%d, failed to encode while sync trans since %s", pTrans->id, terrstr());
return -1;
@ -872,7 +806,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
return conflict;
}
int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans) {
int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans) {
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (strlen(pTrans->dbname) == 0 && strlen(pTrans->stbname) == 0) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
@ -891,7 +825,7 @@ int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans) {
}
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
return -1;
}
@ -922,7 +856,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0;
mndTransExecute(pMnode, pNew, true);
mndTransExecute(pMnode, pNew);
mndReleaseTrans(pMnode, pNew);
return 0;
}
@ -961,7 +895,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
bool sendRsp = false;
int32_t code = pTrans->code;
if (pTrans->stage == TRN_STAGE_FINISHED) {
if (pTrans->stage == TRN_STAGE_FINISH) {
sendRsp = true;
}
@ -1003,7 +937,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
code = TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL;
}
if (code == TSDB_CODE_SYN_TIMEOUT) {
code = TSDB_CODE_MND_TRNAS_SYNC_TIMEOUT;
code = TSDB_CODE_MND_TRANS_SYNC_TIMEOUT;
}
if (i != 0 && code == 0) {
@ -1104,7 +1038,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
mInfo("trans:%d, invalid action, index:%d, code:0x%x", transId, action, pRsp->code);
}
mndTransExecute(pMnode, pTrans, true);
mndTransExecute(pMnode, pTrans);
_OVER:
mndReleaseTrans(pMnode, pTrans);
@ -1392,8 +1326,25 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
return code;
}
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
int32_t code = 0;
int32_t numOfActions = taosArrayGetSize(pTrans->prepareActions);
if (numOfActions == 0) goto _OVER;
mInfo("trans:%d, execute %d prepare actions.", pTrans->id, numOfActions);
for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pTrans->prepareActions, action);
code = mndTransExecSingleAction(pMnode, pTrans, pAction);
if (code != 0) {
mError("trans:%d, failed to execute prepare action:%d, numOfActions:%d", pTrans->id, action, numOfActions);
return false;
}
}
_OVER:
pTrans->stage = TRN_STAGE_REDO_ACTION;
mInfo("trans:%d, stage from prepare to redoAction", pTrans->id);
return continueExec;
@ -1476,7 +1427,7 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) {
if (code == 0) {
pTrans->code = 0;
pTrans->stage = TRN_STAGE_FINISHED; // TRN_STAGE_PRE_FINISH is not necessary
pTrans->stage = TRN_STAGE_FINISH; // TRN_STAGE_PRE_FINISH is not necessary
mInfo("trans:%d, stage from commitAction to finished", pTrans->id);
continueExec = true;
} else {
@ -1528,14 +1479,14 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
return continueExec;
}
static bool mndTransPerfromPreFinishedStage(SMnode *pMnode, STrans *pTrans) {
static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans) {
if (mndCannotExecuteTransAction(pMnode)) return false;
bool continueExec = true;
int32_t code = mndTransPreFinish(pMnode, pTrans);
if (code == 0) {
pTrans->stage = TRN_STAGE_FINISHED;
pTrans->stage = TRN_STAGE_FINISH;
mInfo("trans:%d, stage from pre-finish to finish", pTrans->id);
continueExec = true;
} else {
@ -1547,10 +1498,10 @@ static bool mndTransPerfromPreFinishedStage(SMnode *pMnode, STrans *pTrans) {
return continueExec;
}
static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = false;
SSdbRaw *pRaw = mndTransActionEncode(pTrans);
SSdbRaw *pRaw = mndTransEncode(pTrans);
if (pRaw == NULL) {
mError("trans:%d, failed to encode while finish trans since %s", pTrans->id, terrstr());
return false;
@ -1567,12 +1518,12 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
return continueExec;
}
void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader) {
void mndTransExecuteImp(SMnode *pMnode, STrans *pTrans, bool topHalf) {
bool continueExec = true;
while (continueExec) {
mInfo("trans:%d, continue to execute, stage:%s createTime:%" PRId64 " leader:%d", pTrans->id,
mndTransStr(pTrans->stage), pTrans->createdTime, isLeader);
mInfo("trans:%d, continue to execute, stage:%s createTime:%" PRId64 " topHalf:%d", pTrans->id,
mndTransStr(pTrans->stage), pTrans->createdTime, topHalf);
pTrans->lastExecTime = taosGetTimestampMs();
switch (pTrans->stage) {
case TRN_STAGE_PREPARE:
@ -1582,7 +1533,7 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader) {
continueExec = mndTransPerformRedoActionStage(pMnode, pTrans);
break;
case TRN_STAGE_COMMIT:
if (isLeader) {
if (topHalf) {
continueExec = mndTransPerformCommitStage(pMnode, pTrans);
} else {
mInfo("trans:%d, can not commit since not leader", pTrans->id);
@ -1593,7 +1544,7 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader) {
continueExec = mndTransPerformCommitActionStage(pMnode, pTrans);
break;
case TRN_STAGE_ROLLBACK:
if (isLeader) {
if (topHalf) {
continueExec = mndTransPerformRollbackStage(pMnode, pTrans);
} else {
mInfo("trans:%d, can not rollback since not leader", pTrans->id);
@ -1604,15 +1555,15 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader) {
continueExec = mndTransPerformUndoActionStage(pMnode, pTrans);
break;
case TRN_STAGE_PRE_FINISH:
if (isLeader) {
continueExec = mndTransPerfromPreFinishedStage(pMnode, pTrans);
if (topHalf) {
continueExec = mndTransPerformPreFinishStage(pMnode, pTrans);
} else {
mInfo("trans:%d, can not pre-finish since not leader", pTrans->id);
continueExec = false;
}
break;
case TRN_STAGE_FINISHED:
continueExec = mndTransPerfromFinishedStage(pMnode, pTrans);
case TRN_STAGE_FINISH:
continueExec = mndTransPerformFinishStage(pMnode, pTrans);
break;
default:
continueExec = false;
@ -1623,6 +1574,16 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader) {
mndTransSendRpcRsp(pMnode, pTrans);
}
void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
bool topHalf = true;
return mndTransExecuteImp(pMnode, pTrans, topHalf);
}
void mndTransRefresh(SMnode *pMnode, STrans *pTrans) {
bool topHalf = false;
return mndTransExecuteImp(pMnode, pTrans, topHalf);
}
static int32_t mndProcessTransTimer(SRpcMsg *pReq) {
mTrace("start to process trans timer");
mndTransPullup(pReq->info.node);
@ -1649,7 +1610,7 @@ int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
pAction->errCode = 0;
}
mndTransExecute(pMnode, pTrans, true);
mndTransExecute(pMnode, pTrans);
return 0;
}
@ -1707,7 +1668,7 @@ void mndTransPullup(SMnode *pMnode) {
int32_t *pTransId = taosArrayGet(pArray, i);
STrans *pTrans = mndAcquireTrans(pMnode, *pTransId);
if (pTrans != NULL) {
mndTransExecute(pMnode, pTrans, true);
mndTransExecute(pMnode, pTrans);
}
mndReleaseTrans(pMnode, pTrans);
}

View File

@ -28,7 +28,6 @@
#define VGROUP_VER_NUMBER 1
#define VGROUP_RESERVE_SIZE 64
static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
@ -483,15 +482,15 @@ static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t v
return pReq;
}
static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dstVgId, int32_t *pContLen) {
static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVgObj *pVgroup, int32_t *pContLen) {
SAlterVnodeHashRangeReq alterReq = {
.srcVgId = pVgroup->vgId,
.dstVgId = dstVgId,
.srcVgId = srcVgId,
.dstVgId = pVgroup->vgId,
.hashBegin = pVgroup->hashBegin,
.hashEnd = pVgroup->hashEnd,
};
mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", pVgroup->vgId, dstVgId,
mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId,
pVgroup->hashBegin, pVgroup->hashEnd);
int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq);
if (contLen < 0) {
@ -1207,12 +1206,12 @@ int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD
return 0;
}
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dstVgId) {
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, int32_t srcVgId, SVgObj *pVgroup) {
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t contLen = 0;
void *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, pVgroup, dstVgId, &contLen);
void *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, srcVgId, pVgroup, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
@ -1247,6 +1246,21 @@ int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb
return 0;
}
int32_t mndAddPrepareNewVgAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
if (pRaw == NULL) goto _err;
STransAction action = {.pRaw = pRaw, .msgType = TDMT_MND_CREATE_VG};
if (mndTransAppendPrepareAction(pTrans, &action) != 0) goto _err;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_CREATING);
pRaw = NULL;
return 0;
_err:
sdbFreeRaw(pRaw);
return -1;
}
int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
if (pDnode == NULL) return -1;
@ -2241,10 +2255,13 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans,
return 0;
}
static int32_t mndTransCommitVgStatus(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus) {
typedef int32_t (*FpTransActionCb)(STrans *pTrans, SSdbRaw *pRaw);
static int32_t mndAddVgStatusAction(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus, ETrnStage stage) {
FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
if (pRaw == NULL) goto _err;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _err;
if (appendActionCb(pTrans, pRaw) != 0) goto _err;
(void)sdbSetRawStatus(pRaw, vgStatus);
pRaw = NULL;
return 0;
@ -2253,18 +2270,32 @@ _err:
return -1;
}
static int32_t mndAddDbStatusAction(STrans *pTrans, SDbObj *pDb, ESdbStatus dbStatus, ETrnStage stage) {
FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
SSdbRaw *pRaw = mndDbActionEncode(pDb);
if (pRaw == NULL) goto _err;
if (appendActionCb(pTrans, pRaw) != 0) goto _err;
(void)sdbSetRawStatus(pRaw, dbStatus);
pRaw = NULL;
return 0;
_err:
sdbFreeRaw(pRaw);
return -1;
}
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
int32_t code = -1;
STrans *pTrans = NULL;
SSdbRaw *pRaw = NULL;
SDbObj dbObj = {0};
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "split-vgroup");
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "split-vgroup");
if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
mndTransSetDbName(pTrans, pDb->name, NULL);
SVgObj newVg1 = {0};
memcpy(&newVg1, pVgroup, sizeof(SVgObj));
mInfo("vgId:%d, vgroup info before split, replica:%d hashBegin:%u hashEnd:%u", newVg1.vgId, newVg1.replica,
@ -2316,32 +2347,25 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
// alter vgId and hash range
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg1, maxVgId) != 0) goto _OVER;
int32_t srcVgId = newVg1.vgId;
newVg1.vgId = maxVgId;
if (mndAddPrepareNewVgAction(pMnode, pTrans, &newVg1) != 0) goto _OVER;
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1) != 0) goto _OVER;
maxVgId++;
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg2, maxVgId) != 0) goto _OVER;
srcVgId = newVg2.vgId;
newVg2.vgId = maxVgId;
if (mndAddPrepareNewVgAction(pMnode, pTrans, &newVg2) != 0) goto _OVER;
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2) != 0) goto _OVER;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2) != 0) goto _OVER;
// adjust vgroup replica
if (pDb->cfg.replications != newVg1.replica) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER;
} else {
if (mndTransCommitVgStatus(pTrans, &newVg1, SDB_STATUS_READY) < 0) goto _OVER;
}
if (pDb->cfg.replications != newVg2.replica) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER;
} else {
if (mndTransCommitVgStatus(pTrans, &newVg2, SDB_STATUS_READY) < 0) goto _OVER;
}
if (mndTransCommitVgStatus(pTrans, pVgroup, SDB_STATUS_DROPPED) < 0) goto _OVER;
if (mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION) < 0) goto _OVER;
if (mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION) < 0) goto _OVER;
if (mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_REDO_ACTION) < 0) goto _OVER;
// update db status
memcpy(&dbObj, pDb, sizeof(SDbObj));
if (dbObj.cfg.pRetensions != NULL) {
dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL);
@ -2350,11 +2374,27 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
dbObj.vgVersion++;
dbObj.updateTime = taosGetTimestampMs();
dbObj.cfg.numOfVgroups++;
pRaw = mndDbActionEncode(&dbObj);
if (pRaw == NULL) goto _OVER;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
pRaw = NULL;
if (mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION) < 0) goto _OVER;
// adjust vgroup replica
if (pDb->cfg.replications != newVg1.replica) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER;
} else {
if (mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION) < 0) goto _OVER;
}
if (pDb->cfg.replications != newVg2.replica) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER;
} else {
if (mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION) < 0) goto _OVER;
}
if (mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_COMMIT_ACTION) < 0) goto _OVER;
// commit db status
dbObj.vgVersion++;
dbObj.updateTime = taosGetTimestampMs();
if (mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION) < 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
@ -2362,7 +2402,6 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
_OVER:
taosArrayDestroy(pArray);
mndTransDrop(pTrans);
sdbFreeRaw(pRaw);
taosArrayDestroy(dbObj.cfg.pRetensions);
return code;
}

View File

@ -122,6 +122,7 @@ typedef enum {
SDB_STATUS_DROPPING = 2,
SDB_STATUS_DROPPED = 3,
SDB_STATUS_READY = 4,
SDB_STATUS_UPDATE = 5,
} ESdbStatus;
typedef enum {

View File

@ -256,6 +256,7 @@ int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw) {
code = sdbInsertRow(pSdb, hash, pRaw, pRow, keySize);
break;
case SDB_STATUS_READY:
case SDB_STATUS_UPDATE:
case SDB_STATUS_DROPPING:
code = sdbUpdateRow(pSdb, hash, pRaw, pRow, keySize);
break;

View File

@ -879,9 +879,13 @@ static int32_t metaFilterTableByHash(SMeta *pMeta, SArray *uidList) {
SDecoder dc = {0};
tDecoderInit(&dc, pData, nData);
metaDecodeEntry(&dc, &me);
if (me.type != TSDB_SUPER_TABLE) {
int32_t ret = vnodeValidateTableHash(pMeta->pVnode, me.name);
if (TSDB_CODE_VND_HASH_MISMATCH == ret) {
char tbFName[TSDB_TABLE_FNAME_LEN + 1];
snprintf(tbFName, sizeof(tbFName), "%s.%s", pMeta->pVnode->config.dbname, me.name);
tbFName[TSDB_TABLE_FNAME_LEN] = '\0';
int32_t ret = vnodeValidateTableHash(pMeta->pVnode, tbFName);
if (ret < 0 && terrno == TSDB_CODE_VND_HASH_MISMATCH) {
taosArrayPush(uidList, &me.uid);
}
}
@ -910,6 +914,7 @@ int32_t metaTrimTables(SMeta *pMeta) {
goto end;
}
metaInfo("vgId:%d, trim %ld tables", TD_VID(pMeta->pVnode), taosArrayGetSize(tbUids));
metaDropTables(pMeta, tbUids);
end:

View File

@ -325,7 +325,7 @@ int vnodeValidateTableHash(SVnode *pVnode, char *tableFName) {
if (hashValue < pVnode->config.hashBegin || hashValue > pVnode->config.hashEnd) {
terrno = TSDB_CODE_VND_HASH_MISMATCH;
return TSDB_CODE_VND_HASH_MISMATCH;
return -1;
}
return 0;

View File

@ -431,7 +431,7 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm
return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
}
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
if (pMsg->code == 0) {
return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
}
@ -451,7 +451,7 @@ static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFs
return 0;
}
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
if (pMeta->isWeak == 1) {
return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
}
@ -463,7 +463,7 @@ static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) {
return atomic_load_64(&pVnode->state.applied);
}
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),

View File

@ -618,8 +618,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_
return -1;
}
// not restored, vnode enable
if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
if (!pSyncNode->restoreFinish) {
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);

View File

@ -275,7 +275,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CONFLICT, "Conflict transaction
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CLOG_IS_NULL, "Transaction commitlog is null")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL, "Unable to establish connection While execute transaction and will continue in the background")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED, "Last Transaction not finished")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRNAS_SYNC_TIMEOUT, "Sync timeout While execute transaction and will continue in the background")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_SYNC_TIMEOUT, "Sync timeout While execute transaction and will continue in the background")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error")
// mnode-mq