Merge pull request #13901 from taosdata/feature/TD-16512

feat(stream): stream partition by
This commit is contained in:
liuyao 2022-06-16 18:39:34 +08:00 committed by GitHub
commit 39e2a646e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 195 additions and 74 deletions

View File

@ -1257,6 +1257,10 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
bool isCloseWindow(STimeWindow *pWin, STimeWindowAggSupp* pSup) {
return pWin->ekey < pSup->maxTs - pSup->waterMark;
}
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
SArray* closeWins) { SArray* closeWins) {
void* pIte = NULL; void* pIte = NULL;
@ -1269,7 +1273,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL);
if (win.ekey < pSup->maxTs - pSup->waterMark) { if (isCloseWindow(&win, pSup)) {
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))]; char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId); SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
taosHashRemove(pHashMap, keyBuf, keyLen); taosHashRemove(pHashMap, keyBuf, keyLen);
@ -2036,59 +2040,6 @@ _error:
return NULL; return NULL;
} }
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId,
SArray* pUpdated) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
int32_t numOfOutput = pOperatorInfo->numOfExprs;
int32_t step = 1;
bool ascScan = true;
TSKEY* tsCols = NULL;
SResultRow* pResult = NULL;
int32_t forwardRows = 0;
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
} else {
return;
}
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
pInfo->interval.precision, NULL);
while (1) {
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
pos->groupId = tableGroupId;
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
*(int64_t*)pos->key = pResult->win.skey;
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
saveResultRow(pResult, tableGroupId, pUpdated);
}
// window start(end) key interpolation
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
// forwardRows);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
int32_t prevEndPos = (forwardRows - 1) * step + startPos;
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order);
if (startPos < 0) {
break;
}
}
}
bool isFinalInterval(SStreamFinalIntervalOperatorInfo* pInfo) { return pInfo->pChildren != NULL; } bool isFinalInterval(SStreamFinalIntervalOperatorInfo* pInfo) { return pInfo->pChildren != NULL; }
void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput, void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput,
@ -2130,6 +2081,74 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SArra
} }
} }
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId);
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable,
pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(int64_t)));
return p1 == NULL;
}
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId,
SArray* pUpdated) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
int32_t numOfOutput = pOperatorInfo->numOfExprs;
int32_t step = 1;
bool ascScan = true;
TSKEY* tsCols = NULL;
SResultRow* pResult = NULL;
int32_t forwardRows = 0;
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
} else {
return;
}
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
pInfo->interval.precision, NULL);
while (1) {
if (isFinalInterval(pInfo) && isCloseWindow(&nextWin, &pInfo->twAggSup) &&
isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup)) {
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow));
taosArrayPush(pUpWins, &nextWin);
rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId,
pOperatorInfo->numOfExprs, pOperatorInfo->pTaskInfo);
taosArrayDestroy(pUpWins);
}
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
pos->groupId = tableGroupId;
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
*(int64_t*)pos->key = pResult->win.skey;
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
saveResultRow(pResult, tableGroupId, pUpdated);
}
// window start(end) key interpolation
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
// forwardRows);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
int32_t prevEndPos = (forwardRows - 1) * step + startPos;
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order);
if (startPos < 0) {
break;
}
}
}
static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) { static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) {
taosHashClear(pInfo->aggSup.pResultRowHashTable); taosHashClear(pInfo->aggSup.pResultRowHashTable);
clearDiskbasedBuf(pInfo->aggSup.pResultBuf); clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
@ -2169,6 +2188,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
TSKEY maxTs = INT64_MIN;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
@ -2222,6 +2242,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated);
if (isFinalInterval(pInfo)) { if (isFinalInterval(pInfo)) {
int32_t chIndex = getChildIndex(pBlock); int32_t chIndex = getChildIndex(pBlock);
int32_t size = taosArrayGetSize(pInfo->pChildren); int32_t size = taosArrayGetSize(pInfo->pChildren);
@ -2238,10 +2259,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
setInputDataBlock(pChildOp, pChInfo->binfo.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true); setInputDataBlock(pChildOp, pChInfo->binfo.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true);
doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL); doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL);
} }
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated); maxTs = TMAX(maxTs, pBlock->info.window.ekey);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
} }
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
if (isFinalInterval(pInfo)) { if (isFinalInterval(pInfo)) {
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated); closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated);
} }
@ -2564,7 +2585,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t
} }
static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx, static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx,
int32_t groupId, int32_t numOfOutput, int32_t* rowCellInfoOffset, uint64_t groupId, int32_t numOfOutput, int32_t* rowCellInfoOffset,
SStreamAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) { SStreamAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) {
assert(pWinInfo->win.skey <= pWinInfo->win.ekey); assert(pWinInfo->win.skey <= pWinInfo->win.ekey);
// too many time window in query // too many time window in query
@ -2642,7 +2663,7 @@ int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap)
return size - startIndex - 1; return size - startIndex - 1;
} }
void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t num, int32_t groupId, void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t num, uint64_t groupId,
int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pStUpdated, SHashObj* pStDeleted) { int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pStUpdated, SHashObj* pStDeleted) {
SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pCurWins, startIndex); SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pCurWins, startIndex);
SResultRow* pCurResult = NULL; SResultRow* pCurResult = NULL;
@ -2667,13 +2688,18 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
} }
} }
typedef struct SWinRes {
TSKEY ts;
uint64_t groupId;
} SWinRes;
static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SHashObj* pStUpdated, static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SHashObj* pStUpdated,
SHashObj* pStDeleted) { SHashObj* pStDeleted) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
bool masterScan = true; bool masterScan = true;
int32_t numOfOutput = pOperator->numOfExprs; int32_t numOfOutput = pOperator->numOfExprs;
int64_t groupId = pSDataBlock->info.groupId; uint64_t groupId = pSDataBlock->info.groupId;
int64_t gap = pInfo->gap; int64_t gap = pInfo->gap;
int64_t code = TSDB_CODE_SUCCESS; int64_t code = TSDB_CODE_SUCCESS;
@ -2693,7 +2719,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
for (int32_t i = 0; i < pSDataBlock->info.rows;) { for (int32_t i = 0; i < pSDataBlock->info.rows;) {
int32_t winIndex = 0; int32_t winIndex = 0;
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[i], pSDataBlock->info.groupId, gap, &winIndex); SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[i], groupId, gap, &winIndex);
winRows = updateSessionWindowInfo(pCurWin, tsCols, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted); winRows = updateSessionWindowInfo(pCurWin, tsCols, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted);
code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pTaskInfo); code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
@ -2709,7 +2735,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
} }
pCurWin->isClosed = false; pCurWin->isClosed = false;
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &(pCurWin->win.skey), sizeof(TSKEY)); SWinRes value = {.ts = pCurWin->win.skey, .groupId = groupId};
code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinRes));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
@ -2736,7 +2763,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo*
} }
} }
static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated, int32_t groupId) { static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated) {
void* pData = NULL; void* pData = NULL;
size_t keyLen = 0; size_t keyLen = 0;
while ((pData = taosHashIterate(pStUpdated, pData)) != NULL) { while ((pData = taosHashIterate(pStUpdated, pData)) != NULL) {
@ -2746,9 +2773,9 @@ static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated, int32_t
if (pos == NULL) { if (pos == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
pos->groupId = groupId; pos->groupId = ((SWinRes*)pData)->groupId;
pos->pos = *(SResultRowPosition*)key; pos->pos = *(SResultRowPosition*)key;
*(int64_t*)pos->key = *(uint64_t*)pData; *(int64_t*)pos->key = ((SWinRes*)pData)->ts;
taosArrayPush(pUpdated, &pos); taosArrayPush(pUpdated, &pos);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -2815,7 +2842,9 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
__get_win_info_ fn) { __get_win_info_ fn) {
// Todo(liuyao) save window to tdb // Todo(liuyao) save window to tdb
void **pIte = NULL; void **pIte = NULL;
size_t keyLen = 0;
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
uint64_t* pGroupId = taosHashGetKey(pIte, &keyLen);
SArray *pWins = (SArray *) (*pIte); SArray *pWins = (SArray *) (*pIte);
int32_t size = taosArrayGetSize(pWins); int32_t size = taosArrayGetSize(pWins);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
@ -2825,7 +2854,7 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
if (!pSeWin->isClosed) { if (!pSeWin->isClosed) {
pSeWin->isClosed = true; pSeWin->isClosed = true;
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed); int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, *pGroupId, pClosed);
pSeWin->isOutput = true; pSeWin->isOutput = true;
} }
} }
@ -2892,7 +2921,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
doClearSessionWindows(&pChildInfo->streamAggSup, &pChildInfo->binfo, pBlock, 0, pChildOp->numOfExprs, doClearSessionWindows(&pChildInfo->streamAggSup, &pChildInfo->binfo, pBlock, 0, pChildOp->numOfExprs,
pChildInfo->gap, NULL); pChildInfo->gap, NULL);
rebuildTimeWindow(pInfo, pWins, pInfo->binfo.pRes->info.groupId, pOperator->numOfExprs, pOperator->pTaskInfo); rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->numOfExprs, pOperator->pTaskInfo);
} }
taosArrayDestroy(pWins); taosArrayDestroy(pWins);
continue; continue;
@ -2916,7 +2945,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getSessionWinInfo); getSessionWinInfo);
copyUpdateResult(pStUpdated, pUpdated, pBInfo->pRes->info.groupId); copyUpdateResult(pStUpdated, pUpdated);
taosHashCleanup(pStUpdated); taosHashCleanup(pStUpdated);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
@ -3216,8 +3245,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
} }
pCurWin->winInfo.isClosed = false; pCurWin->winInfo.isClosed = false;
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), &(pCurWin->winInfo.win.skey), SWinRes value = {.ts = pCurWin->winInfo.win.skey, .groupId = groupId};
sizeof(TSKEY)); code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition),
&value, sizeof(SWinRes));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
@ -3274,7 +3304,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getStateWinInfo); getStateWinInfo);
copyUpdateResult(pSeUpdated, pUpdated, pBInfo->pRes->info.groupId); copyUpdateResult(pSeUpdated, pUpdated);
taosHashCleanup(pSeUpdated); taosHashCleanup(pSeUpdated);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,

View File

@ -173,4 +173,39 @@ endi
sql select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s); sql select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s);
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sql create database test1 vgroups 1;
sql use test1;
sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t2 trigger at_once into streamtST1 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ;
sql insert into ts1 values(1648791211000,1,2,3);
sql insert into ts1 values(1648791222001,2,2,3);
sql insert into ts2 values(1648791211000,1,2,3);
sql insert into ts2 values(1648791222001,2,2,3);
$loop_count = 0
loop2:
sql select * from streamtST1;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data01 != 2 then
print =====data01=$data01
goto loop2
endi
#rows 1
if $data11 != 2 then
print =====data11=$data11
goto loop2
endi
system sh/stop_dnodes.sh

View File

@ -34,6 +34,7 @@ print =====rows=$rows
goto loop0 goto loop0
endi endi
print =====loop0
sql create database test1 vgroups 1; sql create database test1 vgroups 1;
sql use test1; sql use test1;
@ -51,7 +52,7 @@ sql insert into ts2 values(1648791211000,1,2,3);
$loop_count = 0 $loop_count = 0
loop0: loop1:
sleep 300 sleep 300
sql select * from streamt; sql select * from streamt;
@ -62,7 +63,62 @@ endi
if $rows != 2 then if $rows != 2 then
print =====rows=$rows print =====rows=$rows
goto loop0 goto loop1
endi endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT print =====loop1
sql create database test2 vgroups 1;
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int,id int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t2 trigger at_once into streamtST as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6, max(id) c7 from st partition by ta interval(10s) ;
sql insert into ts1 values(1648791211000,1,2,3,1);
sql insert into ts1 values(1648791222001,2,2,3,2);
sql insert into ts2 values(1648791211000,1,2,3,3);
sql insert into ts2 values(1648791222001,2,2,3,4);
sql insert into ts2 values(1648791222002,2,2,3,5);
sql insert into ts2 values(1648791222002,2,2,3,6);
sql insert into ts1 values(1648791211000,1,2,3,1);
sql insert into ts1 values(1648791222001,2,2,3,2);
sql insert into ts2 values(1648791211000,1,2,3,3);
sql insert into ts2 values(1648791222001,2,2,3,4);
$loop_count = 0
loop2:
sleep 300
sql select * from streamtST;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop2
endi
if $data02 != 1 then
print =====data02=$data02
goto loop2
endi
if $data03 != 1 then
print =====data03=$data03
goto loop2
endi
if $data04 != 2 then
print =====data04=$data04
goto loop2
endi
print =====loop2
system sh/stop_dnodes.sh