diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d0c1a3f0b5..ee841e3ce9 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -334,9 +334,10 @@ typedef struct SOperatorInfo { struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator - __optr_open_fn_t openFn; __optr_fn_t getNextFn; + __optr_fn_t cleanupFn; __optr_close_fn_t closeFn; + __optr_open_fn_t _openFn; // DO NOT invoke this function directly } SOperatorInfo; typedef struct { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index ec220098ac..420d70233c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -224,9 +224,12 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) { } } +#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) +#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) + static int32_t operatorDummyOpenFn(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - pOperator->status = OP_OPENED; + OPTR_SET_OPENED(pOperator); return TSDB_CODE_SUCCESS; } @@ -4734,6 +4737,11 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { STableScanInfo *pTableScanInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; + pTaskInfo->code = pOperator->_openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + return NULL; + } + // The read handle is not initialized yet, since no qualified tables exists if (pTableScanInfo->pTsdbReadHandle == NULL) { return NULL; @@ -4851,6 +4859,11 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamBlockScanInfo* pInfo = pOperator->info; + pTaskInfo->code = pOperator->_openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + return NULL; + } + SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; pBlockInfo->rows = 0; @@ -5159,7 +5172,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { static int32_t prepareLoadRemoteData(void* param) { SOperatorInfo *pOperator = (SOperatorInfo*) param; - if ((pOperator->status & OP_OPENED) == OP_OPENED) { + if (OPTR_IS_OPENED(pOperator)) { return TSDB_CODE_SUCCESS; } @@ -5173,7 +5186,7 @@ static int32_t prepareLoadRemoteData(void* param) { } } - pOperator->status = OP_OPENED; + OPTR_SET_OPENED(pOperator); return TSDB_CODE_SUCCESS; } @@ -5183,12 +5196,13 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SExchangeInfo *pExchangeInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; - size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); - if ((pOperator->status & OP_OPENED) != OP_OPENED) { - pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; // TODO add a new error code + int32_t code = pOperator->_openFn(pOperator); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = code; return NULL; } + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); if (pOperator->status == OP_EXEC_DONE) { qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0); @@ -5286,7 +5300,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock pOperator->info = pInfo; pOperator->numOfOutput = size; pOperator->pTaskInfo = pTaskInfo; - pOperator->openFn = prepareLoadRemoteData; // assign a dummy function. + pOperator->_openFn = prepareLoadRemoteData; // assign a dummy function. pOperator->getNextFn = doLoadRemoteData; pOperator->closeFn = destroyExchangeOperatorInfo; @@ -5377,7 +5391,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; - pOperator->openFn = operatorDummyOpenFn; + pOperator->_openFn = operatorDummyOpenFn; pOperator->getNextFn = doTableScan; pOperator->closeFn = operatorDummyCloseFn; pOperator->pTaskInfo = pTaskInfo; @@ -5461,7 +5475,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock* pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = pResBlock->info.numOfCols; - pOperator->openFn = operatorDummyOpenFn; + pOperator->_openFn = operatorDummyOpenFn; pOperator->getNextFn = doStreamBlockScan; pOperator->closeFn = operatorDummyCloseFn;