fix:[TD-25651] reset epoch if consumer changed to avoid consumeing no data

This commit is contained in:
wangmm0220 2023-08-24 16:38:53 +08:00
parent 5445e836de
commit b83cc11043
1 changed files with 1 additions and 13 deletions

View File

@ -879,20 +879,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
} else {
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
// atomic_add_fetch_32(&pHandle->epoch, 1);
atomic_store_32(&pHandle->epoch, 0);
// kill executing task
// if(tqIsHandleExec(pHandle)) {
// qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
// if (pTaskInfo != NULL) {
// qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
// }
// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
// qStreamCloseTsdbReader(pTaskInfo);
// }
// }
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle(pTq, pHandle);
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
}