Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TS-4243-3.0
This commit is contained in:
commit
0b8b64feb4
|
@ -19,12 +19,12 @@ int32_t mndGetTableIdx(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool
|
||||||
int32_t mndRetrieveTagIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
int32_t mndRetrieveTagIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
int32_t mndProcessDropTagIdxReq(SRpcMsg *pReq);
|
int32_t mndProcessDropTagIdxReq(SRpcMsg *pReq);
|
||||||
|
|
||||||
int32_t mndSetCreateIdxRedoLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
|
int32_t mndSetCreateIdxPrepareLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
|
||||||
int32_t mndSetCreateIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
|
int32_t mndSetCreateIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
|
||||||
int32_t mndSetDropIdxRedoLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
|
int32_t mndSetDropIdxPrepareLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
|
||||||
int32_t mndSetDropIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
|
int32_t mndSetDropIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
|
||||||
|
|
||||||
int32_t mndSetAlterIdxRedoLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
|
int32_t mndSetAlterIdxPrepareLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
|
||||||
int32_t mndSetAlterIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
|
int32_t mndSetAlterIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -41,7 +41,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw);
|
||||||
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb);
|
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb);
|
||||||
static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb);
|
static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb);
|
||||||
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew);
|
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew);
|
||||||
static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj);
|
static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
|
||||||
|
|
||||||
static int32_t mndProcessCreateDbReq(SRpcMsg *pReq);
|
static int32_t mndProcessCreateDbReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessAlterDbReq(SRpcMsg *pReq);
|
static int32_t mndProcessAlterDbReq(SRpcMsg *pReq);
|
||||||
|
@ -256,17 +256,29 @@ _OVER:
|
||||||
return pRow;
|
return pRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) {
|
static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
|
||||||
SDbObj *pNewDb = pObj;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
SSdbRow *pRow = NULL;
|
||||||
|
SDbObj *pNewDb = NULL;
|
||||||
|
int code = -1;
|
||||||
|
|
||||||
|
pRow = mndDbActionDecode(pRaw);
|
||||||
|
if (pRow == NULL) goto _OVER;
|
||||||
|
pNewDb = sdbGetRowObj(pRow);
|
||||||
|
if (pNewDb == NULL) goto _OVER;
|
||||||
|
|
||||||
SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name);
|
SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name);
|
||||||
if (pOldDb != NULL) {
|
if (pOldDb != NULL) {
|
||||||
mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name);
|
mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name);
|
||||||
sdbRelease(pMnode->pSdb, pOldDb);
|
sdbRelease(pMnode->pSdb, pOldDb);
|
||||||
return -1;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
code = 0;
|
||||||
|
_OVER:
|
||||||
|
if (pNewDb) mndDbActionDelete(pSdb, pNewDb);
|
||||||
|
taosMemoryFreeClear(pRow);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) {
|
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) {
|
||||||
|
@ -884,10 +896,10 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetAlterDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
static int32_t mndSetAlterDbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
||||||
SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
|
SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
|
||||||
if (pRedoRaw == NULL) return -1;
|
if (pRedoRaw == NULL) return -1;
|
||||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) {
|
||||||
sdbFreeRaw(pRedoRaw);
|
sdbFreeRaw(pRedoRaw);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -943,7 +955,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p
|
||||||
mndTransSetDbName(pTrans, pOld->name, NULL);
|
mndTransSetDbName(pTrans, pOld->name, NULL);
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
|
if (mndSetAlterDbPrepareLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
|
||||||
if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
|
if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
|
||||||
if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
|
if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
@ -1120,10 +1132,10 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
static int32_t mndSetDropDbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||||
SSdbRaw *pRedoRaw = mndDbActionEncode(pDb);
|
SSdbRaw *pRedoRaw = mndDbActionEncode(pDb);
|
||||||
if (pRedoRaw == NULL) return -1;
|
if (pRedoRaw == NULL) return -1;
|
||||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) return -1;
|
||||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
|
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1257,7 +1269,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
if (mndSetDropDbPrepareLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||||
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||||
/*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
/*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
||||||
/*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
/*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
||||||
|
|
|
@ -331,10 +331,10 @@ SDbObj *mndAcquireDbByIdx(SMnode *pMnode, const char *idxName) {
|
||||||
return mndAcquireDb(pMnode, db);
|
return mndAcquireDb(pMnode, db);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSetCreateIdxRedoLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
|
int32_t mndSetCreateIdxPrepareLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
|
||||||
SSdbRaw *pRedoRaw = mndIdxActionEncode(pIdx);
|
SSdbRaw *pRedoRaw = mndIdxActionEncode(pIdx);
|
||||||
if (pRedoRaw == NULL) return -1;
|
if (pRedoRaw == NULL) return -1;
|
||||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) return -1;
|
||||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
|
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -349,10 +349,10 @@ int32_t mndSetCreateIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSetAlterIdxRedoLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
|
int32_t mndSetAlterIdxPrepareLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
|
||||||
SSdbRaw *pRedoRaw = mndIdxActionEncode(pIdx);
|
SSdbRaw *pRedoRaw = mndIdxActionEncode(pIdx);
|
||||||
if (pRedoRaw == NULL) return -1;
|
if (pRedoRaw == NULL) return -1;
|
||||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) {
|
||||||
sdbFreeRaw(pRedoRaw);
|
sdbFreeRaw(pRedoRaw);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -482,10 +482,10 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSetDropIdxRedoLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
|
int32_t mndSetDropIdxPrepareLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
|
||||||
SSdbRaw *pRedoRaw = mndIdxActionEncode(pIdx);
|
SSdbRaw *pRedoRaw = mndIdxActionEncode(pIdx);
|
||||||
if (pRedoRaw == NULL) return -1;
|
if (pRedoRaw == NULL) return -1;
|
||||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) return -1;
|
||||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
|
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -652,7 +652,7 @@ int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pSt
|
||||||
|
|
||||||
mndTransSetSerial(pTrans);
|
mndTransSetSerial(pTrans);
|
||||||
|
|
||||||
if (mndSetCreateIdxRedoLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
|
if (mndSetCreateIdxPrepareLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
|
||||||
if (mndSetCreateIdxCommitLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
|
if (mndSetCreateIdxCommitLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
|
||||||
|
|
||||||
if (mndSetUpdateIdxStbCommitLogs(pMnode, pTrans, pStb, &newStb, pIdx->colName, 1) != 0) goto _OVER;
|
if (mndSetUpdateIdxStbCommitLogs(pMnode, pTrans, pStb, &newStb, pIdx->colName, 1) != 0) goto _OVER;
|
||||||
|
@ -771,7 +771,7 @@ static int32_t mndDropIdx(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SIdxObj *p
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
mndTransSetSerial(pTrans);
|
mndTransSetSerial(pTrans);
|
||||||
if (mndSetDropIdxRedoLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
|
if (mndSetDropIdxPrepareLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
|
||||||
if (mndSetDropIdxCommitLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
|
if (mndSetDropIdxCommitLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
|
||||||
|
|
||||||
if (mndSetUpdateIdxStbCommitLogs(pMnode, pTrans, pStb, &newObj, pIdx->colName, 0) != 0) goto _OVER;
|
if (mndSetUpdateIdxStbCommitLogs(pMnode, pTrans, pStb, &newObj, pIdx->colName, 0) != 0) goto _OVER;
|
||||||
|
|
|
@ -257,6 +257,7 @@ static int32_t mndCreateQnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-qnode");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-qnode");
|
||||||
if (pTrans == NULL) goto _OVER;
|
if (pTrans == NULL) goto _OVER;
|
||||||
|
mndTransSetSerial(pTrans);
|
||||||
|
|
||||||
mInfo("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId);
|
mInfo("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId);
|
||||||
if (mndSetCreateQnodeRedoLogs(pTrans, &qnodeObj) != 0) goto _OVER;
|
if (mndSetCreateQnodeRedoLogs(pTrans, &qnodeObj) != 0) goto _OVER;
|
||||||
|
@ -380,6 +381,7 @@ static int32_t mndDropQnode(SMnode *pMnode, SRpcMsg *pReq, SQnodeObj *pObj) {
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-qnode");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-qnode");
|
||||||
if (pTrans == NULL) goto _OVER;
|
if (pTrans == NULL) goto _OVER;
|
||||||
|
mndTransSetSerial(pTrans);
|
||||||
|
|
||||||
mInfo("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id);
|
mInfo("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id);
|
||||||
if (mndSetDropQnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER;
|
if (mndSetDropQnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER;
|
||||||
|
|
|
@ -257,6 +257,7 @@ static int32_t mndCreateSnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-snode");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-snode");
|
||||||
if (pTrans == NULL) goto _OVER;
|
if (pTrans == NULL) goto _OVER;
|
||||||
|
mndTransSetSerial(pTrans);
|
||||||
|
|
||||||
mInfo("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
|
mInfo("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
|
||||||
|
|
||||||
|
@ -383,6 +384,7 @@ static int32_t mndDropSnode(SMnode *pMnode, SRpcMsg *pReq, SSnodeObj *pObj) {
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-snode");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-snode");
|
||||||
if (pTrans == NULL) goto _OVER;
|
if (pTrans == NULL) goto _OVER;
|
||||||
|
mndTransSetSerial(pTrans);
|
||||||
|
|
||||||
mInfo("trans:%d, used to drop snode:%d", pTrans->id, pObj->id);
|
mInfo("trans:%d, used to drop snode:%d", pTrans->id, pObj->id);
|
||||||
if (mndSetDropSnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER;
|
if (mndSetDropSnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER;
|
||||||
|
|
|
@ -617,10 +617,10 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetCreateStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
static int32_t mndSetCreateStbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||||
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
|
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
|
||||||
if (pRedoRaw == NULL) return -1;
|
if (pRedoRaw == NULL) return -1;
|
||||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) {
|
||||||
sdbFreeRaw(pRedoRaw);
|
sdbFreeRaw(pRedoRaw);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -629,18 +629,6 @@ static int32_t mndSetCreateStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetCreateStbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
|
||||||
SSdbRaw *pUndoRaw = mndStbActionEncode(pStb);
|
|
||||||
if (pUndoRaw == NULL) return -1;
|
|
||||||
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
|
|
||||||
sdbFreeRaw(pUndoRaw);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||||
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
|
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
|
||||||
if (pCommitRaw == NULL) return -1;
|
if (pCommitRaw == NULL) return -1;
|
||||||
|
@ -913,8 +901,6 @@ _OVER:
|
||||||
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||||
mndTransSetDbName(pTrans, pDb->name, pStb->name);
|
mndTransSetDbName(pTrans, pDb->name, pStb->name);
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) return -1;
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) return -1;
|
||||||
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
|
||||||
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
|
||||||
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
||||||
if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
||||||
if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
||||||
|
@ -1746,10 +1732,10 @@ static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbO
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetAlterStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
static int32_t mndSetAlterStbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||||
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
|
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
|
||||||
if (pRedoRaw == NULL) return -1;
|
if (pRedoRaw == NULL) return -1;
|
||||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) {
|
||||||
sdbFreeRaw(pRedoRaw);
|
sdbFreeRaw(pRedoRaw);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -2155,7 +2141,7 @@ static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbOb
|
||||||
mndTransSetRpcRsp(pTrans, pCont, contLen);
|
mndTransSetRpcRsp(pTrans, pCont, contLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
if (mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||||
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||||
if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen) != 0) goto _OVER;
|
if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen) != 0) goto _OVER;
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
@ -2192,11 +2178,11 @@ static int32_t mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDbO
|
||||||
if (mndGetIdxsByTagName(pMnode, pStb, pField0->name, &idxObj) == 0) {
|
if (mndGetIdxsByTagName(pMnode, pStb, pField0->name, &idxObj) == 0) {
|
||||||
exist = true;
|
exist = true;
|
||||||
}
|
}
|
||||||
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
if (mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||||
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||||
|
|
||||||
if (exist == true) {
|
if (exist == true) {
|
||||||
if (mndSetDropIdxRedoLogs(pMnode, pTrans, &idxObj) != 0) goto _OVER;
|
if (mndSetDropIdxPrepareLogs(pMnode, pTrans, &idxObj) != 0) goto _OVER;
|
||||||
if (mndSetDropIdxCommitLogs(pMnode, pTrans, &idxObj) != 0) goto _OVER;
|
if (mndSetDropIdxCommitLogs(pMnode, pTrans, &idxObj) != 0) goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2215,13 +2201,13 @@ static int32_t mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDbO
|
||||||
exist = true;
|
exist = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
if (mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||||
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||||
|
|
||||||
if (exist == true) {
|
if (exist == true) {
|
||||||
memcpy(idxObj.colName, nTagName, strlen(nTagName));
|
memcpy(idxObj.colName, nTagName, strlen(nTagName));
|
||||||
idxObj.colName[strlen(nTagName)] = 0;
|
idxObj.colName[strlen(nTagName)] = 0;
|
||||||
if (mndSetAlterIdxRedoLogs(pMnode, pTrans, &idxObj) != 0) goto _OVER;
|
if (mndSetAlterIdxPrepareLogs(pMnode, pTrans, &idxObj) != 0) goto _OVER;
|
||||||
if (mndSetAlterIdxCommitLogs(pMnode, pTrans, &idxObj) != 0) goto _OVER;
|
if (mndSetAlterIdxCommitLogs(pMnode, pTrans, &idxObj) != 0) goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2354,10 +2340,10 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetDropStbRedoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
|
static int32_t mndSetDropStbPrepareLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
|
||||||
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
|
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
|
||||||
if (pRedoRaw == NULL) return -1;
|
if (pRedoRaw == NULL) return -1;
|
||||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) {
|
||||||
sdbFreeRaw(pRedoRaw);
|
sdbFreeRaw(pRedoRaw);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -2427,7 +2413,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p
|
||||||
mndTransSetDbName(pTrans, pDb->name, pStb->name);
|
mndTransSetDbName(pTrans, pDb->name, pStb->name);
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
|
if (mndSetDropStbPrepareLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
|
||||||
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
|
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
|
||||||
if (mndSetDropStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
if (mndSetDropStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||||
if (mndDropIdxsByStb(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
if (mndDropIdxsByStb(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||||
|
@ -3547,7 +3533,7 @@ static int32_t mndCheckIndexReq(SCreateTagIndexReq *pReq) {
|
||||||
mndTransSetDbName(pTrans, pDb->name, pStb->name);
|
mndTransSetDbName(pTrans, pDb->name, pStb->name);
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
if (mndSetAlterStbPrepareLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||||
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||||
if (mndSetAlterStbRedoActions2(pMnode, pTrans, pDb, pStb, sql, len) != 0) goto _OVER;
|
if (mndSetAlterStbRedoActions2(pMnode, pTrans, pDb, pStb, sql, len) != 0) goto _OVER;
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
|
@ -77,29 +77,16 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
|
static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
|
||||||
SSdbRaw *pRaw = pAction->pRaw;
|
SSdbRaw *pRaw = pAction->pRaw;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SSdbRow *pRow = NULL;
|
int code = 0;
|
||||||
void *pObj = NULL;
|
|
||||||
int code = -1;
|
|
||||||
|
|
||||||
if (pRaw->status != SDB_STATUS_CREATING) goto _OUT;
|
if (pRaw->status != SDB_STATUS_CREATING) goto _OUT;
|
||||||
|
|
||||||
pRow = (pSdb->decodeFps[pRaw->type])(pRaw);
|
|
||||||
if (pRow == NULL) goto _OUT;
|
|
||||||
pObj = sdbGetRowObj(pRow);
|
|
||||||
if (pObj == NULL) goto _OUT;
|
|
||||||
|
|
||||||
SdbValidateFp validateFp = pSdb->validateFps[pRaw->type];
|
SdbValidateFp validateFp = pSdb->validateFps[pRaw->type];
|
||||||
code = 0;
|
|
||||||
if (validateFp) {
|
if (validateFp) {
|
||||||
code = validateFp(pMnode, pTrans, pObj);
|
code = validateFp(pMnode, pTrans, pRaw);
|
||||||
}
|
}
|
||||||
|
|
||||||
_OUT:
|
_OUT:
|
||||||
if (pRow) {
|
|
||||||
SdbDeleteFp deleteFp = pSdb->deleteFps[pRaw->type];
|
|
||||||
if (deleteFp) (*deleteFp)(pSdb, pRow->pObj, false);
|
|
||||||
taosMemoryFreeClear(pRow);
|
|
||||||
}
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1419,7 +1419,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool
|
||||||
pTrans->stage = TRN_STAGE_COMMIT;
|
pTrans->stage = TRN_STAGE_COMMIT;
|
||||||
mInfo("trans:%d, stage from redoAction to commit", pTrans->id);
|
mInfo("trans:%d, stage from redoAction to commit", pTrans->id);
|
||||||
continueExec = true;
|
continueExec = true;
|
||||||
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS || code == TSDB_CODE_MND_TRANS_CTX_SWITCH) {
|
||||||
mInfo("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code));
|
mInfo("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code));
|
||||||
continueExec = false;
|
continueExec = false;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -31,10 +31,10 @@
|
||||||
#define VGROUP_VER_NUMBER 1
|
#define VGROUP_VER_NUMBER 1
|
||||||
#define VGROUP_RESERVE_SIZE 64
|
#define VGROUP_RESERVE_SIZE 64
|
||||||
|
|
||||||
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
|
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
|
||||||
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
|
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
|
||||||
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
|
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
|
||||||
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj);
|
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
|
||||||
|
|
||||||
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
|
||||||
|
@ -181,15 +181,28 @@ _OVER:
|
||||||
return pRow;
|
return pRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) {
|
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
|
||||||
SVgObj *pVgroup = pObj;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
SSdbRow *pRow = NULL;
|
||||||
|
SVgObj *pVgroup = NULL;
|
||||||
|
int code = -1;
|
||||||
|
|
||||||
|
pRow = mndVgroupActionDecode(pRaw);
|
||||||
|
if (pRow == NULL) goto _OVER;
|
||||||
|
pVgroup = sdbGetRowObj(pRow);
|
||||||
|
if (pVgroup == NULL) goto _OVER;
|
||||||
|
|
||||||
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
||||||
if (maxVgId > pVgroup->vgId) {
|
if (maxVgId > pVgroup->vgId) {
|
||||||
mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
|
mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
|
||||||
return -1;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
return 0;
|
|
||||||
|
code = 0;
|
||||||
|
_OVER:
|
||||||
|
if (pVgroup) mndVgroupActionDelete(pSdb, pVgroup);
|
||||||
|
taosMemoryFreeClear(pRow);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
|
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
|
||||||
|
|
|
@ -106,7 +106,7 @@ typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj);
|
||||||
typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj);
|
typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj);
|
||||||
typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc);
|
typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc);
|
||||||
typedef int32_t (*SdbDeployFp)(SMnode *pMnode);
|
typedef int32_t (*SdbDeployFp)(SMnode *pMnode);
|
||||||
typedef int32_t (*SdbValidateFp)(SMnode *pMnode, void *pTrans, void *pObj);
|
typedef int32_t (*SdbValidateFp)(SMnode *pMnode, void *pTrans, SSdbRaw *pRaw);
|
||||||
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
|
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
|
||||||
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
|
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
|
||||||
typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3);
|
typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3);
|
||||||
|
|
Loading…
Reference in New Issue