Merge pull request #13464 from taosdata/fix/mnode
refactor: add null transaction action typ
This commit is contained in:
commit
a4166b731a
|
@ -152,8 +152,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "retrieve", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "retrieve", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "grant", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "grant", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_CONFIRM_WRITE, "mnode-confirm-write", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply-msg", NULL, NULL)
|
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
|
||||||
|
|
|
@ -197,7 +197,6 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIRM_WRITE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||||
|
|
|
@ -29,12 +29,18 @@ typedef enum {
|
||||||
TRANS_STOP_FUNC_MQ_REB = 4,
|
TRANS_STOP_FUNC_MQ_REB = 4,
|
||||||
} ETrnFunc;
|
} ETrnFunc;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
TRANS_ACTION_NULL = 0,
|
||||||
|
TRANS_ACTION_MSG = 1,
|
||||||
|
TRANS_ACTION_RAW = 2,
|
||||||
|
} ETrnAct;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t id;
|
int32_t id;
|
||||||
int32_t errCode;
|
int32_t errCode;
|
||||||
int32_t acceptableCode;
|
int32_t acceptableCode;
|
||||||
int8_t stage;
|
int8_t stage;
|
||||||
int8_t actionType; // 0-msg, 1-raw
|
ETrnAct actionType;
|
||||||
int8_t rawWritten;
|
int8_t rawWritten;
|
||||||
int8_t msgSent;
|
int8_t msgSent;
|
||||||
int8_t msgReceived;
|
int8_t msgReceived;
|
||||||
|
@ -57,6 +63,7 @@ void mndTransDrop(STrans *pTrans);
|
||||||
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
|
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
|
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
|
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
|
int32_t mndTransAppendNullLog(STrans *pTrans);
|
||||||
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
|
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
|
||||||
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
|
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
|
||||||
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
|
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
|
||||||
|
@ -65,7 +72,7 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname);
|
||||||
void mndTransSetSerial(STrans *pTrans);
|
void mndTransSetSerial(STrans *pTrans);
|
||||||
|
|
||||||
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
|
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
|
||||||
void mndTransProcessRsp(SRpcMsg *pRsp);
|
int32_t mndTransProcessRsp(SRpcMsg *pRsp);
|
||||||
void mndTransPullup(SMnode *pMnode);
|
void mndTransPullup(SMnode *pMnode);
|
||||||
int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans);
|
int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans);
|
||||||
|
|
||||||
|
|
|
@ -34,9 +34,6 @@ static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNe
|
||||||
static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq);
|
static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq);
|
static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq);
|
static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessCreateMnodeRsp(SRpcMsg *pRsp);
|
|
||||||
static int32_t mndProcessAlterMnodeRsp(SRpcMsg *pRsp);
|
|
||||||
static int32_t mndProcessDropMnodeRsp(SRpcMsg *pRsp);
|
|
||||||
static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
|
@ -53,11 +50,11 @@ int32_t mndInitMnode(SMnode *pMnode) {
|
||||||
};
|
};
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_MNODE_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE, mndProcessAlterMnodeReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE, mndProcessAlterMnodeReq);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_MNODE_RSP, mndProcessCreateMnodeRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndProcessAlterMnodeRsp);
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndProcessDropMnodeRsp);
|
|
||||||
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
|
||||||
|
@ -367,7 +364,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
|
||||||
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
|
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
|
||||||
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
|
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
|
||||||
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
|
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
|
||||||
|
if (mndTransAppendNullLog(pTrans) != 0) goto _OVER;
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
@ -549,6 +546,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
|
||||||
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
|
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
|
||||||
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
|
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
|
||||||
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER;
|
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER;
|
||||||
|
if (mndTransAppendNullLog(pTrans) != 0) goto _OVER;
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
@ -616,21 +614,6 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessCreateMnodeRsp(SRpcMsg *pRsp) {
|
|
||||||
mndTransProcessRsp(pRsp);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndProcessAlterMnodeRsp(SRpcMsg *pRsp) {
|
|
||||||
mndTransProcessRsp(pRsp);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndProcessDropMnodeRsp(SRpcMsg *pRsp) {
|
|
||||||
mndTransProcessRsp(pRsp);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
|
@ -88,12 +88,14 @@ static int32_t mndTransGetActionsSize(SArray *pArray) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < actionNum; ++i) {
|
for (int32_t i = 0; i < actionNum; ++i) {
|
||||||
STransAction *pAction = taosArrayGet(pArray, i);
|
STransAction *pAction = taosArrayGet(pArray, i);
|
||||||
if (pAction->actionType) {
|
if (pAction->actionType == TRANS_ACTION_RAW) {
|
||||||
rawDataLen += (sdbGetRawTotalSize(pAction->pRaw) + sizeof(int32_t));
|
rawDataLen += (sdbGetRawTotalSize(pAction->pRaw) + sizeof(int32_t));
|
||||||
} else {
|
} else if (pAction->actionType == TRANS_ACTION_MSG) {
|
||||||
rawDataLen += (sizeof(STransAction) + pAction->contLen);
|
rawDataLen += (sizeof(STransAction) + pAction->contLen);
|
||||||
|
} else {
|
||||||
|
// empty
|
||||||
}
|
}
|
||||||
rawDataLen += sizeof(pAction->actionType);
|
rawDataLen += sizeof(int8_t);
|
||||||
}
|
}
|
||||||
|
|
||||||
return rawDataLen;
|
return rawDataLen;
|
||||||
|
@ -137,18 +139,20 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
||||||
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
|
||||||
if (pAction->actionType) {
|
if (pAction->actionType == TRANS_ACTION_RAW) {
|
||||||
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
|
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
|
||||||
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
|
||||||
} else {
|
} else if (pAction->actionType == TRANS_ACTION_MSG) {
|
||||||
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
|
||||||
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
|
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
|
||||||
|
} else {
|
||||||
|
// nothing
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -159,18 +163,20 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
||||||
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
|
||||||
if (pAction->actionType) {
|
if (pAction->actionType == TRANS_ACTION_RAW) {
|
||||||
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
|
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
|
||||||
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
|
||||||
} else {
|
} else if (pAction->actionType == TRANS_ACTION_MSG) {
|
||||||
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
|
||||||
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
|
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
|
||||||
|
} else {
|
||||||
|
// nothing
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -181,18 +187,20 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
||||||
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
|
||||||
if (pAction->actionType) {
|
if (pAction->actionType == TRANS_ACTION_RAW) {
|
||||||
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
|
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
|
||||||
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
|
||||||
} else {
|
} else if (pAction->actionType == TRANS_ACTION_MSG) {
|
||||||
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
|
||||||
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
|
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
|
||||||
|
} else {
|
||||||
|
// nothing
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -252,6 +260,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
int16_t policy = 0;
|
int16_t policy = 0;
|
||||||
int16_t conflict = 0;
|
int16_t conflict = 0;
|
||||||
int16_t exec = 0;
|
int16_t exec = 0;
|
||||||
|
int8_t actionType = 0;
|
||||||
SDB_GET_INT16(pRaw, dataPos, &stage, _OVER)
|
SDB_GET_INT16(pRaw, dataPos, &stage, _OVER)
|
||||||
SDB_GET_INT16(pRaw, dataPos, &policy, _OVER)
|
SDB_GET_INT16(pRaw, dataPos, &policy, _OVER)
|
||||||
SDB_GET_INT16(pRaw, dataPos, &conflict, _OVER)
|
SDB_GET_INT16(pRaw, dataPos, &conflict, _OVER)
|
||||||
|
@ -279,9 +288,10 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
|
||||||
SDB_GET_INT8(pRaw, dataPos, &action.actionType, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
|
||||||
|
action.actionType = actionType;
|
||||||
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
|
||||||
if (action.actionType) {
|
if (action.actionType == TRANS_ACTION_RAW) {
|
||||||
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
||||||
action.pRaw = taosMemoryMalloc(dataLen);
|
action.pRaw = taosMemoryMalloc(dataLen);
|
||||||
|
@ -290,7 +300,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
|
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
|
||||||
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
|
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
|
||||||
action.pRaw = NULL;
|
action.pRaw = NULL;
|
||||||
} else {
|
} else if (action.actionType == TRANS_ACTION_MSG) {
|
||||||
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
|
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
|
||||||
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
|
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
|
||||||
SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER)
|
||||||
|
@ -301,6 +311,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
|
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
|
||||||
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
|
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
|
||||||
action.pCont = NULL;
|
action.pCont = NULL;
|
||||||
|
} else {
|
||||||
|
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -308,9 +320,10 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
|
||||||
SDB_GET_INT8(pRaw, dataPos, &action.actionType, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
|
||||||
|
action.actionType = actionType;
|
||||||
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
|
||||||
if (action.actionType) {
|
if (action.actionType == TRANS_ACTION_RAW) {
|
||||||
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
||||||
action.pRaw = taosMemoryMalloc(dataLen);
|
action.pRaw = taosMemoryMalloc(dataLen);
|
||||||
|
@ -319,7 +332,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
|
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
|
||||||
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
|
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
|
||||||
action.pRaw = NULL;
|
action.pRaw = NULL;
|
||||||
} else {
|
} else if (action.actionType == TRANS_ACTION_MSG) {
|
||||||
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
|
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
|
||||||
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
|
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
|
||||||
SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER)
|
||||||
|
@ -330,6 +343,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
|
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
|
||||||
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
|
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
|
||||||
action.pCont = NULL;
|
action.pCont = NULL;
|
||||||
|
} else {
|
||||||
|
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -337,7 +352,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
|
||||||
SDB_GET_INT8(pRaw, dataPos, &action.actionType, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
|
||||||
|
action.actionType = actionType;
|
||||||
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
|
||||||
if (action.actionType) {
|
if (action.actionType) {
|
||||||
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
|
||||||
|
@ -348,7 +364,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
|
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
|
||||||
if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER;
|
if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER;
|
||||||
action.pRaw = NULL;
|
action.pRaw = NULL;
|
||||||
} else {
|
} else if (action.actionType == TRANS_ACTION_MSG) {
|
||||||
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
|
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
|
||||||
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
|
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
|
||||||
SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER)
|
||||||
|
@ -359,6 +375,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
|
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
|
||||||
if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER;
|
if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER;
|
||||||
action.pCont = NULL;
|
action.pCont = NULL;
|
||||||
|
} else {
|
||||||
|
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -552,10 +570,12 @@ static void mndTransDropActions(SArray *pArray) {
|
||||||
int32_t size = taosArrayGetSize(pArray);
|
int32_t size = taosArrayGetSize(pArray);
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
STransAction *pAction = taosArrayGet(pArray, i);
|
STransAction *pAction = taosArrayGet(pArray, i);
|
||||||
if (pAction->actionType) {
|
if (pAction->actionType == TRANS_ACTION_RAW) {
|
||||||
taosMemoryFreeClear(pAction->pRaw);
|
taosMemoryFreeClear(pAction->pRaw);
|
||||||
} else {
|
} else if (pAction->actionType == TRANS_ACTION_MSG) {
|
||||||
taosMemoryFreeClear(pAction->pCont);
|
taosMemoryFreeClear(pAction->pCont);
|
||||||
|
} else {
|
||||||
|
// nothing
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -583,27 +603,34 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
|
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
|
||||||
STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .actionType = true, .pRaw = pRaw};
|
STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .actionType = TRANS_ACTION_RAW, .pRaw = pRaw};
|
||||||
|
return mndTransAppendAction(pTrans->redoActions, &action);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndTransAppendNullLog(STrans *pTrans) {
|
||||||
|
STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .actionType = TRANS_ACTION_NULL};
|
||||||
return mndTransAppendAction(pTrans->redoActions, &action);
|
return mndTransAppendAction(pTrans->redoActions, &action);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) {
|
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) {
|
||||||
STransAction action = {.stage = TRN_STAGE_UNDO_ACTION, .actionType = true, .pRaw = pRaw};
|
STransAction action = {.stage = TRN_STAGE_UNDO_ACTION, .actionType = TRANS_ACTION_RAW, .pRaw = pRaw};
|
||||||
return mndTransAppendAction(pTrans->undoActions, &action);
|
return mndTransAppendAction(pTrans->undoActions, &action);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
|
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
|
||||||
STransAction action = {.stage = TRN_STAGE_COMMIT_ACTION, .actionType = true, .pRaw = pRaw};
|
STransAction action = {.stage = TRN_STAGE_COMMIT_ACTION, .actionType = TRANS_ACTION_RAW, .pRaw = pRaw};
|
||||||
return mndTransAppendAction(pTrans->commitActions, &action);
|
return mndTransAppendAction(pTrans->commitActions, &action);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
|
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
|
||||||
pAction->stage = TRN_STAGE_REDO_ACTION;
|
pAction->stage = TRN_STAGE_REDO_ACTION;
|
||||||
|
pAction->actionType = TRANS_ACTION_MSG;
|
||||||
return mndTransAppendAction(pTrans->redoActions, pAction);
|
return mndTransAppendAction(pTrans->redoActions, pAction);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
|
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
|
||||||
pAction->stage = TRN_STAGE_UNDO_ACTION;
|
pAction->stage = TRN_STAGE_UNDO_ACTION;
|
||||||
|
pAction->actionType = TRANS_ACTION_MSG;
|
||||||
return mndTransAppendAction(pTrans->undoActions, pAction);
|
return mndTransAppendAction(pTrans->undoActions, pAction);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -782,7 +809,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndTransProcessRsp(SRpcMsg *pRsp) {
|
int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
|
||||||
SMnode *pMnode = pRsp->info.node;
|
SMnode *pMnode = pRsp->info.node;
|
||||||
int64_t signature = (int64_t)(pRsp->info.ahandle);
|
int64_t signature = (int64_t)(pRsp->info.ahandle);
|
||||||
int32_t transId = (int32_t)(signature >> 32);
|
int32_t transId = (int32_t)(signature >> 32);
|
||||||
|
@ -827,6 +854,7 @@ void mndTransProcessRsp(SRpcMsg *pRsp) {
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
mndReleaseTrans(pMnode, pTrans);
|
mndReleaseTrans(pMnode, pTrans);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
|
static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
|
||||||
|
@ -899,10 +927,15 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
|
static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
|
||||||
if (pAction->actionType) {
|
if (pAction->actionType == TRANS_ACTION_RAW) {
|
||||||
return mndTransWriteSingleLog(pMnode, pTrans, pAction);
|
return mndTransWriteSingleLog(pMnode, pTrans, pAction);
|
||||||
} else {
|
} else if (pAction->actionType == TRANS_ACTION_MSG) {
|
||||||
return mndTransSendSingleMsg(pMnode, pTrans, pAction);
|
return mndTransSendSingleMsg(pMnode, pTrans, pAction);
|
||||||
|
} else {
|
||||||
|
pAction->rawWritten = 0;
|
||||||
|
pAction->errCode = 0;
|
||||||
|
mDebug("trans:%d, %s:%d null action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue