fix:[TD-31017]process return value in mnode for tmq
This commit is contained in:
parent
44bfb1dddc
commit
02346072fc
|
@ -362,10 +362,9 @@ static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoc
|
||||||
pHead->walever = 0;
|
pHead->walever = 0;
|
||||||
|
|
||||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||||
code = tEncodeSMqAskEpRsp(&abuf, rsp);
|
if (tEncodeSMqAskEpRsp(&abuf, rsp) < 0) {
|
||||||
if (code != 0) {
|
|
||||||
rpcFreeCont(buf);
|
rpcFreeCont(buf);
|
||||||
return code;
|
return TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
// send rsp
|
// send rsp
|
||||||
|
|
|
@ -594,7 +594,9 @@ int32_t tCloneSubscribeObj(const SMqSubscribeObj *pSub, SMqSubscribeObj **ppSub)
|
||||||
pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
|
pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
|
||||||
(void)memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
|
(void)memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
|
||||||
pSubNew->qmsg = taosStrdup(pSub->qmsg);
|
pSubNew->qmsg = taosStrdup(pSub->qmsg);
|
||||||
|
if (ppSub) {
|
||||||
|
*ppSub = pSubNew;
|
||||||
|
}
|
||||||
END:
|
END:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -397,9 +397,11 @@ END:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
|
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
|
||||||
int32_t code = 0;
|
|
||||||
SMqSubscribeObj *pSub = NULL;
|
SMqSubscribeObj *pSub = NULL;
|
||||||
MND_TMQ_RETURN_CHECK(mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub)); // put all offset rows
|
int32_t code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub); // put all offset rows
|
||||||
|
if( code != 0){
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
taosRLockLatch(&pSub->lock);
|
taosRLockLatch(&pSub->lock);
|
||||||
if (pOutput->pSub->offsetRows == NULL) {
|
if (pOutput->pSub->offsetRows == NULL) {
|
||||||
pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
|
pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
|
||||||
|
@ -858,12 +860,11 @@ END:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
|
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
|
||||||
int32_t code = 0;
|
|
||||||
const char *key = rebInput->pRebInfo->key;
|
const char *key = rebInput->pRebInfo->key;
|
||||||
SMqSubscribeObj *pSub = NULL;
|
SMqSubscribeObj *pSub = NULL;
|
||||||
MND_TMQ_RETURN_CHECK(mndAcquireSubscribeByKey(pMnode, key, &pSub));
|
int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
|
||||||
|
|
||||||
if (pSub == NULL) {
|
if (code != 0) {
|
||||||
// split sub key and extract topic
|
// split sub key and extract topic
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN] = {0};
|
char topic[TSDB_TOPIC_FNAME_LEN] = {0};
|
||||||
char cgroup[TSDB_CGROUP_LEN] = {0};
|
char cgroup[TSDB_CGROUP_LEN] = {0};
|
||||||
|
@ -893,7 +894,7 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu
|
||||||
taosRUnLockLatch(&pSub->lock);
|
taosRUnLockLatch(&pSub->lock);
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
code =checkConsumer(pMnode, rebOutput->pSub);
|
code = checkConsumer(pMnode, rebOutput->pSub);
|
||||||
if(code != 0){
|
if(code != 0){
|
||||||
taosRUnLockLatch(&pSub->lock);
|
taosRUnLockLatch(&pSub->lock);
|
||||||
goto END;
|
goto END;
|
||||||
|
|
Loading…
Reference in New Issue