Merge pull request #14974 from taosdata/feature/TD-17357
feat(stream): optimize interval plan
This commit is contained in:
commit
f41b7a6810
|
@ -1752,7 +1752,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
|||
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
int32_t len = 0;
|
||||
len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|group id:%" PRIu64 "| uid:%ld\n", flag,
|
||||
len += snprintf(dumpBuf + len, size - len, "%s |block type %d |child id %d|group id:%" PRIu64 "| uid:%ld|======\n", "dumpBlockData",
|
||||
(int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId,
|
||||
pDataBlock->info.uid);
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
|
|
|
@ -371,6 +371,13 @@ typedef struct SessionWindowSupporter {
|
|||
uint8_t parentType;
|
||||
} SessionWindowSupporter;
|
||||
|
||||
typedef struct STimeWindowSupp {
|
||||
int8_t calTrigger;
|
||||
int64_t waterMark;
|
||||
TSKEY maxTs;
|
||||
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
|
||||
} STimeWindowAggSupp;
|
||||
|
||||
typedef struct SStreamScanInfo {
|
||||
uint64_t tableUid; // queried super table uid
|
||||
SExprInfo* pPseudoExpr;
|
||||
|
@ -407,6 +414,7 @@ typedef struct SStreamScanInfo {
|
|||
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
|
||||
int32_t deleteDataIndex;
|
||||
STimeWindow updateWin;
|
||||
STimeWindowAggSupp twAggSup;
|
||||
|
||||
// status for tmq
|
||||
// SSchemaWrapper schema;
|
||||
|
@ -452,13 +460,6 @@ typedef struct SAggSupporter {
|
|||
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
|
||||
} SAggSupporter;
|
||||
|
||||
typedef struct STimeWindowSupp {
|
||||
int8_t calTrigger;
|
||||
int64_t waterMark;
|
||||
TSKEY maxTs;
|
||||
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
|
||||
} STimeWindowAggSupp;
|
||||
|
||||
typedef struct SIntervalAggOperatorInfo {
|
||||
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
|
||||
SOptrBasicInfo binfo; // basic info
|
||||
|
@ -952,6 +953,7 @@ bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
|
|||
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
|
||||
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
|
||||
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
||||
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
|
||||
|
||||
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
||||
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
|
||||
|
|
|
@ -802,7 +802,12 @@ static bool isStateWindow(SStreamScanInfo* pInfo) {
|
|||
|
||||
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
|
||||
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
|
||||
pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
|
||||
pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
|
||||
pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
|
||||
}
|
||||
|
||||
static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
|
||||
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
|
||||
}
|
||||
|
||||
static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) {
|
||||
|
@ -1130,9 +1135,14 @@ static void setUpdateData(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlo
|
|||
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
TSKEY* ts = (TSKEY*)pColDataInfo->pData;
|
||||
TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
|
||||
for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
|
||||
if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[rowId]) && out) {
|
||||
SResultRowInfo dumyInfo;
|
||||
dumyInfo.cur.pageId = -1;
|
||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
|
||||
// must check update info first.
|
||||
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
|
||||
if ( (update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup)) ) && out) {
|
||||
taosArrayPush(pInfo->tsArray, &rowId);
|
||||
}
|
||||
}
|
||||
|
@ -1413,6 +1423,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
pInfo->tsArrayIndex = 0;
|
||||
checkUpdateData(pInfo, true, pInfo->pRes, true);
|
||||
setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey);
|
||||
if (pInfo->pUpdateRes->info.rows > 0) {
|
||||
if (pInfo->pUpdateRes->info.type == STREAM_CLEAR) {
|
||||
pInfo->updateResIndex = 0;
|
||||
|
@ -1584,13 +1595,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pInfo->pUpdateRes = createResDataBlock(pDescNode);
|
||||
pInfo->pCondition = pScanPhyNode->node.pConditions;
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
|
||||
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
|
||||
pInfo->groupId = 0;
|
||||
pInfo->pPullDataRes = createPullDataBlock();
|
||||
pInfo->pStreamScanOp = pOperator;
|
||||
pInfo->deleteDataIndex = 0;
|
||||
pInfo->pDeleteDataRes = createPullDataBlock();
|
||||
pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
|
||||
pInfo->twAggSup = *pTwSup;
|
||||
|
||||
pOperator->name = "StreamScanOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
||||
|
|
|
@ -652,8 +652,8 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
|
|||
}
|
||||
|
||||
void printDataBlock(SSDataBlock* pBlock, const char* flag) {
|
||||
if (pBlock == NULL) {
|
||||
qDebug("======printDataBlock Block is Null");
|
||||
if (!pBlock || pBlock->info.rows == 0) {
|
||||
qDebug("======printDataBlock: Block is Null or Empty");
|
||||
return;
|
||||
}
|
||||
char* pBuf = NULL;
|
||||
|
@ -1355,7 +1355,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
|
|||
int32_t size = taosArrayGetSize(chAy);
|
||||
qDebug("window %" PRId64 " wait child size:%d", win.skey, size);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
qDebug("window %" PRId64 " wait chid id:%d", win.skey, *(int32_t*)taosArrayGet(chAy, i));
|
||||
qDebug("window %" PRId64 " wait child id:%d", win.skey, *(int32_t*)taosArrayGet(chAy, i));
|
||||
}
|
||||
continue;
|
||||
} else if (pPullDataMap) {
|
||||
|
@ -1626,6 +1626,12 @@ SSDataBlock* createDeleteBlock() {
|
|||
return pBlock;
|
||||
}
|
||||
|
||||
void initIntervalDownStream(SOperatorInfo* downstream, uint8_t type) {
|
||||
ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
|
||||
SStreamScanInfo* pScanInfo = downstream->info;
|
||||
pScanInfo->sessionSup.parentType = type;
|
||||
}
|
||||
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode,
|
||||
|
@ -1701,6 +1707,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL,
|
||||
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
|
||||
if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL) {
|
||||
initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL);
|
||||
}
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -2476,12 +2486,14 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
|
|||
} else {
|
||||
int32_t index = -1;
|
||||
SArray* chArray = NULL;
|
||||
int32_t chId = 0;
|
||||
if (chIds) {
|
||||
chArray = *(void**)chIds;
|
||||
int32_t chId = getChildIndex(pSDataBlock);
|
||||
chId = getChildIndex(pSDataBlock);
|
||||
index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
|
||||
}
|
||||
if (index != -1 && pSDataBlock->info.type == STREAM_PULL_DATA) {
|
||||
qDebug("======delete child id %d", chId);
|
||||
taosArrayRemove(chArray, index);
|
||||
if (taosArrayGetSize(chArray) == 0) {
|
||||
// pull data is over
|
||||
|
@ -3010,6 +3022,7 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num
|
|||
pDummy[i].functionId = pCtx[i].functionId;
|
||||
}
|
||||
}
|
||||
|
||||
void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int64_t gap, int64_t waterMark,
|
||||
uint8_t type) {
|
||||
ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
|
||||
|
|
|
@ -198,8 +198,7 @@ static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) {
|
|||
}
|
||||
|
||||
static bool stbSplIsMultiTbScan(bool streamQuery, SScanLogicNode* pScan) {
|
||||
return (NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1) ||
|
||||
(streamQuery && TSDB_SUPER_TABLE == pScan->tableType);
|
||||
return (NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1);
|
||||
}
|
||||
|
||||
static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
|
||||
|
|
|
@ -366,4 +366,143 @@ if $data32 != 8 then
|
|||
goto loop1
|
||||
endi
|
||||
|
||||
sql drop database IF EXISTS test2;
|
||||
sql drop stream IF EXISTS streams21;
|
||||
sql drop stream IF EXISTS streams22;
|
||||
|
||||
sql create database test2 vgroups 2;
|
||||
sql use test2;
|
||||
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stream streams21 trigger at_once into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s);
|
||||
sql create stream streams22 trigger at_once into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s);
|
||||
|
||||
sql insert into t1 values(1648791213000,1,1,1,1.0);
|
||||
sql insert into t1 values(1648791223001,2,2,2,1.1);
|
||||
sql insert into t1 values(1648791233002,3,3,3,2.1);
|
||||
sql insert into t1 values(1648791243003,4,4,4,3.1);
|
||||
sql insert into t1 values(1648791213004,4,5,5,4.1);
|
||||
|
||||
sql insert into t2 values(1648791213000,1,6,6,1.0);
|
||||
sql insert into t2 values(1648791223001,2,7,7,1.1);
|
||||
sql insert into t2 values(1648791233002,3,8,8,2.1);
|
||||
sql insert into t2 values(1648791243003,4,9,9,3.1);
|
||||
sql insert into t2 values(1648791213004,4,10,10,4.1);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop2:
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt;
|
||||
|
||||
# row 0
|
||||
if $data01 != 2 then
|
||||
print =====data01=$data01
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data02 != 5 then
|
||||
print =====data02=$data02
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
# row 1
|
||||
if $data11 != 1 then
|
||||
print =====data11=$data11
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data12 != 2 then
|
||||
print =====data12=$data12
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
# row 2
|
||||
if $data21 != 1 then
|
||||
print =====data21=$data21
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data22 != 3 then
|
||||
print =====data22=$data22
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
# row 3
|
||||
if $data31 != 1 then
|
||||
print =====data31=$data31
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data32 != 4 then
|
||||
print =====data32=$data32
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
print step 6
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop3:
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt2;
|
||||
|
||||
# row 0
|
||||
if $data01 != 4 then
|
||||
print =====data01=$data01
|
||||
# goto loop3
|
||||
endi
|
||||
|
||||
if $data02 != 10 then
|
||||
print =====data02=$data02
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
# row 1
|
||||
if $data11 != 2 then
|
||||
print =====data11=$data11
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
if $data12 != 4 then
|
||||
print =====data12=$data12
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
# row 2
|
||||
if $data21 != 2 then
|
||||
print =====data21=$data21
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
if $data22 != 6 then
|
||||
print =====data22=$data22
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
# row 3
|
||||
if $data31 != 2 then
|
||||
print =====data31=$data31
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
if $data32 != 8 then
|
||||
print =====data32=$data32
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
system sh/stop_dnodes.sh
|
Loading…
Reference in New Issue