fix/TD-30989
This commit is contained in:
parent
651077866e
commit
0b71f18e3c
|
@ -93,6 +93,7 @@ static int32_t mndFindSuperTableTagId(const SStbObj *pStb, const char *tagName,
|
|||
}
|
||||
|
||||
int mndSetCreateIdxRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SIdxObj *pIdx) {
|
||||
int32_t code = 0;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
void *pIter = NULL;
|
||||
|
@ -110,18 +111,20 @@ int mndSetCreateIdxRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStb
|
|||
if (pReq == NULL) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
return -1;
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
action.pCont = pReq;
|
||||
action.contLen = contLen;
|
||||
action.msgType = TDMT_VND_CREATE_INDEX;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
@ -164,6 +167,7 @@ _err:
|
|||
return NULL;
|
||||
}
|
||||
int mndSetDropIdxRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SIdxObj *pIdx) {
|
||||
int32_t code = 0;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
void *pIter = NULL;
|
||||
|
@ -182,6 +186,8 @@ int mndSetDropIdxRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbOb
|
|||
if (pReq == NULL) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
return -1;
|
||||
}
|
||||
STransAction action = {0};
|
||||
|
@ -189,7 +195,7 @@ int mndSetDropIdxRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbOb
|
|||
action.pCont = pReq;
|
||||
action.contLen = len;
|
||||
action.msgType = TDMT_VND_DROP_INDEX;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
|
@ -198,7 +204,7 @@ int mndSetDropIdxRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbOb
|
|||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
void mndCleanupIdx(SMnode *pMnode) {
|
||||
|
@ -332,61 +338,91 @@ SDbObj *mndAcquireDbByIdx(SMnode *pMnode, const char *idxName) {
|
|||
}
|
||||
|
||||
int32_t mndSetCreateIdxPrepareLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pRedoRaw = mndIdxActionEncode(pIdx);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
|
||||
if (pRedoRaw == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
return -1;
|
||||
}
|
||||
TAOS_CHECK_RETURN(mndTransAppendPrepareLog(pTrans, pRedoRaw));
|
||||
TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
|
||||
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t mndSetCreateIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pCommitRaw = mndIdxActionEncode(pIdx);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
|
||||
if (pCommitRaw == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
return -1;
|
||||
}
|
||||
TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
|
||||
TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
|
||||
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t mndSetAlterIdxPrepareLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pRedoRaw = mndIdxActionEncode(pIdx);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) {
|
||||
if (pRedoRaw == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
return -1;
|
||||
}
|
||||
if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
|
||||
sdbFreeRaw(pRedoRaw);
|
||||
return -1;
|
||||
}
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
|
||||
TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
|
||||
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t mndSetAlterIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pCommitRaw = mndIdxActionEncode(pIdx);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
sdbFreeRaw(pCommitRaw);
|
||||
return -1;
|
||||
if (pCommitRaw == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
|
||||
if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
|
||||
sdbFreeRaw(pCommitRaw);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
|
||||
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateIdxVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
|
||||
if (pVgRaw == NULL) return -1;
|
||||
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING) != 0) return -1;
|
||||
return 0;
|
||||
if (pVgRaw == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pVgRaw));
|
||||
TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateIdxVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
|
||||
if (pVgRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_READY) != 0) return -1;
|
||||
return 0;
|
||||
if (pVgRaw == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pVgRaw));
|
||||
TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_READY));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
// static int32_t mndSetUpdateIdxStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
|
||||
|
@ -427,10 +463,7 @@ static int32_t mndProcessCreateIdxReq(SRpcMsg *pReq) {
|
|||
SDbObj *pDb = NULL;
|
||||
SCreateTagIndexReq createReq = {0};
|
||||
|
||||
if (tDeserializeSCreateTagIdxReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
goto _OVER;
|
||||
}
|
||||
TAOS_CHECK_GOTO(tDeserializeSCreateTagIdxReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
|
||||
|
||||
mInfo("idx:%s start to create", createReq.idxName);
|
||||
// if (mndCheckCreateIdxReq(&createReq) != 0) {
|
||||
|
@ -439,29 +472,30 @@ static int32_t mndProcessCreateIdxReq(SRpcMsg *pReq) {
|
|||
|
||||
pDb = mndAcquireDbByStb(pMnode, createReq.stbName);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||
code = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||
if (terrno != 0) code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pStb = mndAcquireStb(pMnode, createReq.stbName);
|
||||
if (pStb == NULL) {
|
||||
mError("idx:%s, failed to create since stb:%s not exist", createReq.idxName, createReq.stbName);
|
||||
code = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||
if (terrno != 0) code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
SSIdx idx = {0};
|
||||
if (mndAcquireGlobalIdx(pMnode, createReq.idxName, SDB_IDX, &idx) == 0) {
|
||||
if ((code = mndAcquireGlobalIdx(pMnode, createReq.idxName, SDB_IDX, &idx)) == 0) {
|
||||
pIdx = idx.pIdx;
|
||||
} else {
|
||||
goto _OVER;
|
||||
}
|
||||
if (pIdx != NULL) {
|
||||
terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
|
||||
code = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
|
||||
|
||||
code = mndAddIndex(pMnode, pReq, &createReq, pDb, pStb);
|
||||
if (terrno == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST || terrno == TSDB_CODE_MND_TAG_NOT_EXIST) {
|
||||
|
@ -479,25 +513,35 @@ _OVER:
|
|||
mndReleaseIdx(pMnode, pIdx);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
|
||||
return code;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t mndSetDropIdxPrepareLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pRedoRaw = mndIdxActionEncode(pIdx);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
|
||||
if (pRedoRaw == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
TAOS_CHECK_RETURN(mndTransAppendPrepareLog(pTrans, pRedoRaw));
|
||||
TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
|
||||
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t mndSetDropIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pCommitRaw = mndIdxActionEncode(pIdx);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||
if (pCommitRaw == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
|
||||
TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
|
||||
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t mndProcessGetTbIdxReq(SRpcMsg *pReq) {
|
||||
|
@ -592,6 +636,7 @@ static int32_t mndCheckIndexReq(SCreateTagIndexReq *pReq) {
|
|||
|
||||
static int32_t mndSetUpdateIdxStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pOld, SStbObj *pNew, char *tagName,
|
||||
int on) {
|
||||
int32_t code = 0;
|
||||
taosRLockLatch(&pOld->lock);
|
||||
memcpy(pNew, pOld, sizeof(SStbObj));
|
||||
taosRUnLockLatch(&pOld->lock);
|
||||
|
@ -604,28 +649,24 @@ static int32_t mndSetUpdateIdxStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb
|
|||
int8_t hasIdx = 0;
|
||||
int32_t tag = mndFindSuperTableTagId(pOld, tagName, &hasIdx);
|
||||
if (tag < 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||
return -1;
|
||||
code = TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
col_id_t colId = pOld->pTags[tag].colId;
|
||||
if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) {
|
||||
return -1;
|
||||
}
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId));
|
||||
TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
|
||||
SSchema *pTag = pNew->pTags + tag;
|
||||
|
||||
if (on == 1) {
|
||||
if (hasIdx && tag != 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
|
||||
return -1;
|
||||
code = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
|
||||
TAOS_RETURN(code);
|
||||
} else {
|
||||
SSCHMEA_SET_IDX_ON(pTag);
|
||||
}
|
||||
} else {
|
||||
if (hasIdx == 0) {
|
||||
terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
|
||||
code = TSDB_CODE_MND_SMA_NOT_EXIST;
|
||||
} else {
|
||||
SSCHMEA_SET_IDX_OFF(pTag);
|
||||
pTag->flags = 0;
|
||||
|
@ -634,11 +675,15 @@ static int32_t mndSetUpdateIdxStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb
|
|||
pNew->tagVer++;
|
||||
|
||||
SSdbRaw *pCommitRaw = mndStbActionEncode(pNew);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
|
||||
if (pCommitRaw == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
|
||||
TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
|
||||
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, SIdxObj *pIdx) {
|
||||
// impl later
|
||||
|
@ -649,17 +694,17 @@ int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pSt
|
|||
|
||||
// mInfo("trans:%d, used to add index to stb:%s", pTrans->id, pStb->name);
|
||||
mndTransSetDbName(pTrans, pDb->name, pStb->name);
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||
TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
|
||||
|
||||
mndTransSetSerial(pTrans);
|
||||
|
||||
if (mndSetCreateIdxPrepareLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
|
||||
if (mndSetCreateIdxCommitLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
|
||||
TAOS_CHECK_GOTO(mndSetCreateIdxPrepareLogs(pMnode, pTrans, pIdx), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndSetCreateIdxCommitLogs(pMnode, pTrans, pIdx), NULL, _OVER);
|
||||
|
||||
if (mndSetUpdateIdxStbCommitLogs(pMnode, pTrans, pStb, &newStb, pIdx->colName, 1) != 0) goto _OVER;
|
||||
if (mndSetCreateIdxRedoActions(pMnode, pTrans, pDb, &newStb, pIdx) != 0) goto _OVER;
|
||||
TAOS_CHECK_GOTO(mndSetUpdateIdxStbCommitLogs(pMnode, pTrans, pStb, &newStb, pIdx->colName, 1), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndSetCreateIdxRedoActions(pMnode, pTrans, pDb, &newStb, pIdx), NULL, _OVER);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
|
||||
|
||||
code = 0;
|
||||
|
||||
|
@ -671,7 +716,7 @@ _OVER:
|
|||
taosMemoryFree(newStb.pCmpr);
|
||||
}
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
int8_t mndCheckIndexNameByTagName(SMnode *pMnode, SIdxObj *pIdxObj) {
|
||||
// build index on first tag, and no index name;
|
||||
|
@ -725,25 +770,23 @@ static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *re
|
|||
int8_t hasIdx = 0;
|
||||
int32_t tag = mndFindSuperTableTagId(pStb, req->colName, &hasIdx);
|
||||
if (tag < 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||
return -1;
|
||||
code = TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
int8_t exist = 0;
|
||||
if (tag == 0 && hasIdx == 1) {
|
||||
exist = mndCheckIndexNameByTagName(pMnode, &idxObj);
|
||||
if (exist) {
|
||||
terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
|
||||
return -1;
|
||||
code = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
} else if (hasIdx == 1) {
|
||||
terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
|
||||
return -1;
|
||||
code = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
col_id_t colId = pStb->pTags[tag].colId;
|
||||
if (mndCheckColAndTagModifiable(pMnode, pStb->name, pStb->uid, colId) != 0) {
|
||||
return -1;
|
||||
}
|
||||
TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pStb->name, pStb->uid, colId));
|
||||
|
||||
// SSchema *pTag = pStb->pTags + tag;
|
||||
// if (IS_IDX_ON(pTag)) {
|
||||
|
@ -752,7 +795,7 @@ static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *re
|
|||
// }
|
||||
code = mndAddIndexImpl(pMnode, pReq, pDb, pStb, &idxObj);
|
||||
|
||||
return code;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t mndDropIdx(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SIdxObj *pIdx) {
|
||||
|
@ -763,22 +806,30 @@ static int32_t mndDropIdx(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SIdxObj *p
|
|||
SStbObj newObj = {0};
|
||||
|
||||
pStb = mndAcquireStb(pMnode, pIdx->stb);
|
||||
if (pStb == NULL) goto _OVER;
|
||||
if (pStb == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-index");
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
if (pTrans == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
mInfo("trans:%d, used to drop idx:%s", pTrans->id, pIdx->name);
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||
TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
|
||||
|
||||
mndTransSetSerial(pTrans);
|
||||
if (mndSetDropIdxPrepareLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
|
||||
if (mndSetDropIdxCommitLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
|
||||
TAOS_CHECK_GOTO(mndSetDropIdxPrepareLogs(pMnode, pTrans, pIdx), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndSetDropIdxCommitLogs(pMnode, pTrans, pIdx), NULL, _OVER);
|
||||
|
||||
if (mndSetUpdateIdxStbCommitLogs(pMnode, pTrans, pStb, &newObj, pIdx->colName, 0) != 0) goto _OVER;
|
||||
if (mndSetDropIdxRedoActions(pMnode, pTrans, pDb, &newObj, pIdx) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
TAOS_CHECK_GOTO(mndSetUpdateIdxStbCommitLogs(pMnode, pTrans, pStb, &newObj, pIdx->colName, 0), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndSetDropIdxRedoActions(pMnode, pTrans, pDb, &newObj, pIdx), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
|
||||
|
||||
code = 0;
|
||||
|
||||
|
@ -789,7 +840,7 @@ _OVER:
|
|||
|
||||
mndTransDrop(pTrans);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
return code;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
int32_t mndProcessDropTagIdxReq(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
|
@ -798,13 +849,10 @@ int32_t mndProcessDropTagIdxReq(SRpcMsg *pReq) {
|
|||
SIdxObj *pIdx = NULL;
|
||||
|
||||
SDropTagIndexReq req = {0};
|
||||
if (tDeserializeSDropTagIdxReq(pReq->pCont, pReq->contLen, &req) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
goto _OVER;
|
||||
}
|
||||
TAOS_CHECK_GOTO(tDeserializeSDropTagIdxReq(pReq->pCont, pReq->contLen, &req), NULL, _OVER);
|
||||
mInfo("idx:%s, start to drop", req.name);
|
||||
SSIdx idx = {0};
|
||||
if (mndAcquireGlobalIdx(pMnode, req.name, SDB_IDX, &idx) == 0) {
|
||||
if ((code = mndAcquireGlobalIdx(pMnode, req.name, SDB_IDX, &idx)) == 0) {
|
||||
pIdx = idx.pIdx;
|
||||
} else {
|
||||
goto _OVER;
|
||||
|
@ -815,7 +863,7 @@ int32_t mndProcessDropTagIdxReq(SRpcMsg *pReq) {
|
|||
code = 0;
|
||||
goto _OVER;
|
||||
} else {
|
||||
terrno = TSDB_CODE_MND_TAG_INDEX_NOT_EXIST;
|
||||
code = TSDB_CODE_MND_TAG_INDEX_NOT_EXIST;
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
|
@ -823,12 +871,11 @@ int32_t mndProcessDropTagIdxReq(SRpcMsg *pReq) {
|
|||
pDb = mndAcquireDbByIdx(pMnode, req.name);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
if (terrno != 0) code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
|
||||
|
||||
code = mndDropIdx(pMnode, pReq, pDb, pIdx);
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
@ -839,7 +886,7 @@ _OVER:
|
|||
}
|
||||
mndReleaseIdx(pMnode, pIdx);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
return code;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
static int32_t mndProcessGetIdxReq(SRpcMsg *pReq) {
|
||||
// do nothing
|
||||
|
@ -847,6 +894,7 @@ static int32_t mndProcessGetIdxReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
int32_t mndDropIdxsByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
int32_t code = 0;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
|
||||
|
@ -856,17 +904,17 @@ int32_t mndDropIdxsByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
|
|||
if (pIter == NULL) break;
|
||||
|
||||
if (pIdx->stbUid == pStb->uid) {
|
||||
if (mndSetDropIdxCommitLogs(pMnode, pTrans, pIdx) != 0) {
|
||||
if ((code = mndSetDropIdxCommitLogs(pMnode, pTrans, pIdx)) != 0) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pIdx);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pIdx);
|
||||
}
|
||||
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t mndGetIdxsByTagName(SMnode *pMnode, SStbObj *pStb, char *tagName, SIdxObj *idx) {
|
||||
|
@ -891,6 +939,7 @@ int32_t mndGetIdxsByTagName(SMnode *pMnode, SStbObj *pStb, char *tagName, SIdxOb
|
|||
return -1;
|
||||
}
|
||||
int32_t mndDropIdxsByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||
int32_t code = 0;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
|
||||
|
@ -900,15 +949,15 @@ int32_t mndDropIdxsByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
|||
if (pIter == NULL) break;
|
||||
|
||||
if (pIdx->dbUid == pDb->uid) {
|
||||
if (mndSetDropIdxCommitLogs(pMnode, pTrans, pIdx) != 0) {
|
||||
if ((code = mndSetDropIdxCommitLogs(pMnode, pTrans, pIdx)) != 0) {
|
||||
sdbRelease(pSdb, pIdx);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pIdx);
|
||||
}
|
||||
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ int mndAcquireGlobalIdx(SMnode *pMnode, char *name, int type, SSIdx *idx) {
|
|||
SSmaObj *pSma = mndGetIdx(pMnode, name, SDB_SMA);
|
||||
SIdxObj *pIdx = mndGetIdx(pMnode, name, SDB_IDX);
|
||||
|
||||
terrno = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
if (pSma == NULL && pIdx == NULL) return 0;
|
||||
|
||||
|
@ -40,8 +40,8 @@ int mndAcquireGlobalIdx(SMnode *pMnode, char *name, int type, SSIdx *idx) {
|
|||
idx->pIdx = pSma;
|
||||
} else {
|
||||
mndReleaseSma(pMnode, pSma);
|
||||
terrno = TSDB_CODE_MND_SMA_ALREADY_EXIST;
|
||||
return -1;
|
||||
code = TSDB_CODE_MND_SMA_ALREADY_EXIST;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
} else {
|
||||
if (type == SDB_IDX) {
|
||||
|
@ -49,9 +49,9 @@ int mndAcquireGlobalIdx(SMnode *pMnode, char *name, int type, SSIdx *idx) {
|
|||
idx->pIdx = pIdx;
|
||||
} else {
|
||||
mndReleaseIdx(pMnode, pIdx);
|
||||
terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
|
||||
return -1;
|
||||
code = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
|
|
@ -18,10 +18,11 @@
|
|||
#include "systable.h"
|
||||
|
||||
static int32_t mndInitInfosTableSchema(const SSysDbTableSchema *pSrc, int32_t colNum, SSchema **pDst) {
|
||||
int32_t code = 0;
|
||||
SSchema *schema = taosMemoryCalloc(colNum, sizeof(SSchema));
|
||||
if (NULL == schema) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < colNum; ++i) {
|
||||
|
@ -35,10 +36,11 @@ static int32_t mndInitInfosTableSchema(const SSysDbTableSchema *pSrc, int32_t co
|
|||
}
|
||||
|
||||
*pDst = schema;
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t mndInsInitMeta(SHashObj *hash) {
|
||||
int32_t code = 0;
|
||||
STableMetaRsp meta = {0};
|
||||
|
||||
tstrncpy(meta.dbFName, TSDB_INFORMATION_SCHEMA_DB, sizeof(meta.dbFName));
|
||||
|
@ -55,24 +57,23 @@ static int32_t mndInsInitMeta(SHashObj *hash) {
|
|||
meta.numOfColumns = pInfosTableMeta[i].colNum;
|
||||
meta.sysInfo = pInfosTableMeta[i].sysInfo;
|
||||
|
||||
if (mndInitInfosTableSchema(pInfosTableMeta[i].schema, pInfosTableMeta[i].colNum, &meta.pSchemas)) {
|
||||
return -1;
|
||||
}
|
||||
TAOS_CHECK_RETURN(mndInitInfosTableSchema(pInfosTableMeta[i].schema, pInfosTableMeta[i].colNum, &meta.pSchemas));
|
||||
|
||||
if (taosHashPut(hash, meta.tbName, strlen(meta.tbName), &meta, sizeof(meta))) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t mndBuildInsTableSchema(SMnode *pMnode, const char *dbFName, const char *tbName, bool sysinfo,
|
||||
STableMetaRsp *pRsp) {
|
||||
int32_t code = 0;
|
||||
if (NULL == pMnode->infosMeta) {
|
||||
terrno = TSDB_CODE_APP_ERROR;
|
||||
return -1;
|
||||
code = TSDB_CODE_APP_ERROR;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
STableMetaRsp *pMeta = NULL;
|
||||
|
@ -84,40 +85,41 @@ int32_t mndBuildInsTableSchema(SMnode *pMnode, const char *dbFName, const char *
|
|||
|
||||
if (NULL == pMeta) {
|
||||
mError("invalid information schema table name:%s", tbName);
|
||||
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||
return -1;
|
||||
code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (!sysinfo && pMeta->sysInfo) {
|
||||
mError("no permission to get schema of table name:%s", tbName);
|
||||
terrno = TSDB_CODE_PAR_PERMISSION_DENIED;
|
||||
return -1;
|
||||
code = TSDB_CODE_PAR_PERMISSION_DENIED;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
*pRsp = *pMeta;
|
||||
|
||||
pRsp->pSchemas = taosMemoryCalloc(pMeta->numOfColumns, sizeof(SSchema));
|
||||
if (pRsp->pSchemas == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
pRsp->pSchemas = NULL;
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
memcpy(pRsp->pSchemas, pMeta->pSchemas, pMeta->numOfColumns * sizeof(SSchema));
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t mndBuildInsTableCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) {
|
||||
int32_t code = 0;
|
||||
if (NULL == pMnode->infosMeta) {
|
||||
terrno = TSDB_CODE_APP_ERROR;
|
||||
return -1;
|
||||
code = TSDB_CODE_APP_ERROR;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
STableMetaRsp *pMeta = taosHashGet(pMnode->infosMeta, tbName, strlen(tbName));
|
||||
if (NULL == pMeta) {
|
||||
mError("invalid information schema table name:%s", tbName);
|
||||
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||
return -1;
|
||||
code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
strcpy(pRsp->tbName, pMeta->tbName);
|
||||
|
@ -129,22 +131,21 @@ int32_t mndBuildInsTableCfg(SMnode *pMnode, const char *dbFName, const char *tbN
|
|||
|
||||
pRsp->pSchemas = taosMemoryCalloc(pMeta->numOfColumns, sizeof(SSchema));
|
||||
if (pRsp->pSchemas == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
pRsp->pSchemas = NULL;
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
memcpy(pRsp->pSchemas, pMeta->pSchemas, pMeta->numOfColumns * sizeof(SSchema));
|
||||
|
||||
pRsp->pSchemaExt = taosMemoryCalloc(pMeta->numOfColumns, sizeof(SSchemaExt));
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t mndInitInfos(SMnode *pMnode) {
|
||||
pMnode->infosMeta = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
|
||||
if (pMnode->infosMeta == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
return mndInsInitMeta(pMnode->infosMeta);
|
||||
|
|
|
@ -50,8 +50,7 @@ static inline int32_t mndAcquireRpc(SMnode *pMnode) {
|
|||
int32_t code = 0;
|
||||
taosThreadRwlockRdlock(&pMnode->lock);
|
||||
if (pMnode->stopped) {
|
||||
terrno = TSDB_CODE_APP_IS_STOPPING;
|
||||
code = -1;
|
||||
code = TSDB_CODE_APP_IS_STOPPING;
|
||||
} else if (!mndIsLeader(pMnode)) {
|
||||
code = -1;
|
||||
} else {
|
||||
|
@ -63,7 +62,7 @@ static inline int32_t mndAcquireRpc(SMnode *pMnode) {
|
|||
#endif
|
||||
}
|
||||
taosThreadRwlockUnlock(&pMnode->lock);
|
||||
return code;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static inline void mndReleaseRpc(SMnode *pMnode) {
|
||||
|
@ -78,6 +77,7 @@ static inline void mndReleaseRpc(SMnode *pMnode) {
|
|||
}
|
||||
|
||||
static void *mndBuildTimerMsg(int32_t *pContLen) {
|
||||
terrno = 0;
|
||||
SMTimerReq timerReq = {0};
|
||||
|
||||
int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
|
||||
|
@ -426,17 +426,18 @@ static void *mndThreadFp(void *param) {
|
|||
}
|
||||
|
||||
static int32_t mndInitTimer(SMnode *pMnode) {
|
||||
int32_t code = 0;
|
||||
TdThreadAttr thAttr;
|
||||
taosThreadAttrInit(&thAttr);
|
||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
|
||||
if ((code = taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode)) != 0) {
|
||||
mError("failed to create timer thread since %s", strerror(errno));
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
taosThreadAttrDestroy(&thAttr);
|
||||
tmsgReportStartup("mnode-timer", "initialized");
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static void mndCleanupTimer(SMnode *pMnode) {
|
||||
|
@ -447,21 +448,23 @@ static void mndCleanupTimer(SMnode *pMnode) {
|
|||
}
|
||||
|
||||
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
|
||||
int32_t code = 0;
|
||||
pMnode->path = taosStrdup(path);
|
||||
if (pMnode->path == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (taosMkDir(pMnode->path) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t mndInitWal(SMnode *pMnode) {
|
||||
int32_t code = 0;
|
||||
char path[PATH_MAX + 20] = {0};
|
||||
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
|
||||
SWalCfg cfg = {
|
||||
|
@ -480,8 +483,8 @@ static int32_t mndInitWal(SMnode *pMnode) {
|
|||
if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_MNODE_WAL) == DND_CS_MNODE_WAL){
|
||||
cfg.encryptAlgorithm = (tsiEncryptScope & DND_CS_MNODE_WAL)? tsiEncryptAlgorithm : 0;
|
||||
if(tsEncryptKey[0] == '\0'){
|
||||
terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
|
||||
return -1;
|
||||
code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
else{
|
||||
strncpy(cfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
|
||||
|
@ -492,10 +495,12 @@ static int32_t mndInitWal(SMnode *pMnode) {
|
|||
pMnode->pWal = walOpen(path, &cfg);
|
||||
if (pMnode->pWal == NULL) {
|
||||
mError("failed to open wal since %s. wal:%s", terrstr(), path);
|
||||
return -1;
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static void mndCloseWal(SMnode *pMnode) {
|
||||
|
@ -506,6 +511,7 @@ static void mndCloseWal(SMnode *pMnode) {
|
|||
}
|
||||
|
||||
static int32_t mndInitSdb(SMnode *pMnode) {
|
||||
int32_t code = 0;
|
||||
SSdbOpt opt = {0};
|
||||
opt.path = pMnode->path;
|
||||
opt.pMnode = pMnode;
|
||||
|
@ -513,10 +519,12 @@ static int32_t mndInitSdb(SMnode *pMnode) {
|
|||
|
||||
pMnode->pSdb = sdbInit(&opt);
|
||||
if (pMnode->pSdb == NULL) {
|
||||
return -1;
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t mndOpenSdb(SMnode *pMnode) {
|
||||
|
@ -542,48 +550,47 @@ static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCle
|
|||
step.initFp = initFp;
|
||||
step.cleanupFp = cleanupFp;
|
||||
if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
return 0;
|
||||
TAOS_RETURN(0);
|
||||
}
|
||||
|
||||
static int32_t mndInitSteps(SMnode *pMnode) {
|
||||
if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-arbgroup", mndInitArbGroup, mndCleanupArbGroup) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-idx", mndInitIdx, mndCleanupIdx) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-view", mndInitView, mndCleanupView) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-compact", mndInitCompact, mndCleanupCompact) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-compact-detail", mndInitCompactDetail, mndCleanupCompactDetail) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
|
||||
if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-arbgroup", mndInitArbGroup, mndCleanupArbGroup));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-idx", mndInitIdx, mndCleanupIdx));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-view", mndInitView, mndCleanupView));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-compact", mndInitCompact, mndCleanupCompact));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-compact-detail", mndInitCompactDetail, mndCleanupCompactDetail));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync));
|
||||
TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -618,8 +625,7 @@ static int32_t mndExecSteps(SMnode *pMnode) {
|
|||
int32_t code = terrno;
|
||||
mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
|
||||
mndCleanupSteps(pMnode, pos);
|
||||
terrno = code;
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
} else {
|
||||
mInfo("%s is initialized", pStep->name);
|
||||
tmsgReportStartup(pStep->name, "initialized");
|
||||
|
@ -627,7 +633,7 @@ static int32_t mndExecSteps(SMnode *pMnode) {
|
|||
}
|
||||
|
||||
pMnode->clusterId = mndGetClusterId(pMnode);
|
||||
return 0;
|
||||
TAOS_RETURN(0);
|
||||
}
|
||||
|
||||
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
|
||||
|
@ -642,6 +648,7 @@ static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
|
|||
}
|
||||
|
||||
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
|
||||
terrno = 0;
|
||||
mInfo("start to open mnode in %s", path);
|
||||
|
||||
SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
|
||||
|
@ -769,38 +776,40 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
|
||||
if (!IsReq(pMsg)) return 0;
|
||||
int32_t code = 0;
|
||||
if (!IsReq(pMsg)) TAOS_RETURN(code);
|
||||
if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
|
||||
pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
|
||||
pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK ||
|
||||
pMsg->msgType == TDMT_SCH_TASK_NOTIFY) {
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
taosThreadRwlockRdlock(&pMnode->lock);
|
||||
if (pMnode->stopped) {
|
||||
taosThreadRwlockUnlock(&pMnode->lock);
|
||||
terrno = TSDB_CODE_APP_IS_STOPPING;
|
||||
return -1;
|
||||
code = TSDB_CODE_APP_IS_STOPPING;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
terrno = 0;
|
||||
SSyncState state = syncGetState(pMnode->syncMgmt.sync);
|
||||
if (terrno != 0) {
|
||||
taosThreadRwlockUnlock(&pMnode->lock);
|
||||
return -1;
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (state.state != TAOS_SYNC_STATE_LEADER) {
|
||||
taosThreadRwlockUnlock(&pMnode->lock);
|
||||
terrno = TSDB_CODE_SYN_NOT_LEADER;
|
||||
code = TSDB_CODE_SYN_NOT_LEADER;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (!state.restored || !pMnode->restored) {
|
||||
taosThreadRwlockUnlock(&pMnode->lock);
|
||||
terrno = TSDB_CODE_SYN_RESTORING;
|
||||
code = TSDB_CODE_SYN_RESTORING;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -812,7 +821,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
|
|||
#endif
|
||||
|
||||
taosThreadRwlockUnlock(&pMnode->lock);
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
|
||||
_OVER:
|
||||
if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
|
||||
|
@ -824,19 +833,17 @@ _OVER:
|
|||
pMsg->msgType == TDMT_MND_ARB_CHECK_SYNC_TIMER) {
|
||||
mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
|
||||
pMnode->stopped, state.restored, syncStr(state.state));
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
SEpSet epSet = {0};
|
||||
int32_t tmpCode = terrno;
|
||||
mndGetMnodeEpSet(pMnode, &epSet);
|
||||
terrno = tmpCode;
|
||||
|
||||
mGDebug(
|
||||
"msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
|
||||
"role:%s, redirect numOfEps:%d inUse:%d, type:%s",
|
||||
pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, state.restored,
|
||||
pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code), pMnode->restored, pMnode->stopped, state.restored,
|
||||
syncStr(state.state), epSet.numOfEps, epSet.inUse, TMSG_INFO(pMsg->msgType));
|
||||
|
||||
if (epSet.numOfEps <= 0) return -1;
|
||||
|
@ -853,7 +860,7 @@ _OVER:
|
|||
pMsg->info.rspLen = contLen;
|
||||
}
|
||||
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t mndProcessRpcMsg(SRpcMsg *pMsg, SQueueInfo* pQueueInfo) {
|
||||
|
@ -867,12 +874,12 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg, SQueueInfo* pQueueInfo) {
|
|||
fpExt = pMnode->msgFpExt[TMSG_INDEX(pMsg->msgType)];
|
||||
if (fpExt == NULL) {
|
||||
mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
return -1;
|
||||
code = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
}
|
||||
|
||||
if (mndCheckMnodeState(pMsg) != 0) return -1;
|
||||
TAOS_CHECK_RETURN(mndCheckMnodeState(pMsg));
|
||||
|
||||
mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
||||
if (fp)
|
||||
|
@ -886,14 +893,12 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg, SQueueInfo* pQueueInfo) {
|
|||
} else if (code == 0) {
|
||||
mGTrace("msg:%p, successfully processed", pMsg);
|
||||
} else {
|
||||
if (code == -1) {
|
||||
code = terrno;
|
||||
}
|
||||
if (terrno != 0) code = terrno;
|
||||
mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, tstrerror(code), pMsg->info.ahandle,
|
||||
TMSG_INFO(pMsg->msgType));
|
||||
}
|
||||
|
||||
return code;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
|
||||
|
@ -925,7 +930,8 @@ int64_t mndGenerateUid(const char *name, int32_t len) {
|
|||
|
||||
int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
|
||||
SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
|
||||
if (mndAcquireRpc(pMnode) != 0) return -1;
|
||||
int32_t code = 0;
|
||||
TAOS_CHECK_RETURN(mndAcquireRpc(pMnode));
|
||||
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int64_t ms = taosGetTimestampMs();
|
||||
|
@ -937,7 +943,9 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
|
|||
if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL ||
|
||||
pStbInfo->stbs == NULL) {
|
||||
mndReleaseRpc(pMnode);
|
||||
return -1;
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
// cluster info
|
||||
|
@ -1063,7 +1071,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
|
|||
}
|
||||
|
||||
mndReleaseRpc(pMnode);
|
||||
return 0;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
|
||||
|
|
Loading…
Reference in New Issue