Merge pull request #21427 from taosdata/mark/tmq
fix:lost data because tsdbreader or task is killed
This commit is contained in:
commit
09a99142cb
|
@ -572,16 +572,16 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
// atomic_add_fetch_32(&pHandle->epoch, 1);
|
// atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||||
|
|
||||||
// kill executing task
|
// kill executing task
|
||||||
if(tqIsHandleExec(pHandle)) {
|
// if(tqIsHandleExec(pHandle)) {
|
||||||
qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
|
// qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
|
||||||
if (pTaskInfo != NULL) {
|
// if (pTaskInfo != NULL) {
|
||||||
qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
|
// qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
|
||||||
}
|
// }
|
||||||
|
|
||||||
// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
// qStreamCloseTsdbReader(pTaskInfo);
|
// qStreamCloseTsdbReader(pTaskInfo);
|
||||||
// }
|
// }
|
||||||
}
|
// }
|
||||||
// remove if it has been register in the push manager, and return one empty block to consumer
|
// remove if it has been register in the push manager, and return one empty block to consumer
|
||||||
tqUnregisterPushHandle(pTq, pHandle);
|
tqUnregisterPushHandle(pTq, pHandle);
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
|
Loading…
Reference in New Issue