|
|
|
@ -127,8 +127,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
|
|
|
|
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
|
|
|
|
|
SDB_SET_INT16(pRaw, dataPos, pTrans->originRpcType, _OVER)
|
|
|
|
|
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
|
|
|
|
|
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_TABLE_FNAME_LEN, _OVER)
|
|
|
|
|
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_TABLE_FNAME_LEN, _OVER)
|
|
|
|
|
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER)
|
|
|
|
|
SDB_SET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER)
|
|
|
|
|
SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER)
|
|
|
|
|
|
|
|
|
|
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
|
|
|
|
@ -289,8 +289,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
|
|
|
|
pTrans->oper = oper;
|
|
|
|
|
SDB_GET_INT16(pRaw, dataPos, &pTrans->originRpcType, _OVER)
|
|
|
|
|
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
|
|
|
|
|
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_TABLE_FNAME_LEN, _OVER)
|
|
|
|
|
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_TABLE_FNAME_LEN, _OVER)
|
|
|
|
|
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER)
|
|
|
|
|
SDB_GET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER)
|
|
|
|
|
SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER)
|
|
|
|
|
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER)
|
|
|
|
|
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER)
|
|
|
|
@ -706,7 +706,7 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c
|
|
|
|
|
if (pIter == NULL) break;
|
|
|
|
|
|
|
|
|
|
if (pTrans->oper == oper) {
|
|
|
|
|
if (strcasecmp(dbname, pTrans->dbname1) == 0) {
|
|
|
|
|
if (strcasecmp(dbname, pTrans->dbname) == 0) {
|
|
|
|
|
mInfo("trans:%d, db:%s oper:%d matched with input", pTrans->id, dbname, oper);
|
|
|
|
|
if (pTrans->pRpcArray == NULL) {
|
|
|
|
|
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
|
|
|
|
@ -725,12 +725,12 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2) {
|
|
|
|
|
if (dbname1 != NULL) {
|
|
|
|
|
tstrncpy(pTrans->dbname1, dbname1, TSDB_TABLE_FNAME_LEN);
|
|
|
|
|
void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname) {
|
|
|
|
|
if (dbname != NULL) {
|
|
|
|
|
tstrncpy(pTrans->dbname, dbname, TSDB_TABLE_FNAME_LEN);
|
|
|
|
|
}
|
|
|
|
|
if (dbname2 != NULL) {
|
|
|
|
|
tstrncpy(pTrans->dbname2, dbname2, TSDB_TABLE_FNAME_LEN);
|
|
|
|
|
if (stbname != NULL) {
|
|
|
|
|
tstrncpy(pTrans->stbname, stbname, TSDB_TABLE_FNAME_LEN);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -759,9 +759,9 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool mndCheckDbConflict(const char *db, STrans *pTrans) {
|
|
|
|
|
if (db[0] == 0) return false;
|
|
|
|
|
if (strcasecmp(db, pTrans->dbname1) == 0 || strcasecmp(db, pTrans->dbname2) == 0) return true;
|
|
|
|
|
static bool mndCheckDbConflict(const char *conflict, STrans *pTrans) {
|
|
|
|
|
if (conflict[0] == 0) return false;
|
|
|
|
|
if (strcasecmp(conflict, pTrans->dbname) == 0 || strcasecmp(conflict, pTrans->stbname) == 0) return true;
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -780,28 +780,28 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
|
|
|
|
|
if (pNew->conflict == TRN_CONFLICT_DB) {
|
|
|
|
|
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
|
|
|
|
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
|
|
|
|
|
if (mndCheckDbConflict(pNew->dbname1, pTrans)) conflict = true;
|
|
|
|
|
if (mndCheckDbConflict(pNew->dbname2, pTrans)) conflict = true;
|
|
|
|
|
if (mndCheckDbConflict(pNew->dbname, pTrans)) conflict = true;
|
|
|
|
|
if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (pNew->conflict == TRN_CONFLICT_DB_INSIDE) {
|
|
|
|
|
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
|
|
|
|
if (pTrans->conflict == TRN_CONFLICT_DB) {
|
|
|
|
|
if (mndCheckDbConflict(pNew->dbname1, pTrans)) conflict = true;
|
|
|
|
|
if (mndCheckDbConflict(pNew->dbname2, pTrans)) conflict = true;
|
|
|
|
|
if (mndCheckDbConflict(pNew->dbname, pTrans)) conflict = true;
|
|
|
|
|
if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true;
|
|
|
|
|
}
|
|
|
|
|
if (pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
|
|
|
|
|
if (mndCheckDbConflict(pNew->dbname2, pTrans)) conflict = true; // for stb
|
|
|
|
|
if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true; // for stb
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (conflict) {
|
|
|
|
|
mError("trans:%d, db1:%s db2:%s type:%d, can't execute since conflict with trans:%d db1:%s db2:%s type:%d",
|
|
|
|
|
pNew->id, pNew->dbname1, pNew->dbname2, pNew->conflict, pTrans->id, pTrans->dbname1, pTrans->dbname2,
|
|
|
|
|
mError("trans:%d, db:%s stb:%s type:%d, can't execute since conflict with trans:%d db:%s stb:%s type:%d",
|
|
|
|
|
pNew->id, pNew->dbname, pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname,
|
|
|
|
|
pTrans->conflict);
|
|
|
|
|
} else {
|
|
|
|
|
mDebug("trans:%d, db1:%s db2:%s type:%d, not conflict with trans:%d db1:%s db2:%s type:%d", pNew->id,
|
|
|
|
|
pNew->dbname1, pNew->dbname2, pNew->conflict, pTrans->id, pTrans->dbname1, pTrans->dbname2,
|
|
|
|
|
mDebug("trans:%d, db:%s stb:%s type:%d, not conflict with trans:%d db:%s stb:%s type:%d", pNew->id,
|
|
|
|
|
pNew->dbname, pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname,
|
|
|
|
|
pTrans->conflict);
|
|
|
|
|
}
|
|
|
|
|
sdbRelease(pMnode->pSdb, pTrans);
|
|
|
|
@ -812,7 +812,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
|
|
|
|
|
|
|
|
|
|
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
|
|
|
|
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
|
|
|
|
|
if (strlen(pTrans->dbname1) == 0 && strlen(pTrans->dbname2) == 0) {
|
|
|
|
|
if (strlen(pTrans->dbname) == 0 && strlen(pTrans->stbname) == 0) {
|
|
|
|
|
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
|
|
|
|
|
mError("trans:%d, failed to prepare conflict db not set", pTrans->id);
|
|
|
|
|
return -1;
|
|
|
|
@ -913,12 +913,12 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
|
|
|
|
|
|
|
|
|
|
if (pTrans->originRpcType == TDMT_MND_CREATE_DB) {
|
|
|
|
|
mInfo("trans:%d, origin msgtype:%s", pTrans->id, TMSG_INFO(pTrans->originRpcType));
|
|
|
|
|
SDbObj *pDb = mndAcquireDb(pMnode, pTrans->dbname1);
|
|
|
|
|
SDbObj *pDb = mndAcquireDb(pMnode, pTrans->dbname);
|
|
|
|
|
if (pDb != NULL) {
|
|
|
|
|
for (int32_t j = 0; j < 12; j++) {
|
|
|
|
|
bool ready = mndIsDbReady(pMnode, pDb);
|
|
|
|
|
if (!ready) {
|
|
|
|
|
mInfo("trans:%d, db:%s not ready yet, wait %d times", pTrans->id, pTrans->dbname1, j);
|
|
|
|
|
mInfo("trans:%d, db:%s not ready yet, wait %d times", pTrans->id, pTrans->dbname, j);
|
|
|
|
|
taosMsleep(1000);
|
|
|
|
|
} else {
|
|
|
|
|
break;
|
|
|
|
@ -929,7 +929,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
|
|
|
|
|
} else if (pTrans->originRpcType == TDMT_MND_CREATE_STB) {
|
|
|
|
|
void *pCont = NULL;
|
|
|
|
|
int32_t contLen = 0;
|
|
|
|
|
if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname1, pTrans->dbname2, &pCont, &contLen) != 0) {
|
|
|
|
|
if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname, pTrans->stbname, &pCont, &contLen) != 0) {
|
|
|
|
|
mndTransSetRpcRsp(pTrans, pCont, contLen);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1599,15 +1599,15 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|
|
|
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
|
|
|
colDataAppend(pColInfo, numOfRows, (const char *)stage, false);
|
|
|
|
|
|
|
|
|
|
char dbname1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
|
|
|
|
STR_WITH_MAXSIZE_TO_VARSTR(dbname1, mndGetDbStr(pTrans->dbname1), pShow->pMeta->pSchemas[cols].bytes);
|
|
|
|
|
char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
|
|
|
|
STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(pTrans->dbname), pShow->pMeta->pSchemas[cols].bytes);
|
|
|
|
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
|
|
|
colDataAppend(pColInfo, numOfRows, (const char *)dbname1, false);
|
|
|
|
|
colDataAppend(pColInfo, numOfRows, (const char *)dbname, false);
|
|
|
|
|
|
|
|
|
|
char dbname2[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
|
|
|
|
STR_WITH_MAXSIZE_TO_VARSTR(dbname2, mndGetDbStr(pTrans->dbname2), pShow->pMeta->pSchemas[cols].bytes);
|
|
|
|
|
char stbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
|
|
|
|
STR_WITH_MAXSIZE_TO_VARSTR(stbname, mndGetDbStr(pTrans->stbname), pShow->pMeta->pSchemas[cols].bytes);
|
|
|
|
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
|
|
|
colDataAppend(pColInfo, numOfRows, (const char *)dbname2, false);
|
|
|
|
|
colDataAppend(pColInfo, numOfRows, (const char *)stbname, false);
|
|
|
|
|
|
|
|
|
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
|
|
|
colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->failedTimes, false);
|
|
|
|
|