feat(stream): delete result
This commit is contained in:
parent
a28c0781be
commit
7836b581a7
|
@ -694,6 +694,7 @@ typedef struct SSessionAggOperatorInfo {
|
|||
typedef struct SResultWindowInfo {
|
||||
SResultRowPosition pos;
|
||||
STimeWindow win;
|
||||
uint64_t groupId;
|
||||
bool isOutput;
|
||||
bool isClosed;
|
||||
} SResultWindowInfo;
|
||||
|
@ -1008,8 +1009,6 @@ SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY star
|
|||
SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
|
||||
TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex);
|
||||
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
|
||||
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
|
||||
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
|
||||
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
||||
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
|
||||
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
|
||||
|
|
|
@ -3734,7 +3734,7 @@ SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY star
|
|||
return insertNewSessionWindow(pWinInfos, startTs, index + 1);
|
||||
}
|
||||
|
||||
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, int32_t rows,
|
||||
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId,int32_t rows,
|
||||
int32_t start, int64_t gap, SHashObj* pStDeleted) {
|
||||
for (int32_t i = start; i < rows; ++i) {
|
||||
if (!isInWindow(pWinInfo, pStartTs[i], gap) && (!pEndTs || !isInWindow(pWinInfo, pEndTs[i], gap))) {
|
||||
|
@ -3742,7 +3742,8 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TS
|
|||
}
|
||||
if (pWinInfo->win.skey > pStartTs[i]) {
|
||||
if (pStDeleted && pWinInfo->isOutput) {
|
||||
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY));
|
||||
SWinRes res = {.ts = pWinInfo->win.skey, .groupId = groupId};
|
||||
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes));
|
||||
pWinInfo->isOutput = false;
|
||||
}
|
||||
pWinInfo->win.skey = pStartTs[i];
|
||||
|
@ -3861,7 +3862,8 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
|
|||
compactFunctions(pSup->pCtx, pInfo->pDummyCtx, numOfOutput, pTaskInfo);
|
||||
taosHashRemove(pStUpdated, &pWinInfo->pos, sizeof(SResultRowPosition));
|
||||
if (pWinInfo->isOutput) {
|
||||
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY));
|
||||
SWinRes res = {.ts = pWinInfo->win.skey, .groupId = groupId};
|
||||
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes));
|
||||
pWinInfo->isOutput = false;
|
||||
}
|
||||
taosArrayRemove(pInfo->streamAggSup.pCurWins, i);
|
||||
|
@ -3911,7 +3913,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
|
|||
int32_t winIndex = 0;
|
||||
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, startTsCols[i], endTsCols[i], groupId, gap, &winIndex);
|
||||
winRows =
|
||||
updateSessionWindowInfo(pCurWin, startTsCols, endTsCols, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted);
|
||||
updateSessionWindowInfo(pCurWin, startTsCols, endTsCols, groupId, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted);
|
||||
code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pOperator);
|
||||
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
|
@ -3960,6 +3962,7 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc
|
|||
}
|
||||
deleteWindow(pAggSup->pCurWins, winIndex, fp);
|
||||
if (result) {
|
||||
pCurWin->groupId = gpDatas[i];
|
||||
taosArrayPush(result, pCurWin);
|
||||
}
|
||||
}
|
||||
|
@ -3980,7 +3983,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup,
|
|||
step = 1;
|
||||
continue;
|
||||
}
|
||||
step = updateSessionWindowInfo(pCurWin, tsCols, NULL, pBlock->info.rows, i, gap, NULL);
|
||||
step = updateSessionWindowInfo(pCurWin, tsCols, NULL, 0, pBlock->info.rows, i, gap, NULL);
|
||||
ASSERT(isInWindow(pCurWin, tsCols[i], gap));
|
||||
doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pSup, numOfOutput);
|
||||
if (result) {
|
||||
|
@ -4017,12 +4020,11 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It
|
|||
blockDataEnsureCapacity(pBlock, size);
|
||||
size_t keyLen = 0;
|
||||
while (((*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
colDataAppend(pColInfoData, pBlock->info.rows, *Ite, false);
|
||||
for (int32_t i = 1; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
|
||||
pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
colDataAppendNULL(pColInfoData, pBlock->info.rows);
|
||||
}
|
||||
SWinRes* res = *Ite;
|
||||
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
colDataAppend(pTsCol, pBlock->info.rows, (const char*)&res->ts, false);
|
||||
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
colDataAppend(pGpCol, pBlock->info.rows, (const char*)&res->groupId, false);
|
||||
pBlock->info.rows += 1;
|
||||
if (pBlock->info.rows + 1 >= pBlock->info.capacity) {
|
||||
break;
|
||||
|
@ -4149,7 +4151,8 @@ static void copyDeleteWindowInfo(SArray* pResWins, SHashObj* pStDeleted) {
|
|||
int32_t size = taosArrayGetSize(pResWins);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SResultWindowInfo* pWinInfo = taosArrayGet(pResWins, i);
|
||||
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY));
|
||||
SWinRes res = {.ts = pWinInfo->win.skey, .groupId = pWinInfo->groupId};
|
||||
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue