[td-11818] refactor.
This commit is contained in:
parent
e1d9fa73b0
commit
ecdd6784f6
|
@ -635,7 +635,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
|
||||||
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
|
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
|
||||||
void* task = pHandle->buffer.output[pos].task;
|
void* task = pHandle->buffer.output[pos].task;
|
||||||
|
|
||||||
qStreamExecTaskSetInput(task, pCont);
|
qSetStreamInput(task, pCont);
|
||||||
SSDataBlock* pDataBlock;
|
SSDataBlock* pDataBlock;
|
||||||
uint64_t ts;
|
uint64_t ts;
|
||||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||||
|
|
|
@ -18,17 +18,20 @@
|
||||||
#include "executorimpl.h"
|
#include "executorimpl.h"
|
||||||
#include "planner.h"
|
#include "planner.h"
|
||||||
|
|
||||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input) {
|
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, uint64_t reqId) {
|
||||||
ASSERT(pOperator != NULL);
|
ASSERT(pOperator != NULL);
|
||||||
if (pOperator->operatorType != OP_StreamScan) {
|
if (pOperator->operatorType != OP_StreamScan) {
|
||||||
if (pOperator->numOfDownstream > 0) {
|
if (pOperator->numOfDownstream == 0) {
|
||||||
|
qError("failed to find stream scan operator to set the input data block, reqId:0x%" PRIx64, reqId);
|
||||||
if (pOperator->numOfDownstream > 1) { // not handle this in join query
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
return TSDB_CODE_QRY_APP_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
return doSetStreamBlock(pOperator->pDownstream[0], input);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pOperator->numOfDownstream > 1) { // not handle this in join query
|
||||||
|
qError("join not supported for stream block scan, reqId:0x%" PRIx64, reqId);
|
||||||
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
return doSetStreamBlock(pOperator->pDownstream[0], input, reqId);
|
||||||
} else {
|
} else {
|
||||||
SStreamBlockScanInfo* pInfo = pOperator->info;
|
SStreamBlockScanInfo* pInfo = pOperator->info;
|
||||||
tqReadHandleSetMsg(pInfo->readerHandle, input, 0);
|
tqReadHandleSetMsg(pInfo->readerHandle, input, 0);
|
||||||
|
@ -46,7 +49,8 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
|
||||||
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input);
|
|
||||||
|
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input, GET_TASKID(pTaskInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("failed to set the stream block data, reqId:0x%"PRIx64, GET_TASKID(pTaskInfo));
|
qError("failed to set the stream block data, reqId:0x%"PRIx64, GET_TASKID(pTaskInfo));
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue