|
|
|
@ -15,11 +15,11 @@
|
|
|
|
|
|
|
|
|
|
#define _DEFAULT_SOURCE
|
|
|
|
|
#include "mndTrans.h"
|
|
|
|
|
#include "mndSubscribe.h"
|
|
|
|
|
#include "mndDb.h"
|
|
|
|
|
#include "mndPrivilege.h"
|
|
|
|
|
#include "mndShow.h"
|
|
|
|
|
#include "mndStb.h"
|
|
|
|
|
#include "mndSubscribe.h"
|
|
|
|
|
#include "mndSync.h"
|
|
|
|
|
#include "mndUser.h"
|
|
|
|
|
|
|
|
|
@ -801,16 +801,17 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
|
|
|
|
|
if (pNew->conflict == TRN_CONFLICT_TOPIC) {
|
|
|
|
|
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
|
|
|
|
if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
|
|
|
|
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 ) conflict = true;
|
|
|
|
|
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
|
|
|
|
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
|
|
|
|
if (pTrans->conflict == TRN_CONFLICT_TOPIC ) {
|
|
|
|
|
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 ) conflict = true;
|
|
|
|
|
if (pTrans->conflict == TRN_CONFLICT_TOPIC) {
|
|
|
|
|
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
|
|
|
|
|
}
|
|
|
|
|
if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
|
|
|
|
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0) conflict = true;
|
|
|
|
|
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0)
|
|
|
|
|
conflict = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -847,7 +848,7 @@ int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
|
|
|
|
if(pTrans == NULL) return -1;
|
|
|
|
|
if (pTrans == NULL) return -1;
|
|
|
|
|
|
|
|
|
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
|
|
|
|
return -1;
|
|
|
|
@ -1142,6 +1143,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
rpcMsg.info.traceId.rootId = pTrans->mTraceId;
|
|
|
|
|
rpcMsg.info.notFreeAhandle = 1;
|
|
|
|
|
|
|
|
|
|
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
|
|
|
|
|
|
|
|
|
@ -1156,7 +1158,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
|
|
|
|
|
int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg);
|
|
|
|
|
if (code == 0) {
|
|
|
|
|
pAction->msgSent = 1;
|
|
|
|
|
//pAction->msgReceived = 0;
|
|
|
|
|
// pAction->msgReceived = 0;
|
|
|
|
|
pAction->errCode = TSDB_CODE_ACTION_IN_PROGRESS;
|
|
|
|
|
mInfo("trans:%d, %s:%d is sent, %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, detail);
|
|
|
|
|
|
|
|
|
@ -1253,16 +1255,16 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
|
|
|
|
|
|
|
|
|
|
for (int32_t action = 0; action < numOfActions; ++action) {
|
|
|
|
|
STransAction *pAction = taosArrayGet(pArray, action);
|
|
|
|
|
mDebug("trans:%d, %s:%d Sent:%d, Received:%d, errCode:0x%x, acceptableCode:0x%x, retryCode:0x%x",
|
|
|
|
|
pTrans->id, mndTransStr(pAction->stage), pAction->id, pAction->msgSent, pAction->msgReceived,
|
|
|
|
|
pAction->errCode, pAction->acceptableCode, pAction->retryCode);
|
|
|
|
|
mDebug("trans:%d, %s:%d Sent:%d, Received:%d, errCode:0x%x, acceptableCode:0x%x, retryCode:0x%x", pTrans->id,
|
|
|
|
|
mndTransStr(pAction->stage), pAction->id, pAction->msgSent, pAction->msgReceived, pAction->errCode,
|
|
|
|
|
pAction->acceptableCode, pAction->retryCode);
|
|
|
|
|
if (pAction->msgSent) {
|
|
|
|
|
if (pAction->msgReceived) {
|
|
|
|
|
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
|
|
|
|
|
mndTransResetAction(pMnode, pTrans, pAction);
|
|
|
|
|
mInfo("trans:%d, %s:%d reset", pTrans->id, mndTransStr(pAction->stage), pAction->id);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return TSDB_CODE_ACTION_IN_PROGRESS;
|
|
|
|
|