stream supports filter
This commit is contained in:
parent
ef7d7eb64b
commit
b2dd293568
|
@ -375,6 +375,7 @@ typedef struct SStreamBlockScanInfo {
|
||||||
uint64_t numOfExec; // execution times
|
uint64_t numOfExec; // execution times
|
||||||
void* readerHandle; // stream block reader handle
|
void* readerHandle; // stream block reader handle
|
||||||
SArray* pColMatchInfo; //
|
SArray* pColMatchInfo; //
|
||||||
|
SNode* pCondition;
|
||||||
} SStreamBlockScanInfo;
|
} SStreamBlockScanInfo;
|
||||||
|
|
||||||
typedef struct SSysTableScanInfo {
|
typedef struct SSysTableScanInfo {
|
||||||
|
@ -672,7 +673,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
||||||
const STableGroupInfo* pTableGroupInfo);
|
const STableGroupInfo* pTableGroupInfo);
|
||||||
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList,
|
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList,
|
||||||
SArray* pTableIdList, SExecTaskInfo* pTaskInfo);
|
SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pConditions);
|
||||||
|
|
||||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
||||||
SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal,
|
SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal,
|
||||||
|
|
|
@ -6538,7 +6538,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
||||||
SOperatorInfo* pOperator =
|
SOperatorInfo* pOperator =
|
||||||
createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo);
|
createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo, pScanPhyNode->node.pConditions);
|
||||||
taosArrayDestroy(tableIdList);
|
taosArrayDestroy(tableIdList);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
||||||
|
|
|
@ -514,6 +514,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
// NOTE: this operator does never check if current status is done or not
|
// NOTE: this operator does never check if current status is done or not
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStreamBlockScanInfo* pInfo = pOperator->info;
|
SStreamBlockScanInfo* pInfo = pOperator->info;
|
||||||
|
int32_t rows = 0;
|
||||||
|
|
||||||
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
|
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
|
||||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
|
if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
|
||||||
|
@ -580,6 +581,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
pTaskInfo->code = terrno;
|
pTaskInfo->code = terrno;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
rows = pBlockInfo->rows;
|
||||||
|
doFilter(pInfo->pCondition, pInfo->pRes);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -588,16 +591,16 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
pInfo->numOfExec++;
|
pInfo->numOfExec++;
|
||||||
pInfo->numOfRows += pBlockInfo->rows;
|
pInfo->numOfRows += pBlockInfo->rows;
|
||||||
|
|
||||||
if (pBlockInfo->rows == 0) {
|
if (rows == 0) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
|
return (rows == 0) ? NULL : pInfo->pRes;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList,
|
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList,
|
||||||
SArray* pTableIdList, SExecTaskInfo* pTaskInfo) {
|
SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition) {
|
||||||
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
|
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -635,6 +638,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
|
||||||
|
|
||||||
pInfo->readerHandle = streamReadHandle;
|
pInfo->readerHandle = streamReadHandle;
|
||||||
pInfo->pRes = pResBlock;
|
pInfo->pRes = pResBlock;
|
||||||
|
pInfo->pCondition = pCondition;
|
||||||
|
|
||||||
pOperator->name = "StreamBlockScanOperator";
|
pOperator->name = "StreamBlockScanOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
||||||
|
|
Loading…
Reference in New Issue