|
|
|
@ -1257,6 +1257,10 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool isCloseWindow(STimeWindow *pWin, STimeWindowAggSupp* pSup) {
|
|
|
|
|
return pWin->ekey < pSup->maxTs - pSup->waterMark;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
|
|
|
|
|
SArray* closeWins) {
|
|
|
|
|
void* pIte = NULL;
|
|
|
|
@ -1269,7 +1273,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
|
|
|
|
|
SResultRowInfo dumyInfo;
|
|
|
|
|
dumyInfo.cur.pageId = -1;
|
|
|
|
|
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL);
|
|
|
|
|
if (win.ekey < pSup->maxTs - pSup->waterMark) {
|
|
|
|
|
if (isCloseWindow(&win, pSup)) {
|
|
|
|
|
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
|
|
|
|
|
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
|
|
|
|
|
taosHashRemove(pHashMap, keyBuf, keyLen);
|
|
|
|
@ -2036,59 +2040,6 @@ _error:
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId,
|
|
|
|
|
SArray* pUpdated) {
|
|
|
|
|
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
|
|
|
|
|
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
|
|
|
|
|
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
|
|
|
|
int32_t numOfOutput = pOperatorInfo->numOfExprs;
|
|
|
|
|
int32_t step = 1;
|
|
|
|
|
bool ascScan = true;
|
|
|
|
|
TSKEY* tsCols = NULL;
|
|
|
|
|
SResultRow* pResult = NULL;
|
|
|
|
|
int32_t forwardRows = 0;
|
|
|
|
|
|
|
|
|
|
if (pSDataBlock->pDataBlock != NULL) {
|
|
|
|
|
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
|
|
|
|
tsCols = (int64_t*)pColDataInfo->pData;
|
|
|
|
|
} else {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
|
|
|
|
|
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
|
|
|
|
|
STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
|
|
|
|
|
pInfo->interval.precision, NULL);
|
|
|
|
|
while (1) {
|
|
|
|
|
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pInfo->binfo.pCtx,
|
|
|
|
|
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
|
|
|
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
|
|
|
}
|
|
|
|
|
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
|
|
|
|
|
pos->groupId = tableGroupId;
|
|
|
|
|
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
|
|
|
|
|
*(int64_t*)pos->key = pResult->win.skey;
|
|
|
|
|
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
|
|
|
|
|
TSDB_ORDER_ASC);
|
|
|
|
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
|
|
|
|
|
saveResultRow(pResult, tableGroupId, pUpdated);
|
|
|
|
|
}
|
|
|
|
|
// window start(end) key interpolation
|
|
|
|
|
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
|
|
|
|
|
// forwardRows);
|
|
|
|
|
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
|
|
|
|
|
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
|
|
|
|
|
tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
|
|
|
|
int32_t prevEndPos = (forwardRows - 1) * step + startPos;
|
|
|
|
|
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
|
|
|
|
|
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order);
|
|
|
|
|
if (startPos < 0) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool isFinalInterval(SStreamFinalIntervalOperatorInfo* pInfo) { return pInfo->pChildren != NULL; }
|
|
|
|
|
|
|
|
|
|
void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput,
|
|
|
|
@ -2130,6 +2081,74 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SArra
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
|
|
|
|
|
SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId);
|
|
|
|
|
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable,
|
|
|
|
|
pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(int64_t)));
|
|
|
|
|
return p1 == NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId,
|
|
|
|
|
SArray* pUpdated) {
|
|
|
|
|
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
|
|
|
|
|
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
|
|
|
|
|
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
|
|
|
|
int32_t numOfOutput = pOperatorInfo->numOfExprs;
|
|
|
|
|
int32_t step = 1;
|
|
|
|
|
bool ascScan = true;
|
|
|
|
|
TSKEY* tsCols = NULL;
|
|
|
|
|
SResultRow* pResult = NULL;
|
|
|
|
|
int32_t forwardRows = 0;
|
|
|
|
|
|
|
|
|
|
if (pSDataBlock->pDataBlock != NULL) {
|
|
|
|
|
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
|
|
|
|
tsCols = (int64_t*)pColDataInfo->pData;
|
|
|
|
|
} else {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
|
|
|
|
|
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
|
|
|
|
|
STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
|
|
|
|
|
pInfo->interval.precision, NULL);
|
|
|
|
|
while (1) {
|
|
|
|
|
if (isFinalInterval(pInfo) && isCloseWindow(&nextWin, &pInfo->twAggSup) &&
|
|
|
|
|
isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup)) {
|
|
|
|
|
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow));
|
|
|
|
|
taosArrayPush(pUpWins, &nextWin);
|
|
|
|
|
rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId,
|
|
|
|
|
pOperatorInfo->numOfExprs, pOperatorInfo->pTaskInfo);
|
|
|
|
|
taosArrayDestroy(pUpWins);
|
|
|
|
|
}
|
|
|
|
|
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pInfo->binfo.pCtx,
|
|
|
|
|
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
|
|
|
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
|
|
|
}
|
|
|
|
|
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
|
|
|
|
|
pos->groupId = tableGroupId;
|
|
|
|
|
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
|
|
|
|
|
*(int64_t*)pos->key = pResult->win.skey;
|
|
|
|
|
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
|
|
|
|
|
TSDB_ORDER_ASC);
|
|
|
|
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
|
|
|
|
|
saveResultRow(pResult, tableGroupId, pUpdated);
|
|
|
|
|
}
|
|
|
|
|
// window start(end) key interpolation
|
|
|
|
|
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
|
|
|
|
|
// forwardRows);
|
|
|
|
|
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
|
|
|
|
|
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
|
|
|
|
|
tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
|
|
|
|
int32_t prevEndPos = (forwardRows - 1) * step + startPos;
|
|
|
|
|
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
|
|
|
|
|
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order);
|
|
|
|
|
if (startPos < 0) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) {
|
|
|
|
|
taosHashClear(pInfo->aggSup.pResultRowHashTable);
|
|
|
|
|
clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
|
|
|
|
@ -2169,6 +2188,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
|
|
|
|
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
|
|
|
|
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
|
|
|
|
|
TSKEY maxTs = INT64_MIN;
|
|
|
|
|
|
|
|
|
|
if (pOperator->status == OP_EXEC_DONE) {
|
|
|
|
|
return NULL;
|
|
|
|
@ -2222,6 +2242,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
|
|
|
|
|
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated);
|
|
|
|
|
if (isFinalInterval(pInfo)) {
|
|
|
|
|
int32_t chIndex = getChildIndex(pBlock);
|
|
|
|
|
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
|
|
|
@ -2238,10 +2259,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
setInputDataBlock(pChildOp, pChInfo->binfo.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true);
|
|
|
|
|
doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL);
|
|
|
|
|
}
|
|
|
|
|
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated);
|
|
|
|
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
|
|
|
|
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
|
|
|
|
if (isFinalInterval(pInfo)) {
|
|
|
|
|
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated);
|
|
|
|
|
}
|
|
|
|
@ -2564,7 +2585,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx,
|
|
|
|
|
int32_t groupId, int32_t numOfOutput, int32_t* rowCellInfoOffset,
|
|
|
|
|
uint64_t groupId, int32_t numOfOutput, int32_t* rowCellInfoOffset,
|
|
|
|
|
SStreamAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) {
|
|
|
|
|
assert(pWinInfo->win.skey <= pWinInfo->win.ekey);
|
|
|
|
|
// too many time window in query
|
|
|
|
@ -2642,7 +2663,7 @@ int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap)
|
|
|
|
|
return size - startIndex - 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t num, int32_t groupId,
|
|
|
|
|
void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t num, uint64_t groupId,
|
|
|
|
|
int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pStUpdated, SHashObj* pStDeleted) {
|
|
|
|
|
SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pCurWins, startIndex);
|
|
|
|
|
SResultRow* pCurResult = NULL;
|
|
|
|
@ -2667,13 +2688,18 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct SWinRes {
|
|
|
|
|
TSKEY ts;
|
|
|
|
|
uint64_t groupId;
|
|
|
|
|
} SWinRes;
|
|
|
|
|
|
|
|
|
|
static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SHashObj* pStUpdated,
|
|
|
|
|
SHashObj* pStDeleted) {
|
|
|
|
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
|
|
|
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
|
|
|
|
bool masterScan = true;
|
|
|
|
|
int32_t numOfOutput = pOperator->numOfExprs;
|
|
|
|
|
int64_t groupId = pSDataBlock->info.groupId;
|
|
|
|
|
uint64_t groupId = pSDataBlock->info.groupId;
|
|
|
|
|
int64_t gap = pInfo->gap;
|
|
|
|
|
int64_t code = TSDB_CODE_SUCCESS;
|
|
|
|
|
|
|
|
|
@ -2693,7 +2719,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
|
|
|
|
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
|
|
|
|
for (int32_t i = 0; i < pSDataBlock->info.rows;) {
|
|
|
|
|
int32_t winIndex = 0;
|
|
|
|
|
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[i], pSDataBlock->info.groupId, gap, &winIndex);
|
|
|
|
|
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[i], groupId, gap, &winIndex);
|
|
|
|
|
winRows = updateSessionWindowInfo(pCurWin, tsCols, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted);
|
|
|
|
|
code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pTaskInfo);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
|
|
|
@ -2709,7 +2735,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
|
|
|
|
|
}
|
|
|
|
|
pCurWin->isClosed = false;
|
|
|
|
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
|
|
|
|
code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &(pCurWin->win.skey), sizeof(TSKEY));
|
|
|
|
|
SWinRes value = {.ts = pCurWin->win.skey, .groupId = groupId};
|
|
|
|
|
code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinRes));
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
|
|
|
}
|
|
|
|
@ -2736,7 +2763,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo*
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated, int32_t groupId) {
|
|
|
|
|
static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated) {
|
|
|
|
|
void* pData = NULL;
|
|
|
|
|
size_t keyLen = 0;
|
|
|
|
|
while ((pData = taosHashIterate(pStUpdated, pData)) != NULL) {
|
|
|
|
@ -2746,9 +2773,9 @@ static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated, int32_t
|
|
|
|
|
if (pos == NULL) {
|
|
|
|
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
|
|
|
|
}
|
|
|
|
|
pos->groupId = groupId;
|
|
|
|
|
pos->groupId = ((SWinRes*)pData)->groupId;
|
|
|
|
|
pos->pos = *(SResultRowPosition*)key;
|
|
|
|
|
*(int64_t*)pos->key = *(uint64_t*)pData;
|
|
|
|
|
*(int64_t*)pos->key = ((SWinRes*)pData)->ts;
|
|
|
|
|
taosArrayPush(pUpdated, &pos);
|
|
|
|
|
}
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
@ -2815,7 +2842,9 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
|
|
|
|
|
__get_win_info_ fn) {
|
|
|
|
|
// Todo(liuyao) save window to tdb
|
|
|
|
|
void **pIte = NULL;
|
|
|
|
|
size_t keyLen = 0;
|
|
|
|
|
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
|
|
|
|
|
uint64_t* pGroupId = taosHashGetKey(pIte, &keyLen);
|
|
|
|
|
SArray *pWins = (SArray *) (*pIte);
|
|
|
|
|
int32_t size = taosArrayGetSize(pWins);
|
|
|
|
|
for (int32_t i = 0; i < size; i++) {
|
|
|
|
@ -2825,7 +2854,7 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
|
|
|
|
|
if (!pSeWin->isClosed) {
|
|
|
|
|
pSeWin->isClosed = true;
|
|
|
|
|
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
|
|
|
|
int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed);
|
|
|
|
|
int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, *pGroupId, pClosed);
|
|
|
|
|
pSeWin->isOutput = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -2892,7 +2921,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
|
|
|
|
|
doClearSessionWindows(&pChildInfo->streamAggSup, &pChildInfo->binfo, pBlock, 0, pChildOp->numOfExprs,
|
|
|
|
|
pChildInfo->gap, NULL);
|
|
|
|
|
rebuildTimeWindow(pInfo, pWins, pInfo->binfo.pRes->info.groupId, pOperator->numOfExprs, pOperator->pTaskInfo);
|
|
|
|
|
rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->numOfExprs, pOperator->pTaskInfo);
|
|
|
|
|
}
|
|
|
|
|
taosArrayDestroy(pWins);
|
|
|
|
|
continue;
|
|
|
|
@ -2916,7 +2945,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
|
|
|
|
|
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
|
|
|
|
|
getSessionWinInfo);
|
|
|
|
|
copyUpdateResult(pStUpdated, pUpdated, pBInfo->pRes->info.groupId);
|
|
|
|
|
copyUpdateResult(pStUpdated, pUpdated);
|
|
|
|
|
taosHashCleanup(pStUpdated);
|
|
|
|
|
|
|
|
|
|
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
|
|
|
|
@ -3216,8 +3245,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
|
|
|
|
}
|
|
|
|
|
pCurWin->winInfo.isClosed = false;
|
|
|
|
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
|
|
|
|
code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), &(pCurWin->winInfo.win.skey),
|
|
|
|
|
sizeof(TSKEY));
|
|
|
|
|
SWinRes value = {.ts = pCurWin->winInfo.win.skey, .groupId = groupId};
|
|
|
|
|
code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition),
|
|
|
|
|
&value, sizeof(SWinRes));
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
|
|
|
}
|
|
|
|
@ -3274,7 +3304,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
|
|
|
|
|
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
|
|
|
|
|
getStateWinInfo);
|
|
|
|
|
copyUpdateResult(pSeUpdated, pUpdated, pBInfo->pRes->info.groupId);
|
|
|
|
|
copyUpdateResult(pSeUpdated, pUpdated);
|
|
|
|
|
taosHashCleanup(pSeUpdated);
|
|
|
|
|
|
|
|
|
|
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
|
|
|
|
|