Merge pull request #15398 from taosdata/fix/mnode
fix: can't drop db since transaction conflict
This commit is contained in:
commit
f8761bfc16
|
@ -164,8 +164,8 @@ typedef struct {
|
||||||
int32_t lastErrorNo;
|
int32_t lastErrorNo;
|
||||||
tmsg_t lastMsgType;
|
tmsg_t lastMsgType;
|
||||||
SEpSet lastEpset;
|
SEpSet lastEpset;
|
||||||
char dbname1[TSDB_DB_FNAME_LEN];
|
char dbname1[TSDB_TABLE_FNAME_LEN];
|
||||||
char dbname2[TSDB_DB_FNAME_LEN];
|
char dbname2[TSDB_TABLE_FNAME_LEN];
|
||||||
int32_t startFunc;
|
int32_t startFunc;
|
||||||
int32_t stopFunc;
|
int32_t stopFunc;
|
||||||
int32_t paramLen;
|
int32_t paramLen;
|
||||||
|
|
|
@ -281,7 +281,7 @@ static int32_t mndSetDropOffsetRedoLogs(SMnode *pMnode, STrans *pTrans, SMqOffse
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||||
int32_t code = -1;
|
int32_t code = 0;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
@ -297,14 +297,14 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||||
|
|
||||||
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
|
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
|
||||||
sdbRelease(pSdb, pOffset);
|
sdbRelease(pSdb, pOffset);
|
||||||
goto END;
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
code = -1;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbRelease(pSdb, pOffset);
|
sdbRelease(pSdb, pOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = 0;
|
|
||||||
END:
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -641,6 +641,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
action.contLen = contLen;
|
action.contLen = contLen;
|
||||||
action.msgType = TDMT_VND_CREATE_STB;
|
action.msgType = TDMT_VND_CREATE_STB;
|
||||||
action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST;
|
action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST;
|
||||||
|
action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST;
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
@ -805,7 +806,7 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
mndTransSetDbName(pTrans, pDb->name, pStb->name);
|
||||||
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
||||||
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
||||||
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
||||||
|
@ -1612,7 +1613,7 @@ static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbOb
|
||||||
if (pTrans == NULL) goto _OVER;
|
if (pTrans == NULL) goto _OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
|
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
|
||||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
mndTransSetDbName(pTrans, pDb->name, pStb->name);
|
||||||
|
|
||||||
if (needRsp) {
|
if (needRsp) {
|
||||||
void *pCont = NULL;
|
void *pCont = NULL;
|
||||||
|
@ -1811,7 +1812,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p
|
||||||
if (pTrans == NULL) goto _OVER;
|
if (pTrans == NULL) goto _OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
|
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
|
||||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
mndTransSetDbName(pTrans, pDb->name, pStb->name);
|
||||||
|
|
||||||
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
|
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
|
||||||
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
|
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
|
||||||
|
|
|
@ -824,7 +824,7 @@ int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||||
int32_t code = -1;
|
int32_t code = 0;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
@ -840,12 +840,14 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||||
|
|
||||||
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
|
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
|
||||||
sdbRelease(pSdb, pSub);
|
sdbRelease(pSdb, pSub);
|
||||||
goto END;
|
sdbCancelFetch(pSdb, pIter);
|
||||||
}
|
code = -1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
sdbRelease(pSdb, pSub);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = 0;
|
|
||||||
END:
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -833,7 +833,7 @@ static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||||
int32_t code = -1;
|
int32_t code = 0;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
@ -848,11 +848,14 @@ int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) < 0) {
|
if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) < 0) {
|
||||||
goto END;
|
sdbRelease(pSdb, pTopic);
|
||||||
}
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
code = -1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
sdbRelease(pSdb, pTopic);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = 0;
|
|
||||||
END:
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -127,8 +127,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
||||||
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
|
||||||
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
|
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER)
|
||||||
|
|
||||||
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
|
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
|
||||||
|
@ -290,8 +290,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
pTrans->exec = exec;
|
pTrans->exec = exec;
|
||||||
pTrans->oper = oper;
|
pTrans->oper = oper;
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
|
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER)
|
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER)
|
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER)
|
||||||
|
@ -727,10 +727,10 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c
|
||||||
|
|
||||||
void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2) {
|
void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2) {
|
||||||
if (dbname1 != NULL) {
|
if (dbname1 != NULL) {
|
||||||
memcpy(pTrans->dbname1, dbname1, TSDB_DB_FNAME_LEN);
|
tstrncpy(pTrans->dbname1, dbname1, TSDB_TABLE_FNAME_LEN);
|
||||||
}
|
}
|
||||||
if (dbname2 != NULL) {
|
if (dbname2 != NULL) {
|
||||||
memcpy(pTrans->dbname2, dbname2, TSDB_DB_FNAME_LEN);
|
tstrncpy(pTrans->dbname2, dbname2, TSDB_TABLE_FNAME_LEN);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1289,6 +1289,19 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
} else {
|
} else {
|
||||||
pTrans->code = terrno;
|
pTrans->code = terrno;
|
||||||
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
|
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
|
||||||
|
if (pTrans->lastAction != 0) {
|
||||||
|
STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->lastAction);
|
||||||
|
if (pAction->retryCode != 0 && pAction->retryCode != pAction->errCode) {
|
||||||
|
if (pTrans->failedTimes < 6) {
|
||||||
|
mError("trans:%d, stage keep on redoAction since action:%d code:0x%x not 0x%x, failedTimes:%d", pTrans->id,
|
||||||
|
pTrans->lastAction, pTrans->code, pAction->retryCode, pTrans->failedTimes);
|
||||||
|
taosMsleep(1000);
|
||||||
|
continueExec = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pTrans->stage = TRN_STAGE_ROLLBACK;
|
pTrans->stage = TRN_STAGE_ROLLBACK;
|
||||||
mError("trans:%d, stage from redoAction to rollback since %s", pTrans->id, terrstr());
|
mError("trans:%d, stage from redoAction to rollback since %s", pTrans->id, terrstr());
|
||||||
continueExec = true;
|
continueExec = true;
|
||||||
|
|
|
@ -178,7 +178,7 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
|
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
|
||||||
// TODO: just for pass case
|
// TODO: just for pass case
|
||||||
#if 0
|
#if 0
|
||||||
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
|
terrno = TSDB_CODE_TDB_STB_ALREADY_EXIST;
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
return -1;
|
return -1;
|
||||||
#else
|
#else
|
||||||
|
@ -223,7 +223,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb
|
||||||
// check if super table exists
|
// check if super table exists
|
||||||
rc = tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pData, &nData);
|
rc = tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pData, &nData);
|
||||||
if (rc < 0 || *(tb_uid_t *)pData != pReq->suid) {
|
if (rc < 0 || *(tb_uid_t *)pData != pReq->suid) {
|
||||||
terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
|
terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue