fix(tmq): add filter for taosX poll rsp.
This commit is contained in:
parent
041ffa91bd
commit
a4d9210b07
|
@ -1430,6 +1430,9 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
|||
clientVg.offsetInfo.committedOffset = offsetNew;
|
||||
clientVg.offsetInfo.walVerBegin = -1;
|
||||
clientVg.offsetInfo.walVerEnd = -1;
|
||||
clientVg.seekUpdated = false;
|
||||
clientVg.receivedInfoFromVnode = false;
|
||||
|
||||
taosArrayPush(pTopic->vgs, &clientVg);
|
||||
}
|
||||
}
|
||||
|
@ -1858,9 +1861,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
|
||||
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
|
||||
SMqClientVg* pVg = pollRspWrapper->vgHandle;
|
||||
if(pollRspWrapper->metaRsp.rspOffset.type != 0){ // if offset is validate
|
||||
if (!pVg->seekUpdated) {
|
||||
pVg->offsetInfo.currentOffset = pollRspWrapper->metaRsp.rspOffset;
|
||||
}
|
||||
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
// build rsp
|
||||
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
|
||||
|
@ -1878,9 +1882,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
|
||||
if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
|
||||
SMqClientVg* pVg = pollRspWrapper->vgHandle;
|
||||
if(pollRspWrapper->taosxRsp.rspOffset.type != 0){ // if offset is validate
|
||||
if (!pVg->seekUpdated) { // if offset is validate
|
||||
pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset;
|
||||
}
|
||||
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
|
||||
if (pollRspWrapper->taosxRsp.blockNum == 0) {
|
||||
|
|
|
@ -110,12 +110,6 @@ typedef struct {
|
|||
tq_handle_status status;
|
||||
} STqHandle;
|
||||
|
||||
//typedef struct {
|
||||
// SMqDataRsp* pDataRsp;
|
||||
// char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
// SRpcHandleInfo info;
|
||||
//} STqPushEntry;
|
||||
|
||||
struct STQ {
|
||||
SVnode* pVnode;
|
||||
char* path;
|
||||
|
@ -195,7 +189,6 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
|
|||
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
|
||||
int32_t type, int64_t sver, int64_t ever);
|
||||
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq);
|
||||
bool tqIsHandleExecuting(STqHandle* pHandle);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -307,13 +307,6 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (pOffset->val.type == TMQ_OFFSET__LOG) {
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
|
||||
if (pHandle && (walRefVer(pHandle->pRef, pOffset->val.version) < 0)) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -381,16 +374,6 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen)
|
|||
pOffset->val.version = ever;
|
||||
}
|
||||
|
||||
ASSERT(0);
|
||||
if (offset.val.type == TMQ_OFFSET__LOG) {
|
||||
taosWLockLatch(&pTq->lock);
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
|
||||
if (pHandle && (walSetRefVer(pHandle->pRef, offset.val.version) < 0)) {
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
return -1;
|
||||
}
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
|
||||
// save the new offset value
|
||||
if (pSavedOffset != NULL) {
|
||||
tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
|
||||
|
@ -404,7 +387,6 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen)
|
|||
return -1;
|
||||
}
|
||||
|
||||
walReaderSeekVer(pHandle->execHandle.pTqReader->pWalReader, vgOffset.offset.val.version);
|
||||
tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
|
||||
vgOffset.consumerId, vgOffset.offset.val.version);
|
||||
|
||||
|
@ -575,6 +557,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
|
||||
tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
|
||||
int32_t code = 0;
|
||||
|
@ -586,15 +569,11 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
|
||||
taosMsleep(5);
|
||||
}
|
||||
|
||||
if (pHandle->pRef) {
|
||||
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
|
||||
}
|
||||
|
||||
while (tqIsHandleExecuting(pHandle)) {
|
||||
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
|
||||
taosMsleep(5);
|
||||
}
|
||||
|
||||
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||
if (code != 0) {
|
||||
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
|
||||
|
|
Loading…
Reference in New Issue