Merge pull request #26741 from taosdata/fix/TD-30989-mnode10

fix/TD-30989
This commit is contained in:
Hongze Cheng 2024-07-23 15:29:14 +08:00 committed by GitHub
commit 9f0a2ac3ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 94 additions and 70 deletions

View File

@ -44,11 +44,16 @@ static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash); static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash);
static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) { static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) {
int32_t code = 0;
SSdbRaw *pCommitRaw = mndSubActionEncode(pSub); SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
if (pCommitRaw == NULL) return -1; if (pCommitRaw == NULL) {
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; if (terrno != 0) code = terrno;
return 0; TAOS_RETURN(code);
}
TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
TAOS_RETURN(code);
} }
int32_t mndInitSubscribe(SMnode *pMnode) { int32_t mndInitSubscribe(SMnode *pMnode) {
@ -75,6 +80,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
} }
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey) { static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey) {
int32_t code = 0;
SMqSubscribeObj *pSub = tNewSubscribeObj(subKey); SMqSubscribeObj *pSub = tNewSubscribeObj(subKey);
if (pSub == NULL) { if (pSub == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -86,7 +92,7 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
pSub->subType = pTopic->subType; pSub->subType = pTopic->subType;
pSub->withMeta = pTopic->withMeta; pSub->withMeta = pTopic->withMeta;
if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) { if ((terrno = mndSchedInitSubEp(pMnode, pTopic, pSub)) < 0) {
tDeleteSubscribeObj(pSub); tDeleteSubscribeObj(pSub);
taosMemoryFree(pSub); taosMemoryFree(pSub);
return NULL; return NULL;
@ -97,6 +103,7 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg, static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg,
SSubplan *pPlan) { SSubplan *pPlan) {
int32_t code = 0;
SMqRebVgReq req = {0}; SMqRebVgReq req = {0};
req.oldConsumerId = pRebVg->oldConsumerId; req.oldConsumerId = pRebVg->oldConsumerId;
req.newConsumerId = pRebVg->newConsumerId; req.newConsumerId = pRebVg->newConsumerId;
@ -106,8 +113,8 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj
pPlan->execNode.nodeId = pRebVg->pVgEp->vgId; pPlan->execNode.nodeId = pRebVg->pVgEp->vgId;
int32_t msgLen; int32_t msgLen;
if (qSubPlanToString(pPlan, &req.qmsg, &msgLen) < 0) { if (qSubPlanToString(pPlan, &req.qmsg, &msgLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT; code = TSDB_CODE_QRY_INVALID_INPUT;
return -1; TAOS_RETURN(code);
} }
} else { } else {
req.qmsg = taosStrdup(""); req.qmsg = taosStrdup("");
@ -122,7 +129,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj
tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret); tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret);
if (ret < 0) { if (ret < 0) {
taosMemoryFree(req.qmsg); taosMemoryFree(req.qmsg);
return -1; TAOS_RETURN(ret);
} }
tlen += sizeof(SMsgHead); tlen += sizeof(SMsgHead);
@ -130,7 +137,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(req.qmsg); taosMemoryFree(req.qmsg);
return -1; TAOS_RETURN(ret);
} }
SMsgHead *pMsgHead = (SMsgHead *)buf; SMsgHead *pMsgHead = (SMsgHead *)buf;
@ -140,40 +147,41 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen); tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
if (tEncodeSMqRebVgReq(&encoder, &req) < 0) { if ((code = tEncodeSMqRebVgReq(&encoder, &req)) < 0) {
taosMemoryFreeClear(buf); taosMemoryFreeClear(buf);
tEncoderClear(&encoder); tEncoderClear(&encoder);
taosMemoryFree(req.qmsg); taosMemoryFree(req.qmsg);
return -1; TAOS_RETURN(code);
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
*pBuf = buf; *pBuf = buf;
*pLen = tlen; *pLen = tlen;
taosMemoryFree(req.qmsg); taosMemoryFree(req.qmsg);
return 0; TAOS_RETURN(code);
} }
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub, static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
const SMqRebOutputVg *pRebVg, SSubplan *pPlan) { const SMqRebOutputVg *pRebVg, SSubplan *pPlan) {
int32_t code = 0;
if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
if (pRebVg->oldConsumerId == -1) return 0; // drop stream, no consumer, while split vnode,all consumerId is -1 if (pRebVg->oldConsumerId == -1) return 0; // drop stream, no consumer, while split vnode,all consumerId is -1
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; code = TSDB_CODE_MND_INVALID_SUB_OPTION;
return -1; TAOS_RETURN(code);
} }
void *buf; void *buf;
int32_t tlen; int32_t tlen;
if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan) < 0) { if ((code = mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan)) < 0) {
return -1; TAOS_RETURN(code);
} }
int32_t vgId = pRebVg->pVgEp->vgId; int32_t vgId = pRebVg->pVgEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
if (pVgObj == NULL) { if (pVgObj == NULL) {
taosMemoryFree(buf); taosMemoryFree(buf);
terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST; code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
return -1; TAOS_RETURN(code);
} }
STransAction action = {0}; STransAction action = {0};
@ -183,11 +191,11 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubsc
action.msgType = TDMT_VND_TMQ_SUBSCRIBE; action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
mndReleaseVgroup(pMnode, pVgObj); mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
taosMemoryFree(buf); taosMemoryFree(buf);
return -1; TAOS_RETURN(code);
} }
return 0; TAOS_RETURN(code);
} }
static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) { static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
@ -209,6 +217,7 @@ static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup,
} }
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
terrno = 0;
SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1); SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
if (pRebInfo == NULL) { if (pRebInfo == NULL) {
pRebInfo = tNewSMqRebSubscribe(key); pRebInfo = tNewSMqRebSubscribe(key);
@ -612,7 +621,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
if (strcmp(pOutput->pSub->qmsg, "") != 0) { if (strcmp(pOutput->pSub->qmsg, "") != 0) {
code = qStringToSubplan(pOutput->pSub->qmsg, &pPlan); code = qStringToSubplan(pOutput->pSub->qmsg, &pPlan);
if (code != 0) { if (code != 0) {
terrno = code;
goto END; goto END;
} }
} }
@ -623,7 +631,8 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb"); pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
if (pTrans == NULL) { if (pTrans == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto END; goto END;
} }
@ -665,7 +674,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
END: END:
nodesDestroyNode((SNode *)pPlan); nodesDestroyNode((SNode *)pPlan);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; TAOS_RETURN(code);
} }
static void freeRebalanceItem(void *param) { static void freeRebalanceItem(void *param) {
@ -827,6 +836,7 @@ static void checkConsumer(SMnode *pMnode, SMqSubscribeObj *pSub) {
} }
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) { static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
int32_t code = 0;
const char *key = rebInput->pRebInfo->key; const char *key = rebInput->pRebInfo->key;
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, key); SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, key);
@ -838,8 +848,10 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
if (pTopic == NULL) { if (pTopic == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
mError("[rebalance] mq rebalance %s ignored since topic %s doesn't exist", key, topic); mError("[rebalance] mq rebalance %s ignored since topic %s doesn't exist", key, topic);
return -1; TAOS_RETURN(code);
} }
taosRLockLatch(&pTopic->lock); taosRLockLatch(&pTopic->lock);
@ -848,10 +860,12 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu
rebOutput->pSub = mndCreateSubscription(pMnode, pTopic, key); rebOutput->pSub = mndCreateSubscription(pMnode, pTopic, key);
if (rebOutput->pSub == NULL) { if (rebOutput->pSub == NULL) {
mError("[rebalance] mq rebalance %s failed create sub since %s, ignore", key, terrstr()); code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
mError("[rebalance] mq rebalance %s failed create sub since %s, ignore", key, tstrerror(code));
taosRUnLockLatch(&pTopic->lock); taosRUnLockLatch(&pTopic->lock);
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
return -1; TAOS_RETURN(code);
} }
memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN); memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
@ -869,7 +883,7 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu
mInfo("[rebalance] sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum); mInfo("[rebalance] sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum);
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
} }
return 0; TAOS_RETURN(code);
} }
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
@ -885,6 +899,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
if (rebSubHash == NULL) { if (rebSubHash == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
if (terrno != 0) code = terrno;
goto END; goto END;
} }
taosHashSetFreeFp(rebSubHash, freeRebalanceItem); taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
@ -915,8 +930,8 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
mndDoRebalance(pMnode, &rebInput, &rebOutput); mndDoRebalance(pMnode, &rebInput, &rebOutput);
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) != 0) { if ((code = mndPersistRebResult(pMnode, pMsg, &rebOutput)) != 0) {
mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", terrstr()) mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code))
} }
clearRebOutput(&rebOutput); clearRebOutput(&rebOutput);
@ -931,7 +946,7 @@ END:
taosHashCleanup(rebSubHash); taosHashCleanup(rebSubHash);
mndRebCntDec(); mndRebCntDec();
return code; TAOS_RETURN(code);
} }
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) { static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) {
@ -950,8 +965,7 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran
} }
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
if (pReq == NULL) { if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; ret = TSDB_CODE_OUT_OF_MEMORY;
ret = -1;
goto END; goto END;
} }
pReq->head.vgId = htonl(pVgObj->vgId); pReq->head.vgId = htonl(pVgObj->vgId);
@ -968,15 +982,14 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran
action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST; action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
sdbRelease(pMnode->pSdb, pVgObj); sdbRelease(pMnode->pSdb, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if ((ret = mndTransAppendRedoAction(pTrans, &action)) != 0) {
ret = -1;
goto END; goto END;
} }
} }
END: END:
sdbRelease(pMnode->pSdb, pVgObj); sdbRelease(pMnode->pSdb, pVgObj);
sdbCancelFetch(pMnode->pSdb, pIter); sdbCancelFetch(pMnode->pSdb, pIter);
return ret; TAOS_RETURN(ret);
} }
static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) { static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) {
@ -1012,7 +1025,7 @@ static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgro
END: END:
sdbRelease(pMnode->pSdb, pConsumer); sdbRelease(pMnode->pSdb, pConsumer);
sdbCancelFetch(pMnode->pSdb, pIter); sdbCancelFetch(pMnode->pSdb, pIter);
return ret; TAOS_RETURN(ret);
} }
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
@ -1022,8 +1035,8 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_ACTION_IN_PROGRESS; int32_t code = TSDB_CODE_ACTION_IN_PROGRESS;
if (tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { if (tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
return -1; TAOS_RETURN(code);
} }
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, dropReq.cgroup, dropReq.topic); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, dropReq.cgroup, dropReq.topic);
@ -1032,24 +1045,24 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic); mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
return 0; return 0;
} else { } else {
terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST; code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, terrstr()); mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, tstrerror(code));
return -1; TAOS_RETURN(code);
} }
} }
taosWLockLatch(&pSub->lock); taosWLockLatch(&pSub->lock);
if (taosHashGetSize(pSub->consumerHash) != 0) { if (taosHashGetSize(pSub->consumerHash) != 0) {
terrno = TSDB_CODE_MND_CGROUP_USED; code = TSDB_CODE_MND_CGROUP_USED;
mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, tstrerror(code));
code = -1;
goto end; goto end;
} }
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "drop-cgroup"); pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "drop-cgroup");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); code = TSDB_CODE_MND_RETURN_VALUE_NULL;
code = -1; if (terrno != 0) code = terrno;
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, tstrerror(code));
goto end; goto end;
} }
@ -1088,9 +1101,9 @@ end:
if (code != 0) { if (code != 0) {
mError("cgroup %s on topic:%s, failed to drop", dropReq.cgroup, dropReq.topic); mError("cgroup %s on topic:%s, failed to drop", dropReq.cgroup, dropReq.topic);
return code; TAOS_RETURN(code);
} }
return TSDB_CODE_ACTION_IN_PROGRESS; TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS);
} }
void mndCleanupSubscribe(SMnode *pMnode) {} void mndCleanupSubscribe(SMnode *pMnode) {}
@ -1340,7 +1353,7 @@ END:
sdbRelease(pSdb, pSub); sdbRelease(pSdb, pSub);
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
return code; TAOS_RETURN(code);
} }
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char *topic, static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char *topic,

View File

@ -123,15 +123,16 @@ _OUT:
} }
static int32_t mndTransValidateImp(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransValidateImp(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
if (pTrans->stage == TRN_STAGE_PREPARE) { if (pTrans->stage == TRN_STAGE_PREPARE) {
if (mndTransCheckConflict(pMnode, pTrans) < 0) { if ((code = mndTransCheckConflict(pMnode, pTrans)) < 0) {
mError("trans:%d, failed to validate trans conflicts.", pTrans->id); mError("trans:%d, failed to validate trans conflicts.", pTrans->id);
return -1; TAOS_RETURN(code);
} }
return mndTransValidatePrepareStage(pMnode, pTrans); return mndTransValidatePrepareStage(pMnode, pTrans);
} }
return 0; TAOS_RETURN(code);
} }
static int32_t mndTransValidate(SMnode *pMnode, SSdbRaw *pRaw) { static int32_t mndTransValidate(SMnode *pMnode, SSdbRaw *pRaw) {
@ -139,10 +140,18 @@ static int32_t mndTransValidate(SMnode *pMnode, SSdbRaw *pRaw) {
int32_t code = -1; int32_t code = -1;
SSdbRow *pRow = mndTransDecode(pRaw); SSdbRow *pRow = mndTransDecode(pRaw);
if (pRow == NULL) goto _OUT; if (pRow == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OUT;
}
pTrans = sdbGetRowObj(pRow); pTrans = sdbGetRowObj(pRow);
if (pTrans == NULL) goto _OUT; if (pTrans == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OUT;
}
code = mndTransValidateImp(pMnode, pTrans); code = mndTransValidateImp(pMnode, pTrans);
@ -150,11 +159,10 @@ _OUT:
if (pTrans) mndTransDropData(pTrans); if (pTrans) mndTransDropData(pTrans);
if (pRow) taosMemoryFreeClear(pRow); if (pRow) taosMemoryFreeClear(pRow);
if (code) terrno = (terrno ? terrno : TSDB_CODE_MND_TRANS_CONFLICT); if (code) terrno = (terrno ? terrno : TSDB_CODE_MND_TRANS_CONFLICT);
return code; TAOS_RETURN(code);
} }
int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
terrno = TSDB_CODE_SUCCESS;
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont; SSdbRaw *pRaw = pMsg->pCont;
STrans *pTrans = NULL; STrans *pTrans = NULL;
@ -163,7 +171,7 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
if (transId <= 0) { if (transId <= 0) {
mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq); mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq);
terrno = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto _OUT; goto _OUT;
} }
@ -176,7 +184,7 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
if (code != 0) { if (code != 0) {
mError("trans:%d, failed to validate requested trans since %s", transId, terrstr()); mError("trans:%d, failed to validate requested trans since %s", transId, terrstr());
code = 0; code = 0;
pMeta->code = terrno; pMeta->code = code;
goto _OUT; goto _OUT;
} }
@ -184,13 +192,15 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
if (code != 0) { if (code != 0) {
mError("trans:%d, failed to write to sdb since %s", transId, terrstr()); mError("trans:%d, failed to write to sdb since %s", transId, terrstr());
code = 0; code = 0;
pMeta->code = terrno; pMeta->code = code;
goto _OUT; goto _OUT;
} }
pTrans = mndAcquireTrans(pMnode, transId); pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("trans:%d, not found while execute in mnode since %s", transId, terrstr()); code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
mError("trans:%d, not found while execute in mnode since %s", transId, tstrerror(code));
goto _OUT; goto _OUT;
} }
@ -207,7 +217,7 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
_OUT: _OUT:
if (pTrans) mndReleaseTrans(pMnode, pTrans); if (pTrans) mndReleaseTrans(pMnode, pTrans);
return code; TAOS_RETURN(code);
} }
static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) { static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
@ -258,7 +268,7 @@ _OUT:
mndPostMgmtCode(pMnode, code ? code : pMeta->code); mndPostMgmtCode(pMnode, code ? code : pMeta->code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL; pMsg->pCont = NULL;
return code; TAOS_RETURN(code);
} }
SyncIndex mndSyncAppliedIndex(const SSyncFSM *pFSM) { SyncIndex mndSyncAppliedIndex(const SSyncFSM *pFSM) {
@ -466,16 +476,18 @@ int32_t mndInitSync(SMnode *pMnode) {
pNode->clusterId); pNode->clusterId);
} }
int32_t code = 0;
tsem_init(&pMgmt->syncSem, 0, 0); tsem_init(&pMgmt->syncSem, 0, 0);
pMgmt->sync = syncOpen(&syncInfo, true); pMgmt->sync = syncOpen(&syncInfo, true);
if (pMgmt->sync <= 0) { if (pMgmt->sync <= 0) {
mError("failed to open sync since %s", terrstr()); if (terrno != 0) code = terrno;
return -1; mError("failed to open sync since %s", tstrerror(code));
TAOS_RETURN(code);
} }
pMnode->pSdb->sync = pMgmt->sync; pMnode->pSdb->sync = pMgmt->sync;
mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync); mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
return 0; TAOS_RETURN(code);
} }
void mndCleanupSync(SMnode *pMnode) { void mndCleanupSync(SMnode *pMnode) {
@ -518,10 +530,10 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
if (req.contLen <= 0) return -1; if (req.contLen <= 0) return TSDB_CODE_OUT_OF_MEMORY;
req.pCont = rpcMallocCont(req.contLen); req.pCont = rpcMallocCont(req.contLen);
if (req.pCont == NULL) return -1; if (req.pCont == NULL) return TSDB_CODE_OUT_OF_MEMORY;
memcpy(req.pCont, pRaw, req.contLen); memcpy(req.pCont, pRaw, req.contLen);
taosThreadMutexLock(&pMgmt->lock); taosThreadMutexLock(&pMgmt->lock);
@ -531,8 +543,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
taosThreadMutexUnlock(&pMgmt->lock); taosThreadMutexUnlock(&pMgmt->lock);
rpcFreeCont(req.pCont); rpcFreeCont(req.pCont);
terrno = TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED; TAOS_RETURN(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED);
return terrno;
} }
mInfo("trans:%d, will be proposed", transId); mInfo("trans:%d, will be proposed", transId);