fix:[TD-24058]send poll result to client if no data 5 times to avoid lost data

This commit is contained in:
wangmm0220 2023-05-09 17:42:11 +08:00
parent f4f3b886fc
commit 587f750c94
3 changed files with 21 additions and 14 deletions

View File

@ -101,6 +101,7 @@ typedef struct {
STqPushHandle pushHandle; // push STqPushHandle pushHandle; // push
STqExecHandle execHandle; // exec STqExecHandle execHandle; // exec
SRpcMsg* msg; SRpcMsg* msg;
int32_t noDataPollCnt;
} STqHandle; } STqHandle;
typedef struct { typedef struct {

View File

@ -16,6 +16,7 @@
#include "tq.h" #include "tq.h"
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
#define NO_POLL_CNT 5
static int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp); static int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp);
@ -185,12 +186,18 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
// till now, all data has been transferred to consumer, new data needs to push client once arrived. // till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
// lock if(pHandle->noDataPollCnt >= NO_POLL_CNT){ // send poll result to client if no data 5 times to avoid lost data
taosWLockLatch(&pTq->lock); pHandle->noDataPollCnt = 0;
code = tqRegisterPushHandle(pTq, pHandle, pMsg); // lock
taosWUnLockLatch(&pTq->lock); taosWLockLatch(&pTq->lock);
tDeleteSMqDataRsp(&dataRsp); code = tqRegisterPushHandle(pTq, pHandle, pMsg);
return code; taosWUnLockLatch(&pTq->lock);
tDeleteSMqDataRsp(&dataRsp);
return code;
}
else{
pHandle->noDataPollCnt++;
}
} }

View File

@ -74,18 +74,17 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int64_t lastVer = walGetLastVer(pReader->pWal); int64_t lastVer = walGetLastVer(pReader->pWal);
int64_t committedVer = walGetCommittedVer(pReader->pWal); int64_t committedVer = walGetCommittedVer(pReader->pWal);
int64_t appliedVer = walGetAppliedVer(pReader->pWal); int64_t appliedVer = walGetAppliedVer(pReader->pWal);
while(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64", so sleep 1ms", pReader->pWal->cfg.vgId, appliedVer, committedVer); wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
taosMsleep(1); // taosMsleep(10);
appliedVer = walGetAppliedVer(pReader->pWal);
} }
// int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer; // int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
// endVer = TMIN(appliedVer, endVer); int64_t endVer = TMIN(appliedVer, committedVer);
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
", applied index:%" PRId64, ", applied index:%" PRId64", end index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer); pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
while (fetchVer <= committedVer) { while (fetchVer <= endVer) {
if (walFetchHeadNew(pReader, fetchVer) < 0) { if (walFetchHeadNew(pReader, fetchVer) < 0) {
return -1; return -1;
} }