Merge pull request #26701 from taosdata/fix/TD-30989-mnode6
fix/TD-30989
This commit is contained in:
commit
dd22e80059
|
@ -69,6 +69,7 @@ int32_t mndInitMnode(SMnode *pMnode) {
|
||||||
void mndCleanupMnode(SMnode *pMnode) {}
|
void mndCleanupMnode(SMnode *pMnode) {}
|
||||||
|
|
||||||
SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) {
|
SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) {
|
||||||
|
terrno = 0;
|
||||||
SMnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_MNODE, &mnodeId);
|
SMnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_MNODE, &mnodeId);
|
||||||
if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
||||||
terrno = TSDB_CODE_MND_MNODE_NOT_EXIST;
|
terrno = TSDB_CODE_MND_MNODE_NOT_EXIST;
|
||||||
|
@ -82,13 +83,18 @@ void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
|
static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
|
||||||
|
int32_t code = 0;
|
||||||
SMnodeObj mnodeObj = {0};
|
SMnodeObj mnodeObj = {0};
|
||||||
mnodeObj.id = 1;
|
mnodeObj.id = 1;
|
||||||
mnodeObj.createdTime = taosGetTimestampMs();
|
mnodeObj.createdTime = taosGetTimestampMs();
|
||||||
mnodeObj.updateTime = mnodeObj.createdTime;
|
mnodeObj.updateTime = mnodeObj.createdTime;
|
||||||
|
|
||||||
SSdbRaw *pRaw = mndMnodeActionEncode(&mnodeObj);
|
SSdbRaw *pRaw = mndMnodeActionEncode(&mnodeObj);
|
||||||
if (pRaw == NULL) return -1;
|
if (pRaw == NULL) {
|
||||||
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
mInfo("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw);
|
mInfo("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw);
|
||||||
|
@ -97,25 +103,27 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
sdbFreeRaw(pRaw);
|
sdbFreeRaw(pRaw);
|
||||||
mError("mnode:%d, failed to create since %s", mnodeObj.id, terrstr());
|
mError("mnode:%d, failed to create since %s", mnodeObj.id, terrstr());
|
||||||
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
mInfo("trans:%d, used to create mnode:%d", pTrans->id, mnodeObj.id);
|
mInfo("trans:%d, used to create mnode:%d", pTrans->id, mnodeObj.id);
|
||||||
|
|
||||||
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
|
if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
|
||||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
|
||||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) {
|
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) {
|
||||||
|
@ -188,16 +196,19 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
|
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
|
||||||
|
int32_t code = 0;
|
||||||
mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj);
|
mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj);
|
||||||
pObj->pDnode = sdbAcquireNotReadyObj(pSdb, SDB_DNODE, &pObj->id);
|
pObj->pDnode = sdbAcquireNotReadyObj(pSdb, SDB_DNODE, &pObj->id);
|
||||||
if (pObj->pDnode == NULL) {
|
if (pObj->pDnode == NULL) {
|
||||||
mError("mnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
|
mError("mnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
|
||||||
return -1;
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
|
int32_t code = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
pObj->syncState = TAOS_SYNC_STATE_OFFLINE;
|
pObj->syncState = TAOS_SYNC_STATE_OFFLINE;
|
||||||
mndReloadSyncConfig(pSdb->pMnode);
|
mndReloadSyncConfig(pSdb->pMnode);
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) {
|
static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) {
|
||||||
|
@ -271,38 +282,59 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
||||||
|
int32_t code = 0;
|
||||||
SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
|
SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
|
||||||
if (pRedoRaw == NULL) return -1;
|
if (pRedoRaw == NULL) {
|
||||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
|
if (terrno != 0) code = terrno;
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
|
||||||
|
TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSetRestoreCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
int32_t mndSetRestoreCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
||||||
|
int32_t code = 0;
|
||||||
SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
|
SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
|
||||||
if (pRedoRaw == NULL) return -1;
|
if (pRedoRaw == NULL) {
|
||||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
|
if (terrno != 0) code = terrno;
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
|
||||||
|
TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
||||||
|
int32_t code = 0;
|
||||||
SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj);
|
SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj);
|
||||||
if (pUndoRaw == NULL) return -1;
|
if (pUndoRaw == NULL) {
|
||||||
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
if (terrno != 0) code = terrno;
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
|
||||||
|
TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
||||||
|
int32_t code = 0;
|
||||||
SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
|
SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
|
||||||
if (pCommitRaw == NULL) return -1;
|
if (pCommitRaw == NULL) {
|
||||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
|
if (terrno != 0) code = terrno;
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
|
||||||
|
TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pCreateReq, SEpSet *pCreateEpSet) {
|
static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pCreateReq, SEpSet *pCreateEpSet) {
|
||||||
|
int32_t code = 0;
|
||||||
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pCreateReq);
|
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pCreateReq);
|
||||||
void *pReq = taosMemoryMalloc(contLen);
|
void *pReq = taosMemoryMalloc(contLen);
|
||||||
tSerializeSDCreateMnodeReq(pReq, contLen, pCreateReq);
|
tSerializeSDCreateMnodeReq(pReq, contLen, pCreateReq);
|
||||||
|
@ -315,15 +347,16 @@ static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *p
|
||||||
.acceptableCode = TSDB_CODE_MNODE_ALREADY_DEPLOYED,
|
.acceptableCode = TSDB_CODE_MNODE_ALREADY_DEPLOYED,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, SDAlterMnodeTypeReq *pAlterMnodeTypeReq,
|
static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, SDAlterMnodeTypeReq *pAlterMnodeTypeReq,
|
||||||
SEpSet *pAlterMnodeTypeEpSet) {
|
SEpSet *pAlterMnodeTypeEpSet) {
|
||||||
|
int32_t code = 0;
|
||||||
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterMnodeTypeReq);
|
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterMnodeTypeReq);
|
||||||
void *pReq = taosMemoryMalloc(contLen);
|
void *pReq = taosMemoryMalloc(contLen);
|
||||||
tSerializeSDCreateMnodeReq(pReq, contLen, pAlterMnodeTypeReq);
|
tSerializeSDCreateMnodeReq(pReq, contLen, pAlterMnodeTypeReq);
|
||||||
|
@ -337,14 +370,15 @@ static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, SDAlterMnodeType
|
||||||
.acceptableCode = TSDB_CODE_MNODE_ALREADY_IS_VOTER,
|
.acceptableCode = TSDB_CODE_MNODE_ALREADY_IS_VOTER,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndBuildAlterMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pAlterReq, SEpSet *pAlterEpSet) {
|
static int32_t mndBuildAlterMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pAlterReq, SEpSet *pAlterEpSet) {
|
||||||
|
int32_t code = 0;
|
||||||
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterReq);
|
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterReq);
|
||||||
void *pReq = taosMemoryMalloc(contLen);
|
void *pReq = taosMemoryMalloc(contLen);
|
||||||
tSerializeSDCreateMnodeReq(pReq, contLen, pAlterReq);
|
tSerializeSDCreateMnodeReq(pReq, contLen, pAlterReq);
|
||||||
|
@ -357,15 +391,16 @@ static int32_t mndBuildAlterMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pA
|
||||||
.acceptableCode = 0,
|
.acceptableCode = 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndBuildDropMnodeRedoAction(STrans *pTrans, SDDropMnodeReq *pDropReq, SEpSet *pDroprEpSet) {
|
static int32_t mndBuildDropMnodeRedoAction(STrans *pTrans, SDDropMnodeReq *pDropReq, SEpSet *pDroprEpSet) {
|
||||||
|
int32_t code = 0;
|
||||||
int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, pDropReq);
|
int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, pDropReq);
|
||||||
void *pReq = taosMemoryMalloc(contLen);
|
void *pReq = taosMemoryMalloc(contLen);
|
||||||
tSerializeSCreateDropMQSNodeReq(pReq, contLen, pDropReq);
|
tSerializeSCreateDropMQSNodeReq(pReq, contLen, pDropReq);
|
||||||
|
@ -378,11 +413,11 @@ static int32_t mndBuildDropMnodeRedoAction(STrans *pTrans, SDDropMnodeReq *pDrop
|
||||||
.acceptableCode = TSDB_CODE_MNODE_NOT_DEPLOYED,
|
.acceptableCode = TSDB_CODE_MNODE_NOT_DEPLOYED,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
|
static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
|
||||||
|
@ -426,9 +461,9 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
|
||||||
createEpset.eps[0].port = pDnode->port;
|
createEpset.eps[0].port = pDnode->port;
|
||||||
memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
|
|
||||||
if (mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset) != 0) return -1;
|
TAOS_CHECK_RETURN(mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset));
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSetRestoreCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
|
int32_t mndSetRestoreCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
|
||||||
|
@ -474,9 +509,9 @@ int32_t mndSetRestoreCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
|
||||||
createEpset.eps[0].port = pDnode->port;
|
createEpset.eps[0].port = pDnode->port;
|
||||||
memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
|
|
||||||
if (mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset) != 0) return -1;
|
TAOS_CHECK_RETURN(mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset));
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
|
static int32_t mndSetAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
|
||||||
|
@ -517,9 +552,9 @@ static int32_t mndSetAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, S
|
||||||
createEpset.eps[0].port = pDnode->port;
|
createEpset.eps[0].port = pDnode->port;
|
||||||
memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
|
|
||||||
if (mndBuildAlterMnodeTypeRedoAction(pTrans, &alterReq, &createEpset) != 0) return -1;
|
TAOS_CHECK_RETURN(mndBuildAlterMnodeTypeRedoAction(pTrans, &alterReq, &createEpset));
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSetRestoreAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
|
int32_t mndSetRestoreAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
|
||||||
|
@ -565,19 +600,23 @@ int32_t mndSetRestoreAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, S
|
||||||
createEpset.eps[0].port = pDnode->port;
|
createEpset.eps[0].port = pDnode->port;
|
||||||
memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
|
|
||||||
if (mndBuildAlterMnodeTypeRedoAction(pTrans, &alterReq, &createEpset) != 0) return -1;
|
TAOS_CHECK_RETURN(mndBuildAlterMnodeTypeRedoAction(pTrans, &alterReq, &createEpset));
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) {
|
static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "create-mnode");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "create-mnode");
|
||||||
if (pTrans == NULL) goto _OVER;
|
if (pTrans == NULL) {
|
||||||
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
mndTransSetSerial(pTrans);
|
mndTransSetSerial(pTrans);
|
||||||
mInfo("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
|
mInfo("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
|
||||||
|
|
||||||
SMnodeObj mnodeObj = {0};
|
SMnodeObj mnodeObj = {0};
|
||||||
mnodeObj.id = pDnode->id;
|
mnodeObj.id = pDnode->id;
|
||||||
|
@ -586,8 +625,8 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
|
||||||
mnodeObj.role = TAOS_SYNC_ROLE_LEARNER;
|
mnodeObj.role = TAOS_SYNC_ROLE_LEARNER;
|
||||||
mnodeObj.lastIndex = pMnode->applied;
|
mnodeObj.lastIndex = pMnode->applied;
|
||||||
|
|
||||||
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
|
TAOS_CHECK_GOTO(mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj), NULL, _OVER);
|
||||||
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
|
TAOS_CHECK_GOTO(mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj), NULL, _OVER);
|
||||||
|
|
||||||
SMnodeObj mnodeLeaderObj = {0};
|
SMnodeObj mnodeLeaderObj = {0};
|
||||||
mnodeLeaderObj.id = pDnode->id;
|
mnodeLeaderObj.id = pDnode->id;
|
||||||
|
@ -596,15 +635,15 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
|
||||||
mnodeLeaderObj.role = TAOS_SYNC_ROLE_VOTER;
|
mnodeLeaderObj.role = TAOS_SYNC_ROLE_VOTER;
|
||||||
mnodeLeaderObj.lastIndex = pMnode->applied + 1;
|
mnodeLeaderObj.lastIndex = pMnode->applied + 1;
|
||||||
|
|
||||||
if (mndSetAlterMnodeTypeRedoActions(pMnode, pTrans, pDnode, &mnodeLeaderObj) != 0) goto _OVER;
|
TAOS_CHECK_GOTO(mndSetAlterMnodeTypeRedoActions(pMnode, pTrans, pDnode, &mnodeLeaderObj), NULL, _OVER);
|
||||||
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeLeaderObj) != 0) goto _OVER;
|
TAOS_CHECK_GOTO(mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeLeaderObj), NULL, _OVER);
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
|
static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
|
||||||
|
@ -614,19 +653,14 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
|
||||||
SDnodeObj *pDnode = NULL;
|
SDnodeObj *pDnode = NULL;
|
||||||
SMCreateMnodeReq createReq = {0};
|
SMCreateMnodeReq createReq = {0};
|
||||||
|
|
||||||
if (tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
TAOS_CHECK_GOTO(tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
mInfo("mnode:%d, start to create", createReq.dnodeId);
|
mInfo("mnode:%d, start to create", createReq.dnodeId);
|
||||||
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE) != 0) {
|
TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE), NULL, _OVER);
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
pObj = mndAcquireMnode(pMnode, createReq.dnodeId);
|
pObj = mndAcquireMnode(pMnode, createReq.dnodeId);
|
||||||
if (pObj != NULL) {
|
if (pObj != NULL) {
|
||||||
terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
|
code = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
} else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) {
|
} else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
@ -634,17 +668,17 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
|
pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
|
||||||
if (pDnode == NULL) {
|
if (pDnode == NULL) {
|
||||||
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
code = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sdbGetSize(pMnode->pSdb, SDB_MNODE) >= 3) {
|
if (sdbGetSize(pMnode->pSdb, SDB_MNODE) >= 3) {
|
||||||
terrno = TSDB_CODE_MND_TOO_MANY_MNODES;
|
code = TSDB_CODE_MND_TOO_MANY_MNODES;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
|
if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
|
||||||
terrno = TSDB_CODE_DNODE_OFFLINE;
|
code = TSDB_CODE_DNODE_OFFLINE;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -665,27 +699,38 @@ _OVER:
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
tFreeSMCreateQnodeReq(&createReq);
|
tFreeSMCreateQnodeReq(&createReq);
|
||||||
|
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetDropMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
static int32_t mndSetDropMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
||||||
|
int32_t code = 0;
|
||||||
SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
|
SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
|
||||||
if (pRedoRaw == NULL) return -1;
|
if (pRedoRaw == NULL) {
|
||||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
|
if (terrno != 0) code = terrno;
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
|
||||||
|
TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
||||||
|
int32_t code = 0;
|
||||||
SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
|
SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
|
||||||
if (pCommitRaw == NULL) return -1;
|
if (pCommitRaw == NULL) {
|
||||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
if (terrno != 0) code = terrno;
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
|
||||||
|
TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj,
|
static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj,
|
||||||
bool force) {
|
bool force) {
|
||||||
|
int32_t code = 0;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SDDropMnodeReq dropReq = {0};
|
SDDropMnodeReq dropReq = {0};
|
||||||
|
@ -700,32 +745,32 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
|
||||||
if (totalMnodes == 2) {
|
if (totalMnodes == 2) {
|
||||||
if (force) {
|
if (force) {
|
||||||
mError("cant't force drop dnode, since a mnode on it and replica is 2");
|
mError("cant't force drop dnode, since a mnode on it and replica is 2");
|
||||||
terrno = TSDB_CODE_MNODE_ONLY_TWO_MNODE;
|
code = TSDB_CODE_MNODE_ONLY_TWO_MNODE;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
mInfo("vgId:1, has %d mnodes, exec redo log first", totalMnodes);
|
mInfo("vgId:1, has %d mnodes, exec redo log first", totalMnodes);
|
||||||
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) return -1;
|
TAOS_CHECK_RETURN(mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj));
|
||||||
if (!force) {
|
if (!force) {
|
||||||
if (mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet) != 0) return -1;
|
TAOS_CHECK_RETURN(mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet));
|
||||||
}
|
}
|
||||||
} else if (totalMnodes == 3) {
|
} else if (totalMnodes == 3) {
|
||||||
mInfo("vgId:1, has %d mnodes, exec redo action first", totalMnodes);
|
mInfo("vgId:1, has %d mnodes, exec redo action first", totalMnodes);
|
||||||
if (!force) {
|
if (!force) {
|
||||||
if (mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet) != 0) return -1;
|
TAOS_CHECK_RETURN(mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet));
|
||||||
}
|
}
|
||||||
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) return -1;
|
TAOS_CHECK_RETURN(mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj));
|
||||||
} else {
|
} else {
|
||||||
return -1;
|
TAOS_RETURN(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj, bool force) {
|
int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj, bool force) {
|
||||||
if (pObj == NULL) return 0;
|
if (pObj == NULL) return 0;
|
||||||
pObj->lastIndex = pMnode->applied;
|
pObj->lastIndex = pMnode->applied;
|
||||||
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj, force) != 0) return -1;
|
TAOS_CHECK_RETURN(mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj, force));
|
||||||
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) return -1;
|
TAOS_CHECK_RETURN(mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -734,19 +779,23 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
|
||||||
STrans *pTrans = NULL;
|
STrans *pTrans = NULL;
|
||||||
|
|
||||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-mnode");
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-mnode");
|
||||||
if (pTrans == NULL) goto _OVER;
|
if (pTrans == NULL) {
|
||||||
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
mndTransSetSerial(pTrans);
|
mndTransSetSerial(pTrans);
|
||||||
mInfo("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
|
mInfo("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
|
||||||
|
|
||||||
if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER;
|
TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj, false), NULL, _OVER);
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
|
static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
|
||||||
|
@ -755,38 +804,35 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
|
||||||
SMnodeObj *pObj = NULL;
|
SMnodeObj *pObj = NULL;
|
||||||
SMDropMnodeReq dropReq = {0};
|
SMDropMnodeReq dropReq = {0};
|
||||||
|
|
||||||
if (tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
TAOS_CHECK_GOTO(tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
mInfo("mnode:%d, start to drop", dropReq.dnodeId);
|
mInfo("mnode:%d, start to drop", dropReq.dnodeId);
|
||||||
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) {
|
TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dropReq.dnodeId <= 0) {
|
if (dropReq.dnodeId <= 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
pObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
|
pObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
|
||||||
if (pObj == NULL) {
|
if (pObj == NULL) {
|
||||||
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMnode->selfDnodeId == dropReq.dnodeId) {
|
if (pMnode->selfDnodeId == dropReq.dnodeId) {
|
||||||
terrno = TSDB_CODE_MND_CANT_DROP_LEADER;
|
code = TSDB_CODE_MND_CANT_DROP_LEADER;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
|
if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
|
||||||
terrno = TSDB_CODE_MND_TOO_FEW_MNODES;
|
code = TSDB_CODE_MND_TOO_FEW_MNODES;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!mndIsDnodeOnline(pObj->pDnode, taosGetTimestampMs())) {
|
if (!mndIsDnodeOnline(pObj->pDnode, taosGetTimestampMs())) {
|
||||||
terrno = TSDB_CODE_DNODE_OFFLINE;
|
code = TSDB_CODE_DNODE_OFFLINE;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -805,7 +851,7 @@ _OVER:
|
||||||
|
|
||||||
mndReleaseMnode(pMnode, pObj);
|
mndReleaseMnode(pMnode, pObj);
|
||||||
tFreeSMCreateQnodeReq(&dropReq);
|
tFreeSMCreateQnodeReq(&dropReq);
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||||
|
@ -892,13 +938,11 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
|
||||||
#if 1
|
#if 1
|
||||||
return 0;
|
return 0;
|
||||||
#else
|
#else
|
||||||
|
int32_t code = 0;
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SDAlterMnodeReq alterReq = {0};
|
SDAlterMnodeReq alterReq = {0};
|
||||||
|
|
||||||
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
|
TAOS_CHECK_RETURN(tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq));
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SMnodeOpt option = {.deploy = true, .numOfReplicas = alterReq.replica, .selfIndex = -1};
|
SMnodeOpt option = {.deploy = true, .numOfReplicas = alterReq.replica, .selfIndex = -1};
|
||||||
memcpy(option.replicas, alterReq.replicas, sizeof(alterReq.replicas));
|
memcpy(option.replicas, alterReq.replicas, sizeof(alterReq.replicas));
|
||||||
|
@ -913,9 +957,9 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndWriteFile(pMnode->path, &option) != 0) {
|
if ((code = mndWriteFile(pMnode->path, &option)) != 0) {
|
||||||
mError("failed to write mnode file since %s", terrstr());
|
mError("failed to write mnode file since %s", terrstr());
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncCfg cfg = {.replicaNum = alterReq.replica, .myIndex = -1};
|
SSyncCfg cfg = {.replicaNum = alterReq.replica, .myIndex = -1};
|
||||||
|
@ -939,14 +983,14 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = syncReconfig(pMnode->syncMgmt.sync, &cfg);
|
code = syncReconfig(pMnode->syncMgmt.sync, &cfg);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("failed to sync reconfig since %s", terrstr());
|
mError("failed to sync reconfig since %s", terrstr());
|
||||||
} else {
|
} else {
|
||||||
mInfo("alter mnode sync success");
|
mInfo("alter mnode sync success");
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,10 +19,11 @@
|
||||||
|
|
||||||
// connection/application/
|
// connection/application/
|
||||||
int32_t mndInitPerfsTableSchema(const SSysDbTableSchema *pSrc, int32_t colNum, SSchema **pDst) {
|
int32_t mndInitPerfsTableSchema(const SSysDbTableSchema *pSrc, int32_t colNum, SSchema **pDst) {
|
||||||
|
int32_t code = 0;
|
||||||
SSchema *schema = taosMemoryCalloc(colNum, sizeof(SSchema));
|
SSchema *schema = taosMemoryCalloc(colNum, sizeof(SSchema));
|
||||||
if (NULL == schema) {
|
if (NULL == schema) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < colNum; ++i) {
|
for (int32_t i = 0; i < colNum; ++i) {
|
||||||
|
@ -34,10 +35,11 @@ int32_t mndInitPerfsTableSchema(const SSysDbTableSchema *pSrc, int32_t colNum, S
|
||||||
}
|
}
|
||||||
|
|
||||||
*pDst = schema;
|
*pDst = schema;
|
||||||
return TSDB_CODE_SUCCESS;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndPerfsInitMeta(SHashObj *hash) {
|
int32_t mndPerfsInitMeta(SHashObj *hash) {
|
||||||
|
int32_t code = 0;
|
||||||
STableMetaRsp meta = {0};
|
STableMetaRsp meta = {0};
|
||||||
|
|
||||||
tstrncpy(meta.dbFName, TSDB_INFORMATION_SCHEMA_DB, sizeof(meta.dbFName));
|
tstrncpy(meta.dbFName, TSDB_INFORMATION_SCHEMA_DB, sizeof(meta.dbFName));
|
||||||
|
@ -53,56 +55,56 @@ int32_t mndPerfsInitMeta(SHashObj *hash) {
|
||||||
tstrncpy(meta.tbName, pSysDbTableMeta[i].name, sizeof(meta.tbName));
|
tstrncpy(meta.tbName, pSysDbTableMeta[i].name, sizeof(meta.tbName));
|
||||||
meta.numOfColumns = pSysDbTableMeta[i].colNum;
|
meta.numOfColumns = pSysDbTableMeta[i].colNum;
|
||||||
|
|
||||||
if (mndInitPerfsTableSchema(pSysDbTableMeta[i].schema, pSysDbTableMeta[i].colNum, &meta.pSchemas)) {
|
TAOS_CHECK_RETURN(mndInitPerfsTableSchema(pSysDbTableMeta[i].schema, pSysDbTableMeta[i].colNum, &meta.pSchemas));
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosHashPut(hash, meta.tbName, strlen(meta.tbName), &meta, sizeof(meta))) {
|
if (taosHashPut(hash, meta.tbName, strlen(meta.tbName), &meta, sizeof(meta))) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndBuildPerfsTableSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
|
int32_t mndBuildPerfsTableSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
|
||||||
|
int32_t code = 0;
|
||||||
if (NULL == pMnode->perfsMeta) {
|
if (NULL == pMnode->perfsMeta) {
|
||||||
terrno = TSDB_CODE_APP_ERROR;
|
code = TSDB_CODE_APP_ERROR;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
STableMetaRsp *meta = (STableMetaRsp *)taosHashGet(pMnode->perfsMeta, tbName, strlen(tbName));
|
STableMetaRsp *meta = (STableMetaRsp *)taosHashGet(pMnode->perfsMeta, tbName, strlen(tbName));
|
||||||
if (NULL == meta) {
|
if (NULL == meta) {
|
||||||
mError("invalid performance schema table name:%s", tbName);
|
mError("invalid performance schema table name:%s", tbName);
|
||||||
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
*pRsp = *meta;
|
*pRsp = *meta;
|
||||||
|
|
||||||
pRsp->pSchemas = taosMemoryCalloc(meta->numOfColumns, sizeof(SSchema));
|
pRsp->pSchemas = taosMemoryCalloc(meta->numOfColumns, sizeof(SSchema));
|
||||||
if (pRsp->pSchemas == NULL) {
|
if (pRsp->pSchemas == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
pRsp->pSchemas = NULL;
|
pRsp->pSchemas = NULL;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(pRsp->pSchemas, meta->pSchemas, meta->numOfColumns * sizeof(SSchema));
|
memcpy(pRsp->pSchemas, meta->pSchemas, meta->numOfColumns * sizeof(SSchema));
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndBuildPerfsTableCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) {
|
int32_t mndBuildPerfsTableCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) {
|
||||||
|
int32_t code = 0;
|
||||||
if (NULL == pMnode->perfsMeta) {
|
if (NULL == pMnode->perfsMeta) {
|
||||||
terrno = TSDB_CODE_APP_ERROR;
|
code = TSDB_CODE_APP_ERROR;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
STableMetaRsp *pMeta = taosHashGet(pMnode->perfsMeta, tbName, strlen(tbName));
|
STableMetaRsp *pMeta = taosHashGet(pMnode->perfsMeta, tbName, strlen(tbName));
|
||||||
if (NULL == pMeta) {
|
if (NULL == pMeta) {
|
||||||
mError("invalid performance schema table name:%s", tbName);
|
mError("invalid performance schema table name:%s", tbName);
|
||||||
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
strcpy(pRsp->tbName, pMeta->tbName);
|
strcpy(pRsp->tbName, pMeta->tbName);
|
||||||
|
@ -114,20 +116,21 @@ int32_t mndBuildPerfsTableCfg(SMnode *pMnode, const char *dbFName, const char *t
|
||||||
|
|
||||||
pRsp->pSchemas = taosMemoryCalloc(pMeta->numOfColumns, sizeof(SSchema));
|
pRsp->pSchemas = taosMemoryCalloc(pMeta->numOfColumns, sizeof(SSchema));
|
||||||
if (pRsp->pSchemas == NULL) {
|
if (pRsp->pSchemas == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
pRsp->pSchemas = NULL;
|
pRsp->pSchemas = NULL;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(pRsp->pSchemas, pMeta->pSchemas, pMeta->numOfColumns * sizeof(SSchema));
|
memcpy(pRsp->pSchemas, pMeta->pSchemas, pMeta->numOfColumns * sizeof(SSchema));
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndInitPerfs(SMnode *pMnode) {
|
int32_t mndInitPerfs(SMnode *pMnode) {
|
||||||
|
int32_t code = 0;
|
||||||
pMnode->perfsMeta = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
pMnode->perfsMeta = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
if (pMnode->perfsMeta == NULL) {
|
if (pMnode->perfsMeta == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
return mndPerfsInitMeta(pMnode->perfsMeta);
|
return mndPerfsInitMeta(pMnode->perfsMeta);
|
||||||
|
|
|
@ -85,22 +85,23 @@ static void mndCancelGetNextApp(SMnode *pMnode, void *pIter);
|
||||||
static int32_t mndProcessSvrVerReq(SRpcMsg *pReq);
|
static int32_t mndProcessSvrVerReq(SRpcMsg *pReq);
|
||||||
|
|
||||||
int32_t mndInitProfile(SMnode *pMnode) {
|
int32_t mndInitProfile(SMnode *pMnode) {
|
||||||
|
int32_t code = 0;
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
|
|
||||||
// in ms
|
// in ms
|
||||||
int32_t checkTime = tsShellActivityTimer * 2 * 1000;
|
int32_t checkTime = tsShellActivityTimer * 2 * 1000;
|
||||||
pMgmt->connCache = taosCacheInit(TSDB_DATA_TYPE_UINT, checkTime, false, (__cache_free_fn_t)mndFreeConn, "conn");
|
pMgmt->connCache = taosCacheInit(TSDB_DATA_TYPE_UINT, checkTime, false, (__cache_free_fn_t)mndFreeConn, "conn");
|
||||||
if (pMgmt->connCache == NULL) {
|
if (pMgmt->connCache == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
mError("failed to alloc profile cache since %s", terrstr());
|
mError("failed to alloc profile cache since %s", terrstr());
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
pMgmt->appCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, checkTime, true, (__cache_free_fn_t)mndFreeApp, "app");
|
pMgmt->appCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, checkTime, true, (__cache_free_fn_t)mndFreeApp, "app");
|
||||||
if (pMgmt->appCache == NULL) {
|
if (pMgmt->appCache == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
mError("failed to alloc profile cache since %s", terrstr());
|
mError("failed to alloc profile cache since %s", terrstr());
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
|
||||||
|
@ -116,7 +117,7 @@ int32_t mndInitProfile(SMnode *pMnode) {
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndRetrieveApps);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndRetrieveApps);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndCancelGetNextApp);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndCancelGetNextApp);
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndCleanupProfile(SMnode *pMnode) {
|
void mndCleanupProfile(SMnode *pMnode) {
|
||||||
|
@ -384,6 +385,7 @@ static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq *pReq)
|
||||||
static void mndFreeApp(SAppObj *pApp) { mTrace("app %" PRIx64 " is destroyed", pApp->appId); }
|
static void mndFreeApp(SAppObj *pApp) { mTrace("app %" PRIx64 " is destroyed", pApp->appId); }
|
||||||
|
|
||||||
static SAppObj *mndAcquireApp(SMnode *pMnode, int64_t appId) {
|
static SAppObj *mndAcquireApp(SMnode *pMnode, int64_t appId) {
|
||||||
|
terrno = 0;
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
|
|
||||||
SAppObj *pApp = taosCacheAcquireByKey(pMgmt->appCache, &appId, sizeof(appId));
|
SAppObj *pApp = taosCacheAcquireByKey(pMgmt->appCache, &appId, sizeof(appId));
|
||||||
|
@ -431,13 +433,16 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnInfo *connInfo) {
|
static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnInfo *connInfo) {
|
||||||
|
int32_t code = 0;
|
||||||
SAppHbReq *pReq = &pHbReq->app;
|
SAppHbReq *pReq = &pHbReq->app;
|
||||||
SAppObj *pApp = mndAcquireApp(pMnode, pReq->appId);
|
SAppObj *pApp = mndAcquireApp(pMnode, pReq->appId);
|
||||||
if (pApp == NULL) {
|
if (pApp == NULL) {
|
||||||
pApp = mndCreateApp(pMnode, connInfo->clientIp, pReq);
|
pApp = mndCreateApp(pMnode, connInfo->clientIp, pReq);
|
||||||
if (pApp == NULL) {
|
if (pApp == NULL) {
|
||||||
mError("failed to create new app %" PRIx64 " since %s", pReq->appId, terrstr());
|
mError("failed to create new app %" PRIx64 " since %s", pReq->appId, terrstr());
|
||||||
return -1;
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
|
TAOS_RETURN(code);
|
||||||
} else {
|
} else {
|
||||||
mDebug("a new app %" PRIx64 " is created", pReq->appId);
|
mDebug("a new app %" PRIx64 " is created", pReq->appId);
|
||||||
mndReleaseApp(pMnode, pApp);
|
mndReleaseApp(pMnode, pApp);
|
||||||
|
@ -475,6 +480,7 @@ static int32_t mndGetOnlineDnodeNum(SMnode *pMnode, int32_t *num) {
|
||||||
|
|
||||||
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
|
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
|
||||||
SClientHbBatchRsp *pBatchRsp, SConnPreparedObj *pObj) {
|
SClientHbBatchRsp *pBatchRsp, SConnPreparedObj *pObj) {
|
||||||
|
int32_t code = 0;
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
|
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
|
||||||
SRpcConnInfo connInfo = pMsg->info.conn;
|
SRpcConnInfo connInfo = pMsg->info.conn;
|
||||||
|
@ -492,7 +498,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
||||||
pHbReq->app.pid, pHbReq->app.name, 0);
|
pHbReq->app.pid, pHbReq->app.name, 0);
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
|
mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
|
||||||
return -1;
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
|
TAOS_RETURN(code);
|
||||||
} else {
|
} else {
|
||||||
mDebug("user:%s, conn:%u is freed, will create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
|
mDebug("user:%s, conn:%u is freed, will create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
|
||||||
}
|
}
|
||||||
|
@ -501,9 +509,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
||||||
SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
|
SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
|
||||||
if (rspBasic == NULL) {
|
if (rspBasic == NULL) {
|
||||||
mndReleaseConn(pMnode, pConn, true);
|
mndReleaseConn(pMnode, pConn, true);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr());
|
mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr());
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
mndSaveQueryList(pConn, pBasic);
|
mndSaveQueryList(pConn, pBasic);
|
||||||
|
@ -539,9 +547,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
||||||
hbRsp.info = taosArrayInit(kvNum, sizeof(SKv));
|
hbRsp.info = taosArrayInit(kvNum, sizeof(SKv));
|
||||||
if (NULL == hbRsp.info) {
|
if (NULL == hbRsp.info) {
|
||||||
mError("taosArrayInit %d rsp kv failed", kvNum);
|
mError("taosArrayInit %d rsp kv failed", kvNum);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tFreeClientHbRsp(&hbRsp);
|
tFreeClientHbRsp(&hbRsp);
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef TD_ENTERPRISE
|
#ifdef TD_ENTERPRISE
|
||||||
|
@ -554,8 +562,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
||||||
mTrace("recv view dyn ver, bootTs:%" PRId64 ", ver:%" PRIu64, pDynViewVer->svrBootTs, pDynViewVer->dynViewVer);
|
mTrace("recv view dyn ver, bootTs:%" PRId64 ", ver:%" PRIu64, pDynViewVer->svrBootTs, pDynViewVer->dynViewVer);
|
||||||
|
|
||||||
SDynViewVersion *pRspVer = NULL;
|
SDynViewVersion *pRspVer = NULL;
|
||||||
if (0 != mndValidateDynViewVersion(pMnode, pDynViewVer, &needCheck, &pRspVer)) {
|
if (0 != (code = mndValidateDynViewVersion(pMnode, pDynViewVer, &needCheck, &pRspVer))) {
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (needCheck) {
|
if (needCheck) {
|
||||||
|
@ -647,13 +655,14 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
|
static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
|
||||||
|
int32_t code = 0;
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
|
|
||||||
SClientHbBatchReq batchReq = {0};
|
SClientHbBatchReq batchReq = {0};
|
||||||
if (tDeserializeSClientHbBatchReq(pReq->pCont, pReq->contLen, &batchReq) != 0) {
|
if (tDeserializeSClientHbBatchReq(pReq->pCont, pReq->contLen, &batchReq) != 0) {
|
||||||
taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
|
taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
SConnPreparedObj obj = {0};
|
SConnPreparedObj obj = {0};
|
||||||
|
@ -699,31 +708,27 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
taosArrayDestroy(obj.pQnodeList);
|
taosArrayDestroy(obj.pQnodeList);
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
|
static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
|
||||||
|
int32_t code = 0;
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
|
|
||||||
SKillQueryReq killReq = {0};
|
SKillQueryReq killReq = {0};
|
||||||
if (tDeserializeSKillQueryReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
|
TAOS_CHECK_RETURN(tDeserializeSKillQueryReq(pReq->pCont, pReq->contLen, &killReq));
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
mInfo("kill query msg is received, queryId:%s", killReq.queryStrId);
|
mInfo("kill query msg is received, queryId:%s", killReq.queryStrId);
|
||||||
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_QUERY) != 0) {
|
TAOS_CHECK_RETURN(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_QUERY));
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t connId = 0;
|
int32_t connId = 0;
|
||||||
uint64_t queryId = 0;
|
uint64_t queryId = 0;
|
||||||
char *p = strchr(killReq.queryStrId, ':');
|
char *p = strchr(killReq.queryStrId, ':');
|
||||||
if (NULL == p) {
|
if (NULL == p) {
|
||||||
mError("invalid query id %s", killReq.queryStrId);
|
mError("invalid query id %s", killReq.queryStrId);
|
||||||
terrno = TSDB_CODE_MND_INVALID_QUERY_ID;
|
code = TSDB_CODE_MND_INVALID_QUERY_ID;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
*p = 0;
|
*p = 0;
|
||||||
connId = taosStr2Int32(killReq.queryStrId, NULL, 16);
|
connId = taosStr2Int32(killReq.queryStrId, NULL, 16);
|
||||||
|
@ -732,40 +737,36 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
|
||||||
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &connId, sizeof(int32_t));
|
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &connId, sizeof(int32_t));
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
mError("connId:%x, failed to kill queryId:%" PRIx64 ", conn not exist", connId, queryId);
|
mError("connId:%x, failed to kill queryId:%" PRIx64 ", conn not exist", connId, queryId);
|
||||||
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
|
code = TSDB_CODE_MND_INVALID_CONN_ID;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
} else {
|
} else {
|
||||||
mInfo("connId:%x, queryId:%" PRIx64 " is killed by user:%s", connId, queryId, pReq->info.conn.user);
|
mInfo("connId:%x, queryId:%" PRIx64 " is killed by user:%s", connId, queryId, pReq->info.conn.user);
|
||||||
pConn->killId = queryId;
|
pConn->killId = queryId;
|
||||||
taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
|
taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
|
static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
|
||||||
|
int32_t code = 0;
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
|
|
||||||
SKillConnReq killReq = {0};
|
SKillConnReq killReq = {0};
|
||||||
if (tDeserializeSKillConnReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
|
TAOS_CHECK_RETURN(tDeserializeSKillConnReq(pReq->pCont, pReq->contLen, &killReq));
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_CONN) != 0) {
|
TAOS_CHECK_RETURN(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_CONN));
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &killReq.connId, sizeof(uint32_t));
|
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &killReq.connId, sizeof(uint32_t));
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
mError("connId:%u, failed to kill connection, conn not exist", killReq.connId);
|
mError("connId:%u, failed to kill connection, conn not exist", killReq.connId);
|
||||||
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
|
code = TSDB_CODE_MND_INVALID_CONN_ID;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
} else {
|
} else {
|
||||||
mInfo("connId:%u, is killed by user:%s", killReq.connId, pReq->info.conn.user);
|
mInfo("connId:%u, is killed by user:%s", killReq.connId, pReq->info.conn.user);
|
||||||
pConn->killed = 1;
|
pConn->killed = 1;
|
||||||
taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
|
taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
|
||||||
return TSDB_CODE_SUCCESS;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -138,15 +138,16 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj) {
|
static int32_t mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj) {
|
||||||
|
int32_t code = 0;
|
||||||
mTrace("qnode:%d, perform insert action, row:%p", pObj->id, pObj);
|
mTrace("qnode:%d, perform insert action, row:%p", pObj->id, pObj);
|
||||||
pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
|
pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
|
||||||
if (pObj->pDnode == NULL) {
|
if (pObj->pDnode == NULL) {
|
||||||
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
code = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||||
mError("qnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
|
mError("qnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj) {
|
static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj) {
|
||||||
|
@ -166,27 +167,42 @@ static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetCreateQnodeRedoLogs(STrans *pTrans, SQnodeObj *pObj) {
|
static int32_t mndSetCreateQnodeRedoLogs(STrans *pTrans, SQnodeObj *pObj) {
|
||||||
|
int32_t code = 0;
|
||||||
SSdbRaw *pRedoRaw = mndQnodeActionEncode(pObj);
|
SSdbRaw *pRedoRaw = mndQnodeActionEncode(pObj);
|
||||||
if (pRedoRaw == NULL) return -1;
|
if (pRedoRaw == NULL) {
|
||||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
|
if (terrno != 0) code = terrno;
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
|
||||||
|
TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetCreateQnodeUndoLogs(STrans *pTrans, SQnodeObj *pObj) {
|
static int32_t mndSetCreateQnodeUndoLogs(STrans *pTrans, SQnodeObj *pObj) {
|
||||||
|
int32_t code = 0;
|
||||||
SSdbRaw *pUndoRaw = mndQnodeActionEncode(pObj);
|
SSdbRaw *pUndoRaw = mndQnodeActionEncode(pObj);
|
||||||
if (pUndoRaw == NULL) return -1;
|
if (pUndoRaw == NULL) {
|
||||||
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
if (terrno != 0) code = terrno;
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
|
||||||
|
TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSetCreateQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) {
|
int32_t mndSetCreateQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) {
|
||||||
|
int32_t code = 0;
|
||||||
SSdbRaw *pCommitRaw = mndQnodeActionEncode(pObj);
|
SSdbRaw *pCommitRaw = mndQnodeActionEncode(pObj);
|
||||||
if (pCommitRaw == NULL) return -1;
|
if (pCommitRaw == NULL) {
|
||||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
|
if (terrno != 0) code = terrno;
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
|
||||||
|
TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool mndQnodeInDnode(SQnodeObj *pQnode, int32_t dnodeId) {
|
bool mndQnodeInDnode(SQnodeObj *pQnode, int32_t dnodeId) {
|
||||||
|
@ -194,6 +210,7 @@ bool mndQnodeInDnode(SQnodeObj *pQnode, int32_t dnodeId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) {
|
int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) {
|
||||||
|
int32_t code = 0;
|
||||||
SDCreateQnodeReq createReq = {0};
|
SDCreateQnodeReq createReq = {0};
|
||||||
createReq.dnodeId = pDnode->id;
|
createReq.dnodeId = pDnode->id;
|
||||||
|
|
||||||
|
@ -212,23 +229,24 @@ int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeOb
|
||||||
action.msgType = TDMT_DND_CREATE_QNODE;
|
action.msgType = TDMT_DND_CREATE_QNODE;
|
||||||
action.acceptableCode = TSDB_CODE_QNODE_ALREADY_DEPLOYED;
|
action.acceptableCode = TSDB_CODE_QNODE_ALREADY_DEPLOYED;
|
||||||
|
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetCreateQnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) {
|
static int32_t mndSetCreateQnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) {
|
||||||
|
int32_t code = 0;
|
||||||
SDDropQnodeReq dropReq = {0};
|
SDDropQnodeReq dropReq = {0};
|
||||||
dropReq.dnodeId = pDnode->id;
|
dropReq.dnodeId = pDnode->id;
|
||||||
|
|
||||||
int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, &dropReq);
|
int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, &dropReq);
|
||||||
void *pReq = taosMemoryMalloc(contLen);
|
void *pReq = taosMemoryMalloc(contLen);
|
||||||
if (pReq == NULL) {
|
if (pReq == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq);
|
tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq);
|
||||||
|
|
||||||
|
@ -239,12 +257,12 @@ static int32_t mndSetCreateQnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, S
|
||||||
action.msgType = TDMT_DND_DROP_QNODE;
|
action.msgType = TDMT_DND_DROP_QNODE;
|
||||||
action.acceptableCode = TSDB_CODE_QNODE_NOT_DEPLOYED;
|
action.acceptableCode = TSDB_CODE_QNODE_NOT_DEPLOYED;
|
||||||
|
|
||||||
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCreateQnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateQnodeReq *pCreate) {
|
static int32_t mndCreateQnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateQnodeReq *pCreate) {
|
||||||
|
@ -256,22 +274,26 @@ static int32_t mndCreateQnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
|
||||||
qnodeObj.updateTime = qnodeObj.createdTime;
|
qnodeObj.updateTime = qnodeObj.createdTime;
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-qnode");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-qnode");
|
||||||
if (pTrans == NULL) goto _OVER;
|
if (pTrans == NULL) {
|
||||||
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
mndTransSetSerial(pTrans);
|
mndTransSetSerial(pTrans);
|
||||||
|
|
||||||
mInfo("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId);
|
mInfo("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId);
|
||||||
if (mndSetCreateQnodeRedoLogs(pTrans, &qnodeObj) != 0) goto _OVER;
|
TAOS_CHECK_GOTO(mndSetCreateQnodeRedoLogs(pTrans, &qnodeObj), NULL, _OVER);
|
||||||
if (mndSetCreateQnodeUndoLogs(pTrans, &qnodeObj) != 0) goto _OVER;
|
TAOS_CHECK_GOTO(mndSetCreateQnodeUndoLogs(pTrans, &qnodeObj), NULL, _OVER);
|
||||||
if (mndSetCreateQnodeCommitLogs(pTrans, &qnodeObj) != 0) goto _OVER;
|
TAOS_CHECK_GOTO(mndSetCreateQnodeCommitLogs(pTrans, &qnodeObj), NULL, _OVER);
|
||||||
if (mndSetCreateQnodeRedoActions(pTrans, pDnode, &qnodeObj) != 0) goto _OVER;
|
TAOS_CHECK_GOTO(mndSetCreateQnodeRedoActions(pTrans, pDnode, &qnodeObj), NULL, _OVER);
|
||||||
if (mndSetCreateQnodeUndoActions(pTrans, pDnode, &qnodeObj) != 0) goto _OVER;
|
TAOS_CHECK_GOTO(mndSetCreateQnodeUndoActions(pTrans, pDnode, &qnodeObj), NULL, _OVER);
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
|
static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
|
||||||
|
@ -281,19 +303,14 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
|
||||||
SDnodeObj *pDnode = NULL;
|
SDnodeObj *pDnode = NULL;
|
||||||
SMCreateQnodeReq createReq = {0};
|
SMCreateQnodeReq createReq = {0};
|
||||||
|
|
||||||
if (tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
TAOS_CHECK_GOTO(tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
mInfo("qnode:%d, start to create", createReq.dnodeId);
|
mInfo("qnode:%d, start to create", createReq.dnodeId);
|
||||||
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_QNODE) != 0) {
|
TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_QNODE), NULL, _OVER);
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
pObj = mndAcquireQnode(pMnode, createReq.dnodeId);
|
pObj = mndAcquireQnode(pMnode, createReq.dnodeId);
|
||||||
if (pObj != NULL) {
|
if (pObj != NULL) {
|
||||||
terrno = TSDB_CODE_MND_QNODE_ALREADY_EXIST;
|
code = TSDB_CODE_MND_QNODE_ALREADY_EXIST;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
} else if (terrno != TSDB_CODE_MND_QNODE_NOT_EXIST) {
|
} else if (terrno != TSDB_CODE_MND_QNODE_NOT_EXIST) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
@ -301,7 +318,7 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
|
pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
|
||||||
if (pDnode == NULL) {
|
if (pDnode == NULL) {
|
||||||
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
code = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -320,34 +337,47 @@ _OVER:
|
||||||
mndReleaseQnode(pMnode, pObj);
|
mndReleaseQnode(pMnode, pObj);
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
tFreeSMCreateQnodeReq(&createReq);
|
tFreeSMCreateQnodeReq(&createReq);
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetDropQnodeRedoLogs(STrans *pTrans, SQnodeObj *pObj) {
|
static int32_t mndSetDropQnodeRedoLogs(STrans *pTrans, SQnodeObj *pObj) {
|
||||||
|
int32_t code = 0;
|
||||||
SSdbRaw *pRedoRaw = mndQnodeActionEncode(pObj);
|
SSdbRaw *pRedoRaw = mndQnodeActionEncode(pObj);
|
||||||
if (pRedoRaw == NULL) return -1;
|
if (pRedoRaw == NULL) {
|
||||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
|
if (terrno != 0) code = terrno;
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
|
||||||
|
TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetDropQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) {
|
static int32_t mndSetDropQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) {
|
||||||
|
int32_t code = 0;
|
||||||
SSdbRaw *pCommitRaw = mndQnodeActionEncode(pObj);
|
SSdbRaw *pCommitRaw = mndQnodeActionEncode(pObj);
|
||||||
if (pCommitRaw == NULL) return -1;
|
if (pCommitRaw == NULL) {
|
||||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
if (terrno != 0) code = terrno;
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
if (mndTransAppendCommitlog(pTrans, pCommitRaw))
|
||||||
|
;
|
||||||
|
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED))
|
||||||
|
;
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) {
|
static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) {
|
||||||
|
int32_t code = 0;
|
||||||
SDDropQnodeReq dropReq = {0};
|
SDDropQnodeReq dropReq = {0};
|
||||||
dropReq.dnodeId = pDnode->id;
|
dropReq.dnodeId = pDnode->id;
|
||||||
|
|
||||||
int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, &dropReq);
|
int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, &dropReq);
|
||||||
void *pReq = taosMemoryMalloc(contLen);
|
void *pReq = taosMemoryMalloc(contLen);
|
||||||
if (pReq == NULL) {
|
if (pReq == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq);
|
tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq);
|
||||||
|
|
||||||
|
@ -358,20 +388,20 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn
|
||||||
action.msgType = TDMT_DND_DROP_QNODE;
|
action.msgType = TDMT_DND_DROP_QNODE;
|
||||||
action.acceptableCode = TSDB_CODE_QNODE_NOT_DEPLOYED;
|
action.acceptableCode = TSDB_CODE_QNODE_NOT_DEPLOYED;
|
||||||
|
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSetDropQnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SQnodeObj *pObj, bool force) {
|
int32_t mndSetDropQnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SQnodeObj *pObj, bool force) {
|
||||||
if (pObj == NULL) return 0;
|
if (pObj == NULL) return 0;
|
||||||
if (mndSetDropQnodeRedoLogs(pTrans, pObj) != 0) return -1;
|
TAOS_CHECK_RETURN(mndSetDropQnodeRedoLogs(pTrans, pObj));
|
||||||
if (mndSetDropQnodeCommitLogs(pTrans, pObj) != 0) return -1;
|
TAOS_CHECK_RETURN(mndSetDropQnodeCommitLogs(pTrans, pObj));
|
||||||
if (!force) {
|
if (!force) {
|
||||||
if (mndSetDropQnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) return -1;
|
TAOS_CHECK_RETURN(mndSetDropQnodeRedoActions(pTrans, pObj->pDnode, pObj));
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -380,18 +410,22 @@ static int32_t mndDropQnode(SMnode *pMnode, SRpcMsg *pReq, SQnodeObj *pObj) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-qnode");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-qnode");
|
||||||
if (pTrans == NULL) goto _OVER;
|
if (pTrans == NULL) {
|
||||||
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
mndTransSetSerial(pTrans);
|
mndTransSetSerial(pTrans);
|
||||||
|
|
||||||
mInfo("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id);
|
mInfo("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id);
|
||||||
if (mndSetDropQnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER;
|
TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pObj, false), NULL, _OVER);
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
|
static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
|
||||||
|
@ -400,23 +434,20 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
|
||||||
SQnodeObj *pObj = NULL;
|
SQnodeObj *pObj = NULL;
|
||||||
SMDropQnodeReq dropReq = {0};
|
SMDropQnodeReq dropReq = {0};
|
||||||
|
|
||||||
if (tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
TAOS_CHECK_GOTO(tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
mInfo("qnode:%d, start to drop", dropReq.dnodeId);
|
mInfo("qnode:%d, start to drop", dropReq.dnodeId);
|
||||||
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_QNODE) != 0) {
|
TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_QNODE), NULL, _OVER);
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dropReq.dnodeId <= 0) {
|
if (dropReq.dnodeId <= 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
pObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
|
pObj = mndAcquireQnode(pMnode, dropReq.dnodeId);
|
||||||
if (pObj == NULL) {
|
if (pObj == NULL) {
|
||||||
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -435,10 +466,11 @@ _OVER:
|
||||||
|
|
||||||
mndReleaseQnode(pMnode, pObj);
|
mndReleaseQnode(pMnode, pObj);
|
||||||
tFreeSDDropQnodeReq(&dropReq);
|
tFreeSDDropQnodeReq(&dropReq);
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndCreateQnodeList(SMnode *pMnode, SArray **pList, int32_t limit) {
|
int32_t mndCreateQnodeList(SMnode *pMnode, SArray **pList, int32_t limit) {
|
||||||
|
int32_t code = 0;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SQnodeObj *pObj = NULL;
|
SQnodeObj *pObj = NULL;
|
||||||
|
@ -447,8 +479,8 @@ int32_t mndCreateQnodeList(SMnode *pMnode, SArray **pList, int32_t limit) {
|
||||||
SArray *qnodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
|
SArray *qnodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
|
||||||
if (NULL == qnodeList) {
|
if (NULL == qnodeList) {
|
||||||
mError("failed to alloc epSet while process qnode list req");
|
mError("failed to alloc epSet while process qnode list req");
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return terrno;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -484,20 +516,14 @@ static int32_t mndProcessQnodeListReq(SRpcMsg *pReq) {
|
||||||
SQnodeListReq qlistReq = {0};
|
SQnodeListReq qlistReq = {0};
|
||||||
SQnodeListRsp qlistRsp = {0};
|
SQnodeListRsp qlistRsp = {0};
|
||||||
|
|
||||||
if (tDeserializeSQnodeListReq(pReq->pCont, pReq->contLen, &qlistReq) != 0) {
|
TAOS_CHECK_GOTO(tDeserializeSQnodeListReq(pReq->pCont, pReq->contLen, &qlistReq), NULL, _OVER);
|
||||||
mError("failed to parse qnode list req");
|
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndCreateQnodeList(pMnode, &qlistRsp.qnodeList, qlistReq.rowNum) != 0) {
|
TAOS_CHECK_GOTO(mndCreateQnodeList(pMnode, &qlistRsp.qnodeList, qlistReq.rowNum), NULL, _OVER);
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t rspLen = tSerializeSQnodeListRsp(NULL, 0, &qlistRsp);
|
int32_t rspLen = tSerializeSQnodeListRsp(NULL, 0, &qlistRsp);
|
||||||
void *pRsp = rpcMallocCont(rspLen);
|
void *pRsp = rpcMallocCont(rspLen);
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -509,7 +535,7 @@ static int32_t mndProcessQnodeListReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
tFreeSQnodeListRsp(&qlistRsp);
|
tFreeSQnodeListRsp(&qlistRsp);
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveQnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
static int32_t mndRetrieveQnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||||
|
|
|
@ -88,7 +88,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
|
||||||
void *pRsp = NULL;
|
void *pRsp = NULL;
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
|
|
||||||
if (tDeserializeSBatchReq(pMsg->pCont, pMsg->contLen, &batchReq)) {
|
if ((code = tDeserializeSBatchReq(pMsg->pCont, pMsg->contLen, &batchReq)) != 0) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
mError("tDeserializeSBatchReq failed");
|
mError("tDeserializeSBatchReq failed");
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
@ -119,7 +119,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
|
||||||
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(req->msgType)];
|
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(req->msgType)];
|
||||||
if (fp == NULL) {
|
if (fp == NULL) {
|
||||||
mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
||||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
code = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
taosArrayDestroy(batchRsp.pRsps);
|
taosArrayDestroy(batchRsp.pRsps);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -164,7 +164,7 @@ _exit:
|
||||||
taosArrayDestroyEx(batchReq.pMsgs, tFreeSBatchReqMsg);
|
taosArrayDestroyEx(batchReq.pMsgs, tFreeSBatchReqMsg);
|
||||||
taosArrayDestroyEx(batchRsp.pRsps, mnodeFreeSBatchRspMsg);
|
taosArrayDestroyEx(batchRsp.pRsps, mnodeFreeSBatchRspMsg);
|
||||||
|
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndInitQuery(SMnode *pMnode) {
|
int32_t mndInitQuery(SMnode *pMnode) {
|
||||||
|
|
|
@ -50,17 +50,17 @@ static bool isCountWindowStreamTask(SSubplan* pPlan) {
|
||||||
|
|
||||||
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
|
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
|
||||||
int64_t watermark, int64_t deleteMark) {
|
int64_t watermark, int64_t deleteMark) {
|
||||||
|
int32_t code = 0;
|
||||||
SNode* pAst = NULL;
|
SNode* pAst = NULL;
|
||||||
SQueryPlan* pPlan = NULL;
|
SQueryPlan* pPlan = NULL;
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
|
||||||
|
|
||||||
if (nodesStringToNode(ast, &pAst) < 0) {
|
if (nodesStringToNode(ast, &pAst) < 0) {
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
code = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (qSetSTableIdForRsma(pAst, uid) < 0) {
|
if (qSetSTableIdForRsma(pAst, uid) < 0) {
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
code = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,33 +75,33 @@ int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64
|
||||||
};
|
};
|
||||||
|
|
||||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
code = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
|
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
|
||||||
if (levelNum != 1) {
|
if (levelNum != 1) {
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
code = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
|
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
|
||||||
|
|
||||||
int32_t opNum = LIST_LENGTH(inner->pNodeList);
|
int32_t opNum = LIST_LENGTH(inner->pNodeList);
|
||||||
if (opNum != 1) {
|
if (opNum != 1) {
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
code = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
||||||
if (qSubPlanToString(plan, pDst, pDstLen) < 0) {
|
if (qSubPlanToString(plan, pDst, pDstLen) < 0) {
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
code = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
END:
|
END:
|
||||||
if (pAst) nodesDestroyNode(pAst);
|
if (pAst) nodesDestroyNode(pAst);
|
||||||
if (pPlan) nodesDestroyNode((SNode*)pPlan);
|
if (pPlan) nodesDestroyNode((SNode*)pPlan);
|
||||||
return terrno;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
|
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
|
||||||
|
@ -127,6 +127,7 @@ int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
|
||||||
|
|
||||||
int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList,
|
int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList,
|
||||||
SStreamTask* pTask) {
|
SStreamTask* pTask) {
|
||||||
|
int32_t code = 0;
|
||||||
bool isShuffle = false;
|
bool isShuffle = false;
|
||||||
|
|
||||||
if (pStream->fixedSinkVgId == 0) {
|
if (pStream->fixedSinkVgId == 0) {
|
||||||
|
@ -135,9 +136,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr
|
||||||
isShuffle = true;
|
isShuffle = true;
|
||||||
pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH;
|
pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH;
|
||||||
pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
|
pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
|
||||||
if (mndExtractDbInfo(pMnode, pDb, &pTask->outputInfo.shuffleDispatcher.dbInfo, NULL) < 0) {
|
TAOS_CHECK_RETURN(mndExtractDbInfo(pMnode, pDb, &pTask->outputInfo.shuffleDispatcher.dbInfo, NULL));
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbRelease(pMnode->pSdb, pDb);
|
sdbRelease(pMnode->pSdb, pDb);
|
||||||
|
@ -166,7 +165,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr
|
||||||
streamTaskSetFixedDownstreamInfo(pTask, pOneSinkTask);
|
streamTaskSetFixedDownstreamInfo(pTask, pOneSinkTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
|
int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
|
||||||
|
@ -639,14 +638,15 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
|
||||||
|
|
||||||
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, int64_t skey,
|
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, int64_t skey,
|
||||||
SArray* pVerList) {
|
SArray* pVerList) {
|
||||||
|
int32_t code = 0;
|
||||||
SSdb* pSdb = pMnode->pSdb;
|
SSdb* pSdb = pMnode->pSdb;
|
||||||
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
|
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
|
||||||
bool hasExtraSink = false;
|
bool hasExtraSink = false;
|
||||||
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
|
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
|
||||||
SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
|
SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
|
||||||
if (pDbObj == NULL) {
|
if (pDbObj == NULL) {
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
code = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
|
bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
|
||||||
|
@ -670,9 +670,11 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
|
||||||
|
|
||||||
SSubplan* plan = getScanSubPlan(pPlan); // source plan
|
SSubplan* plan = getScanSubPlan(pPlan); // source plan
|
||||||
if (plan == NULL) {
|
if (plan == NULL) {
|
||||||
return terrno;
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
int32_t code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, numOfPlanLevel == 1);
|
code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, numOfPlanLevel == 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -688,7 +690,9 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
|
||||||
if (numOfPlanLevel == 3) {
|
if (numOfPlanLevel == 3) {
|
||||||
plan = getAggSubPlan(pPlan, 1); // middle agg plan
|
plan = getAggSubPlan(pPlan, 1); // middle agg plan
|
||||||
if (plan == NULL) {
|
if (plan == NULL) {
|
||||||
return terrno;
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
do {
|
do {
|
||||||
SArray** list = taosArrayGetLast(pStream->tasks);
|
SArray** list = taosArrayGetLast(pStream->tasks);
|
||||||
|
@ -715,7 +719,9 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
|
||||||
|
|
||||||
plan = getAggSubPlan(pPlan, 0);
|
plan = getAggSubPlan(pPlan, 0);
|
||||||
if (plan == NULL) {
|
if (plan == NULL) {
|
||||||
return terrno;
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("doScheduleStream add final agg");
|
mDebug("doScheduleStream add final agg");
|
||||||
|
@ -724,7 +730,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
|
||||||
addNewTaskList(pStream);
|
addNewTaskList(pStream);
|
||||||
code = addAggTask(pStream, pMnode, plan, pEpset, true);
|
code = addAggTask(pStream, pMnode, plan, pEpset, true);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
bindTwoLevel(pStream->tasks, 0, size);
|
bindTwoLevel(pStream->tasks, 0, size);
|
||||||
if (pStream->conf.fillHistory) {
|
if (pStream->conf.fillHistory) {
|
||||||
|
@ -735,26 +741,28 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
|
||||||
if (pStream->conf.fillHistory) {
|
if (pStream->conf.fillHistory) {
|
||||||
bindAggSink(pStream, pMnode, pStream->pHTasksList);
|
bindAggSink(pStream, pMnode, pStream->pHTasksList);
|
||||||
}
|
}
|
||||||
return TDB_CODE_SUCCESS;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t skey, SArray* pVgVerList) {
|
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t skey, SArray* pVgVerList) {
|
||||||
|
int32_t code = 0;
|
||||||
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
|
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
|
||||||
if (pPlan == NULL) {
|
if (pPlan == NULL) {
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
code = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
SEpSet mnodeEpset = {0};
|
SEpSet mnodeEpset = {0};
|
||||||
mndGetMnodeEpSet(pMnode, &mnodeEpset);
|
mndGetMnodeEpSet(pMnode, &mnodeEpset);
|
||||||
|
|
||||||
int32_t code = doScheduleStream(pStream, pMnode, pPlan, &mnodeEpset, skey, pVgVerList);
|
code = doScheduleStream(pStream, pMnode, pPlan, &mnodeEpset, skey, pVgVerList);
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
|
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
|
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
|
||||||
|
int32_t code = 0;
|
||||||
SSdb* pSdb = pMnode->pSdb;
|
SSdb* pSdb = pMnode->pSdb;
|
||||||
SVgObj* pVgroup = NULL;
|
SVgObj* pVgroup = NULL;
|
||||||
SQueryPlan* pPlan = NULL;
|
SQueryPlan* pPlan = NULL;
|
||||||
|
@ -763,21 +771,21 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
||||||
if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
pPlan = qStringToQueryPlan(pTopic->physicalPlan);
|
pPlan = qStringToQueryPlan(pTopic->physicalPlan);
|
||||||
if (pPlan == NULL) {
|
if (pPlan == NULL) {
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
code = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
} else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL) {
|
} else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL) {
|
||||||
SNode* pAst = NULL;
|
SNode* pAst = NULL;
|
||||||
if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
|
if ((code = nodesStringToNode(pTopic->ast, &pAst)) != 0) {
|
||||||
mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
|
mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
|
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
|
||||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
|
if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) != 0) {
|
||||||
mError("failed to create topic:%s since %s", pTopic->name, terrstr());
|
mError("failed to create topic:%s since %s", pTopic->name, terrstr());
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
}
|
}
|
||||||
|
@ -786,8 +794,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
||||||
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
|
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
|
||||||
if (levelNum != 1) {
|
if (levelNum != 1) {
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
|
code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
|
SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
|
||||||
|
@ -795,8 +803,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
||||||
int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
|
int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
|
||||||
if (opNum != 1) {
|
if (opNum != 1) {
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
|
code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
|
pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
|
||||||
|
@ -831,13 +839,13 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
||||||
|
|
||||||
if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) {
|
if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) {
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
code = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pSub->qmsg = taosStrdup("");
|
pSub->qmsg = taosStrdup("");
|
||||||
}
|
}
|
||||||
|
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue