fix(tmq): avoid return delete msg for table subscription.
This commit is contained in:
parent
e1700d00dc
commit
30e7cb5860
|
@ -135,6 +135,7 @@ typedef struct {
|
||||||
// int8_t scanUncommited;
|
// int8_t scanUncommited;
|
||||||
int8_t scanNotApplied;
|
int8_t scanNotApplied;
|
||||||
int8_t scanMeta;
|
int8_t scanMeta;
|
||||||
|
int8_t deleteMsg;
|
||||||
int8_t enableRef;
|
int8_t enableRef;
|
||||||
} SWalFilterCond;
|
} SWalFilterCond;
|
||||||
|
|
||||||
|
|
|
@ -648,7 +648,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
SWalFilterCond cond = {.deleteMsg = 1};
|
||||||
|
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamSetupTrigger(pTask);
|
streamSetupTrigger(pTask);
|
||||||
|
|
|
@ -82,6 +82,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
||||||
SSDataBlock* pDataBlock = NULL;
|
SSDataBlock* pDataBlock = NULL;
|
||||||
uint64_t ts = 0;
|
uint64_t ts = 0;
|
||||||
qStreamSetOpen(task);
|
qStreamSetOpen(task);
|
||||||
|
|
||||||
tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
|
tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
|
||||||
if (qExecTask(task, &pDataBlock, &ts) != TSDB_CODE_SUCCESS) {
|
if (qExecTask(task, &pDataBlock, &ts) != TSDB_CODE_SUCCESS) {
|
||||||
tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr());
|
tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr());
|
||||||
|
|
|
@ -87,8 +87,9 @@ int32_t walNextValidMsg(SWalReader *pReader) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT || pReader->pHead->head.msgType == TDMT_VND_DELETE ||
|
int32_t type = pReader->pHead->head.msgType;
|
||||||
(IS_META_MSG(pReader->pHead->head.msgType) && pReader->cond.scanMeta)) {
|
if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
|
||||||
|
(IS_META_MSG(type) && pReader->cond.scanMeta)) {
|
||||||
if (walFetchBodyNew(pReader) < 0) {
|
if (walFetchBodyNew(pReader) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue