parallel exec trans
This commit is contained in:
parent
ebd862ded4
commit
536627edbf
|
@ -247,6 +247,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
|
||||
#define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1)
|
||||
#define TSDB_CODE_MND_TRANS_INVALID_STAGE TAOS_DEF_ERROR_CODE(0, 0x03D2)
|
||||
#define TSDB_CODE_MND_TRANS_CANT_PARALLEL TAOS_DEF_ERROR_CODE(0, 0x03D4)
|
||||
|
||||
// mnode-mq
|
||||
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0)
|
||||
|
|
|
@ -132,12 +132,14 @@ typedef enum {
|
|||
TRN_TYPE_CREATE_DB = 3001,
|
||||
TRN_TYPE_ALTER_DB = 3002,
|
||||
TRN_TYPE_DROP_DB = 3003,
|
||||
TRN_TYPE_CREATE_STB = 3004,
|
||||
TRN_TYPE_ALTER_STB = 3005,
|
||||
TRN_TYPE_DROP_STB = 3006,
|
||||
TRN_TYPE_SPLIT_VGROUP = 3007,
|
||||
TRN_TYPE_MERGE_VGROUP = 3018,
|
||||
TRN_TYPE_SPLIT_VGROUP = 3004,
|
||||
TRN_TYPE_MERGE_VGROUP = 3015,
|
||||
TRN_TYPE_DB_SCOPE_END,
|
||||
TRN_TYPE_STB_SCOPE = 4000,
|
||||
TRN_TYPE_CREATE_STB = 4001,
|
||||
TRN_TYPE_ALTER_STB = 4002,
|
||||
TRN_TYPE_DROP_STB = 4003,
|
||||
TRN_TYPE_STB_SCOPE_END,
|
||||
} ETrnType;
|
||||
|
||||
typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy;
|
||||
|
|
|
@ -546,7 +546,89 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static bool mndIsBasicTrans(STrans *pTrans) {
|
||||
return pTrans->stage > TRN_TYPE_BASIC_SCOPE && pTrans->stage < TRN_TYPE_BASIC_SCOPE_END;
|
||||
}
|
||||
|
||||
static bool mndIsGlobalTrans(STrans *pTrans) {
|
||||
return pTrans->stage > TRN_TYPE_GLOBAL_SCOPE && pTrans->stage < TRN_TYPE_GLOBAL_SCOPE_END;
|
||||
}
|
||||
|
||||
static bool mndIsDbTrans(STrans *pTrans) {
|
||||
return pTrans->stage > TRN_TYPE_DB_SCOPE && pTrans->stage < TRN_TYPE_DB_SCOPE_END;
|
||||
}
|
||||
|
||||
static bool mndIsStbTrans(STrans *pTrans) {
|
||||
return pTrans->stage > TRN_TYPE_STB_SCOPE && pTrans->stage < TRN_TYPE_STB_SCOPE_END;
|
||||
}
|
||||
|
||||
static int32_t mndCheckTransCanBeStartedInParallel(SMnode *pMnode, STrans *pNewTrans) {
|
||||
if (mndIsBasicTrans(pNewTrans)) return 0;
|
||||
|
||||
STrans *pTrans = NULL;
|
||||
void *pIter = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (mndIsGlobalTrans(pNewTrans)) {
|
||||
if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) {
|
||||
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
|
||||
code = -1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (mndIsDbTrans(pNewTrans)) {
|
||||
if (mndIsBasicTrans(pTrans)) continue;
|
||||
if (mndIsGlobalTrans(pTrans)) {
|
||||
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
|
||||
code = -1;
|
||||
break;
|
||||
}
|
||||
if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) {
|
||||
if (pNewTrans->dbUid == pTrans->dbUid) {
|
||||
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
|
||||
code = -1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (mndIsStbTrans(pNewTrans)) {
|
||||
if (mndIsBasicTrans(pTrans)) continue;
|
||||
if (mndIsGlobalTrans(pTrans)) {
|
||||
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
|
||||
code = -1;
|
||||
break;
|
||||
}
|
||||
if (mndIsDbTrans(pTrans)) {
|
||||
if (pNewTrans->dbUid == pTrans->dbUid) {
|
||||
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
|
||||
code = -1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (mndIsStbTrans(pTrans)) continue;
|
||||
}
|
||||
|
||||
sdbRelease(pMnode->pSdb, pTrans);
|
||||
}
|
||||
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
sdbRelease(pMnode->pSdb, pTrans);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
||||
if (mndCheckTransCanBeStartedInParallel(pMnode, pTrans) != 0) {
|
||||
terrno = TSDB_CODE_MND_TRANS_CANT_PARALLEL;
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
mDebug("trans:%d, prepare transaction", pTrans->id);
|
||||
if (mndTransSync(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
|
|
|
@ -257,6 +257,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retriev
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CANT_PARALLEL, "Invalid stage to kill")
|
||||
|
||||
// mnode-topic
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with STable not supported yet")
|
||||
|
|
Loading…
Reference in New Issue