minor changes
This commit is contained in:
parent
8565cb4f61
commit
8cd8f52993
|
@ -589,8 +589,8 @@ typedef struct {
|
||||||
int32_t daysToKeep0;
|
int32_t daysToKeep0;
|
||||||
int32_t daysToKeep1;
|
int32_t daysToKeep1;
|
||||||
int32_t daysToKeep2;
|
int32_t daysToKeep2;
|
||||||
int32_t minRowsPerFileBlock;
|
int32_t minRows;
|
||||||
int32_t maxRowsPerFileBlock;
|
int32_t maxRows;
|
||||||
int32_t commitTime;
|
int32_t commitTime;
|
||||||
int32_t fsyncPeriod;
|
int32_t fsyncPeriod;
|
||||||
int8_t walLevel;
|
int8_t walLevel;
|
||||||
|
|
|
@ -197,8 +197,8 @@ void *vnodeParseReq(void *buf, SVnodeReq *pReq, uint8_t type);
|
||||||
// int32_t daysToKeep0;
|
// int32_t daysToKeep0;
|
||||||
// int32_t daysToKeep1;
|
// int32_t daysToKeep1;
|
||||||
// int32_t daysToKeep2;
|
// int32_t daysToKeep2;
|
||||||
// int32_t minRowsPerFileBlock;
|
// int32_t minRows;
|
||||||
// int32_t maxRowsPerFileBlock;
|
// int32_t maxRows;
|
||||||
// int8_t precision; // time resolution
|
// int8_t precision; // time resolution
|
||||||
// int8_t compression;
|
// int8_t compression;
|
||||||
// int8_t cacheLastRow;
|
// int8_t cacheLastRow;
|
||||||
|
|
|
@ -683,7 +683,7 @@ static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
|
||||||
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
|
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
dDebug("vgId:%d, failed to drop since %s", vgId, terrstr());
|
dDebug("vgId:%d, failed to drop since %s", vgId, terrstr());
|
||||||
return terrno;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
pVnode->dropped = 1;
|
pVnode->dropped = 1;
|
||||||
|
|
|
@ -210,8 +210,8 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
|
||||||
pReq->daysToKeep0 = htonl(3650);
|
pReq->daysToKeep0 = htonl(3650);
|
||||||
pReq->daysToKeep1 = htonl(3650);
|
pReq->daysToKeep1 = htonl(3650);
|
||||||
pReq->daysToKeep2 = htonl(3650);
|
pReq->daysToKeep2 = htonl(3650);
|
||||||
pReq->minRowsPerFileBlock = htonl(100);
|
pReq->minRows = htonl(100);
|
||||||
pReq->maxRowsPerFileBlock = htonl(4096);
|
pReq->maxRows = htonl(4096);
|
||||||
pReq->commitTime = htonl(3600);
|
pReq->commitTime = htonl(3600);
|
||||||
pReq->fsyncPeriod = htonl(3000);
|
pReq->fsyncPeriod = htonl(3000);
|
||||||
pReq->walLevel = 1;
|
pReq->walLevel = 1;
|
||||||
|
@ -375,8 +375,8 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) {
|
||||||
pReq->daysToKeep0 = htonl(3650);
|
pReq->daysToKeep0 = htonl(3650);
|
||||||
pReq->daysToKeep1 = htonl(3650);
|
pReq->daysToKeep1 = htonl(3650);
|
||||||
pReq->daysToKeep2 = htonl(3650);
|
pReq->daysToKeep2 = htonl(3650);
|
||||||
pReq->minRowsPerFileBlock = htonl(100);
|
pReq->minRows = htonl(100);
|
||||||
pReq->maxRowsPerFileBlock = htonl(4096);
|
pReq->maxRows = htonl(4096);
|
||||||
pReq->commitTime = htonl(3600);
|
pReq->commitTime = htonl(3600);
|
||||||
pReq->fsyncPeriod = htonl(3000);
|
pReq->fsyncPeriod = htonl(3000);
|
||||||
pReq->walLevel = 1;
|
pReq->walLevel = 1;
|
||||||
|
|
|
@ -187,8 +187,8 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
||||||
pReq->daysToKeep0 = htonl(3650);
|
pReq->daysToKeep0 = htonl(3650);
|
||||||
pReq->daysToKeep1 = htonl(3650);
|
pReq->daysToKeep1 = htonl(3650);
|
||||||
pReq->daysToKeep2 = htonl(3650);
|
pReq->daysToKeep2 = htonl(3650);
|
||||||
pReq->minRowsPerFileBlock = htonl(100);
|
pReq->minRows = htonl(100);
|
||||||
pReq->maxRowsPerFileBlock = htonl(4096);
|
pReq->maxRows = htonl(4096);
|
||||||
pReq->commitTime = htonl(3600);
|
pReq->commitTime = htonl(3600);
|
||||||
pReq->fsyncPeriod = htonl(3000);
|
pReq->fsyncPeriod = htonl(3000);
|
||||||
pReq->walLevel = 1;
|
pReq->walLevel = 1;
|
||||||
|
|
|
@ -275,7 +275,7 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
rpcMsg.pCont = pReq;
|
rpcMsg.pCont = pReq;
|
||||||
rpcMsg.contLen = sizeof(SCreateVnodeMsg);
|
rpcMsg.contLen = sizeof(SDropVnodeMsg);
|
||||||
rpcMsg.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN;
|
rpcMsg.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN;
|
||||||
|
|
||||||
sendMsg(pClient, &rpcMsg);
|
sendMsg(pClient, &rpcMsg);
|
||||||
|
|
|
@ -22,6 +22,13 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SEpSet epSet;
|
||||||
|
int8_t msgType;
|
||||||
|
int32_t contLen;
|
||||||
|
void *pCont;
|
||||||
|
} STransAction;
|
||||||
|
|
||||||
int32_t mndInitTrans(SMnode *pMnode);
|
int32_t mndInitTrans(SMnode *pMnode);
|
||||||
void mndCleanupTrans(SMnode *pMnode);
|
void mndCleanupTrans(SMnode *pMnode);
|
||||||
|
|
||||||
|
@ -30,8 +37,8 @@ void mndTransDrop(STrans *pTrans);
|
||||||
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
|
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
|
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
|
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont);
|
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
|
||||||
int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont);
|
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
|
||||||
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
|
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
|
||||||
void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code);
|
void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code);
|
||||||
char *mndTransStageStr(ETrnStage stage);
|
char *mndTransStageStr(ETrnStage stage);
|
||||||
|
|
|
@ -244,13 +244,15 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
|
||||||
|
|
||||||
static int32_t mndSetRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
static int32_t mndSetRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
||||||
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
|
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
|
||||||
if (pDbRaw == NULL || mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1;
|
if (pDbRaw == NULL) return -1;
|
||||||
sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING);
|
if (mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1;
|
||||||
|
if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1;
|
||||||
|
|
||||||
for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
||||||
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
|
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
|
||||||
if (pVgRaw == NULL || mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1;
|
if (pVgRaw == NULL) return -1;
|
||||||
sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING);
|
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1;
|
||||||
|
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING) != 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -258,13 +260,15 @@ static int32_t mndSetRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgOb
|
||||||
|
|
||||||
static int32_t mndSetUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
static int32_t mndSetUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
||||||
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
|
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
|
||||||
if (pDbRaw == NULL || mndTransAppendUndolog(pTrans, pDbRaw) != 0) return -1;
|
if (pDbRaw == NULL) return -1;
|
||||||
sdbSetRawStatus(pDbRaw, SDB_STATUS_DROPPED);
|
if (mndTransAppendUndolog(pTrans, pDbRaw) != 0) return -1;
|
||||||
|
if (sdbSetRawStatus(pDbRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||||
|
|
||||||
for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
||||||
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
|
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
|
||||||
if (pVgRaw == NULL || mndTransAppendUndolog(pTrans, pVgRaw) != 0) return -1;
|
if (pVgRaw == NULL) return -1;
|
||||||
sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED);
|
if (mndTransAppendUndolog(pTrans, pVgRaw) != 0) return -1;
|
||||||
|
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -272,38 +276,40 @@ static int32_t mndSetUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgOb
|
||||||
|
|
||||||
static int32_t mndSetCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
static int32_t mndSetCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
||||||
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
|
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
|
||||||
if (pDbRaw == NULL || mndTransAppendCommitlog(pTrans, pDbRaw) != 0) return -1;
|
if (pDbRaw == NULL) return -1;
|
||||||
sdbSetRawStatus(pDbRaw, SDB_STATUS_READY);
|
if (mndTransAppendCommitlog(pTrans, pDbRaw) != 0) return -1;
|
||||||
|
if (sdbSetRawStatus(pDbRaw, SDB_STATUS_READY) != 0) return -1;
|
||||||
|
|
||||||
for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
||||||
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
|
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
|
||||||
if (pVgRaw == NULL || mndTransAppendCommitlog(pTrans, pVgRaw) != 0) return -1;
|
if (pVgRaw == NULL) return -1;
|
||||||
sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
|
if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) return -1;
|
||||||
|
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_READY) != 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
static int32_t mndSetRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
||||||
for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
for (int vg = 0; vg < pDb->cfg.numOfVgroups; ++vg) {
|
||||||
SVgObj *pVgroup = pVgroups + v;
|
SVgObj *pVgroup = pVgroups + vg;
|
||||||
|
|
||||||
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
||||||
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
|
STransAction action = {0};
|
||||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
|
||||||
if (pDnode == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SEpSet epset = mndGetDnodeEpset(pDnode);
|
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
||||||
|
if (pDnode == NULL) return -1;
|
||||||
|
action.epSet = mndGetDnodeEpset(pDnode);
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
|
||||||
SCreateVnodeMsg *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup);
|
SCreateVnodeMsg *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup);
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) return -1;
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndTransAppendRedoAction(pTrans, &epset, TSDB_MSG_TYPE_ALTER_VNODE_IN, sizeof(SCreateVnodeMsg), pMsg) != 0) {
|
action.pCont = pMsg;
|
||||||
|
action.contLen = sizeof(SCreateVnodeMsg);
|
||||||
|
action.msgType = TSDB_MSG_TYPE_ALTER_VNODE_IN;
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
free(pMsg);
|
free(pMsg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -314,25 +320,25 @@ static int32_t mndSetRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SV
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
static int32_t mndSetUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
||||||
for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
for (int vg = 0; vg < pDb->cfg.numOfVgroups; ++vg) {
|
||||||
SVgObj *pVgroup = pVgroups + v;
|
SVgObj *pVgroup = pVgroups + vg;
|
||||||
|
|
||||||
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
||||||
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
|
STransAction action = {0};
|
||||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
|
||||||
if (pDnode == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SEpSet epset = mndGetDnodeEpset(pDnode);
|
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
||||||
|
if (pDnode == NULL) return -1;
|
||||||
|
action.epSet = mndGetDnodeEpset(pDnode);
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
|
||||||
SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup);
|
SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup);
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) return -1;
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndTransAppendUndoAction(pTrans, &epset, TSDB_MSG_TYPE_DROP_VNODE_IN, sizeof(SDropVnodeMsg), pMsg) != 0) {
|
action.pCont = pMsg;
|
||||||
|
action.contLen = sizeof(SDropVnodeMsg);
|
||||||
|
action.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN;
|
||||||
|
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
||||||
free(pMsg);
|
free(pMsg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -344,14 +350,14 @@ static int32_t mndSetUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SV
|
||||||
|
|
||||||
static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreate, SUserObj *pUser) {
|
static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreate, SUserObj *pUser) {
|
||||||
SDbObj dbObj = {0};
|
SDbObj dbObj = {0};
|
||||||
tstrncpy(dbObj.name, pCreate->db, TSDB_FULL_DB_NAME_LEN);
|
memcpy(dbObj.name, pCreate->db, TSDB_FULL_DB_NAME_LEN);
|
||||||
tstrncpy(dbObj.acct, pUser->acct, TSDB_USER_LEN);
|
memcpy(dbObj.acct, pUser->acct, TSDB_USER_LEN);
|
||||||
dbObj.createdTime = taosGetTimestampMs();
|
dbObj.createdTime = taosGetTimestampMs();
|
||||||
dbObj.updateTime = dbObj.createdTime;
|
dbObj.updateTime = dbObj.createdTime;
|
||||||
dbObj.uid = mndGenerateUid(dbObj.name, TSDB_FULL_DB_NAME_LEN);
|
dbObj.uid = mndGenerateUid(dbObj.name, TSDB_FULL_DB_NAME_LEN);
|
||||||
dbObj.hashMethod = 1;
|
|
||||||
dbObj.cfgVersion = 1;
|
dbObj.cfgVersion = 1;
|
||||||
dbObj.vgVersion = 1;
|
dbObj.vgVersion = 1;
|
||||||
|
dbObj.hashMethod = 1;
|
||||||
dbObj.cfg = (SDbCfg){.numOfVgroups = pCreate->numOfVgroups,
|
dbObj.cfg = (SDbCfg){.numOfVgroups = pCreate->numOfVgroups,
|
||||||
.cacheBlockSize = pCreate->cacheBlockSize,
|
.cacheBlockSize = pCreate->cacheBlockSize,
|
||||||
.totalBlocks = pCreate->totalBlocks,
|
.totalBlocks = pCreate->totalBlocks,
|
||||||
|
@ -359,8 +365,8 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
|
||||||
.daysToKeep0 = pCreate->daysToKeep0,
|
.daysToKeep0 = pCreate->daysToKeep0,
|
||||||
.daysToKeep1 = pCreate->daysToKeep1,
|
.daysToKeep1 = pCreate->daysToKeep1,
|
||||||
.daysToKeep2 = pCreate->daysToKeep2,
|
.daysToKeep2 = pCreate->daysToKeep2,
|
||||||
.minRows = pCreate->minRowsPerFileBlock,
|
.minRows = pCreate->minRows,
|
||||||
.maxRows = pCreate->maxRowsPerFileBlock,
|
.maxRows = pCreate->maxRows,
|
||||||
.fsyncPeriod = pCreate->fsyncPeriod,
|
.fsyncPeriod = pCreate->fsyncPeriod,
|
||||||
.commitTime = pCreate->commitTime,
|
.commitTime = pCreate->commitTime,
|
||||||
.precision = pCreate->precision,
|
.precision = pCreate->precision,
|
||||||
|
@ -447,8 +453,8 @@ static int32_t mndProcessCreateDbMsg(SMnodeMsg *pMsg) {
|
||||||
pCreate->daysToKeep0 = htonl(pCreate->daysToKeep0);
|
pCreate->daysToKeep0 = htonl(pCreate->daysToKeep0);
|
||||||
pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1);
|
pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1);
|
||||||
pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2);
|
pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2);
|
||||||
pCreate->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock);
|
pCreate->minRows = htonl(pCreate->minRows);
|
||||||
pCreate->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock);
|
pCreate->maxRows = htonl(pCreate->maxRows);
|
||||||
pCreate->commitTime = htonl(pCreate->commitTime);
|
pCreate->commitTime = htonl(pCreate->commitTime);
|
||||||
pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod);
|
pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod);
|
||||||
|
|
||||||
|
|
|
@ -21,13 +21,6 @@
|
||||||
#define TSDB_TRN_ARRAY_SIZE 8
|
#define TSDB_TRN_ARRAY_SIZE 8
|
||||||
#define TSDB_TRN_RESERVE_SIZE 64
|
#define TSDB_TRN_RESERVE_SIZE 64
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SEpSet epSet;
|
|
||||||
int8_t msgType;
|
|
||||||
int32_t contLen;
|
|
||||||
void *pCont;
|
|
||||||
} STransAction;
|
|
||||||
|
|
||||||
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
|
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
|
||||||
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
|
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
|
||||||
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
|
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
|
||||||
|
@ -37,7 +30,7 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans);
|
||||||
static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle);
|
static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle);
|
||||||
static void mndTransSendRpcRsp(STrans *pTrans, int32_t code);
|
static void mndTransSendRpcRsp(STrans *pTrans, int32_t code);
|
||||||
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
|
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
|
||||||
static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont);
|
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
|
||||||
static void mndTransDropLogs(SArray *pArray);
|
static void mndTransDropLogs(SArray *pArray);
|
||||||
static void mndTransDropActions(SArray *pArray);
|
static void mndTransDropActions(SArray *pArray);
|
||||||
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray);
|
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray);
|
||||||
|
@ -459,10 +452,8 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) {
|
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
|
||||||
STransAction action = {.epSet = *pEpSet, .msgType = msgType, .contLen = contLen, .pCont = pCont};
|
void *ptr = taosArrayPush(pArray, &pAction);
|
||||||
|
|
||||||
void *ptr = taosArrayPush(pArray, &action);
|
|
||||||
if (ptr == NULL) {
|
if (ptr == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -471,16 +462,12 @@ static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, int8_t msgTy
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) {
|
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
|
||||||
int32_t code = mndTransAppendAction(pTrans->redoActions, pEpSet, msgType, contLen, pCont);
|
return mndTransAppendAction(pTrans->redoActions, pAction);
|
||||||
mTrace("trans:%d, msg:%s len:%d append to redo actions", pTrans->id, taosMsg[msgType], contLen);
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) {
|
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
|
||||||
int32_t code = mndTransAppendAction(pTrans->undoActions, pEpSet, msgType, contLen, pCont);
|
return mndTransAppendAction(pTrans->undoActions, pAction);
|
||||||
mTrace("trans:%d, msg:%s len:%d append to undo actions", pTrans->id, taosMsg[msgType], contLen);
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
||||||
|
|
|
@ -61,8 +61,8 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
|
||||||
pReq->daysToKeep0 = htonl(3650);
|
pReq->daysToKeep0 = htonl(3650);
|
||||||
pReq->daysToKeep1 = htonl(3650);
|
pReq->daysToKeep1 = htonl(3650);
|
||||||
pReq->daysToKeep2 = htonl(3650);
|
pReq->daysToKeep2 = htonl(3650);
|
||||||
pReq->minRowsPerFileBlock = htonl(100);
|
pReq->minRows = htonl(100);
|
||||||
pReq->maxRowsPerFileBlock = htonl(4096);
|
pReq->maxRows = htonl(4096);
|
||||||
pReq->commitTime = htonl(3600);
|
pReq->commitTime = htonl(3600);
|
||||||
pReq->fsyncPeriod = htonl(3000);
|
pReq->fsyncPeriod = htonl(3000);
|
||||||
pReq->walLevel = 1;
|
pReq->walLevel = 1;
|
||||||
|
|
|
@ -116,8 +116,8 @@ static void doSetDbOptions(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDb) {
|
||||||
pMsg->totalBlocks = htonl(pCreateDb->numOfBlocks);
|
pMsg->totalBlocks = htonl(pCreateDb->numOfBlocks);
|
||||||
pMsg->daysPerFile = htonl(pCreateDb->daysPerFile);
|
pMsg->daysPerFile = htonl(pCreateDb->daysPerFile);
|
||||||
pMsg->commitTime = htonl((int32_t)pCreateDb->commitTime);
|
pMsg->commitTime = htonl((int32_t)pCreateDb->commitTime);
|
||||||
pMsg->minRowsPerFileBlock = htonl(pCreateDb->minRowsPerBlock);
|
pMsg->minRows = htonl(pCreateDb->minRowsPerBlock);
|
||||||
pMsg->maxRowsPerFileBlock = htonl(pCreateDb->maxRowsPerBlock);
|
pMsg->maxRows = htonl(pCreateDb->maxRowsPerBlock);
|
||||||
pMsg->fsyncPeriod = htonl(pCreateDb->fsyncPeriod);
|
pMsg->fsyncPeriod = htonl(pCreateDb->fsyncPeriod);
|
||||||
pMsg->compression = pCreateDb->compressionLevel;
|
pMsg->compression = pCreateDb->compressionLevel;
|
||||||
pMsg->walLevel = (char)pCreateDb->walLevel;
|
pMsg->walLevel = (char)pCreateDb->walLevel;
|
||||||
|
|
Loading…
Reference in New Issue