enh: create SDB_DB entry in prepareAction

This commit is contained in:
Benguang Zhao 2023-08-18 17:08:24 +08:00
parent f01a26f68a
commit e7411183d9
3 changed files with 61 additions and 17 deletions

View File

@ -448,6 +448,16 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->tsdbPageSize <= 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE;
}
static int32_t mndSetCreateDbPrepareActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
if (pDbRaw == NULL) return -1;
STransAction action = {.pRaw = pDbRaw, .msgType = TDMT_MND_CREATE_DB};
if (mndTransAppendPrepareAction(pTrans, &action) != 0) return -1;
if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1;
return 0;
}
static int32_t mndSetNewVgPrepareActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
if (mndAddNewVgPrepareAction(pMnode, pTrans, (pVgroups + v)) != 0) return -1;
@ -459,7 +469,7 @@ static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
if (pDbRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1;
if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1;
if (sdbSetRawStatus(pDbRaw, SDB_STATUS_UPDATE) != 0) return -1;
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
@ -633,6 +643,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
mndTransSetOper(pTrans, MND_OPER_CREATE_DB);
if (mndSetCreateDbPrepareActions(pMnode, pTrans, &dbObj) != 0) goto _OVER;
if (mndSetNewVgPrepareActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;

View File

@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "mndSync.h"
#include "mndCluster.h"
#include "mndDb.h"
#include "mndTrans.h"
#include "mndVgroup.h"
@ -74,23 +75,17 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return code;
}
static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
SSdbRow *pRow = NULL;
int32_t code = -1;
static int32_t mndValidateNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
int code = -1;
SSdbRow *pRow = (pMnode->pSdb->decodeFps[SDB_VGROUP])(pAction->pRaw);
if (pRow == NULL) goto _OUT;
SVgObj *pVgroup = sdbGetRowObj(pRow);
if (pVgroup == NULL) goto _OUT;
if (pAction->msgType == TDMT_MND_CREATE_VG) {
pRow = mndVgroupActionDecode(pAction->pRaw);
if (pRow == NULL) goto _OUT;
SVgObj *pVgroup = sdbGetRowObj(pRow);
if (pVgroup == NULL) goto _OUT;
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
if (maxVgId > pVgroup->vgId) {
mError("trans:%d, failed to satisfy vgroup id %d of prepare action. maxVgId:%d", pTrans->id, pVgroup->vgId,
maxVgId);
goto _OUT;
}
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
if (maxVgId > pVgroup->vgId) {
mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
goto _OUT;
}
code = 0;
@ -99,6 +94,42 @@ _OUT:
return code;
}
static int32_t mndValidateCreateDbPrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
int code = -1;
SSdbRow *pRow = (pMnode->pSdb->decodeFps[SDB_DB])(pAction->pRaw);
if (pRow == NULL) goto _OUT;
SDbObj *pNewDb = sdbGetRowObj(pRow);
if (pNewDb == NULL) goto _OUT;
SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name);
if (pOldDb != NULL) {
mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name);
sdbRelease(pMnode->pSdb, pOldDb);
goto _OUT;
}
code = 0;
_OUT:
taosMemoryFreeClear(pRow);
return code;
}
static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
int32_t code = 0;
switch (pAction->pRaw->type) {
case SDB_VGROUP:
code = mndValidateNewVgPrepareAction(pMnode, pTrans, pAction);
break;
case SDB_DB:
code = mndValidateCreateDbPrepareAction(pMnode, pTrans, pAction);
break;
default:
}
return code;
}
static int32_t mndTransValidatePrepareStage(SMnode *pMnode, STrans *pTrans) {
int32_t code = -1;
int32_t action = 0;

View File

@ -79,6 +79,8 @@ const char *sdbStatusName(ESdbStatus status) {
return "dropped";
case SDB_STATUS_INIT:
return "init";
case SDB_STATUS_UPDATE:
return "update";
default:
return "undefine";
}