Merge pull request #21227 from taosdata/fix/TD-24058

fix:[TD-24058]send poll result to client if no data 5 times to avoid …
This commit is contained in:
Hui Li 2023-05-10 11:12:20 +08:00 committed by GitHub
commit b1b85c3d95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 21 additions and 14 deletions

View File

@ -101,6 +101,7 @@ typedef struct {
STqPushHandle pushHandle; // push STqPushHandle pushHandle; // push
STqExecHandle execHandle; // exec STqExecHandle execHandle; // exec
SRpcMsg* msg; SRpcMsg* msg;
int32_t noDataPollCnt;
} STqHandle; } STqHandle;
typedef struct { typedef struct {

View File

@ -16,6 +16,7 @@
#include "tq.h" #include "tq.h"
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
#define NO_POLL_CNT 5
static int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp); static int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp);
@ -185,6 +186,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
// till now, all data has been transferred to consumer, new data needs to push client once arrived. // till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
if(pHandle->noDataPollCnt >= NO_POLL_CNT){ // send poll result to client if no data 5 times to avoid lost data
pHandle->noDataPollCnt = 0;
// lock // lock
taosWLockLatch(&pTq->lock); taosWLockLatch(&pTq->lock);
code = tqRegisterPushHandle(pTq, pHandle, pMsg); code = tqRegisterPushHandle(pTq, pHandle, pMsg);
@ -192,6 +195,10 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
tDeleteSMqDataRsp(&dataRsp); tDeleteSMqDataRsp(&dataRsp);
return code; return code;
} }
else{
pHandle->noDataPollCnt++;
}
}
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);

View File

@ -74,18 +74,17 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int64_t lastVer = walGetLastVer(pReader->pWal); int64_t lastVer = walGetLastVer(pReader->pWal);
int64_t committedVer = walGetCommittedVer(pReader->pWal); int64_t committedVer = walGetCommittedVer(pReader->pWal);
int64_t appliedVer = walGetAppliedVer(pReader->pWal); int64_t appliedVer = walGetAppliedVer(pReader->pWal);
while(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64", so sleep 1ms", pReader->pWal->cfg.vgId, appliedVer, committedVer); wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
taosMsleep(1); // taosMsleep(10);
appliedVer = walGetAppliedVer(pReader->pWal);
} }
// int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer; // int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
// endVer = TMIN(appliedVer, endVer); int64_t endVer = TMIN(appliedVer, committedVer);
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
", applied index:%" PRId64, ", applied index:%" PRId64", end index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer); pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
while (fetchVer <= committedVer) { while (fetchVer <= endVer) {
if (walFetchHeadNew(pReader, fetchVer) < 0) { if (walFetchHeadNew(pReader, fetchVer) < 0) {
return -1; return -1;
} }