fix:offset encode assert error
This commit is contained in:
parent
1ee1b0422c
commit
cce868d140
|
@ -1790,8 +1790,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
pVg->epSet = *pollRspWrapper->pEpset;
|
pVg->epSet = *pollRspWrapper->pEpset;
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the local offset value only for the returned values.
|
if(pDataRsp->rspOffset.type != 0){ // if offset is validate
|
||||||
pVg->currentOffset = pDataRsp->rspOffset;
|
pVg->currentOffset = pDataRsp->rspOffset; // update the local offset value only for the returned values.
|
||||||
|
}
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
|
|
||||||
char buf[80];
|
char buf[80];
|
||||||
|
|
|
@ -6839,10 +6839,8 @@ int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal)
|
||||||
if (tEncodeI64(pEncoder, pOffsetVal->ts) < 0) return -1;
|
if (tEncodeI64(pEncoder, pOffsetVal->ts) < 0) return -1;
|
||||||
} else if (pOffsetVal->type == TMQ_OFFSET__LOG) {
|
} else if (pOffsetVal->type == TMQ_OFFSET__LOG) {
|
||||||
if (tEncodeI64(pEncoder, pOffsetVal->version) < 0) return -1;
|
if (tEncodeI64(pEncoder, pOffsetVal->version) < 0) return -1;
|
||||||
} else if (pOffsetVal->type < 0) {
|
|
||||||
// do nothing
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
// do nothing
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -6854,10 +6852,8 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) {
|
||||||
if (tDecodeI64(pDecoder, &pOffsetVal->ts) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pOffsetVal->ts) < 0) return -1;
|
||||||
} else if (pOffsetVal->type == TMQ_OFFSET__LOG) {
|
} else if (pOffsetVal->type == TMQ_OFFSET__LOG) {
|
||||||
if (tDecodeI64(pDecoder, &pOffsetVal->version) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pOffsetVal->version) < 0) return -1;
|
||||||
} else if (pOffsetVal->type < 0) {
|
|
||||||
// do nothing
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
// do nothing
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -114,7 +114,7 @@ struct STQ {
|
||||||
char* path;
|
char* path;
|
||||||
int64_t walLogLastVer;
|
int64_t walLogLastVer;
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
SHashObj* pPushMgr; // consumerId -> STqHandle
|
SHashObj* pPushMgr; // subKey -> STqHandle
|
||||||
SHashObj* pHandle; // subKey -> STqHandle
|
SHashObj* pHandle; // subKey -> STqHandle
|
||||||
SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
|
SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
|
||||||
STqOffsetStore* pOffsetStore;
|
STqOffsetStore* pOffsetStore;
|
||||||
|
|
|
@ -551,13 +551,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWLockLatch(&pTq->lock);
|
taosWLockLatch(&pTq->lock);
|
||||||
atomic_store_32(&pHandle->epoch, -1);
|
atomic_store_32(&pHandle->epoch, 0);
|
||||||
|
|
||||||
// remove if it has been register in the push manager, and return one empty block to consumer
|
// remove if it has been register in the push manager, and return one empty block to consumer
|
||||||
tqUnregisterPushHandle(pTq, pHandle);
|
tqUnregisterPushHandle(pTq, pHandle);
|
||||||
|
|
||||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
|
||||||
|
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
qStreamCloseTsdbReader(pTaskInfo);
|
qStreamCloseTsdbReader(pTaskInfo);
|
||||||
|
|
Loading…
Reference in New Issue