Merge pull request #14087 from taosdata/feature/stream

fix(stream): convert datablk to submit blk
This commit is contained in:
Liu Jicong 2022-06-22 09:43:48 +08:00 committed by GitHub
commit 557d69d14a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 209 additions and 220 deletions

View File

@ -90,8 +90,9 @@ int32_t create_stream() {
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/ /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes = taos_query(pConn, pRes = taos_query(
"create stream stream1 trigger window_close into outstb as select _wstartts, sum(k) from st1 " pConn,
"create stream stream1 trigger window_close watermark 10s into outstb as select _wstartts, sum(k) from st1 "
"interval(10s) "); "interval(10s) ");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));

View File

@ -80,11 +80,10 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
ret->length = sizeof(SSubmitReq); ret->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(sz); ret->numOfBlocks = htonl(sz);
void* submitBlk = POINTER_SHIFT(ret, sizeof(SSubmitReq)); SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
SSubmitBlk* blkHead = submitBlk;
blkHead->numOfRows = htons(pDataBlock->info.rows); blkHead->numOfRows = htons(pDataBlock->info.rows);
blkHead->sversion = htonl(pTSchema->version); blkHead->sversion = htonl(pTSchema->version);
// TODO // TODO
@ -97,7 +96,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
/*blkHead->dataLen = htonl(rows * maxLen);*/ /*blkHead->dataLen = htonl(rows * maxLen);*/
blkHead->dataLen = 0; blkHead->dataLen = 0;
void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk)); void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
int32_t schemaLen = 0; int32_t schemaLen = 0;
if (createTb) { if (createTb) {
@ -135,7 +134,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
} }
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, blockData, schemaLen); tEncoderInit(&encoder, blkSchema, schemaLen);
code = tEncodeSVCreateTbReq(&encoder, &createTbReq); code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
tEncoderClear(&encoder); tEncoderClear(&encoder);
tdDestroySVCreateTbReq(&createTbReq); tdDestroySVCreateTbReq(&createTbReq);
@ -148,7 +147,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
} }
blkHead->schemaLen = htonl(schemaLen); blkHead->schemaLen = htonl(schemaLen);
STSRow* rowData = POINTER_SHIFT(blockData, schemaLen); STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
for (int32_t j = 0; j < rows; j++) { for (int32_t j = 0; j < rows; j++) {
SRowBuilder rb = {0}; SRowBuilder rb = {0};
@ -174,8 +173,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
blkHead->dataLen = htonl(dataLen); blkHead->dataLen = htonl(dataLen);
ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen; ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
blkHead = POINTER_SHIFT(blkHead, schemaLen + dataLen); blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + schemaLen + dataLen);
/*submitBlk = blkHead;*/
} }
ret->length = htonl(ret->length); ret->length = htonl(ret->length);

View File

@ -16,8 +16,8 @@
#include "function.h" #include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "ttime.h"
#include "tfill.h" #include "tfill.h"
#include "ttime.h"
typedef enum SResultTsInterpType { typedef enum SResultTsInterpType {
RESULT_ROW_START_INTERP = 1, RESULT_ROW_START_INTERP = 1,
@ -324,8 +324,8 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o
tw->ekey -= 1; tw->ekey -= 1;
} }
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) { int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
SqlFunctionCtx* pCtx = pSup->pCtx; SqlFunctionCtx* pCtx = pSup->pCtx;
int32_t index = 1; int32_t index = 1;
@ -405,8 +405,8 @@ static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, in
} }
} }
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock, const TSKEY* tsCols, static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
STimeWindow* win, SExprSupp* pSup) { const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
bool ascQuery = (pInfo->order == TSDB_ORDER_ASC); bool ascQuery = (pInfo->order == TSDB_ORDER_ASC);
TSKEY curTs = tsCols[pos]; TSKEY curTs = tsCols[pos];
@ -434,9 +434,9 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, i
return true; return true;
} }
static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
int32_t endRowIndex, SArray* pDataBlock, const TSKEY* tsCols, SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey,
TSKEY blockEkey, STimeWindow* win) { STimeWindow* win) {
int32_t order = pInfo->order; int32_t order = pInfo->order;
TSKEY actualEndKey = tsCols[endRowIndex]; TSKEY actualEndKey = tsCols[endRowIndex];
@ -548,8 +548,8 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
} }
} }
static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult, STimeWindow* win, int32_t startPos, static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
int32_t forwardRows, SExprSupp* pSup) { STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
if (!pInfo->timeWindowInterpo) { if (!pInfo->timeWindowInterpo) {
return; return;
} }
@ -655,8 +655,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
} }
STimeWindow w = pr->win; STimeWindow w = pr->win;
int32_t ret = int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -672,8 +671,8 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
doApplyFunctions(pTaskInfo, pSup->pCtx, &w, &pInfo->twAggSup.timeWindowData, startPos, 0, tsCols, doApplyFunctions(pTaskInfo, pSup->pCtx, &w, &pInfo->twAggSup.timeWindowData, startPos, 0, tsCols, pBlock->info.rows,
pBlock->info.rows, numOfExprs, pInfo->order); numOfExprs, pInfo->order);
if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
closeResultRow(pr); closeResultRow(pr);
@ -753,8 +752,7 @@ int64_t getReskey(void* data, int32_t index) {
return *(int64_t*)pos->key; return *(int64_t*)pos->key;
} }
static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SArray* pUpdated) {
SArray* pUpdated) {
int32_t size = taosArrayGetSize(pUpdated); int32_t size = taosArrayGetSize(pUpdated);
int32_t index = binarySearch(pUpdated, size, ts, TSDB_ORDER_DESC, getReskey); int32_t index = binarySearch(pUpdated, size, ts, TSDB_ORDER_DESC, getReskey);
if (index == -1) { if (index == -1) {
@ -819,9 +817,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
pInfo->interval.precision, &pInfo->win); pInfo->interval.precision, &pInfo->win);
int32_t ret = int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
@ -843,8 +840,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos); doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
// restore current time window // restore current time window
ret = ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -870,8 +866,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// null data, failed to allocate more memory buffer // null data, failed to allocate more memory buffer
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
&pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
@ -890,8 +885,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
tsCols, pBlock->info.rows, numOfOutput, pInfo->order); pBlock->info.rows, numOfOutput, pInfo->order);
doCloseWindow(pResultRowInfo, pInfo, pResult); doCloseWindow(pResultRowInfo, pInfo, pResult);
} }
@ -1050,8 +1045,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
STimeWindow window = pRowSup->win; STimeWindow window = pRowSup->win;
pRowSup->win.ekey = pRowSup->win.skey; pRowSup->win.ekey = pRowSup->win.skey;
int32_t ret = int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
@ -1076,9 +1070,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
pRowSup->win.ekey = tsList[pBlock->info.rows - 1]; pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
int32_t ret = int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid, pSup->pCtx, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
@ -1331,7 +1324,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
} }
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_REPROCESS) {
doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs, pBlock, NULL); doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs, pBlock,
NULL);
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
continue; continue;
} else if (pBlock->info.type == STREAM_GET_ALL) { } else if (pBlock->info.type == STREAM_GET_ALL) {
@ -1618,8 +1612,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
STimeWindow window = pRowSup->win; STimeWindow window = pRowSup->win;
pRowSup->win.ekey = pRowSup->win.skey; pRowSup->win.ekey = pRowSup->win.skey;
int32_t ret = int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
@ -1638,9 +1631,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
pRowSup->win.ekey = tsList[pBlock->info.rows - 1]; pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
int32_t ret = int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid, pSup->pCtx, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
@ -1898,7 +1890,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
pResBlock->info.rows += 1; pResBlock->info.rows += 1;
doKeepPrevRows(pSliceInfo, pBlock, i); doKeepPrevRows(pSliceInfo, pBlock, i);
pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) { if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
break; break;
@ -2000,8 +1993,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyBasicOperatorInfo, pOperator->fpSet =
NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyBasicOperatorInfo, NULL, NULL, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
@ -2139,8 +2132,8 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
} }
} }
static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExprSupp* pSup, SArray* pWinArray, int32_t groupId, static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExprSupp* pSup, SArray* pWinArray,
int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
int32_t size = taosArrayGetSize(pWinArray); int32_t size = taosArrayGetSize(pWinArray);
if (!pInfo->pChildren) { if (!pInfo->pChildren) {
return; return;
@ -2148,8 +2141,8 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
STimeWindow* pParentWin = taosArrayGet(pWinArray, i); STimeWindow* pParentWin = taosArrayGet(pWinArray, i);
SResultRow* pCurResult = NULL; SResultRow* pCurResult = NULL;
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, pParentWin, true, &pCurResult, 0, pSup->pCtx, setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, pParentWin, true, &pCurResult, 0, pSup->pCtx, numOfOutput,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
for (int32_t j = 0; j < numOfChildren; j++) { for (int32_t j = 0; j < numOfChildren; j++) {
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j);
@ -2166,8 +2159,8 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) { bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId); SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId);
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(int64_t))); GET_RES_WINDOW_KEY_LEN(sizeof(int64_t)));
return p1 == NULL; return p1 == NULL;
} }
@ -2200,7 +2193,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup)) { isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup)) {
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow)); SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow));
taosArrayPush(pUpWins, &nextWin); taosArrayPush(pUpWins, &nextWin);
rebuildIntervalWindow(pInfo, pSup, pUpWins, pInfo->binfo.pRes->info.groupId, pSup->numOfExprs, pOperatorInfo->pTaskInfo); rebuildIntervalWindow(pInfo, pSup, pUpWins, pInfo->binfo.pRes->info.groupId, pSup->numOfExprs,
pOperatorInfo->pTaskInfo);
taosArrayDestroy(pUpWins); taosArrayDestroy(pUpWins);
} }
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pSup->pCtx, int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pSup->pCtx,
@ -2218,8 +2212,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
saveResultRow(pResult, tableGroupId, pUpdated); saveResultRow(pResult, tableGroupId, pUpdated);
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
int32_t prevEndPos = (forwardRows - 1) * step + startPos; int32_t prevEndPos = (forwardRows - 1) * step + startPos;
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0); ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order); startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order);
@ -2260,9 +2254,7 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol
blockDataUpdateTsWindow(pDest, 0); blockDataUpdateTsWindow(pDest, 0);
} }
static int32_t getChildIndex(SSDataBlock* pBlock) { static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
return pBlock->info.childId;
}
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
@ -2493,7 +2485,8 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
} }
} }
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock) { int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock) {
int32_t code = initExprSupp(pSup, pExprInfo, numOfCols); int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
@ -2523,11 +2516,13 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark); pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark);
} }
int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput) { int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx,
int32_t numOfOutput) {
return initStreamAggSupporter(pSup, pKey, pCtx, numOfOutput, sizeof(SResultWindowInfo)); return initStreamAggSupporter(pSup, pKey, pCtx, numOfOutput, sizeof(SResultWindowInfo));
} }
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
@ -2547,7 +2542,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
goto _error; goto _error;
} }
code = initSessionAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo", pSup->pCtx, numOfCols); code = initSessionAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo", pSup->pCtx, numOfCols);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
@ -2560,9 +2554,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
initDummyFunction(pInfo->pDummyCtx, pSup->pCtx, numOfCols); initDummyFunction(pInfo->pDummyCtx, pSup->pCtx, numOfCols);
pInfo->twAggSup = (STimeWindowAggSupp){ pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pSessionNode->window.watermark, .waterMark = pSessionNode->window.watermark, .calTrigger = pSessionNode->window.triggerType, .maxTs = INT64_MIN};
.calTrigger = pSessionNode->window.triggerType,
.maxTs = INT64_MIN};
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
@ -2648,8 +2640,8 @@ SArray* getWinInfos(SStreamAggSupporter* pAggSup, uint64_t groupId) {
return pWinInfos; return pWinInfos;
} }
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex) { int64_t gap, int32_t* pIndex) {
SArray* pWinInfos = getWinInfos(pAggSup, groupId); SArray* pWinInfos = getWinInfos(pAggSup, groupId);
pAggSup->pCurWins = pWinInfos; pAggSup->pCurWins = pWinInfos;
@ -2688,8 +2680,8 @@ SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY star
return insertNewSessionWindow(pWinInfos, startTs, index + 1); return insertNewSessionWindow(pWinInfos, startTs, index + 1);
} }
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, int32_t rows,
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted) { int32_t start, int64_t gap, SHashObj* pStDeleted) {
for (int32_t i = start; i < rows; ++i) { for (int32_t i = start; i < rows; ++i) {
if (!isInWindow(pWinInfo, pStartTs[i], gap) && (!pEndTs || !isInWindow(pWinInfo, pEndTs[i], gap))) { if (!isInWindow(pWinInfo, pStartTs[i], gap) && (!pEndTs || !isInWindow(pWinInfo, pEndTs[i], gap))) {
return i - start; return i - start;
@ -2824,8 +2816,8 @@ typedef struct SWinRes {
uint64_t groupId; uint64_t groupId;
} SWinRes; } SWinRes;
static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SHashObj* pStUpdated,
SHashObj* pStUpdated, SHashObj* pStDeleted, bool hasEndTs) { SHashObj* pStDeleted, bool hasEndTs) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
bool masterScan = true; bool masterScan = true;
@ -2858,10 +2850,9 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
for (int32_t i = 0; i < pSDataBlock->info.rows;) { for (int32_t i = 0; i < pSDataBlock->info.rows;) {
int32_t winIndex = 0; int32_t winIndex = 0;
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, startTsCols[i], SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, startTsCols[i], endTsCols[i], groupId, gap, &winIndex);
endTsCols[i], groupId, gap, &winIndex); winRows =
winRows = updateSessionWindowInfo(pCurWin, startTsCols, endTsCols, updateSessionWindowInfo(pCurWin, startTsCols, endTsCols, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted);
pSDataBlock->info.rows, i, pInfo->gap, pStDeleted);
code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pOperator); code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pOperator);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -2884,14 +2875,15 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
} }
} }
static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup, SSDataBlock* pBlock, static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup, SSDataBlock* pBlock, int32_t tsIndex,
int32_t tsIndex, int32_t numOfOutput, int64_t gap, SArray* result) { int32_t numOfOutput, int64_t gap, SArray* result) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
TSKEY* tsCols = (TSKEY*)pColDataInfo->pData; TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
int32_t step = 0; int32_t step = 0;
for (int32_t i = 0; i < pBlock->info.rows; i += step) { for (int32_t i = 0; i < pBlock->info.rows; i += step) {
int32_t winIndex = 0; int32_t winIndex = 0;
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[i], INT64_MIN, pBlock->info.groupId, gap, &winIndex); SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup, tsCols[i], INT64_MIN, pBlock->info.groupId, gap, &winIndex);
step = updateSessionWindowInfo(pCurWin, tsCols, NULL, pBlock->info.rows, i, gap, NULL); step = updateSessionWindowInfo(pCurWin, tsCols, NULL, pBlock->info.rows, i, gap, NULL);
ASSERT(isInWindow(pCurWin, tsCols[i], gap)); ASSERT(isInWindow(pCurWin, tsCols[i], gap));
doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pSup, numOfOutput); doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pSup, numOfOutput);
@ -2981,8 +2973,7 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*);
SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*)pData; } SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*)pData; }
SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; } SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; }
int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed, int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed, __get_win_info_ fn) {
__get_win_info_ fn) {
// Todo(liuyao) save window to tdb // Todo(liuyao) save window to tdb
void** pIte = NULL; void** pIte = NULL;
size_t keyLen = 0; size_t keyLen = 0;
@ -3057,7 +3048,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_REPROCESS) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, 0, pOperator->exprSupp.numOfExprs, pInfo->gap, pWins); doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, 0, pOperator->exprSupp.numOfExprs,
pInfo->gap, pWins);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock); int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
@ -3081,7 +3073,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
int32_t size = taosArrayGetSize(pInfo->pChildren); int32_t size = taosArrayGetSize(pInfo->pChildren);
// if chIndex + 1 - size > 0, add new child // if chIndex + 1 - size > 0, add new child
for (int32_t i = 0; i < chIndex + 1 - size; i++) { for (int32_t i = 0; i < chIndex + 1 - size; i++) {
SOperatorInfo* pChildOp = createStreamFinalSessionAggOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0); SOperatorInfo* pChildOp =
createStreamFinalSessionAggOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0);
if (!pChildOp) { if (!pChildOp) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
@ -3098,13 +3091,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForSession);
getResWinForSession);
copyUpdateResult(pStUpdated, pUpdated); copyUpdateResult(pStUpdated, pUpdated);
taosHashCleanup(pStUpdated); taosHashCleanup(pStUpdated);
finalizeUpdatedResult(pSup->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, finalizeUpdatedResult(pSup->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
@ -3226,8 +3217,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
} }
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) { SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
int32_t code = TSDB_CODE_OUT_OF_MEMORY; int32_t code = TSDB_CODE_OUT_OF_MEMORY;
SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo); SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo);
if (pOperator == NULL) { if (pOperator == NULL) {
@ -3245,15 +3236,14 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
blockDataEnsureCapacity(pInfo->pUpdateRes, 128); blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pOperator->name = "StreamSessionSemiAggOperator"; pOperator->name = "StreamSessionSemiAggOperator";
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo, createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL,
aggEncodeResultRow, aggDecodeResultRow, NULL); destroyStreamSessionAggOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
} }
pOperator->operatorType = pPhyNode->type; pOperator->operatorType = pPhyNode->type;
if (numOfChild > 0) { if (numOfChild > 0) {
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*)); pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
for (int32_t i = 0; i < numOfChild; i++) { for (int32_t i = 0; i < numOfChild; i++) {
SOperatorInfo* pChild = SOperatorInfo* pChild = createStreamFinalSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
createStreamFinalSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
if (pChild == NULL) { if (pChild == NULL) {
goto _error; goto _error;
} }
@ -3371,8 +3361,8 @@ SStateWindowInfo* getStateWindowByTs(SStreamAggSupporter* pAggSup, TSKEY ts, uin
return NULL; return NULL;
} }
SStateWindowInfo* getStateWindow(SStreamAggSupporter* pAggSup, TSKEY ts, SStateWindowInfo* getStateWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, char* pKeyData,
uint64_t groupId, char* pKeyData, SColumn* pCol, int32_t* pIndex) { SColumn* pCol, int32_t* pIndex) {
SArray* pWinInfos = getWinInfos(pAggSup, groupId); SArray* pWinInfos = getWinInfos(pAggSup, groupId);
pAggSup->pCurWins = pWinInfos; pAggSup->pCurWins = pWinInfos;
int32_t size = taosArrayGetSize(pWinInfos); int32_t size = taosArrayGetSize(pWinInfos);
@ -3505,10 +3495,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
int32_t winIndex = 0; int32_t winIndex = 0;
bool allEqual = true; bool allEqual = true;
SStateWindowInfo* pCurWin = SStateWindowInfo* pCurWin =
getStateWindow(pAggSup, tsCols[i], pSDataBlock->info.groupId, pKeyData, getStateWindow(pAggSup, tsCols[i], pSDataBlock->info.groupId, pKeyData, &pInfo->stateCol, &winIndex);
&pInfo->stateCol, &winIndex); winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, pKeyColInfo, pSDataBlock->info.rows, i,
winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, pKeyColInfo, &allEqual, pInfo->pSeDeleted);
pSDataBlock->info.rows, i, &allEqual, pInfo->pSeDeleted);
if (!allEqual) { if (!allEqual) {
taosArrayPush(pAggSup->pScanWindow, &pCurWin->winInfo.win); taosArrayPush(pAggSup->pScanWindow, &pCurWin->winInfo.win);
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
@ -3522,8 +3511,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
pCurWin->winInfo.isClosed = false; pCurWin->winInfo.isClosed = false;
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
SWinRes value = {.ts = pCurWin->winInfo.win.skey, .groupId = groupId}; SWinRes value = {.ts = pCurWin->winInfo.win.skey, .groupId = groupId};
code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), &value, sizeof(SWinRes));
&value, sizeof(SWinRes));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
@ -3579,8 +3567,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForState);
getResWinForState);
copyUpdateResult(pSeUpdated, pUpdated); copyUpdateResult(pSeUpdated, pUpdated);
taosHashCleanup(pSeUpdated); taosHashCleanup(pSeUpdated);
@ -3694,7 +3681,8 @@ void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput); destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
} }
static int32_t outputMergeIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock, TSKEY wstartTs) { static int32_t outputMergeIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
TSKEY wstartTs) {
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
@ -3707,9 +3695,8 @@ static int32_t outputMergeIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL); ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs,
pSup->numOfExprs, pSup->rowEntryInfoOffset, pResultBlock, pSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
pTaskInfo);
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
return 0; return 0;
@ -3732,12 +3719,12 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
STimeWindow win; STimeWindow win;
win.skey = blockStartTs; win.skey = blockStartTs;
win.ekey = taosTimeAdd(win.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1; win.ekey =
taosTimeAdd(win.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1;
// TODO: remove the hash table usage (groupid + winkey => result row position) // TODO: remove the hash table usage (groupid + winkey => result row position)
int32_t ret = int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
@ -3754,25 +3741,27 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
continue; continue;
} else { } else {
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
currPos - startPos, tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); tsCols, pBlock->info.rows, numOfOutput, iaInfo->order);
outputMergeIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); outputMergeIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
currTs = tsCols[currPos]; currTs = tsCols[currPos];
currWin.skey = currTs; currWin.skey = currTs;
currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1; currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit,
iaInfo->interval.precision) -
1;
startPos = currPos; startPos = currPos;
ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
} }
} }
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
currPos - startPos, tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); tsCols, pBlock->info.rows, numOfOutput, iaInfo->order);
outputMergeIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); outputMergeIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
} }
@ -3858,7 +3847,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
int32_t code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); int32_t code =
initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&iaInfo->binfo, pResBlock); initBasicInfo(&iaInfo->binfo, pResBlock);
initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);

View File

@ -75,7 +75,7 @@
./test.sh -f tsim/stream/distributeInterval0.sim ./test.sh -f tsim/stream/distributeInterval0.sim
# ./test.sh -f tsim/stream/distributesession0.sim # ./test.sh -f tsim/stream/distributesession0.sim
# ./test.sh -f tsim/stream/session0.sim # ./test.sh -f tsim/stream/session0.sim
# ./test.sh -f tsim/stream/session1.sim ./test.sh -f tsim/stream/session1.sim
# ./test.sh -f tsim/stream/state0.sim # ./test.sh -f tsim/stream/state0.sim
./test.sh -f tsim/stream/triggerInterval0.sim ./test.sh -f tsim/stream/triggerInterval0.sim
# ./test.sh -f tsim/stream/triggerSession0.sim # ./test.sh -f tsim/stream/triggerSession0.sim