fix: conflicts after refactor SRpcMsg
This commit is contained in:
parent
0623514ed1
commit
d7e895448d
|
@ -399,7 +399,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t newTopicNum = taosArrayGetSize(newSub);
|
int32_t newTopicNum = taosArrayGetSize(newSub);
|
||||||
// check topic existance
|
// check topic existance
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, pMsg);
|
||||||
if (pTrans == NULL) goto SUBSCRIBE_OVER;
|
if (pTrans == NULL) goto SUBSCRIBE_OVER;
|
||||||
|
|
||||||
for (int32_t i = 0; i < newTopicNum; i++) {
|
for (int32_t i = 0; i < newTopicNum; i++) {
|
||||||
|
|
|
@ -390,7 +390,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
|
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_REBALANCE, &pMsg->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_REBALANCE, pMsg);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -464,22 +464,24 @@ static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTo
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SMDropTopicReq dropReq = {0};
|
SMDropTopicReq dropReq = {0};
|
||||||
|
|
||||||
if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
}
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
|
||||||
if (pTopic->refConsumerCnt != 0) {
|
if (pTopic->refConsumerCnt != 0) {
|
||||||
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||||
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
|
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, pReq);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -529,8 +529,8 @@ typedef struct SUdfdRpcSendRecvInfo {
|
||||||
|
|
||||||
|
|
||||||
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->ahandle;
|
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle;
|
||||||
ASSERT(pMsg->ahandle != NULL);
|
ASSERT(pMsg->info.ahandle != NULL);
|
||||||
|
|
||||||
if (pEpSet) {
|
if (pEpSet) {
|
||||||
if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
|
if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
|
||||||
|
@ -609,7 +609,7 @@ int32_t udfdConnectToMNode() {
|
||||||
rpcMsg.msgType = TDMT_MND_CONNECT;
|
rpcMsg.msgType = TDMT_MND_CONNECT;
|
||||||
rpcMsg.pCont = pReq;
|
rpcMsg.pCont = pReq;
|
||||||
rpcMsg.contLen = contLen;
|
rpcMsg.contLen = contLen;
|
||||||
rpcMsg.ahandle = msgInfo;
|
rpcMsg.info.ahandle = msgInfo;
|
||||||
rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
|
rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
|
||||||
|
|
||||||
uv_sem_wait(&msgInfo->resultSem);
|
uv_sem_wait(&msgInfo->resultSem);
|
||||||
|
@ -639,7 +639,7 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
|
||||||
rpcMsg.pCont = pReq;
|
rpcMsg.pCont = pReq;
|
||||||
rpcMsg.contLen = contLen;
|
rpcMsg.contLen = contLen;
|
||||||
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
|
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
|
||||||
rpcMsg.ahandle = msgInfo;
|
rpcMsg.info.ahandle = msgInfo;
|
||||||
rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
|
rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
|
||||||
|
|
||||||
uv_sem_wait(&msgInfo->resultSem);
|
uv_sem_wait(&msgInfo->resultSem);
|
||||||
|
|
|
@ -1,12 +1,8 @@
|
||||||
system sh/stop_dnodes.sh
|
system sh/stop_dnodes.sh
|
||||||
|
|
||||||
system sh/deploy.sh -n dnode1 -i 1
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
system sh/cfg.sh -n dnode1 -c wallevel -v 2
|
|
||||||
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
|
|
||||||
|
|
||||||
print ========= start dnode1 as LEADER
|
print ========= start dnode1 as LEADER
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
sleep 2000
|
|
||||||
sql connect
|
sql connect
|
||||||
|
|
||||||
print ======== step1
|
print ======== step1
|
||||||
|
|
Loading…
Reference in New Issue