diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 36cc0f2665..457245e9a3 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -29,23 +29,23 @@ struct SSubplan; /** * Create the exec task for streaming mode * @param pMsg - * @param pStreamBlockReadHandle + * @param streamReadHandle * @return */ -qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle); +qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* streamReadHandle); -void qStreamExecTaskSetInput(qTaskInfo_t qHandle, void* input); +int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input); /** * Create the exec task object according to task json - * @param tsdb + * @param readHandle * @param vgId * @param pTaskInfoMsg * @param pTaskInfo * @param qId * @return */ -int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle); +int32_t qCreateExecTask(void* readHandle, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle); /** * The main task execution function, including query on both table and multiple tables, @@ -62,63 +62,63 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds); * this function will be blocked to wait for the query execution completed or paused, * in which case enough results have been produced already. * - * @param qinfo + * @param tinfo * @return */ -int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext); +int32_t qRetrieveQueryResultInfo(qTaskInfo_t tinfo, bool* buildRes, void* pRspContext); /** * * Retrieve the actual results to fill the response message payload. * Note that this function must be executed after qRetrieveQueryResultInfo is invoked. * - * @param qinfo qinfo object + * @param tinfo tinfo object * @param pRsp response message * @param contLen payload length * @return */ -//int32_t qDumpRetrieveResult(qTaskInfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec); +//int32_t qDumpRetrieveResult(qTaskInfo_t tinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec); /** * return the transporter context (RPC) - * @param qinfo + * @param tinfo * @return */ -void* qGetResultRetrieveMsg(qTaskInfo_t qinfo); +void* qGetResultRetrieveMsg(qTaskInfo_t tinfo); /** * kill the ongoing query and free the query handle and corresponding resources automatically - * @param qinfo qhandle + * @param tinfo qhandle * @return */ -int32_t qKillTask(qTaskInfo_t qinfo); +int32_t qKillTask(qTaskInfo_t tinfo); /** * kill the ongoing query asynchronously - * @param qinfo qhandle + * @param tinfo qhandle * @return */ -int32_t qAsyncKillTask(qTaskInfo_t qinfo); +int32_t qAsyncKillTask(qTaskInfo_t tinfo); /** * return whether query is completed or not - * @param qinfo + * @param tinfo * @return */ -int32_t qIsTaskCompleted(qTaskInfo_t qinfo); +int32_t qIsTaskCompleted(qTaskInfo_t tinfo); /** * destroy query info structure * @param qHandle */ -void qDestroyTask(qTaskInfo_t qHandle); +void qDestroyTask(qTaskInfo_t tinfo); /** * Get the queried table uid * @param qHandle * @return */ -int64_t qGetQueriedTableUid(qTaskInfo_t qHandle); +int64_t qGetQueriedTableUid(qTaskInfo_t tinfo); /** * Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks. @@ -145,7 +145,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t * @param type operation type: ADD|DROP * @return */ -int32_t qUpdateQueriedTableIdList(qTaskInfo_t qinfo, int64_t uid, int32_t type); +int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type); //================================================================================================ // query handle management diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 49bf42f383..ccc1620264 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -14,12 +14,50 @@ */ #include "executor.h" +#include "tq.h" +#include "executorimpl.h" #include "planner.h" -void qStreamExecTaskSetInput(qTaskInfo_t qHandle, void* input) {} +static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input) { + ASSERT(pOperator != NULL); + if (pOperator->operatorType != OP_StreamScan) { + if (pOperator->numOfDownstream > 0) { -qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* pStreamBlockReadHandle) { - if (pMsg == NULL || pStreamBlockReadHandle == NULL) { + if (pOperator->numOfDownstream > 1) { // not handle this in join query + return TSDB_CODE_QRY_APP_ERROR; + } + + return doSetStreamBlock(pOperator->pDownstream[0], input); + } + } else { + SStreamBlockScanInfo* pInfo = pOperator->info; + tqReadHandleSetMsg(pInfo->readerHandle, input, 0); + return TSDB_CODE_SUCCESS; + } +} + +int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) { + if (tinfo == NULL) { + return TSDB_CODE_QRY_APP_ERROR; + } + + if (input == NULL) { + return TSDB_CODE_SUCCESS; + } + + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo; + int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to set the stream block data, reqId:0x%"PRIx64, GET_TASKID(pTaskInfo)); + } else { + qDebug("set the stream block successfully, reqId:0x%"PRIx64, GET_TASKID(pTaskInfo)); + } + + return code; +} + +qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle) { + if (pMsg == NULL || streamReadHandle == NULL) { return NULL; } @@ -37,7 +75,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* pStreamBlockRead } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL); + code = qCreateExecTask(streamReadHandle, 0, plan, &pTaskInfo, NULL); if (code != TSDB_CODE_SUCCESS) { // TODO: destroy SSubplan & pTaskInfo terrno = code; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0ed480ed15..3b01c319e4 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5407,7 +5407,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt return pOperator; } -SOperatorInfo* createStreamBlockScanOperatorInfo(void *pStreamBlockHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { SStreamBlockScanInfo* pInfo = calloc(1, sizeof(SStreamBlockScanInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5417,10 +5417,21 @@ SOperatorInfo* createStreamBlockScanOperatorInfo(void *pStreamBlockHandle, int32 return NULL; } - pInfo->readerHandle = pStreamBlockHandle; + int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprInfo); + SArray* pColList = taosArrayInit(numOfOutput, sizeof(int32_t)); + for(int32_t i = 0; i < numOfOutput; ++i) { + SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); + + taosArrayPush(pColList, &pExpr->pExpr->pSchema[0].colId); + } + + // TODO set the extract column id to streamHandle + // pColList + + pInfo->readerHandle = streamReadHandle; pOperator->name = "StreamBlockScanOperator"; - pOperator->operatorType = OP_StreamBlockScan; + pOperator->operatorType = OP_StreamScan; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -7704,6 +7715,9 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask } else if (pPhyNode->info.type == OP_Exchange) { SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode; return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo); + } else if (pPhyNode->info.type == OP_StreamScan) { + size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); + return createStreamScanOperatorInfo(readerHandle, pPhyNode->pTargets, pTaskInfo); } }