fix:add task status check in doQueueScan & checkout consumer in tqProcessPollReq
This commit is contained in:
parent
bca1af3f5d
commit
238254e49e
|
@ -358,9 +358,19 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// 2. check re-balance status
|
||||
if (pHandle->consumerId != consumerId) {
|
||||
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
||||
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
|
||||
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
return -1;
|
||||
}
|
||||
|
||||
bool exec = tqIsHandleExec(pHandle);
|
||||
if(!exec) {
|
||||
tqSetHandleExec(pHandle);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
break;
|
||||
}
|
||||
|
@ -370,17 +380,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
taosMsleep(10);
|
||||
}
|
||||
|
||||
// 2. check re-balance status
|
||||
if (pHandle->consumerId != consumerId) {
|
||||
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
||||
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
|
||||
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
|
||||
|
||||
// 3. update the epoch value
|
||||
int32_t savedEpoch = pHandle->epoch;
|
||||
if (savedEpoch < reqEpoch) {
|
||||
|
@ -396,7 +395,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
|
||||
|
||||
end:
|
||||
tqSetHandleIdle(pHandle);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
|
||||
return code;
|
||||
|
|
|
@ -1665,37 +1665,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
|||
|
||||
qDebug("start to exec queue scan, %s", id);
|
||||
|
||||
#if 0
|
||||
if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
|
||||
if (pInfo->tqReader->msg.msgStr == NULL) {
|
||||
SPackedData submit = pTaskInfo->streamInfo.submit;
|
||||
if (tqReaderSetSubmitMsg(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
|
||||
qError("submit msg messed up when initing stream submit block %p", submit.msgStr);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||
|
||||
while (tqNextBlockImpl(pInfo->tqReader)) {
|
||||
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
setBlockIntoRes(pInfo, pInfo->tqReader->pResBlock, true);
|
||||
|
||||
if (pBlockInfo->rows > 0) {
|
||||
return pInfo->pRes;
|
||||
}
|
||||
}
|
||||
|
||||
pInfo->tqReader->msg = (SPackedData){0};
|
||||
pTaskInfo->streamInfo.submit = (SPackedData){0};
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
return NULL;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
||||
|
|
Loading…
Reference in New Issue