fix(stream): adjust stream trans conflict check.

This commit is contained in:
Haojun Liao 2024-01-16 16:11:04 +08:00
parent 8ede6d87ce
commit 882b916940
2 changed files with 5 additions and 11 deletions

View File

@ -2841,6 +2841,8 @@ void killTransImpl(SMnode* pMnode, int32_t transId, const char* pDbName) {
mInfo("kill active transId:%d in Db:%s", transId, pDbName); mInfo("kill active transId:%d in Db:%s", transId, pDbName);
mndKillTrans(pMnode, pTrans); mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans);
} else {
mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId);
} }
} }
@ -2866,15 +2868,7 @@ int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) { static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
killTransImpl(pMnode, transId, "");
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("kill checkpoint transId:%d to reset task status", transId);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
} else {
mError("failed to acquire checkpoint trans:%d", transId);
}
SStreamObj *pStream = mndGetStreamObj(pMnode, streamId); SStreamObj *pStream = mndGetStreamObj(pMnode, streamId);
if (pStream == NULL) { if (pStream == NULL) {
@ -3100,7 +3094,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
} }
if (p->checkpointFailed) { if (p->checkpointFailed) {
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " failed, transId:%d, kill it", p->id.taskId, mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
p->checkpointId, p->chkpointTransId); p->checkpointId, p->chkpointTransId);
checkpointFailed = p->checkpointFailed; checkpointFailed = p->checkpointFailed;

View File

@ -89,7 +89,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char*
} }
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) { if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
if (strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) { if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0)) {
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid, mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
tInfo.name); tInfo.name);
terrno = TSDB_CODE_MND_TRANS_CONFLICT; terrno = TSDB_CODE_MND_TRANS_CONFLICT;