enh(query): filter the child table that are not belongs to current super table when adding into candidate list.

This commit is contained in:
Haojun Liao 2022-05-17 16:02:03 +08:00
parent c2a918a85e
commit c508c89a92
4 changed files with 59 additions and 36 deletions

View File

@ -388,13 +388,15 @@ typedef struct SStreamBlockScanInfo {
SColumnInfo* pCols; // the output column info SColumnInfo* pCols; // the output column info
uint64_t numOfRows; // total scanned rows uint64_t numOfRows; // total scanned rows
uint64_t numOfExec; // execution times uint64_t numOfExec; // execution times
void* readerHandle; // stream block reader handle void* streamBlockReader;// stream block reader handle
SArray* pColMatchInfo; // SArray* pColMatchInfo; //
SNode* pCondition; SNode* pCondition;
SArray* tsArray; SArray* tsArray;
SUpdateInfo* pUpdateInfo; SUpdateInfo* pUpdateInfo;
int32_t primaryTsIndex; // primary time stamp slot id int32_t primaryTsIndex; // primary time stamp slot id
void* pDataReader; void* pDataReader;
SReadHandle readHandle;
uint64_t tableUid; // queried super table uid
EStreamScanMode scanMode; EStreamScanMode scanMode;
SOperatorInfo* pOperatorDumy; SOperatorInfo* pOperatorDumy;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
@ -706,9 +708,10 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo); const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SSDataBlock* pResBlock, SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SReadHandle* pHandle,
SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo, uint64_t uid, SSDataBlock* pResBlock, SArray* pColList,
SNode* pConditions, SOperatorInfo* pOperatorDumy); SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition,
SOperatorInfo* pOperatorDumy);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal, SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal,

View File

@ -14,6 +14,7 @@
*/ */
#include "executor.h" #include "executor.h"
#include <vnode.h>
#include "executorimpl.h" #include "executorimpl.h"
#include "planner.h" #include "planner.h"
#include "tdatablock.h" #include "tdatablock.h"
@ -46,7 +47,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} }
if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) {
qError("submit msg messed up when initing stream block, %s" PRIx64, id); qError("submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
@ -128,7 +129,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd) { int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
// traverse to the streamscan node to add this table id // traverse to the stream scanner node to add this table id
SOperatorInfo* pInfo = pTaskInfo->pRoot; SOperatorInfo* pInfo = pTaskInfo->pRoot;
while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
pInfo = pInfo->pDownstream[0]; pInfo = pInfo->pDownstream[0];
@ -136,7 +137,29 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA
SStreamBlockScanInfo* pScanInfo = pInfo->info; SStreamBlockScanInfo* pScanInfo = pInfo->info;
if (isAdd) { if (isAdd) {
int32_t code = tqReadHandleAddTbUidList(pScanInfo->readerHandle, tableIdList); SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
SMetaReader mr = {0};
metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
for(int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
int64_t* id = (int64_t*)taosArrayGet(tableIdList, i);
int32_t code = metaGetTableEntryByUid(&mr, *id);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:%"PRIu64" code:%s", *id, tstrerror(terrno));
continue;
}
ASSERT(mr.me.type == TSDB_CHILD_TABLE);
if (mr.me.ctbEntry.suid != pScanInfo->tableUid) {
continue;
}
taosArrayPush(qa, id);
}
qDebug(" %d qualified child tables added into stream scanner", (int32_t) taosArrayGetSize(qa));
int32_t code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }

View File

@ -4703,10 +4703,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
SArray* tableIdList = extractTableIdList(pTableGroupInfo); SArray* tableIdList = extractTableIdList(pTableGroupInfo);
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pResBlock, pCols, tableIdList, pTaskInfo,
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pHandle, pScanPhyNode->uid, pResBlock, pCols, tableIdList, pTaskInfo,
pScanPhyNode->node.pConditions, pOperatorDumy); pScanPhyNode->node.pConditions, pOperatorDumy);
taosArrayDestroy(tableIdList); taosArrayDestroy(tableIdList);
return pOperator; return pOperator;

View File

@ -710,14 +710,14 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
while (tqNextDataBlock(pInfo->readerHandle)) { while (tqNextDataBlock(pInfo->streamBlockReader)) {
SArray* pCols = NULL; SArray* pCols = NULL;
uint64_t groupId = 0; uint64_t groupId = 0;
uint64_t uid = 0; uint64_t uid = 0;
int32_t numOfRows = 0; int32_t numOfRows = 0;
int16_t outputCol = 0; int16_t outputCol = 0;
int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &uid, &numOfRows, &outputCol); int32_t code = tqRetrieveDataBlock(&pCols, pInfo->streamBlockReader, &groupId, &uid, &numOfRows, &outputCol);
if (code != TSDB_CODE_SUCCESS || numOfRows == 0) { if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
pTaskInfo->code = code; pTaskInfo->code = code;
@ -791,9 +791,10 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
} }
} }
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SReadHandle* pHandle,
SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, uint64_t uid, SSDataBlock* pResBlock, SArray* pColList,
SExecTaskInfo* pTaskInfo, SNode* pCondition, SOperatorInfo* pOperatorDumy ) { SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition,
SOperatorInfo* pOperatorDumy) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
@ -829,20 +830,18 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
pInfo->tsArray = taosArrayInit(4, sizeof(TSKEY)); pInfo->tsArray = taosArrayInit(4, sizeof(TSKEY));
if (pInfo->tsArray == NULL) { if (pInfo->tsArray == NULL) {
taosMemoryFreeClear(pInfo); goto _error;
taosMemoryFreeClear(pOperator);
return NULL;
} }
pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, 10000); // TODO(liuyao) get watermark from physical plan pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, 10000); // TODO(liuyao) get watermark from physical plan
if (pInfo->pUpdateInfo == NULL) { if (pInfo->pUpdateInfo == NULL) {
taosMemoryFreeClear(pInfo); goto _error;
taosMemoryFreeClear(pOperator);
return NULL;
} }
pInfo->readerHandle = streamReadHandle; pInfo->readHandle = *pHandle;
pInfo->tableUid = uid;
pInfo->streamBlockReader = streamReadHandle;
pInfo->pRes = pResBlock; pInfo->pRes = pResBlock;
pInfo->pCondition = pCondition; pInfo->pCondition = pCondition;
pInfo->pDataReader = pDataReader; pInfo->pDataReader = pDataReader;
@ -856,9 +855,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols; pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->fpSet._openFn = operatorDummyOpenFn;
pOperator->fpSet.getNextFn = doStreamBlockScan;
pOperator->fpSet.closeFn = operatorDummyCloseFn;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =