diff --git a/source/common/src/systable.c b/source/common/src/systable.c index bcc248d122..1552850e76 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -220,7 +220,8 @@ static const SSysDbTableSchema transSchema[] = { {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "db1", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "db2", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "last_action_info", diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 4daeeaa9bf..8963f6be39 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -124,7 +124,8 @@ typedef struct { int32_t lastErrorNo; tmsg_t lastMsgType; SEpSet lastEpset; - char dbname[TSDB_DB_FNAME_LEN]; + char dbname1[TSDB_DB_FNAME_LEN]; + char dbname2[TSDB_DB_FNAME_LEN]; int32_t startFunc; int32_t stopFunc; int32_t paramLen; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 0175e29a77..bc2d5c82b1 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -68,7 +68,7 @@ int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen); -void mndTransSetDbName(STrans *pTrans, const char *dbname); +void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2); void mndTransSetSerial(STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 2eeff9cb33..38d6bb2822 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -477,7 +477,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db); - mndTransSetDbName(pTrans, dbObj.name); + mndTransSetDbName(pTrans, dbObj.name, NULL); if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; @@ -668,7 +668,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p mDebug("trans:%d, used to alter db:%s", pTrans->id, pOld->name); int32_t code = -1; - mndTransSetDbName(pTrans, pOld->name); + mndTransSetDbName(pTrans, pOld->name, NULL); if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; @@ -921,7 +921,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) { if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name); - mndTransSetDbName(pTrans, pDb->name); + mndTransSetDbName(pTrans, pDb->name, NULL); if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 023a28ce35..b6c387a9c8 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -609,7 +609,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq); if (pTrans == NULL) goto _OVER; - mndTransSetDbName(pTrans, pDb->name); + mndTransSetDbName(pTrans, pDb->name, NULL); mndTransSetSerial(pTrans); mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name); @@ -852,7 +852,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name); - mndTransSetDbName(pTrans, pDb->name); + mndTransSetDbName(pTrans, pDb->name, NULL); if (mndSetDropSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER; if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index b8b22cee85..345a5215c2 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -758,7 +758,7 @@ _OVER: } int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { - mndTransSetDbName(pTrans, pDb->name); + mndTransSetDbName(pTrans, pDb->name, NULL); if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; @@ -1396,7 +1396,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name); - mndTransSetDbName(pTrans, pDb->name); + mndTransSetDbName(pTrans, pDb->name, NULL); if (needRsp) { void *pCont = NULL; @@ -1537,7 +1537,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name); - mndTransSetDbName(pTrans, pDb->name); + mndTransSetDbName(pTrans, pDb->name, NULL); if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8e82946d68..d432256f15 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -613,9 +613,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - mndTransSetDbName(pTrans, createStreamReq.sourceDB); + mndTransSetDbName(pTrans, createStreamReq.sourceDB, NULL); // TODO - /*mndTransSetDbName(pTrans, streamObj.targetDb);*/ + /*mndTransSetDbName(pTrans, streamObj.targetDb, NULL);*/ mDebug("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); // build stream obj from request diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 65a5d22bec..d2b7a61e83 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -403,7 +403,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg); - mndTransSetDbName(pTrans, pOutput->pSub->dbName); + mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL); if (pTrans == NULL) return -1; // make txn: diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 8afb7ab354..9632c04f4c 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -566,7 +566,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { #endif STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq); - mndTransSetDbName(pTrans, pTopic->db); + mndTransSetDbName(pTrans, pTopic->db, NULL); if (pTrans == NULL) { mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index b3a2888535..61ac732f2a 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -122,7 +122,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_INT8(pRaw, dataPos, pTrans->conflict, _OVER) SDB_SET_INT8(pRaw, dataPos, pTrans->exec, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) - SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER) SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER) int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); @@ -270,7 +271,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { pTrans->conflict = conflict; pTrans->exec = exec; SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) - SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) + SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER) + SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER) SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER) SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER) @@ -649,7 +651,14 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void * pTrans->paramLen = paramLen; } -void mndTransSetDbName(STrans *pTrans, const char *dbname) { memcpy(pTrans->dbname, dbname, TSDB_DB_FNAME_LEN); } +void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2) { + if (dbname1 != NULL) { + memcpy(pTrans->dbname1, dbname1, TSDB_DB_FNAME_LEN); + } + if (dbname2 != NULL) { + memcpy(pTrans->dbname2, dbname2, TSDB_DB_FNAME_LEN); + } +} void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; } @@ -688,14 +697,24 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { if (pNew->conflict == TRN_CONFLICT_GLOBAL) conflict = true; if (pNew->conflict == TRN_CONFLICT_DB) { if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; - if (pTrans->conflict == TRN_CONFLICT_DB && strcmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; - if (pTrans->conflict == TRN_CONFLICT_DB_INSIDE && strcmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; + if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) { + if (strcmp(pNew->dbname1, pTrans->dbname1) == 0 || strcmp(pNew->dbname1, pTrans->dbname2) == 0 || + strcmp(pNew->dbname2, pTrans->dbname1) == 0 || strcmp(pNew->dbname2, pTrans->dbname2) == 0) { + conflict = true; + } + } } if (pNew->conflict == TRN_CONFLICT_DB_INSIDE) { if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; - if (pTrans->conflict == TRN_CONFLICT_DB && strcmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; + if (pTrans->conflict == TRN_CONFLICT_DB) { + if (strcmp(pNew->dbname1, pTrans->dbname1) == 0 || strcmp(pNew->dbname1, pTrans->dbname2) == 0 || + strcmp(pNew->dbname2, pTrans->dbname1) == 0 || strcmp(pNew->dbname2, pTrans->dbname2) == 0) { + conflict = true; + } + } } - mError("trans:%d, can't execute since conflict with trans:%d, db:%s", pNew->id, pTrans->id, pTrans->dbname); + mError("trans:%d, can't execute since conflict with trans:%d, db1:%s db2:%s", pNew->id, pTrans->id, pTrans->dbname1, + pTrans->dbname2); sdbRelease(pMnode->pSdb, pTrans); } @@ -704,7 +723,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) { - if (strlen(pTrans->dbname) == 0) { + if (strlen(pTrans->dbname1) == 0 && strlen(pTrans->dbname2) == 0) { terrno = TSDB_CODE_MND_TRANS_CONFLICT; mError("trans:%d, failed to prepare conflict db not set", pTrans->id); return -1; @@ -1449,10 +1468,15 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)stage, false); - char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(pTrans->dbname), pShow->pMeta->pSchemas[cols].bytes); + char dbname1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(dbname1, mndGetDbStr(pTrans->dbname1), pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)dbname, false); + colDataAppend(pColInfo, numOfRows, (const char *)dbname1, false); + + char dbname2[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(dbname2, mndGetDbStr(pTrans->dbname2), pShow->pMeta->pSchemas[cols].bytes); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)dbname2, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->failedTimes, false); diff --git a/source/dnode/mnode/impl/test/trans/trans2.cpp b/source/dnode/mnode/impl/test/trans/trans2.cpp index 022c82c73d..aee8aa2748 100644 --- a/source/dnode/mnode/impl/test/trans/trans2.cpp +++ b/source/dnode/mnode/impl/test/trans/trans2.cpp @@ -128,7 +128,7 @@ class MndTestTrans2 : public ::testing::Test { mndTransSetCb(pTrans, TRANS_START_FUNC_TEST, TRANS_STOP_FUNC_TEST, param, strlen(param) + 1); if (pDb != NULL) { - mndTransSetDbName(pTrans, pDb->name); + mndTransSetDbName(pTrans, pDb->name, NULL); } int32_t code = mndTransPrepare(pMnode, pTrans); @@ -201,7 +201,7 @@ class MndTestTrans2 : public ::testing::Test { } if (pDb != NULL) { - mndTransSetDbName(pTrans, pDb->name); + mndTransSetDbName(pTrans, pDb->name, NULL); } int32_t code = mndTransPrepare(pMnode, pTrans); diff --git a/tests/test/c/sdbDump.c b/tests/test/c/sdbDump.c index e5986cf4dd..0f0f7e8d10 100644 --- a/tests/test/c/sdbDump.c +++ b/tests/test/c/sdbDump.c @@ -283,7 +283,8 @@ void dumpTrans(SSdb *pSdb, SJson *json) { tjsonAddIntegerToObject(item, "conflict", pObj->conflict); tjsonAddIntegerToObject(item, "exec", pObj->exec); tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime)); - tjsonAddStringToObject(item, "dbname", pObj->dbname); + tjsonAddStringToObject(item, "dbname1", pObj->dbname1); + tjsonAddStringToObject(item, "dbname2", pObj->dbname2); tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitActions)); tjsonAddIntegerToObject(item, "redoActionNum", taosArrayGetSize(pObj->redoActions)); tjsonAddIntegerToObject(item, "undoActionNum", taosArrayGetSize(pObj->undoActions));