diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 4ad7e2dfc2..1d78702bc2 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -54,6 +54,16 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle); */ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type); +/** + * Set multiple input data blocks for the stream scan. + * @param tinfo + * @param pBlocks + * @param numOfInputBlock + * @param type + * @return + */ +int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, void** pBlocks, size_t numOfBlocks, int32_t type); + /** * Update the table id list, add or remove. * @@ -86,16 +96,6 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, */ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds); -/** - * Retrieve the produced results information, if current query is not paused or completed, - * this function will be blocked to wait for the query execution completed or paused, - * in which case enough results have been produced already. - * - * @param tinfo - * @return - */ -int32_t qRetrieveQueryResultInfo(qTaskInfo_t tinfo, bool* buildRes, void* pRspContext); - /** * kill the ongoing query and free the query handle and corresponding resources automatically * @param tinfo qhandle @@ -158,50 +158,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t */ int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type); -//================================================================================================ -// query handle management -/** - * Query handle mgmt object - * @param vgId - * @return - */ -void* qOpenTaskMgmt(int32_t vgId); - -/** - * broadcast the close information and wait for all query stop. - * @param pExecutor - */ -void qTaskMgmtNotifyClosing(void* pExecutor); - -/** - * Re-open the query handle management module when opening the vnode again. - * @param pExecutor - */ -void qQueryMgmtReOpen(void* pExecutor); - -/** - * Close query mgmt and clean up resources. - * @param pExecutor - */ -void qCleanupTaskMgmt(void* pExecutor); - -/** - * Add the query into the query mgmt object - * @param pMgmt - * @param qId - * @param qInfo - * @return - */ -void** qRegisterTask(void* pMgmt, uint64_t qId, void* qInfo); - -/** - * acquire the query handle according to the key from query mgmt object. - * @param pMgmt - * @param key - * @return - */ -void** qAcquireTask(void* pMgmt, uint64_t key); - /** * release the query handle and decrease the reference count in cache * @param pMgmt @@ -211,13 +167,6 @@ void** qAcquireTask(void* pMgmt, uint64_t key); */ void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle); -/** - * De-register the query handle from the management module and free it immediately. - * @param pMgmt - * @param pQInfo - * @return - */ -void** qDeregisterQInfo(void* pMgmt, void* pQInfo); void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 627bcbab6f..2e692f76ac 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -430,9 +430,10 @@ typedef struct STagScanInfo { } STagScanInfo; typedef struct SStreamBlockScanInfo { + SArray* pBlockLists; // multiple SSDatablock. SSDataBlock* pRes; // result SSDataBlock int32_t blockType; // current block type - bool blockValid; // Is current data has returned? + int32_t validBlockIndex; // Is current data has returned? SColumnInfo* pCols; // the output column info uint64_t numOfRows; // total scanned rows uint64_t numOfExec; // execution times diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 26422fa618..3be496bc2b 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -14,11 +14,12 @@ */ #include "executor.h" +#include "tdatablock.h" #include "executorimpl.h" #include "planner.h" #include "vnode.h" -static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t type, char* id) { +static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void** input, size_t numOfBlocks, int32_t type, char* id) { ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -31,7 +32,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t return TSDB_CODE_QRY_APP_ERROR; } pOperator->status = OP_NOT_OPENED; - return doSetStreamBlock(pOperator->pDownstream[0], input, type, id); + return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id); } else { SStreamBlockScanInfo* pInfo = pOperator->info; @@ -43,20 +44,20 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t } if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { - if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { + if (tqReadHandleSetMsg(pInfo->readerHandle, input[0], 0) < 0) { qError("submit msg messed up when initing stream block, %s" PRIx64, id); return TSDB_CODE_QRY_APP_ERROR; } } else { - ASSERT(!pInfo->blockValid); + for (int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* pDataBlock = input[i]; - SSDataBlock* pDataBlock = input; - pInfo->pRes->info = pDataBlock->info; - taosArrayClear(pInfo->pRes->pDataBlock); - taosArrayAddAll(pInfo->pRes->pDataBlock, pDataBlock->pDataBlock); + SSDataBlock* p = createOneDataBlock(pDataBlock); + p->info = pDataBlock->info; - // set current block valid. - pInfo->blockValid = true; + taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); + taosArrayPush(pInfo->pBlockLists, &p); + } } return TSDB_CODE_SUCCESS; @@ -64,17 +65,21 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t } int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) { + qSetMultiStreamInput(tinfo, (void**) &input, 1, type); +} + +int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, void** pBlocks, size_t numOfBlocks, int32_t type) { if (tinfo == NULL) { return TSDB_CODE_QRY_APP_ERROR; } - if (input == NULL) { + if (pBlocks == NULL || numOfBlocks == 0) { return TSDB_CODE_SUCCESS; } SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)input, type, GET_TASKID(pTaskInfo)); + int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); } else { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 389e6d7e34..dbd25d3548 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4723,6 +4723,17 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo *pOperator, bool* newgroup) { #endif } +static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { + size_t total = taosArrayGetSize(pInfo->pBlockLists); + + pInfo->validBlockIndex = 0; + for(int32_t i = 0; i < total; ++i) { + SSDataBlock* p = taosArrayGet(pInfo->pBlockLists, i); + blockDataDestroy(p); + } + taosArrayClear(pInfo->pBlockLists); +} + static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -4735,43 +4746,45 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup) } if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) { - if (pInfo->blockValid) { - pInfo->blockValid = false; // this block can only be used once. - return pInfo->pRes; - } else { + size_t total = taosArrayGetSize(pInfo->pBlockLists); + if (pInfo->validBlockIndex >= total) { + doClearBufferedBlocks(pInfo); return NULL; } + + int32_t current = pInfo->validBlockIndex++; + return taosArrayGet(pInfo->pBlockLists, current); + } else { + SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; + blockDataCleanup(pInfo->pRes); + + while (tqNextDataBlock(pInfo->readerHandle)) { + pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + terrno = pTaskInfo->code; + return NULL; + } + + if (pBlockInfo->rows == 0) { + return NULL; + } + + pInfo->pRes->pDataBlock = tqRetrieveDataBlock(pInfo->readerHandle); + if (pInfo->pRes->pDataBlock == NULL) { + // TODO add log + pTaskInfo->code = terrno; + return NULL; + } + + break; + } + + // record the scan action. + pInfo->numOfExec++; + pInfo->numOfRows += pBlockInfo->rows; + + return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; } - - SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; - blockDataCleanup(pInfo->pRes); - - while (tqNextDataBlock(pInfo->readerHandle)) { - pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - terrno = pTaskInfo->code; - return NULL; - } - - if (pBlockInfo->rows == 0) { - return NULL; - } - - pInfo->pRes->pDataBlock = tqRetrieveDataBlock(pInfo->readerHandle); - if (pInfo->pRes->pDataBlock == NULL) { - // TODO add log - pTaskInfo->code = terrno; - return NULL; - } - - break; - } - - // record the scan action. - pInfo->numOfExec++; - pInfo->numOfRows += pBlockInfo->rows; - - return (pBlockInfo->rows == 0)? NULL:pInfo->pRes; } int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) { @@ -5408,6 +5421,13 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock* return NULL; } + pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES); + if (pInfo->pBlockLists == NULL) { + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + return NULL; + } + pInfo->readerHandle = streamReadHandle; pInfo->pRes = pResBlock;