fix:set task status killed when vnode receive subscribe msg from mnode
This commit is contained in:
parent
c7c255e967
commit
33d6df8717
|
@ -367,6 +367,20 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 3. update the epoch value
|
||||||
|
int32_t savedEpoch = pHandle->epoch;
|
||||||
|
if (savedEpoch <= reqEpoch) {
|
||||||
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
|
||||||
|
reqEpoch);
|
||||||
|
pHandle->epoch = reqEpoch;
|
||||||
|
}else {
|
||||||
|
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, savedEpoch:%d > reqEpoch:%d ",
|
||||||
|
consumerId, TD_VID(pTq->pVnode), req.subKey, savedEpoch, reqEpoch);
|
||||||
|
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
bool exec = tqIsHandleExec(pHandle);
|
bool exec = tqIsHandleExec(pHandle);
|
||||||
if(!exec) {
|
if(!exec) {
|
||||||
tqSetHandleExec(pHandle);
|
tqSetHandleExec(pHandle);
|
||||||
|
@ -381,14 +395,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. update the epoch value
|
|
||||||
int32_t savedEpoch = pHandle->epoch;
|
|
||||||
if (savedEpoch < reqEpoch) {
|
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
|
|
||||||
reqEpoch);
|
|
||||||
pHandle->epoch = reqEpoch;
|
|
||||||
}
|
|
||||||
|
|
||||||
char buf[80];
|
char buf[80];
|
||||||
tFormatOffset(buf, 80, &reqOffset);
|
tFormatOffset(buf, 80, &reqOffset);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
|
||||||
|
@ -396,6 +402,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
|
code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
|
||||||
tqSetHandleIdle(pHandle);
|
tqSetHandleIdle(pHandle);
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
|
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -561,17 +568,17 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||||
goto end;
|
goto end;
|
||||||
} else {
|
} else {
|
||||||
|
taosWLockLatch(&pTq->lock);
|
||||||
|
|
||||||
if (pHandle->consumerId == req.newConsumerId) { // do nothing
|
if (pHandle->consumerId == req.newConsumerId) { // do nothing
|
||||||
tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId, req.newConsumerId);
|
tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId, req.newConsumerId);
|
||||||
// atomic_add_fetch_32(&pHandle->epoch, 1);
|
|
||||||
} else {
|
} else {
|
||||||
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
||||||
req.newConsumerId);
|
req.newConsumerId);
|
||||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||||
// atomic_store_32(&pHandle->epoch, 0);
|
|
||||||
}
|
}
|
||||||
|
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||||
|
|
||||||
taosWLockLatch(&pTq->lock);
|
|
||||||
// kill executing task
|
// kill executing task
|
||||||
if(tqIsHandleExec(pHandle)) {
|
if(tqIsHandleExec(pHandle)) {
|
||||||
qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
|
qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
|
||||||
|
|
Loading…
Reference in New Issue