Merge pull request #22488 from taosdata/FIX/TD-25762-3.0
enh: ensure transactional atomicity of db names in create database
This commit is contained in:
commit
501c86f858
|
@ -66,11 +66,13 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans);
|
||||||
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, const SRpcMsg *pReq,
|
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, const SRpcMsg *pReq,
|
||||||
const char *opername);
|
const char *opername);
|
||||||
void mndTransDrop(STrans *pTrans);
|
void mndTransDrop(STrans *pTrans);
|
||||||
|
|
||||||
|
int32_t mndTransAppendPrepareLog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
|
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
|
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
|
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
int32_t mndTransAppendNullLog(STrans *pTrans);
|
int32_t mndTransAppendNullLog(STrans *pTrans);
|
||||||
int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pAction);
|
|
||||||
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
|
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
|
||||||
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
|
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
|
||||||
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
|
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
|
||||||
|
|
|
@ -37,7 +37,7 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup);
|
||||||
SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId);
|
SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId);
|
||||||
int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup);
|
int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup);
|
||||||
int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups);
|
int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups);
|
||||||
int32_t mndAddPrepareNewVgAction(SMnode *, STrans *pTrans, SVgObj *pVg);
|
int32_t mndAddNewVgPrepareAction(SMnode *, STrans *pTrans, SVgObj *pVg);
|
||||||
int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid);
|
int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid);
|
||||||
int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup);
|
int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup);
|
||||||
int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType);
|
int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType);
|
||||||
|
|
|
@ -37,6 +37,8 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw);
|
||||||
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb);
|
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb);
|
||||||
static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb);
|
static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb);
|
||||||
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew);
|
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew);
|
||||||
|
static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj);
|
||||||
|
|
||||||
static int32_t mndProcessCreateDbReq(SRpcMsg *pReq);
|
static int32_t mndProcessCreateDbReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessAlterDbReq(SRpcMsg *pReq);
|
static int32_t mndProcessAlterDbReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessDropDbReq(SRpcMsg *pReq);
|
static int32_t mndProcessDropDbReq(SRpcMsg *pReq);
|
||||||
|
@ -59,6 +61,7 @@ int32_t mndInitDb(SMnode *pMnode) {
|
||||||
.insertFp = (SdbInsertFp)mndDbActionInsert,
|
.insertFp = (SdbInsertFp)mndDbActionInsert,
|
||||||
.updateFp = (SdbUpdateFp)mndDbActionUpdate,
|
.updateFp = (SdbUpdateFp)mndDbActionUpdate,
|
||||||
.deleteFp = (SdbDeleteFp)mndDbActionDelete,
|
.deleteFp = (SdbDeleteFp)mndDbActionDelete,
|
||||||
|
.validateFp = (SdbValidateFp)mndNewDbActionValidate,
|
||||||
};
|
};
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DB, mndProcessCreateDbReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DB, mndProcessCreateDbReq);
|
||||||
|
@ -247,6 +250,19 @@ _OVER:
|
||||||
return pRow;
|
return pRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) {
|
||||||
|
SDbObj *pNewDb = pObj;
|
||||||
|
|
||||||
|
SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name);
|
||||||
|
if (pOldDb != NULL) {
|
||||||
|
mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name);
|
||||||
|
sdbRelease(pMnode->pSdb, pOldDb);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) {
|
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) {
|
||||||
mTrace("db:%s, perform insert action, row:%p", pDb->name, pDb);
|
mTrace("db:%s, perform insert action, row:%p", pDb->name, pDb);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -448,9 +464,18 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
|
||||||
if (pCfg->tsdbPageSize <= 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE;
|
if (pCfg->tsdbPageSize <= 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetPrepareNewVgActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
static int32_t mndSetCreateDbPrepareAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||||
|
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
|
||||||
|
if (pDbRaw == NULL) return -1;
|
||||||
|
|
||||||
|
if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1;
|
||||||
|
if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndSetNewVgPrepareActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
||||||
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
||||||
if (mndAddPrepareNewVgAction(pMnode, pTrans, (pVgroups + v)) != 0) return -1;
|
if (mndAddNewVgPrepareAction(pMnode, pTrans, (pVgroups + v)) != 0) return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -459,7 +484,7 @@ static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD
|
||||||
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
|
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
|
||||||
if (pDbRaw == NULL) return -1;
|
if (pDbRaw == NULL) return -1;
|
||||||
if (mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1;
|
if (mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1;
|
||||||
if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1;
|
if (sdbSetRawStatus(pDbRaw, SDB_STATUS_UPDATE) != 0) return -1;
|
||||||
|
|
||||||
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
||||||
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
|
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
|
||||||
|
@ -633,8 +658,8 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
mndTransSetOper(pTrans, MND_OPER_CREATE_DB);
|
mndTransSetOper(pTrans, MND_OPER_CREATE_DB);
|
||||||
if (mndSetPrepareNewVgActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
if (mndSetCreateDbPrepareAction(pMnode, pTrans, &dbObj) != 0) goto _OVER;
|
||||||
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
if (mndSetNewVgPrepareActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
||||||
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
||||||
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups, pNewUserDuped) != 0) goto _OVER;
|
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups, pNewUserDuped) != 0) goto _OVER;
|
||||||
if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
||||||
|
|
|
@ -631,7 +631,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
|
|
||||||
mndTransSetSerial(pTrans);
|
mndTransSetSerial(pTrans);
|
||||||
mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name);
|
mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name);
|
||||||
if (mndAddPrepareNewVgAction(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
|
if (mndAddNewVgPrepareAction(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
|
||||||
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
||||||
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
|
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
|
||||||
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
#include "mndSync.h"
|
#include "mndSync.h"
|
||||||
#include "mndCluster.h"
|
#include "mndCluster.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndVgroup.h"
|
|
||||||
|
|
||||||
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
||||||
if (pMsg == NULL || pMsg->pCont == NULL) {
|
if (pMsg == NULL || pMsg->pCont == NULL) {
|
||||||
|
@ -75,25 +74,25 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
|
static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
|
||||||
|
SSdbRaw *pRaw = pAction->pRaw;
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SSdbRow *pRow = NULL;
|
SSdbRow *pRow = NULL;
|
||||||
int32_t code = -1;
|
void *pObj = NULL;
|
||||||
|
int code = -1;
|
||||||
|
|
||||||
if (pAction->msgType == TDMT_MND_CREATE_VG) {
|
if (pRaw->status != SDB_STATUS_CREATING) goto _OUT;
|
||||||
pRow = mndVgroupActionDecode(pAction->pRaw);
|
|
||||||
if (pRow == NULL) goto _OUT;
|
|
||||||
|
|
||||||
SVgObj *pVgroup = sdbGetRowObj(pRow);
|
pRow = (pSdb->decodeFps[pRaw->type])(pRaw);
|
||||||
if (pVgroup == NULL) goto _OUT;
|
if (pRow == NULL) goto _OUT;
|
||||||
|
pObj = sdbGetRowObj(pRow);
|
||||||
|
if (pObj == NULL) goto _OUT;
|
||||||
|
|
||||||
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
SdbValidateFp validateFp = pSdb->validateFps[pRaw->type];
|
||||||
if (maxVgId > pVgroup->vgId) {
|
code = 0;
|
||||||
mError("trans:%d, failed to satisfy vgroup id %d of prepare action. maxVgId:%d", pTrans->id, pVgroup->vgId,
|
if (validateFp) {
|
||||||
maxVgId);
|
code = validateFp(pMnode, pTrans, pObj);
|
||||||
goto _OUT;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
code = 0;
|
|
||||||
_OUT:
|
_OUT:
|
||||||
taosMemoryFreeClear(pRow);
|
taosMemoryFreeClear(pRow);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -655,11 +655,10 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
|
||||||
return mndTransAppendAction(pTrans->commitActions, &action);
|
return mndTransAppendAction(pTrans->commitActions, &action);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pAction) {
|
int32_t mndTransAppendPrepareLog(STrans *pTrans, SSdbRaw *pRaw) {
|
||||||
pAction->stage = TRN_STAGE_PREPARE;
|
STransAction action = {
|
||||||
pAction->actionType = TRANS_ACTION_RAW;
|
.pRaw = pRaw, .stage = TRN_STAGE_PREPARE, .actionType = TRANS_ACTION_RAW, .mTraceId = pTrans->mTraceId};
|
||||||
pAction->mTraceId = pTrans->mTraceId;
|
return mndTransAppendAction(pTrans->prepareActions, &action);
|
||||||
return mndTransAppendAction(pTrans->prepareActions, pAction);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
|
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
|
||||||
|
|
|
@ -33,6 +33,7 @@
|
||||||
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
|
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
|
||||||
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
|
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
|
||||||
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
|
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
|
||||||
|
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj);
|
||||||
|
|
||||||
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
|
||||||
|
@ -53,6 +54,7 @@ int32_t mndInitVgroup(SMnode *pMnode) {
|
||||||
.insertFp = (SdbInsertFp)mndVgroupActionInsert,
|
.insertFp = (SdbInsertFp)mndVgroupActionInsert,
|
||||||
.updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
|
.updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
|
||||||
.deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
|
.deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
|
||||||
|
.validateFp = (SdbValidateFp)mndNewVgActionValidate,
|
||||||
};
|
};
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
|
||||||
|
@ -171,6 +173,17 @@ _OVER:
|
||||||
return pRow;
|
return pRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) {
|
||||||
|
SVgObj *pVgroup = pObj;
|
||||||
|
|
||||||
|
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
||||||
|
if (maxVgId > pVgroup->vgId) {
|
||||||
|
mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
|
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
|
||||||
mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
|
mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1259,12 +1272,11 @@ int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndAddPrepareNewVgAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
|
int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
|
||||||
SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
|
SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
|
||||||
if (pRaw == NULL) goto _err;
|
if (pRaw == NULL) goto _err;
|
||||||
|
|
||||||
STransAction action = {.pRaw = pRaw, .msgType = TDMT_MND_CREATE_VG};
|
if (mndTransAppendPrepareLog(pTrans, pRaw) != 0) goto _err;
|
||||||
if (mndTransAppendPrepareAction(pTrans, &action) != 0) goto _err;
|
|
||||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_CREATING);
|
(void)sdbSetRawStatus(pRaw, SDB_STATUS_CREATING);
|
||||||
pRaw = NULL;
|
pRaw = NULL;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -2380,13 +2392,13 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
|
||||||
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
||||||
int32_t srcVgId = newVg1.vgId;
|
int32_t srcVgId = newVg1.vgId;
|
||||||
newVg1.vgId = maxVgId;
|
newVg1.vgId = maxVgId;
|
||||||
if (mndAddPrepareNewVgAction(pMnode, pTrans, &newVg1) != 0) goto _OVER;
|
if (mndAddNewVgPrepareAction(pMnode, pTrans, &newVg1) != 0) goto _OVER;
|
||||||
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1) != 0) goto _OVER;
|
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1) != 0) goto _OVER;
|
||||||
|
|
||||||
maxVgId++;
|
maxVgId++;
|
||||||
srcVgId = newVg2.vgId;
|
srcVgId = newVg2.vgId;
|
||||||
newVg2.vgId = maxVgId;
|
newVg2.vgId = maxVgId;
|
||||||
if (mndAddPrepareNewVgAction(pMnode, pTrans, &newVg2) != 0) goto _OVER;
|
if (mndAddNewVgPrepareAction(pMnode, pTrans, &newVg2) != 0) goto _OVER;
|
||||||
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2) != 0) goto _OVER;
|
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2) != 0) goto _OVER;
|
||||||
|
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
|
||||||
|
|
|
@ -106,6 +106,7 @@ typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj);
|
||||||
typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj);
|
typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj);
|
||||||
typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc);
|
typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc);
|
||||||
typedef int32_t (*SdbDeployFp)(SMnode *pMnode);
|
typedef int32_t (*SdbDeployFp)(SMnode *pMnode);
|
||||||
|
typedef int32_t (*SdbValidateFp)(SMnode *pMnode, void *pTrans, void *pObj);
|
||||||
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
|
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
|
||||||
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
|
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
|
||||||
typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3);
|
typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3);
|
||||||
|
@ -189,6 +190,7 @@ typedef struct SSdb {
|
||||||
SdbDeployFp deployFps[SDB_MAX];
|
SdbDeployFp deployFps[SDB_MAX];
|
||||||
SdbEncodeFp encodeFps[SDB_MAX];
|
SdbEncodeFp encodeFps[SDB_MAX];
|
||||||
SdbDecodeFp decodeFps[SDB_MAX];
|
SdbDecodeFp decodeFps[SDB_MAX];
|
||||||
|
SdbValidateFp validateFps[SDB_MAX];
|
||||||
TdThreadMutex filelock;
|
TdThreadMutex filelock;
|
||||||
} SSdb;
|
} SSdb;
|
||||||
|
|
||||||
|
@ -207,6 +209,7 @@ typedef struct {
|
||||||
SdbInsertFp insertFp;
|
SdbInsertFp insertFp;
|
||||||
SdbUpdateFp updateFp;
|
SdbUpdateFp updateFp;
|
||||||
SdbDeleteFp deleteFp;
|
SdbDeleteFp deleteFp;
|
||||||
|
SdbValidateFp validateFp;
|
||||||
} SSdbTable;
|
} SSdbTable;
|
||||||
|
|
||||||
typedef struct SSdbOpt {
|
typedef struct SSdbOpt {
|
||||||
|
|
|
@ -121,6 +121,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
|
||||||
pSdb->deployFps[sdbType] = table.deployFp;
|
pSdb->deployFps[sdbType] = table.deployFp;
|
||||||
pSdb->encodeFps[sdbType] = table.encodeFp;
|
pSdb->encodeFps[sdbType] = table.encodeFp;
|
||||||
pSdb->decodeFps[sdbType] = table.decodeFp;
|
pSdb->decodeFps[sdbType] = table.decodeFp;
|
||||||
|
pSdb->validateFps[sdbType] = table.validateFp;
|
||||||
|
|
||||||
int32_t hashType = 0;
|
int32_t hashType = 0;
|
||||||
if (keyType == SDB_KEY_INT32) {
|
if (keyType == SDB_KEY_INT32) {
|
||||||
|
|
|
@ -79,6 +79,8 @@ const char *sdbStatusName(ESdbStatus status) {
|
||||||
return "dropped";
|
return "dropped";
|
||||||
case SDB_STATUS_INIT:
|
case SDB_STATUS_INIT:
|
||||||
return "init";
|
return "init";
|
||||||
|
case SDB_STATUS_UPDATE:
|
||||||
|
return "update";
|
||||||
default:
|
default:
|
||||||
return "undefine";
|
return "undefine";
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue