Merge pull request #18842 from taosdata/feat/TD-21045-main
feat:add delete mark for sma
This commit is contained in:
commit
20486d5233
|
@ -1946,9 +1946,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
|||
int32_t len = 0;
|
||||
len += snprintf(dumpBuf + len, size - len,
|
||||
"===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64
|
||||
"|rows:%d|version:%" PRIu64 "\n",
|
||||
"|rows:%d|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "\n",
|
||||
flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
|
||||
pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version);
|
||||
pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version, pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey);
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
|
||||
for (int32_t j = 0; j < rows; j++) {
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "ttime.h"
|
||||
|
||||
#define IS_FINAL_OP(op) ((op)->isFinal)
|
||||
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
|
||||
|
||||
typedef struct SSessionAggOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
|
@ -56,6 +57,7 @@ typedef enum SResultTsInterpType {
|
|||
typedef struct SPullWindowInfo {
|
||||
STimeWindow window;
|
||||
uint64_t groupId;
|
||||
STimeWindow calWin;
|
||||
} SPullWindowInfo;
|
||||
|
||||
typedef struct SOpenWindowInfo {
|
||||
|
@ -793,17 +795,18 @@ int32_t comparePullWinKey(void* pKey, void* data, int32_t index) {
|
|||
SArray* res = (SArray*)data;
|
||||
SPullWindowInfo* pos = taosArrayGet(res, index);
|
||||
SPullWindowInfo* pData = (SPullWindowInfo*)pKey;
|
||||
if (pData->window.skey == pos->window.skey) {
|
||||
if (pData->groupId > pos->groupId) {
|
||||
return 1;
|
||||
} else if (pData->groupId < pos->groupId) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
} else if (pData->window.skey > pos->window.skey) {
|
||||
if (pData->groupId > pos->groupId) {
|
||||
return 1;
|
||||
} else if (pData->groupId < pos->groupId) {
|
||||
return -1;
|
||||
}
|
||||
return -1;
|
||||
|
||||
if (pData->window.skey > pos->window.ekey) {
|
||||
return 1;
|
||||
} else if (pData->window.ekey < pos->window.skey) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
|
||||
|
@ -812,10 +815,16 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
|
|||
if (index == -1) {
|
||||
index = 0;
|
||||
} else {
|
||||
if (comparePullWinKey(pPullInfo, pPullWins, index) > 0) {
|
||||
index++;
|
||||
} else {
|
||||
int32_t code = comparePullWinKey(pPullInfo, pPullWins, index);
|
||||
if (code == 0) {
|
||||
SPullWindowInfo* pos = taosArrayGet(pPullWins ,index);
|
||||
pos->window.skey = TMIN(pos->window.skey, pPullInfo->window.skey);
|
||||
pos->window.ekey = TMAX(pos->window.ekey, pPullInfo->window.ekey);
|
||||
pos->calWin.skey = TMIN(pos->calWin.skey, pPullInfo->calWin.skey);
|
||||
pos->calWin.ekey = TMAX(pos->calWin.ekey, pPullInfo->calWin.ekey);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (code > 0 ){
|
||||
index++;
|
||||
}
|
||||
}
|
||||
if (taosArrayInsert(pPullWins, index, pPullInfo) == NULL) {
|
||||
|
@ -2255,8 +2264,8 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
|
|||
colDataAppend(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
|
||||
colDataAppend(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
|
||||
colDataAppend(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false);
|
||||
colDataAppend(pCalStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
|
||||
colDataAppend(pCalEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
|
||||
colDataAppend(pCalStartTs, pBlock->info.rows, (const char*)&pWin->calWin.skey, false);
|
||||
colDataAppend(pCalEndTs, pBlock->info.rows, (const char*)&pWin->calWin.ekey, false);
|
||||
pBlock->info.rows++;
|
||||
}
|
||||
if ((*pIndex) == size) {
|
||||
|
@ -2266,27 +2275,33 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
|
|||
blockDataUpdateTsWindow(pBlock, 0);
|
||||
}
|
||||
|
||||
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
|
||||
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval) {
|
||||
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
TSKEY* tsData = (TSKEY*)pStartCol->pData;
|
||||
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
TSKEY* tsEndData = (TSKEY*)pEndCol->pData;
|
||||
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
uint64_t* groupIdData = (uint64_t*)pGroupCol->pData;
|
||||
int32_t chId = getChildIndex(pBlock);
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
SWinKey winRes = {.ts = tsData[i], .groupId = groupIdData[i]};
|
||||
void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey));
|
||||
if (chIds) {
|
||||
SArray* chArray = *(SArray**)chIds;
|
||||
int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
|
||||
if (index != -1) {
|
||||
qDebug("===stream===window %" PRId64 " delete child id %d", winRes.ts, chId);
|
||||
taosArrayRemove(chArray, index);
|
||||
if (taosArrayGetSize(chArray) == 0) {
|
||||
// pull data is over
|
||||
taosArrayDestroy(chArray);
|
||||
taosHashRemove(pMap, &winRes, sizeof(SWinKey));
|
||||
TSKEY winTs = tsData[i];
|
||||
while (winTs < tsEndData[i]) {
|
||||
SWinKey winRes = {.ts = winTs, .groupId = groupIdData[i]};
|
||||
void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey));
|
||||
if (chIds) {
|
||||
SArray* chArray = *(SArray**)chIds;
|
||||
int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
|
||||
if (index != -1) {
|
||||
qDebug("===stream===window %" PRId64 " delete child id %d", winRes.ts, chId);
|
||||
taosArrayRemove(chArray, index);
|
||||
if (taosArrayGetSize(chArray) == 0) {
|
||||
// pull data is over
|
||||
taosArrayDestroy(chArray);
|
||||
taosHashRemove(pMap, &winRes, sizeof(SWinKey));
|
||||
}
|
||||
}
|
||||
}
|
||||
winTs = taosTimeAdd(winTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2299,12 +2314,13 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) {
|
|||
if (needDeleteWindowBuf(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) {
|
||||
void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey));
|
||||
if (!chIds) {
|
||||
SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId};
|
||||
SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
|
||||
// add pull data request
|
||||
savePullWindow(&pull, pInfo->pPullWins);
|
||||
int32_t size1 = taosArrayGetSize(pInfo->pChildren);
|
||||
addPullWindow(pInfo->pPullDataMap, winKey, size1);
|
||||
qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1);
|
||||
if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
|
||||
int32_t size1 = taosArrayGetSize(pInfo->pChildren);
|
||||
addPullWindow(pInfo->pPullDataMap, winKey, size1);
|
||||
qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2374,12 +2390,13 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
|
|||
};
|
||||
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
|
||||
if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup) && !chIds) {
|
||||
SPullWindowInfo pull = {.window = nextWin, .groupId = groupId};
|
||||
SPullWindowInfo pull = {.window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
|
||||
// add pull data request
|
||||
savePullWindow(&pull, pInfo->pPullWins);
|
||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||
addPullWindow(pInfo->pPullDataMap, &winRes, size);
|
||||
qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size);
|
||||
if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
|
||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||
addPullWindow(pInfo->pPullDataMap, &winRes, size);
|
||||
qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size);
|
||||
}
|
||||
} else {
|
||||
int32_t index = -1;
|
||||
SArray* chArray = NULL;
|
||||
|
@ -2560,7 +2577,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) {
|
||||
processPullOver(pBlock, pInfo->pPullDataMap);
|
||||
processPullOver(pBlock, pInfo->pPullDataMap, &pInfo->interval);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -2638,6 +2655,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) {
|
||||
if (pIntervalPhyNode->window.deleteMark <= 0) {
|
||||
return DEAULT_DELETE_MARK;
|
||||
}
|
||||
int64_t deleteMark = TMAX(pIntervalPhyNode->window.deleteMark,pIntervalPhyNode->window.watermark);
|
||||
deleteMark = TMAX(deleteMark, pIntervalPhyNode->interval);
|
||||
return deleteMark;
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||
SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
|
||||
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
||||
|
@ -2659,9 +2685,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
.calTrigger = pIntervalPhyNode->window.triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
.minTs = INT64_MAX,
|
||||
// for test 315360000000
|
||||
.deleteMark = 1000LL * 60LL * 60LL * 24LL * 365LL * 10LL,
|
||||
// .deleteMark = INT64_MAX,
|
||||
.deleteMark = getDeleteMark(pIntervalPhyNode),
|
||||
.deleteMarkSaved = 0,
|
||||
.calTriggerSaved = 0,
|
||||
};
|
||||
|
@ -4805,7 +4829,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
.calTrigger = pIntervalPhyNode->window.triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
.minTs = INT64_MAX,
|
||||
.deleteMark = INT64_MAX,
|
||||
.deleteMark = getDeleteMark(pIntervalPhyNode),
|
||||
};
|
||||
|
||||
ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
|
||||
|
|
|
@ -461,7 +461,7 @@ sql insert into t2 values(1648791213004,4,10,10,4.1);
|
|||
$loop_count = 0
|
||||
|
||||
loop2:
|
||||
sleep 100
|
||||
sleep 200
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
|
@ -519,7 +519,7 @@ print step 6
|
|||
$loop_count = 0
|
||||
|
||||
loop3:
|
||||
# sleep 300
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
|
@ -618,6 +618,60 @@ if $data41 != 2 then
|
|||
goto loop4
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791343003,4,4,4,3.1);
|
||||
sql insert into t1 values(1648791213004,4,5,5,4.1);
|
||||
|
||||
loop5:
|
||||
sleep 200
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt3;
|
||||
|
||||
# row 0
|
||||
if $rows != 7 then
|
||||
print =====rows=$rows
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data01 != 4 then
|
||||
print =====data01=$data01
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data11 != 6 then
|
||||
print =====data11=$data11
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data21 != 4 then
|
||||
print =====data21=$data21
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data31 != 4 then
|
||||
print =====data31=$data31
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data41 != 2 then
|
||||
print =====data41=$data41
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data51 != 1 then
|
||||
print =====data51=$data51
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data61 != 1 then
|
||||
print =====data61=$data61
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
$loop_all = $loop_all + 1
|
||||
print ============loop_all=$loop_all
|
||||
|
||||
|
|
Loading…
Reference in New Issue