Merge pull request #16808 from taosdata/feature/TD-18948
feat(stream):optimize delete window
This commit is contained in:
commit
e5da5802e8
|
@ -459,10 +459,11 @@ typedef struct SPartitionDataInfo {
|
|||
SArray* rowIds;
|
||||
} SPartitionDataInfo;
|
||||
|
||||
typedef struct STimeWindowSupp {
|
||||
typedef struct STimeWindowAggSupp {
|
||||
int8_t calTrigger;
|
||||
int64_t waterMark;
|
||||
TSKEY maxTs;
|
||||
TSKEY minTs;
|
||||
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
|
||||
} STimeWindowAggSupp;
|
||||
|
||||
|
|
|
@ -1355,7 +1355,7 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
|
|||
}
|
||||
}
|
||||
|
||||
void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) {
|
||||
static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) {
|
||||
SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false);
|
||||
SqlFunctionCtx* pCtx = pSup->pCtx;
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
|
@ -1374,8 +1374,8 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprS
|
|||
releaseBufPage(pResultBuf, bufPage);
|
||||
}
|
||||
|
||||
bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId,
|
||||
int32_t numOfOutput) {
|
||||
static bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId,
|
||||
int32_t numOfOutput) {
|
||||
SET_RES_WINDOW_KEY(pAggSup->keyBuf, pData, bytes, groupId);
|
||||
SResultRowPosition* p1 =
|
||||
(SResultRowPosition*)tSimpleHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
|
@ -1402,18 +1402,21 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId)
|
|||
return true;
|
||||
}
|
||||
|
||||
void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* pDelWins, SInterval* pInterval,
|
||||
SHashObj* pUpdatedMap) {
|
||||
static void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, STimeWindowAggSupp* pTwSup, SSDataBlock* pBlock,
|
||||
SArray* pDelWins, SInterval* pInterval, SHashObj* pUpdatedMap) {
|
||||
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
TSKEY* tsStarts = (TSKEY*)pStartCol->pData;
|
||||
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
TSKEY* tsEnds = (TSKEY*)pEndCol->pData;
|
||||
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
uint64_t* groupIds = (uint64_t*)pGroupCol->pData;
|
||||
int64_t numOfWin = tSimpleHashGetSize(pAggSup->pResultRowHashTable);
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
TSKEY startTs = TMAX(tsStarts[i], pTwSup->minTs);
|
||||
TSKEY endTs = TMIN(tsEnds[i], pTwSup->maxTs);
|
||||
SResultRowInfo dumyInfo;
|
||||
dumyInfo.cur.pageId = -1;
|
||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, TSDB_ORDER_ASC);
|
||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTs, pInterval, TSDB_ORDER_ASC);
|
||||
do {
|
||||
doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]);
|
||||
SWinKey winRes = {.ts = win.skey, .groupId = groupIds[i]};
|
||||
|
@ -1424,7 +1427,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
|
|||
taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
|
||||
}
|
||||
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
|
||||
} while (win.skey <= tsEnds[i]);
|
||||
} while (win.skey <= endTs);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3030,6 +3033,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
||||
TSKEY maxTs = INT64_MIN;
|
||||
TSKEY minTs = INT64_MAX;
|
||||
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
||||
|
@ -3101,8 +3105,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
break;
|
||||
}
|
||||
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv");
|
||||
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
||||
maxTs = TMAX(maxTs, pBlock->info.watermark);
|
||||
|
||||
ASSERT(pBlock->info.type != STREAM_INVERT);
|
||||
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
|
||||
|
@ -3129,13 +3131,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
break;
|
||||
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
|
||||
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
|
||||
doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, delWins, &pInfo->interval, pUpdatedMap);
|
||||
doDeleteSpecifyIntervalWindow(&pInfo->aggSup, &pInfo->twAggSup, pBlock, delWins, &pInfo->interval, pUpdatedMap);
|
||||
if (IS_FINAL_OP(pInfo)) {
|
||||
int32_t childIndex = getChildIndex(pBlock);
|
||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
|
||||
SStreamFinalIntervalOperatorInfo* pChildInfo = pChildOp->info;
|
||||
SExprSupp* pChildSup = &pChildOp->exprSupp;
|
||||
doDeleteSpecifyIntervalWindow(&pChildInfo->aggSup, pBlock, NULL, &pChildInfo->interval, NULL);
|
||||
doDeleteSpecifyIntervalWindow(&pChildInfo->aggSup, &pInfo->twAggSup, pBlock, NULL, &pChildInfo->interval, NULL);
|
||||
rebuildIntervalWindow(pInfo, pSup, delWins, pInfo->binfo.pRes->info.groupId, pOperator->exprSupp.numOfExprs,
|
||||
pOperator->pTaskInfo, pUpdatedMap);
|
||||
addRetriveWindow(delWins, pInfo);
|
||||
|
@ -3189,9 +3191,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true);
|
||||
doHashIntervalAgg(pChildOp, pBlock, pBlock->info.groupId, NULL);
|
||||
}
|
||||
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
||||
maxTs = TMAX(maxTs, pBlock->info.watermark);
|
||||
minTs = TMIN(minTs, pBlock->info.window.skey);
|
||||
}
|
||||
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
|
||||
if (IS_FINAL_OP(pInfo)) {
|
||||
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap,
|
||||
pUpdatedMap, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
|
||||
|
@ -3264,6 +3270,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
.waterMark = pIntervalPhyNode->window.watermark,
|
||||
.calTrigger = pIntervalPhyNode->window.triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
.minTs = INT64_MAX,
|
||||
};
|
||||
ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY);
|
||||
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||
|
@ -3507,7 +3514,11 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
initDummyFunction(pInfo->pDummyCtx, pSup->pCtx, numOfCols);
|
||||
|
||||
pInfo->twAggSup = (STimeWindowAggSupp){
|
||||
.waterMark = pSessionNode->window.watermark, .calTrigger = pSessionNode->window.triggerType, .maxTs = INT64_MIN};
|
||||
.waterMark = pSessionNode->window.watermark,
|
||||
.calTrigger = pSessionNode->window.triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
.minTs = INT64_MAX,
|
||||
};
|
||||
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
|
@ -4832,6 +4843,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
.waterMark = pStateNode->window.watermark,
|
||||
.calTrigger = pStateNode->window.triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
.minTs = INT64_MAX,
|
||||
};
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
|
||||
|
@ -5632,6 +5644,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
int64_t maxTs = INT64_MIN;
|
||||
int64_t minTs = INT64_MAX;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
|
@ -5676,7 +5689,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
|
||||
doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval, pUpdatedMap);
|
||||
doDeleteSpecifyIntervalWindow(&pInfo->aggSup, &pInfo->twAggSup, pBlock, pInfo->pDelWins, &pInfo->interval,
|
||||
pUpdatedMap);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
||||
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
|
||||
|
@ -5702,11 +5716,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
||||
minTs = TMIN(minTs, pBlock->info.window.skey);
|
||||
doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap);
|
||||
// new disc buf
|
||||
/*doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);*/
|
||||
}
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
|
||||
|
||||
#if 0
|
||||
if (pState) {
|
||||
|
@ -5805,6 +5821,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
.waterMark = pIntervalPhyNode->window.watermark,
|
||||
.calTrigger = pIntervalPhyNode->window.triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
.minTs = INT64_MAX,
|
||||
};
|
||||
ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
|
|
@ -247,8 +247,9 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen) {
|
|||
}
|
||||
|
||||
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) {
|
||||
int32_t code = TSDB_CODE_FAILED;
|
||||
if (!pHashObj || !key) {
|
||||
return TSDB_CODE_FAILED;
|
||||
return code;
|
||||
}
|
||||
|
||||
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
|
||||
|
@ -266,13 +267,14 @@ int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) {
|
|||
}
|
||||
FREE_HASH_NODE(pNode);
|
||||
atomic_sub_fetch_64(&pHashObj->size, 1);
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
break;
|
||||
}
|
||||
pPrev = pNode;
|
||||
pNode = pNode->next;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t keyLen, void **pIter, int32_t *iter) {
|
||||
|
|
|
@ -186,7 +186,9 @@ endi
|
|||
|
||||
sql drop stream if exists streams2;
|
||||
sql drop database if exists test2;
|
||||
sql drop database if exists test;
|
||||
sql create database test2 vgroups 4;
|
||||
sql create database test vgroups 1;
|
||||
sql use test2;
|
||||
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
|
@ -411,6 +413,80 @@ if $data12 != 3 then
|
|||
goto loop14
|
||||
endi
|
||||
|
||||
return 1
|
||||
|
||||
sql drop stream if exists streams3;
|
||||
sql drop database if exists test3;
|
||||
sql drop database if exists test;
|
||||
sql create database test3 vgroups 4;
|
||||
sql create database test vgroups 1;
|
||||
sql use test3;
|
||||
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
sql create stream streams3 trigger at_once into test.streamt3 as select _wstart c1, count(*) c2, max(a) c3 from st interval(10s);
|
||||
|
||||
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
|
||||
sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
sql delete from t1;
|
||||
|
||||
loop15:
|
||||
sleep 200
|
||||
sql select * from test.streamt2 order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 1 then
|
||||
print =====rows=$rows
|
||||
goto loop15
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
sql delete from t1 where ts > 100;
|
||||
|
||||
loop16:
|
||||
sleep 200
|
||||
sql select * from test.streamt2 order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 1 then
|
||||
print =====rows=$rows
|
||||
goto loop16
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
sql delete from st;
|
||||
|
||||
loop17:
|
||||
sleep 200
|
||||
sql select * from test.streamt2 order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 0 then
|
||||
print =====rows=$rows
|
||||
goto loop17
|
||||
endi
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
$loop_all = $loop_all + 1
|
||||
print ============loop_all=$loop_all
|
||||
|
||||
|
|
Loading…
Reference in New Issue