optimize projection return
This commit is contained in:
parent
a2a8fe0211
commit
bb1165f1e9
|
@ -214,7 +214,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||||
tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s",
|
tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s",
|
||||||
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
|
TD_VID(pTq->pVnode), pPushEntry->rspHead.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -551,7 +551,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);
|
tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);
|
||||||
|
|
||||||
#if 1
|
#if 1
|
||||||
if (dataRsp.blockNum == 0 && dataRsp.rspOffset.type == TMQ_OFFSET__LOG &&
|
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
||||||
dataRsp.reqOffset.version == dataRsp.rspOffset.version) {
|
dataRsp.reqOffset.version == dataRsp.rspOffset.version) {
|
||||||
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
|
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
|
||||||
if (pPushEntry != NULL) {
|
if (pPushEntry != NULL) {
|
||||||
|
|
|
@ -210,9 +210,12 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
pOperator->status = OP_OPENED;
|
pOperator->status = OP_OPENED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qDebug("enter project");
|
||||||
|
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
|
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
|
||||||
pOperator->status = OP_OPENED;
|
pOperator->status = OP_OPENED;
|
||||||
|
qDebug("projection in queue model, set status open and return null");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -241,10 +244,13 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
// The downstream exec may change the value of the newgroup, so use a local variable instead.
|
// The downstream exec may change the value of the newgroup, so use a local variable instead.
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pOperator->status == OP_EXEC_RECV &&
|
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pFinalRes->info.rows == 0) {
|
||||||
pFinalRes->info.rows == 0) {
|
|
||||||
pOperator->status = OP_OPENED;
|
pOperator->status = OP_OPENED;
|
||||||
continue;
|
if (pOperator->status == OP_EXEC_RECV) {
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status,
|
qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status,
|
||||||
pFinalRes->info.rows);
|
pFinalRes->info.rows);
|
||||||
|
|
Loading…
Reference in New Issue