Merge pull request #21177 from taosdata/fix/TD-23972
fix:[TD-23972] push subscribe msg to vnode even though consumer not c…
This commit is contained in:
commit
1f973123c4
|
@ -133,10 +133,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
|
|||
|
||||
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub,
|
||||
const SMqRebOutputVg *pRebVg) {
|
||||
if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
|
||||
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
|
||||
return -1;
|
||||
}
|
||||
// if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
|
||||
// terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
|
||||
// return -1;
|
||||
// }
|
||||
|
||||
void *buf;
|
||||
int32_t tlen;
|
||||
|
@ -269,6 +269,18 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
|
|||
}
|
||||
}
|
||||
|
||||
static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){
|
||||
for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){
|
||||
SMqVgEp *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
|
||||
SMqRebOutputVg outputVg = {
|
||||
.oldConsumerId = pConsumerEp->consumerId,
|
||||
.newConsumerId = pConsumerEp->consumerId,
|
||||
.pVgEp = pVgEp,
|
||||
};
|
||||
taosArrayPush(pOutput->rebVgs, &outputVg);
|
||||
}
|
||||
}
|
||||
|
||||
static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
|
||||
int32_t imbConsumerNum) {
|
||||
const char *pSubKey = pOutput->pSub->key;
|
||||
|
@ -290,10 +302,6 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
|
|||
taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId);
|
||||
if (consumerVgNum > minVgCnt) {
|
||||
if (imbCnt < imbConsumerNum) {
|
||||
if (consumerVgNum == minVgCnt + 1) {
|
||||
imbCnt++;
|
||||
continue;
|
||||
} else {
|
||||
// pop until equal minVg + 1
|
||||
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {
|
||||
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
|
||||
|
@ -307,7 +315,6 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
|
|||
pConsumerEp->consumerId);
|
||||
}
|
||||
imbCnt++;
|
||||
}
|
||||
} else {
|
||||
// all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg
|
||||
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
|
||||
|
@ -323,6 +330,7 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
|
|||
}
|
||||
}
|
||||
}
|
||||
putNoTransferToOutput(pOutput, pConsumerEp);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -445,6 +445,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t
|
|||
}
|
||||
|
||||
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
int ret = 0;
|
||||
SMqRebVgReq req = {0};
|
||||
tDecodeSMqRebVgReq(msg, &req);
|
||||
|
||||
|
@ -463,8 +464,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
|
||||
if (req.newConsumerId == -1) {
|
||||
tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
|
||||
taosMemoryFree(req.qmsg);
|
||||
return 0;
|
||||
goto end;
|
||||
}
|
||||
|
||||
STqHandle tqHandle = {0};
|
||||
|
@ -481,8 +481,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
// TODO version should be assigned and refed during preprocess
|
||||
SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
|
||||
if (pRef == NULL) {
|
||||
taosMemoryFree(req.qmsg);
|
||||
return -1;
|
||||
ret = -1;
|
||||
goto end;
|
||||
}
|
||||
|
||||
int64_t ver = pRef->refVer;
|
||||
|
@ -534,21 +534,19 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||
tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
|
||||
pHandle->consumerId, oldConsumerId);
|
||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||
taosMemoryFree(req.qmsg);
|
||||
return -1;
|
||||
}
|
||||
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||
goto end;
|
||||
} else {
|
||||
if (pHandle->consumerId == req.newConsumerId) { // do nothing
|
||||
tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
|
||||
atomic_store_32(&pHandle->epoch, -1);
|
||||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||
taosMemoryFree(req.qmsg);
|
||||
return tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||
|
||||
} else {
|
||||
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
||||
req.newConsumerId);
|
||||
|
||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||
atomic_store_32(&pHandle->epoch, 0);
|
||||
}
|
||||
// kill executing task
|
||||
qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
|
||||
if (pTaskInfo != NULL) {
|
||||
|
@ -556,27 +554,22 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
}
|
||||
|
||||
taosWLockLatch(&pTq->lock);
|
||||
atomic_store_32(&pHandle->epoch, 0);
|
||||
|
||||
// remove if it has been register in the push manager, and return one empty block to consumer
|
||||
tqUnregisterPushHandle(pTq, pHandle);
|
||||
|
||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
qStreamCloseTsdbReader(pTaskInfo);
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||
taosMemoryFree(req.qmsg);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||
goto end;
|
||||
}
|
||||
|
||||
end:
|
||||
taosMemoryFree(req.qmsg);
|
||||
return 0;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||
|
|
|
@ -165,12 +165,19 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
|
||||
uint64_t consumerId = pRequest->consumerId;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
int code = 0;
|
||||
|
||||
SMqDataRsp dataRsp = {0};
|
||||
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
||||
qTaskInfo_t task = pHandle->execHandle.task;
|
||||
if(qTaskIsExecuting(task)){
|
||||
code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
return code;
|
||||
}
|
||||
|
||||
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
||||
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
||||
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
||||
if(code != 0) {
|
||||
goto end;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue