fix:error in optimize consume logic
This commit is contained in:
parent
f7b7320f5f
commit
c07e563ecd
|
@ -76,13 +76,13 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
||||||
SSDataBlock* pDataBlock = NULL;
|
SSDataBlock* pDataBlock = NULL;
|
||||||
uint64_t ts = 0;
|
uint64_t ts = 0;
|
||||||
|
|
||||||
tqDebug("vgId:%d, tmq task start to execute", pTq->pVnode->config.vgId);
|
tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task start execute", pHandle->consumerId, pTq->pVnode->config.vgId);
|
||||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||||
tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr());
|
tqError("consumer:0x%"PRIx64" vgId:%d, task exec error since %s", pHandle->consumerId, pTq->pVnode->config.vgId, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task executed, get %p", pHandle->consumerId, pTq->pVnode->config.vgId, pDataBlock);
|
tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task end execute, get block:%p", pHandle->consumerId, pTq->pVnode->config.vgId, pDataBlock);
|
||||||
|
|
||||||
// current scan should be stopped asap, since the rebalance occurs.
|
// current scan should be stopped asap, since the rebalance occurs.
|
||||||
if (pDataBlock == NULL) {
|
if (pDataBlock == NULL) {
|
||||||
|
@ -91,14 +91,11 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
||||||
|
|
||||||
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
|
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
|
||||||
pRsp->blockNum++;
|
pRsp->blockNum++;
|
||||||
|
|
||||||
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
|
||||||
rowCnt += pDataBlock->info.rows;
|
rowCnt += pDataBlock->info.rows;
|
||||||
if (rowCnt >= 4096) {
|
if (rowCnt >= 4096) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
qStreamExtractOffset(task, &pRsp->rspOffset);
|
qStreamExtractOffset(task, &pRsp->rspOffset);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -309,7 +309,7 @@ void tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
||||||
ret->offset.type = TMQ_OFFSET__LOG;
|
ret->offset.type = TMQ_OFFSET__LOG;
|
||||||
ret->offset.version = pReader->pWalReader->curVersion;
|
ret->offset.version = pReader->pWalReader->curVersion;
|
||||||
ret->fetchType = FETCH_TYPE__NONE;
|
ret->fetchType = FETCH_TYPE__NONE;
|
||||||
tqInfo("return offset %" PRId64 ", no more valid msg in wal", ret->offset.version);
|
tqInfo("wal return none, offset %" PRId64 ", no more valid msg in wal", ret->offset.version);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -327,7 +327,7 @@ void tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
ret->fetchType = FETCH_TYPE__DATA;
|
ret->fetchType = FETCH_TYPE__DATA;
|
||||||
tqDebug("return data rows %d", ret->data.info.rows);
|
tqDebug("wal return data rows %d", ret->data.info.rows);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -256,13 +256,9 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pFinalRes->info.rows == 0) {
|
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pFinalRes->info.rows == 0) {
|
||||||
pOperator->status = OP_OPENED;
|
pOperator->status = OP_OPENED;
|
||||||
if (pOperator->status == OP_EXEC_RECV) {
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
qDebug("set op close, exec mode:%d, status %d rows %d", pTaskInfo->execModel, pOperator->status,
|
||||||
qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status,
|
|
||||||
pFinalRes->info.rows);
|
pFinalRes->info.rows);
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -1627,13 +1627,15 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
if (ret.fetchType == FETCH_TYPE__DATA) {
|
if (ret.fetchType == FETCH_TYPE__DATA) {
|
||||||
blockDataCleanup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
setBlockIntoRes(pInfo, &ret.data, true);
|
setBlockIntoRes(pInfo, &ret.data, true);
|
||||||
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
|
qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
|
||||||
return pInfo->pRes;
|
return pInfo->pRes;
|
||||||
}else{
|
|
||||||
pTaskInfo->streamInfo.currentOffset = ret.offset;
|
|
||||||
}
|
}
|
||||||
|
}else if(ret.fetchType == FETCH_TYPE__NONE){
|
||||||
|
pTaskInfo->streamInfo.currentOffset = ret.offset;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type);
|
qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
Loading…
Reference in New Issue