Merge pull request #12584 from taosdata/feature/3.0_liaohj

refactor: do some internal refactor.
This commit is contained in:
Haojun Liao 2022-05-17 16:48:13 +08:00 committed by GitHub
commit 7ab15e8f02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 67 additions and 56 deletions

View File

@ -388,13 +388,15 @@ typedef struct SStreamBlockScanInfo {
SColumnInfo* pCols; // the output column info SColumnInfo* pCols; // the output column info
uint64_t numOfRows; // total scanned rows uint64_t numOfRows; // total scanned rows
uint64_t numOfExec; // execution times uint64_t numOfExec; // execution times
void* readerHandle; // stream block reader handle void* streamBlockReader;// stream block reader handle
SArray* pColMatchInfo; // SArray* pColMatchInfo; //
SNode* pCondition; SNode* pCondition;
SArray* tsArray; SArray* tsArray;
SUpdateInfo* pUpdateInfo; SUpdateInfo* pUpdateInfo;
int32_t primaryTsIndex; // primary time stamp slot id int32_t primaryTsIndex; // primary time stamp slot id
void* pDataReader; void* pDataReader;
SReadHandle readHandle;
uint64_t tableUid; // queried super table uid
EStreamScanMode scanMode; EStreamScanMode scanMode;
SOperatorInfo* pOperatorDumy; SOperatorInfo* pOperatorDumy;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
@ -706,9 +708,10 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo); const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SSDataBlock* pResBlock, SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SReadHandle* pHandle,
SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo, uint64_t uid, SSDataBlock* pResBlock, SArray* pColList,
SNode* pConditions, SOperatorInfo* pOperatorDumy); SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition,
SOperatorInfo* pOperatorDumy);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal, SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal,

View File

@ -14,6 +14,7 @@
*/ */
#include "executor.h" #include "executor.h"
#include <vnode.h>
#include "executorimpl.h" #include "executorimpl.h"
#include "planner.h" #include "planner.h"
#include "tdatablock.h" #include "tdatablock.h"
@ -46,7 +47,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} }
if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) {
qError("submit msg messed up when initing stream block, %s" PRIx64, id); qError("submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
@ -128,7 +129,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd) { int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
// traverse to the streamscan node to add this table id // traverse to the stream scanner node to add this table id
SOperatorInfo* pInfo = pTaskInfo->pRoot; SOperatorInfo* pInfo = pTaskInfo->pRoot;
while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
pInfo = pInfo->pDownstream[0]; pInfo = pInfo->pDownstream[0];
@ -136,7 +137,31 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA
SStreamBlockScanInfo* pScanInfo = pInfo->info; SStreamBlockScanInfo* pScanInfo = pInfo->info;
if (isAdd) { if (isAdd) {
int32_t code = tqReadHandleAddTbUidList(pScanInfo->readerHandle, tableIdList); SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
SMetaReader mr = {0};
metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
for(int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
int64_t* id = (int64_t*)taosArrayGet(tableIdList, i);
int32_t code = metaGetTableEntryByUid(&mr, *id);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:%"PRIu64" code:%s", *id, tstrerror(terrno));
continue;
}
ASSERT(mr.me.type == TSDB_CHILD_TABLE);
if (mr.me.ctbEntry.suid != pScanInfo->tableUid) {
continue;
}
taosArrayPush(qa, id);
}
metaReaderClear(&mr);
qDebug(" %d qualified child tables added into stream scanner", (int32_t) taosArrayGetSize(qa));
int32_t code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }

View File

@ -155,7 +155,7 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
void operatorDummyCloseFn(void* param, int32_t numOfCols) {} void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs); int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs);
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo); static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
@ -2136,23 +2136,13 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
* @param result * @param result
*/ */
int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) { int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) {
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
int32_t numOfResult = pBlock->info.rows; // there are already exists result rows int32_t start = pGroupResInfo->index;
int32_t start = 0;
int32_t step = 1;
// qDebug("QInfo:0x%"PRIx64" start to copy data from windowResInfo to output buf", GET_TASKID(pRuntimeEnv)); // qDebug("QInfo:0x%"PRIx64" start to copy data from windowResInfo to output buf", GET_TASKID(pRuntimeEnv));
assert(orderType == TSDB_ORDER_ASC || orderType == TSDB_ORDER_DESC);
if (orderType == TSDB_ORDER_ASC) { for (int32_t i = start; i < numOfRows; i += 1) {
start = pGroupResInfo->index;
} else { // desc order copy all data
start = numOfRows - pGroupResInfo->index - 1;
}
for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) {
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
@ -2162,9 +2152,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
continue; continue;
} }
// TODO copy multiple rows? if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
int32_t numOfRowsToCopy = pRow->numOfRows;
if (numOfResult + numOfRowsToCopy >= pBlock->info.capacity) {
break; break;
} }
@ -2195,7 +2183,6 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
} }
releaseBufPage(pBuf, page); releaseBufPage(pBuf, page);
pBlock->info.rows += pRow->numOfRows; pBlock->info.rows += pRow->numOfRows;
if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full
break; break;
@ -2223,8 +2210,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
return; return;
} }
int32_t orderType = TSDB_ORDER_ASC; doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, rowCellOffset, pCtx, numOfExprs);
doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx, numOfExprs);
// add condition (pBlock->info.rows >= 1) just to runtime happy // add condition (pBlock->info.rows >= 1) just to runtime happy
blockDataUpdateTsWindow(pBlock); blockDataUpdateTsWindow(pBlock);
@ -4717,10 +4703,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
SArray* tableIdList = extractTableIdList(pTableGroupInfo); SArray* tableIdList = extractTableIdList(pTableGroupInfo);
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); SSDataBlock* pResBlock = createResDataBlock(pDescNode);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pResBlock, pCols, tableIdList, pTaskInfo, SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pHandle, pScanPhyNode->uid, pResBlock, pCols, tableIdList, pTaskInfo,
pScanPhyNode->node.pConditions, pOperatorDumy); pScanPhyNode->node.pConditions, pOperatorDumy);
taosArrayDestroy(tableIdList); taosArrayDestroy(tableIdList);
return pOperator; return pOperator;

View File

@ -710,14 +710,14 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
while (tqNextDataBlock(pInfo->readerHandle)) { while (tqNextDataBlock(pInfo->streamBlockReader)) {
SArray* pCols = NULL; SArray* pCols = NULL;
uint64_t groupId = 0; uint64_t groupId = 0;
uint64_t uid = 0; uint64_t uid = 0;
int32_t numOfRows = 0; int32_t numOfRows = 0;
int16_t outputCol = 0; int16_t outputCol = 0;
int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &uid, &numOfRows, &outputCol); int32_t code = tqRetrieveDataBlock(&pCols, pInfo->streamBlockReader, &groupId, &uid, &numOfRows, &outputCol);
if (code != TSDB_CODE_SUCCESS || numOfRows == 0) { if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
pTaskInfo->code = code; pTaskInfo->code = code;
@ -791,9 +791,10 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
} }
} }
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SReadHandle* pHandle,
SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, uint64_t uid, SSDataBlock* pResBlock, SArray* pColList,
SExecTaskInfo* pTaskInfo, SNode* pCondition, SOperatorInfo* pOperatorDumy ) { SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition,
SOperatorInfo* pOperatorDumy) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
@ -829,37 +830,32 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
pInfo->tsArray = taosArrayInit(4, sizeof(TSKEY)); pInfo->tsArray = taosArrayInit(4, sizeof(TSKEY));
if (pInfo->tsArray == NULL) { if (pInfo->tsArray == NULL) {
taosMemoryFreeClear(pInfo); goto _error;
taosMemoryFreeClear(pOperator);
return NULL;
} }
pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, 10000); // TODO(liuyao) get watermark from physical plan pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, 10000); // TODO(liuyao) get watermark from physical plan
if (pInfo->pUpdateInfo == NULL) { if (pInfo->pUpdateInfo == NULL) {
taosMemoryFreeClear(pInfo); goto _error;
taosMemoryFreeClear(pOperator);
return NULL;
} }
pInfo->readerHandle = streamReadHandle; pInfo->readHandle = *pHandle;
pInfo->pRes = pResBlock; pInfo->tableUid = uid;
pInfo->pCondition = pCondition; pInfo->streamBlockReader = streamReadHandle;
pInfo->pDataReader = pDataReader; pInfo->pRes = pResBlock;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->pCondition = pCondition;
pInfo->pOperatorDumy = pOperatorDumy; pInfo->pDataReader = pDataReader;
pInfo->interval = pSTInfo->interval; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->pOperatorDumy = pOperatorDumy;
pInfo->interval = pSTInfo->interval;
pOperator->name = "StreamBlockScanOperator"; pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols; pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->fpSet._openFn = operatorDummyOpenFn; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet.getNextFn = doStreamBlockScan;
pOperator->fpSet.closeFn = operatorDummyCloseFn;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);