Merge pull request #12932 from taosdata/feature/TD-15774
feat(stream):stream session operator
This commit is contained in:
commit
4bc8470bb6
|
@ -39,6 +39,7 @@ typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInf
|
|||
typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
|
||||
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
|
||||
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
typedef int32_t (*FExecCombine)(struct SqlFunctionCtx *pDestCtx, struct SqlFunctionCtx *pSourceCtx);
|
||||
|
||||
typedef struct SScalarFuncExecFuncs {
|
||||
FExecGetEnv getEnv;
|
||||
|
@ -50,6 +51,7 @@ typedef struct SFuncExecFuncs {
|
|||
FExecInit init;
|
||||
FExecProcess process;
|
||||
FExecFinalize finalize;
|
||||
FExecCombine combine;
|
||||
} SFuncExecFuncs;
|
||||
|
||||
typedef struct SFileBlockInfo {
|
||||
|
|
|
@ -212,6 +212,7 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_FILL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW,
|
||||
QUERY_NODE_PHYSICAL_PLAN_PARTITION,
|
||||
QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
|
||||
|
|
|
@ -296,6 +296,8 @@ typedef struct SSessionWinodwPhysiNode {
|
|||
int64_t gap;
|
||||
} SSessionWinodwPhysiNode;
|
||||
|
||||
typedef SSessionWinodwPhysiNode SStreamSessionWinodwPhysiNode;
|
||||
|
||||
typedef struct SStateWinodwPhysiNode {
|
||||
SWinodwPhysiNode window;
|
||||
SNode* pStateKey;
|
||||
|
|
|
@ -361,6 +361,18 @@ typedef struct SCatchSupporter {
|
|||
int64_t* pKeyBuf;
|
||||
} SCatchSupporter;
|
||||
|
||||
typedef struct SStreamAggSupporter {
|
||||
SArray* pResultRows; // SResultWindowInfo
|
||||
int32_t keySize;
|
||||
char* pKeyBuf; // window key buffer
|
||||
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
||||
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
|
||||
} SStreamAggSupporter;
|
||||
|
||||
typedef struct SessionWindowSupporter {
|
||||
SStreamAggSupporter* pStreamAggSup;
|
||||
int64_t gap;
|
||||
} SessionWindowSupporter;
|
||||
typedef struct SStreamBlockScanInfo {
|
||||
SArray* pBlockLists; // multiple SSDatablock.
|
||||
SSDataBlock* pRes; // result SSDataBlock
|
||||
|
@ -385,6 +397,7 @@ typedef struct SStreamBlockScanInfo {
|
|||
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
|
||||
SCatchSupporter childAggSup;
|
||||
SArray* childIds;
|
||||
SessionWindowSupporter sessionSup;
|
||||
} SStreamBlockScanInfo;
|
||||
|
||||
typedef struct SSysTableScanInfo {
|
||||
|
@ -550,6 +563,27 @@ typedef struct SSessionAggOperatorInfo {
|
|||
STimeWindowAggSupp twAggSup;
|
||||
} SSessionAggOperatorInfo;
|
||||
|
||||
typedef struct SResultWindowInfo {
|
||||
SResultRowPosition pos;
|
||||
STimeWindow win;
|
||||
bool isOutput;
|
||||
} SResultWindowInfo;
|
||||
|
||||
typedef struct SStreamSessionAggOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
SStreamAggSupporter streamAggSup;
|
||||
SGroupResInfo groupResInfo;
|
||||
int64_t gap; // session window gap
|
||||
int32_t primaryTsIndex; // primary timestamp slot id
|
||||
int32_t order; // current SSDataBlock scan order
|
||||
STimeWindowAggSupp twAggSup;
|
||||
SSDataBlock* pWinBlock; // window result
|
||||
SqlFunctionCtx* pDummyCtx; // for combine
|
||||
SSDataBlock* pDelRes;
|
||||
SHashObj* pStDeleted;
|
||||
void* pDelIterator;
|
||||
} SStreamSessionAggOperatorInfo;
|
||||
|
||||
typedef struct STimeSliceOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
SInterval interval;
|
||||
|
@ -727,6 +761,9 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
|
||||
SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap,
|
||||
int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
||||
#if 0
|
||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
||||
#endif
|
||||
|
@ -761,13 +798,19 @@ void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
|
|||
int32_t* length);
|
||||
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts,
|
||||
SInterval* pInterval, int32_t precision, STimeWindow* win);
|
||||
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos,
|
||||
TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item,
|
||||
int32_t order);
|
||||
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn,
|
||||
int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item,
|
||||
int32_t order);
|
||||
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t keyBufSize,
|
||||
const char* pKey, const char* pDir);
|
||||
|
||||
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey,
|
||||
const char* pDir);
|
||||
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey);
|
||||
SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
|
||||
SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap,
|
||||
int32_t* pIndex);
|
||||
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,
|
||||
int32_t start, int64_t gap, SHashObj* pStDeleted);
|
||||
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -98,7 +98,6 @@ static int32_t getExprFunctionId(SExprInfo* pExprInfo) {
|
|||
}
|
||||
|
||||
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
|
||||
static bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
||||
|
||||
static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock);
|
||||
|
||||
|
@ -937,7 +936,7 @@ int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
|
||||
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
|
||||
struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
|
||||
// in case of timestamp column, always generated results.
|
||||
|
@ -4660,6 +4659,19 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
|
||||
pOptr =
|
||||
createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW == type) {
|
||||
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
||||
|
||||
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
|
||||
.calTrigger = pSessionNode->window.triggerType};
|
||||
|
||||
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
|
||||
|
||||
pOptr =
|
||||
createStreamSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo);
|
||||
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
|
||||
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode;
|
||||
SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
|
||||
|
@ -5151,15 +5163,37 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t keyBufSize, const char* pKey,
|
||||
const char* pDir) {
|
||||
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey,
|
||||
const char* pDir) {
|
||||
pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY);
|
||||
pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize);
|
||||
int32_t pageSize = rowSize * 32;
|
||||
int32_t bufSize = pageSize * 4096;
|
||||
createDiskbasedBuf(&pCatchSup->pDataBuf, pageSize, bufSize, pKey, pDir);
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pCatchSup->pWindowHashTable = taosHashInit(10000, hashFn, true, HASH_NO_LOCK);
|
||||
;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
if (pCatchSup->pKeyBuf == NULL || pCatchSup->pWindowHashTable == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t pageSize = rowSize * 32;
|
||||
int32_t bufSize = pageSize * 4096;
|
||||
return createDiskbasedBuf(&pCatchSup->pDataBuf, pageSize, bufSize, pKey, pDir);
|
||||
}
|
||||
|
||||
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey) {
|
||||
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
|
||||
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
|
||||
pSup->pResultRows = taosArrayInit(1024, sizeof(SResultWindowInfo));
|
||||
if (pSup->pKeyBuf == NULL || pSup->pResultRows == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t pageSize = 4096;
|
||||
while (pageSize < pSup->resultRowSize * 4) {
|
||||
pageSize <<= 1u;
|
||||
}
|
||||
// at least four pages need to be in buffer
|
||||
int32_t bufSize = 4096 * 256;
|
||||
if (bufSize <= pageSize) {
|
||||
bufSize = pageSize * 4;
|
||||
}
|
||||
return createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, "/tmp/");
|
||||
}
|
||||
|
|
|
@ -645,6 +645,10 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
|
|||
taosArrayClear(pInfo->pBlockLists);
|
||||
}
|
||||
|
||||
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) {
|
||||
return pInfo->sessionSup.pStreamAggSup != NULL;
|
||||
}
|
||||
|
||||
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
|
||||
SSDataBlock* pSDB = pInfo->pUpdateRes;
|
||||
if (pInfo->updateResIndex < pSDB->info.rows) {
|
||||
|
@ -652,13 +656,25 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
|
|||
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
|
||||
SResultRowInfo dumyInfo;
|
||||
dumyInfo.cur.pageId = -1;
|
||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], &pInfo->interval,
|
||||
pInfo->interval.precision, NULL);
|
||||
STimeWindow win;
|
||||
if (isSessionWindow(pInfo)) {
|
||||
SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup;
|
||||
int64_t gap = pInfo->sessionSup.gap;
|
||||
int32_t winIndex = 0;
|
||||
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup->pResultRows,
|
||||
tsCols[pInfo->updateResIndex], gap, &winIndex);
|
||||
win = pCurWin->win;
|
||||
pInfo->updateResIndex += updateSessionWindowInfo(pCurWin, tsCols, pSDB->info.rows,
|
||||
pInfo->updateResIndex, gap, NULL);
|
||||
} else {
|
||||
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex],
|
||||
&pInfo->interval, pInfo->interval.precision, NULL);
|
||||
pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex,
|
||||
win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
||||
}
|
||||
STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info;
|
||||
pTableScanInfo->cond.twindow = win;
|
||||
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||
pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex,
|
||||
win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
||||
pTableScanInfo->scanTimes = 0;
|
||||
return true;
|
||||
} else {
|
||||
|
@ -848,6 +864,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
} else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
|
||||
prepareDataScan(pInfo);
|
||||
return pInfo->pUpdateRes;
|
||||
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
|
||||
SSDataBlock* pSDB = doDataScan(pInfo);
|
||||
|
@ -924,13 +941,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
|
||||
if (rows == 0) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
} else if (pInfo->interval.interval > 0) {
|
||||
} else if (pInfo->pUpdateInfo) {
|
||||
SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan
|
||||
if (upRes) {
|
||||
pInfo->pUpdateRes = upRes;
|
||||
if (upRes->info.type == STREAM_REPROCESS) {
|
||||
pInfo->updateResIndex = 0;
|
||||
prepareDataScan(pInfo);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
|
||||
} else if (upRes->info.type == STREAM_INVERT) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
||||
|
@ -1001,10 +1017,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
|
|||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
pInfo->pOperatorDumy = pOperatorDumy;
|
||||
pInfo->interval = pSTInfo->interval;
|
||||
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
|
||||
|
||||
size_t childKeyBufSize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY);
|
||||
initCatchSupporter(&pInfo->childAggSup, 1024, childKeyBufSize,
|
||||
"StreamFinalInterval", TD_TMP_DIR_PATH); // TODO(liuyao) get row size from phy plan
|
||||
initCatchSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval", "/tmp/"); // TODO(liuyao) get row size from phy plan
|
||||
|
||||
pOperator->name = "StreamBlockScanOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
||||
|
|
|
@ -9,6 +9,7 @@ typedef enum SResultTsInterpType {
|
|||
} SResultTsInterpType;
|
||||
|
||||
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator);
|
||||
static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator);
|
||||
|
||||
/*
|
||||
* There are two cases to handle:
|
||||
|
@ -1039,13 +1040,9 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
|
|||
}
|
||||
}
|
||||
|
||||
void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData,
|
||||
int16_t bytes, uint64_t groupId, int32_t numOfOutput) {
|
||||
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
|
||||
SResultRowPosition* p1 =
|
||||
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
|
||||
GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
SResultRow* pResult = getResultRowByPos(pSup->pResultBuf, p1);
|
||||
void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf,
|
||||
SOptrBasicInfo* pBinfo, int32_t numOfOutput) {
|
||||
SResultRow* pResult = getResultRowByPos(pResultBuf, p1);
|
||||
SqlFunctionCtx* pCtx = pBinfo->pCtx;
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
pCtx[i].resultInfo = getResultCell(pResult, i, pBinfo->rowCellInfoOffset);
|
||||
|
@ -1060,6 +1057,15 @@ void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData,
|
|||
}
|
||||
}
|
||||
|
||||
void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData,
|
||||
int16_t bytes, uint64_t groupId, int32_t numOfOutput) {
|
||||
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
|
||||
SResultRowPosition* p1 =
|
||||
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
|
||||
GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
doClearWindowImpl(p1, pSup->pResultBuf, pBinfo, numOfOutput);
|
||||
}
|
||||
|
||||
static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo,
|
||||
SInterval* pIntrerval, int32_t tsIndex, int32_t numOfOutput, SSDataBlock* pBlock) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
|
||||
|
@ -1112,8 +1118,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
if (pBlock->info.type == STREAM_REPROCESS) {
|
||||
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval,
|
||||
pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock);
|
||||
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, 0,
|
||||
pOperator->numOfExprs, pBlock);
|
||||
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
|
||||
continue;
|
||||
}
|
||||
|
@ -1644,9 +1650,10 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock,
|
||||
static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock,
|
||||
int32_t tableGroupId) {
|
||||
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
|
||||
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
|
||||
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
||||
int32_t numOfOutput = pOperatorInfo->numOfExprs;
|
||||
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
|
||||
|
@ -1659,7 +1666,10 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRes
|
|||
if (pSDataBlock->pDataBlock != NULL) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
tsCols = (int64_t*)pColDataInfo->pData;
|
||||
} else {
|
||||
return pUpdated;
|
||||
}
|
||||
|
||||
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
|
||||
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascScan);
|
||||
STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts,
|
||||
|
@ -1720,7 +1730,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock);
|
||||
continue;
|
||||
}
|
||||
pUpdated = doHashInterval(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
|
||||
pUpdated = doHashInterval(pOperator, pBlock, 0);
|
||||
}
|
||||
|
||||
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
|
||||
|
@ -1730,3 +1740,534 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
pOperator->status = OP_RES_TO_RETURN;
|
||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||
}
|
||||
|
||||
void destroyStreamAggSupporter(SStreamAggSupporter* pSup) {
|
||||
taosArrayDestroy(pSup->pResultRows);
|
||||
taosMemoryFreeClear(pSup->pKeyBuf);
|
||||
destroyDiskbasedBuf(pSup->pResultBuf);
|
||||
}
|
||||
|
||||
void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param;
|
||||
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
||||
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||
}
|
||||
|
||||
int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo,
|
||||
int32_t numOfCols, SSDataBlock* pResultBlock, SDiskbasedBuf* pResultBuf) {
|
||||
pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset);
|
||||
pBasicInfo->pRes = pResultBlock;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
pBasicInfo->pCtx[i].pBuf = pResultBuf;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t nums) {
|
||||
for (int i = 0; i < nums; i++) {
|
||||
pDummy[i].functionId = pCtx[i].functionId;
|
||||
}
|
||||
}
|
||||
void initDownStream(SOperatorInfo* downstream, SStreamSessionAggOperatorInfo* pInfo) {
|
||||
ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
|
||||
SStreamBlockScanInfo* pScanInfo = downstream->info;
|
||||
pScanInfo->sessionSup =
|
||||
(SessionWindowSupporter){.pStreamAggSup = &pInfo->streamAggSup, .gap = pInfo->gap};
|
||||
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 60000 * 60 * 6);
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
|
||||
SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap,
|
||||
int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
|
||||
SStreamSessionAggOperatorInfo* pInfo =
|
||||
taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
|
||||
int32_t code = initStreamAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo");
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
code = initBiasicInfo(&pInfo->binfo, pExprInfo, numOfCols, pResBlock,
|
||||
pInfo->streamAggSup.pResultBuf);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols);
|
||||
|
||||
pInfo->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfCols, sizeof(SqlFunctionCtx));
|
||||
if (pInfo->pDummyCtx == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
initDummyFunction(pInfo->pDummyCtx, pInfo->binfo.pCtx, numOfCols);
|
||||
|
||||
pInfo->twAggSup = *pTwAggSupp;
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
|
||||
pInfo->primaryTsIndex = tsSlotId;
|
||||
pInfo->gap = gap;
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
|
||||
pInfo->pDelIterator = NULL;
|
||||
pInfo->pDelRes = createOneDataBlock(pResBlock, false);
|
||||
blockDataEnsureCapacity(pInfo->pDelRes, 64);
|
||||
|
||||
pOperator->name = "StreamSessionWindowAggOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionWindowAgg,
|
||||
NULL, NULL, destroyStreamSessionAggOperatorInfo, aggEncodeResultRow,
|
||||
aggDecodeResultRow, NULL);
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
initDownStream(downstream, pInfo);
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
if (pInfo != NULL) {
|
||||
destroyStreamSessionAggOperatorInfo(pInfo, numOfCols);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
typedef int64_t (*__get_value_fn_t)(void* data, int32_t index);
|
||||
|
||||
int32_t binarySearch(void* keyList, int num, TSKEY key, int order,
|
||||
__get_value_fn_t getValuefn) {
|
||||
int firstPos = 0, lastPos = num - 1, midPos = -1;
|
||||
int numOfRows = 0;
|
||||
|
||||
if (num <= 0) return -1;
|
||||
if (order == TSDB_ORDER_DESC) {
|
||||
// find the first position which is smaller than the key
|
||||
while (1) {
|
||||
if (key >= getValuefn(keyList, lastPos)) return lastPos;
|
||||
if (key == getValuefn(keyList, firstPos)) return firstPos;
|
||||
if (key < getValuefn(keyList, firstPos)) return firstPos - 1;
|
||||
|
||||
numOfRows = lastPos - firstPos + 1;
|
||||
midPos = (numOfRows >> 1) + firstPos;
|
||||
|
||||
if (key < getValuefn(keyList, midPos)) {
|
||||
lastPos = midPos - 1;
|
||||
} else if (key > getValuefn(keyList, midPos)) {
|
||||
firstPos = midPos + 1;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// find the first position which is bigger than the key
|
||||
while (1) {
|
||||
if (key <= getValuefn(keyList, firstPos)) return firstPos;
|
||||
if (key == getValuefn(keyList, lastPos)) return lastPos;
|
||||
|
||||
if (key > getValuefn(keyList, lastPos)) {
|
||||
lastPos = lastPos + 1;
|
||||
if (lastPos >= num)
|
||||
return -1;
|
||||
else
|
||||
return lastPos;
|
||||
}
|
||||
|
||||
numOfRows = lastPos - firstPos + 1;
|
||||
midPos = (numOfRows >> 1) + firstPos;
|
||||
|
||||
if (key < getValuefn(keyList, midPos)) {
|
||||
lastPos = midPos - 1;
|
||||
} else if (key > getValuefn(keyList, midPos)) {
|
||||
firstPos = midPos + 1;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return midPos;
|
||||
}
|
||||
|
||||
int64_t getSessionWindowEndkey(void* data, int32_t index) {
|
||||
SArray* pWinInfos = (SArray*) data;
|
||||
SResultWindowInfo* pWin = taosArrayGet(pWinInfos, index);
|
||||
return pWin->win.ekey;
|
||||
}
|
||||
static bool isInWindow(SResultWindowInfo* pWin, TSKEY ts, int64_t gap) {
|
||||
int64_t sGap = ts - pWin->win.skey;
|
||||
int64_t eGap = pWin->win.ekey - ts;
|
||||
if ( (sGap < 0 && sGap >= -gap) || (eGap < 0 && eGap >= -gap) || (sGap >= 0 && eGap >= 0) ) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static SResultWindowInfo* insertNewSessionWindow(SArray* pWinInfos, TSKEY ts,
|
||||
int32_t index) {
|
||||
SResultWindowInfo win =
|
||||
{.pos.offset = -1, .pos.pageId = -1, .win.skey = ts, .win.ekey = ts, .isOutput = false};
|
||||
return taosArrayInsert(pWinInfos, index, &win);
|
||||
}
|
||||
|
||||
static SResultWindowInfo* addNewSessionWindow(SArray* pWinInfos, TSKEY ts) {
|
||||
SResultWindowInfo win =
|
||||
{.pos.offset = -1, .pos.pageId = -1, .win.skey = ts, .win.ekey = ts, .isOutput = false};
|
||||
return taosArrayPush(pWinInfos, &win);
|
||||
}
|
||||
|
||||
SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap,
|
||||
int32_t* pIndex) {
|
||||
int32_t size = taosArrayGetSize(pWinInfos);
|
||||
if (size == 0) {
|
||||
return addNewSessionWindow(pWinInfos, ts);
|
||||
}
|
||||
// find the first position which is smaller than the key
|
||||
int32_t index = binarySearch(pWinInfos, size, ts, TSDB_ORDER_DESC,
|
||||
getSessionWindowEndkey);
|
||||
SResultWindowInfo* pWin = NULL;
|
||||
if (index >= 0) {
|
||||
pWin = taosArrayGet(pWinInfos, index);
|
||||
if (isInWindow(pWin, ts, gap)) {
|
||||
*pIndex = index;
|
||||
return pWin;
|
||||
}
|
||||
}
|
||||
|
||||
if (index + 1 < size) {
|
||||
pWin = taosArrayGet(pWinInfos, index + 1);
|
||||
if (isInWindow(pWin, ts, gap)) {
|
||||
*pIndex = index + 1;
|
||||
return pWin;
|
||||
}
|
||||
}
|
||||
|
||||
if (index == size - 1) {
|
||||
*pIndex = taosArrayGetSize(pWinInfos);
|
||||
return addNewSessionWindow(pWinInfos, ts);
|
||||
}
|
||||
*pIndex = index;
|
||||
return insertNewSessionWindow(pWinInfos, ts, index);
|
||||
}
|
||||
|
||||
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,
|
||||
int32_t start, int64_t gap, SHashObj* pStDeleted) {
|
||||
for (int32_t i = start; i < rows; ++i) {
|
||||
if (!isInWindow(pWinInfo, pTs[i], gap)) {
|
||||
return i - start;
|
||||
}
|
||||
if (pWinInfo->win.skey > pTs[i]) {
|
||||
if (pStDeleted && pWinInfo->isOutput) {
|
||||
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY));
|
||||
pWinInfo->isOutput = false;
|
||||
}
|
||||
pWinInfo->win.skey = pTs[i];
|
||||
}
|
||||
pWinInfo->win.ekey = TMAX(pWinInfo->win.ekey, pTs[i]);
|
||||
}
|
||||
return rows - start;
|
||||
}
|
||||
|
||||
static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult,
|
||||
SqlFunctionCtx* pCtx, int32_t groupId, int32_t numOfOutput,
|
||||
int32_t* rowCellInfoOffset, SStreamAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) {
|
||||
assert(pWinInfo->win.skey <= pWinInfo->win.ekey);
|
||||
// too many time window in query
|
||||
int32_t size = taosArrayGetSize(pAggSup->pResultRows);
|
||||
if (size > MAX_INTERVAL_TIME_WINDOW) {
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
|
||||
}
|
||||
|
||||
if (pWinInfo->pos.pageId == -1) {
|
||||
*pResult = getNewResultRow_rv(pAggSup->pResultBuf, groupId, pAggSup->resultRowSize);
|
||||
if (*pResult == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
initResultRow(*pResult);
|
||||
|
||||
// add a new result set for a new group
|
||||
pWinInfo->pos.pageId = (*pResult)->pageId;
|
||||
pWinInfo->pos.offset = (*pResult)->offset;
|
||||
} else {
|
||||
*pResult = getResultRowByPos(pAggSup->pResultBuf, &pWinInfo->pos);
|
||||
if (!(*pResult)) {
|
||||
qError("getResultRowByPos return NULL, TID:%s", GET_TASKID(pTaskInfo));
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
// set time window for current result
|
||||
(*pResult)->win = pWinInfo->win;
|
||||
setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowCellInfoOffset);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t doOneWindowAgg(SStreamSessionAggOperatorInfo* pInfo,
|
||||
SSDataBlock* pSDataBlock, SResultWindowInfo* pCurWin, SResultRow** pResult,
|
||||
int32_t startIndex, int32_t winRows, int32_t numOutput, SExecTaskInfo* pTaskInfo ) {
|
||||
SColumnInfoData* pColDataInfo =
|
||||
taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
TSKEY* tsCols = (int64_t*)pColDataInfo->pData;
|
||||
int32_t code = setWindowOutputBuf(pCurWin, pResult, pInfo->binfo.pCtx, pSDataBlock->info.groupId,
|
||||
numOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->streamAggSup, pTaskInfo);
|
||||
if (code != TSDB_CODE_SUCCESS || (*pResult) == NULL) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->win, true);
|
||||
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &pCurWin->win,
|
||||
&pInfo->twAggSup.timeWindowData, startIndex, winRows, tsCols, pSDataBlock->info.rows,
|
||||
numOutput, TSDB_ORDER_ASC);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t copyWinInfoToDataBlock(SSDataBlock* pBlock, SStreamAggSupporter* pAggSup,
|
||||
int32_t start, int32_t num, int32_t numOfExprs, SOptrBasicInfo* pBinfo) {
|
||||
for (int32_t i = start; i < num; i += 1) {
|
||||
SResultWindowInfo* pWinInfo = taosArrayGet(pAggSup->pResultRows, start);
|
||||
SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, pWinInfo->pos.pageId);
|
||||
SResultRow* pRow = (SResultRow*)((char*)bufPage + pWinInfo->pos.offset);
|
||||
for (int32_t j = 0; j < numOfExprs; ++j) {
|
||||
SResultRowEntryInfo* pResultInfo = getResultCell(pRow, j, pBinfo->rowCellInfoOffset);
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, j);
|
||||
char* in = GET_ROWCELL_INTERBUF(pBinfo->pCtx[j].resultInfo);
|
||||
colDataAppend(pColInfoData, pBlock->info.rows, in, pResultInfo->isNullRes);
|
||||
}
|
||||
pBlock->info.rows += pRow->numOfRows;
|
||||
releaseBufPage(pAggSup->pResultBuf, bufPage);
|
||||
}
|
||||
blockDataUpdateTsWindow(pBlock, -1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap) {
|
||||
SResultWindowInfo* pCurWin = taosArrayGet(pWinInfos, startIndex);
|
||||
int32_t size = taosArrayGetSize(pWinInfos);
|
||||
// Just look for the window behind StartIndex
|
||||
for (int32_t i = startIndex + 1; i < size; i++) {
|
||||
SResultWindowInfo* pWinInfo = taosArrayGet(pWinInfos, i);
|
||||
if (!isInWindow(pCurWin, pWinInfo->win.skey, gap)) {
|
||||
return i - startIndex - 1;
|
||||
}
|
||||
}
|
||||
|
||||
return size - startIndex - 1;
|
||||
}
|
||||
|
||||
void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx,
|
||||
int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
|
||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||
if (fmIsWindowPseudoColumnFunc(pDestCtx[k].functionId)) {
|
||||
continue;
|
||||
}
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
|
||||
code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t num,
|
||||
int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pStUpdated, SHashObj* pStDeleted) {
|
||||
SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pResultRows, startIndex);
|
||||
SResultRow* pCurResult = NULL;
|
||||
setWindowOutputBuf(pCurWin, &pCurResult, pInfo->binfo.pCtx, groupId,
|
||||
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->streamAggSup, pTaskInfo);
|
||||
num += startIndex + 1;
|
||||
ASSERT(num <= taosArrayGetSize(pInfo->streamAggSup.pResultRows));
|
||||
// Just look for the window behind StartIndex
|
||||
for (int32_t i = startIndex + 1; i < num; i++) {
|
||||
SResultWindowInfo* pWinInfo = taosArrayGet(pInfo->streamAggSup.pResultRows, i);
|
||||
SResultRow* pWinResult = NULL;
|
||||
setWindowOutputBuf(pWinInfo, &pWinResult, pInfo->pDummyCtx, groupId,
|
||||
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->streamAggSup, pTaskInfo);
|
||||
pCurWin->win.ekey = TMAX(pCurWin->win.ekey, pWinInfo->win.ekey);
|
||||
compactFunctions(pInfo->binfo.pCtx, pInfo->pDummyCtx, numOfOutput, pTaskInfo);
|
||||
taosHashRemove(pStUpdated, &pWinInfo->pos, sizeof(SResultRowPosition));
|
||||
if (pWinInfo->isOutput) {
|
||||
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY));
|
||||
pWinInfo->isOutput = false;
|
||||
}
|
||||
taosArrayRemove(pInfo->streamAggSup.pResultRows, i);
|
||||
}
|
||||
}
|
||||
|
||||
static void doStreamSessionWindowAggImpl(SOperatorInfo* pOperator,
|
||||
SSDataBlock* pSDataBlock, SHashObj* pStUpdated, SHashObj* pStDeleted) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||
bool masterScan = true;
|
||||
int32_t numOfOutput = pOperator->numOfExprs;
|
||||
int64_t groupId = pSDataBlock->info.groupId;
|
||||
int64_t gap = pInfo->gap;
|
||||
int64_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
int32_t step = 1;
|
||||
bool ascScan = true;
|
||||
TSKEY* tsCols = NULL;
|
||||
SResultRow* pResult = NULL;
|
||||
int32_t winRows = 0;
|
||||
|
||||
if (pSDataBlock->pDataBlock != NULL) {
|
||||
SColumnInfoData* pColDataInfo =
|
||||
taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
tsCols = (int64_t*)pColDataInfo->pData;
|
||||
} else {
|
||||
return ;
|
||||
}
|
||||
|
||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||
for(int32_t i = 0; i < pSDataBlock->info.rows; ) {
|
||||
int32_t winIndex = 0;
|
||||
SResultWindowInfo* pCurWin =
|
||||
getSessionTimeWindow(pAggSup->pResultRows, tsCols[i], gap, &winIndex);
|
||||
winRows =
|
||||
updateSessionWindowInfo(pCurWin, tsCols, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted);
|
||||
code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pTaskInfo);
|
||||
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
// window start(end) key interpolation
|
||||
// doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep,
|
||||
// pInfo->order, false);
|
||||
int32_t winNum = getNumCompactWindow(pAggSup->pResultRows, winIndex, gap);
|
||||
if (winNum > 0) {
|
||||
compactTimeWindow(pInfo, winIndex, winNum, groupId, numOfOutput, pTaskInfo, pStUpdated, pStDeleted);
|
||||
}
|
||||
|
||||
code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &(pCurWin->win.skey), sizeof(TSKEY));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
pCurWin->isOutput = true;
|
||||
i += winRows;
|
||||
}
|
||||
}
|
||||
|
||||
static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo* pBinfo,
|
||||
SSDataBlock* pBlock, int32_t tsIndex, int32_t numOfOutput, int64_t gap) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
|
||||
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
|
||||
int32_t step = 0;
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
|
||||
int32_t winIndex = 0;
|
||||
SResultWindowInfo* pCurWin =
|
||||
getSessionTimeWindow(pAggSup->pResultRows, tsCols[i], gap, &winIndex);
|
||||
step = updateSessionWindowInfo(pCurWin, tsCols, pBlock->info.rows, i, gap, NULL);
|
||||
doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pBinfo, numOfOutput);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated, int32_t groupId) {
|
||||
void* pData = NULL;
|
||||
size_t keyLen = 0;
|
||||
while((pData = taosHashIterate(pStUpdated, pData)) != NULL) {
|
||||
void* key = taosHashGetKey(pData, &keyLen);
|
||||
ASSERT(keyLen == sizeof(SResultRowPosition));
|
||||
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
|
||||
if (pos == NULL) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
pos->groupId = groupId;
|
||||
pos->pos = *(SResultRowPosition*)key;
|
||||
*(int64_t*)pos->key = *(uint64_t*)pData;
|
||||
taosArrayPush(pUpdated, &pos);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) {
|
||||
blockDataCleanup(pBlock);
|
||||
size_t keyLen = 0;
|
||||
while(( (*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
colDataAppend(pColInfoData, pBlock->info.rows, *Ite, false);
|
||||
for (int32_t i = 1; i < pBlock->info.numOfCols; i++) {
|
||||
pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
colDataAppendNULL(pColInfoData, pBlock->info.rows);
|
||||
}
|
||||
pBlock->info.rows += 1;
|
||||
if (pBlock->info.rows + 1 >= pBlock->info.capacity) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if ((*Ite) == NULL) {
|
||||
taosHashClear(pStDeleted);
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo,
|
||||
pInfo->streamAggSup.pResultBuf);
|
||||
if (pBInfo->pRes->info.rows == 0 ||
|
||||
!hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
}
|
||||
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
||||
}
|
||||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
SHashObj* pStUpdated = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
|
||||
if (pBlock->info.type == STREAM_REPROCESS) {
|
||||
doClearSessionWindows(&pInfo->streamAggSup, &pInfo->binfo, pBlock, 0,
|
||||
pOperator->numOfExprs, pInfo->gap);
|
||||
continue;
|
||||
}
|
||||
doStreamSessionWindowAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted);
|
||||
}
|
||||
|
||||
// restore the value
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES);
|
||||
copyUpdateResult(pStUpdated, pUpdated, pBInfo->pRes->info.groupId);
|
||||
taosHashCleanup(pStUpdated);
|
||||
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
|
||||
pInfo->binfo.rowCellInfoOffset);
|
||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo,
|
||||
pInfo->streamAggSup.pResultBuf);
|
||||
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ typedef struct SBuiltinFuncDefinition {
|
|||
FScalarExecProcess sprocessFunc;
|
||||
FExecFinalize finalizeFunc;
|
||||
FExecProcess invertFunc;
|
||||
FExecCombine combineFunc;
|
||||
} SBuiltinFuncDefinition;
|
||||
|
||||
extern const SBuiltinFuncDefinition funcMgtBuiltins[];
|
||||
|
|
|
@ -27,6 +27,7 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
|||
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx));
|
||||
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult);
|
||||
int32_t combineFunction(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
|
||||
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
|
||||
bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
|
@ -37,24 +38,29 @@ EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
|
|||
bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
int32_t sumFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t sumInvertFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
|
||||
bool minmaxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
int32_t minFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t maxFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t minCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
int32_t maxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
|
||||
bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t avgFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t avgInvertFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
|
||||
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t stddevFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t stddevInvertFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
|
||||
bool getLeastSQRFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool leastSQRFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
|
@ -73,8 +79,10 @@ int32_t diffFunction(SqlFunctionCtx *pCtx);
|
|||
|
||||
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
int32_t firstFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
int32_t lastFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
|
||||
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
|
||||
int32_t topFunction(SqlFunctionCtx *pCtx);
|
||||
|
|
|
@ -745,7 +745,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.initFunc = functionSetup,
|
||||
.processFunc = countFunction,
|
||||
.finalizeFunc = functionFinalize,
|
||||
.invertFunc = countInvertFunction
|
||||
.invertFunc = countInvertFunction,
|
||||
.combineFunc = combineFunction,
|
||||
},
|
||||
{
|
||||
.name = "sum",
|
||||
|
@ -757,7 +758,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.initFunc = functionSetup,
|
||||
.processFunc = sumFunction,
|
||||
.finalizeFunc = functionFinalize,
|
||||
.invertFunc = sumInvertFunction
|
||||
.invertFunc = sumInvertFunction,
|
||||
.combineFunc = sumCombine,
|
||||
},
|
||||
{
|
||||
.name = "min",
|
||||
|
@ -768,7 +770,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getMinmaxFuncEnv,
|
||||
.initFunc = minmaxFunctionSetup,
|
||||
.processFunc = minFunction,
|
||||
.finalizeFunc = minmaxFunctionFinalize
|
||||
.finalizeFunc = minmaxFunctionFinalize,
|
||||
.combineFunc = minCombine
|
||||
},
|
||||
{
|
||||
.name = "max",
|
||||
|
@ -779,7 +782,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getMinmaxFuncEnv,
|
||||
.initFunc = minmaxFunctionSetup,
|
||||
.processFunc = maxFunction,
|
||||
.finalizeFunc = minmaxFunctionFinalize
|
||||
.finalizeFunc = minmaxFunctionFinalize,
|
||||
.combineFunc = maxCombine
|
||||
},
|
||||
{
|
||||
.name = "stddev",
|
||||
|
@ -790,7 +794,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.initFunc = stddevFunctionSetup,
|
||||
.processFunc = stddevFunction,
|
||||
.finalizeFunc = stddevFinalize,
|
||||
.invertFunc = stddevInvertFunction
|
||||
.invertFunc = stddevInvertFunction,
|
||||
.combineFunc = stddevCombine,
|
||||
},
|
||||
{
|
||||
.name = "leastsquares",
|
||||
|
@ -801,7 +806,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.initFunc = leastSQRFunctionSetup,
|
||||
.processFunc = leastSQRFunction,
|
||||
.finalizeFunc = leastSQRFinalize,
|
||||
.invertFunc = leastSQRInvertFunction
|
||||
.invertFunc = leastSQRInvertFunction,
|
||||
},
|
||||
{
|
||||
.name = "avg",
|
||||
|
@ -812,7 +817,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.initFunc = avgFunctionSetup,
|
||||
.processFunc = avgFunction,
|
||||
.finalizeFunc = avgFinalize,
|
||||
.invertFunc = avgInvertFunction
|
||||
.invertFunc = avgInvertFunction,
|
||||
.combineFunc = avgCombine,
|
||||
},
|
||||
{
|
||||
.name = "percentile",
|
||||
|
@ -894,7 +900,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = firstFunction,
|
||||
.finalizeFunc = functionFinalize
|
||||
.finalizeFunc = functionFinalize,
|
||||
.combineFunc = firstCombine,
|
||||
},
|
||||
{
|
||||
.name = "last",
|
||||
|
@ -904,7 +911,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = lastFunction,
|
||||
.finalizeFunc = lastFinalize
|
||||
.finalizeFunc = lastFinalize,
|
||||
.combineFunc = lastCombine,
|
||||
},
|
||||
{
|
||||
.name = "histogram",
|
||||
|
|
|
@ -292,6 +292,24 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||
int32_t type = pDestCtx->input.pData[0]->info.type;
|
||||
int32_t bytes = pDestCtx->input.pData[0]->info.bytes;
|
||||
|
||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
char* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
|
||||
if (pSResInfo->numOfRes != 0 &&
|
||||
(pDResInfo->numOfRes == 0 || *(TSKEY*)(pDBuf + bytes) > *(TSKEY*)(pSBuf + bytes)) ) {
|
||||
memcpy(pDBuf, pSBuf, bytes);
|
||||
*(TSKEY*)(pDBuf + bytes) = *(TSKEY*)(pSBuf + bytes);
|
||||
pDResInfo->numOfRes = 1;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx)) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -388,6 +406,18 @@ int32_t countInvertFunction(SqlFunctionCtx* pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t combineFunction(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||
|
||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
char* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
*((int64_t*)pDBuf) += *((int64_t*)pSBuf);
|
||||
|
||||
SET_VAL(pDResInfo, *((int64_t*)pDBuf), 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
#define LIST_ADD_N(_res, _col, _start, _rows, _t, numOfElem) \
|
||||
do { \
|
||||
_t* d = (_t*)(_col->pData); \
|
||||
|
@ -537,6 +567,26 @@ int32_t sumInvertFunction(SqlFunctionCtx* pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
SSumRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||
int32_t type = pDestCtx->input.pData[0]->info.type;
|
||||
|
||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
SSumRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
|
||||
pDBuf->isum += pSBuf->isum;
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||
pDBuf->usum += pSBuf->usum;
|
||||
} else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) {
|
||||
pDBuf->dsum += pSBuf->dsum;
|
||||
}
|
||||
|
||||
SET_VAL(pDResInfo, *((int64_t*)pDBuf), 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SSumRes);
|
||||
return true;
|
||||
|
@ -738,6 +788,24 @@ int32_t avgInvertFunction(SqlFunctionCtx* pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
SAvgRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||
int32_t type = pDestCtx->input.pData[0]->info.type;
|
||||
|
||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
SAvgRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
|
||||
if (IS_INTEGER_TYPE(type)) {
|
||||
pDBuf->sum.isum += pSBuf->sum.isum;
|
||||
} else {
|
||||
pDBuf->sum.dsum += pSBuf->sum.dsum;
|
||||
}
|
||||
pDBuf->count += pSBuf->count;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
int32_t type = pInput->pData[0]->info.type;
|
||||
|
@ -1273,6 +1341,34 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
|
|||
}
|
||||
}
|
||||
|
||||
int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t isMinFunc) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
SMinmaxResInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||
int32_t type = pDestCtx->input.pData[0]->info.type;
|
||||
|
||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
SMinmaxResInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
if (IS_FLOAT_TYPE(type)) {
|
||||
if (pSBuf->assign &&
|
||||
( (((*(double*)&pDBuf->v) < (*(double*)&pSBuf->v)) ^ isMinFunc) || !pDBuf->assign ) ) {
|
||||
*(double*) &pDBuf->v = *(double*) &pSBuf->v;
|
||||
}
|
||||
} else {
|
||||
if ( pSBuf->assign && ( ((pDBuf->v < pSBuf->v) ^ isMinFunc) || !pDBuf->assign ) ) {
|
||||
pDBuf->v = pSBuf->v;
|
||||
}
|
||||
}
|
||||
SET_VAL(pDResInfo, *((int64_t*)pDBuf), 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t minCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
return minMaxCombine(pDestCtx, pSourceCtx, 1);
|
||||
}
|
||||
int32_t maxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
return minMaxCombine(pDestCtx, pSourceCtx, 0);
|
||||
}
|
||||
|
||||
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SStddevRes);
|
||||
return true;
|
||||
|
@ -1491,6 +1587,25 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
return functionFinalize(pCtx, pBlock);
|
||||
}
|
||||
|
||||
int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
SStddevRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||
int32_t type = pDestCtx->input.pData[0]->info.type;
|
||||
|
||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
SStddevRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
|
||||
if (IS_INTEGER_TYPE(type)) {
|
||||
pDBuf->isum += pSBuf->isum;
|
||||
pDBuf->quadraticISum += pSBuf->quadraticISum;
|
||||
} else {
|
||||
pDBuf->dsum += pSBuf->dsum;
|
||||
pDBuf->quadraticDSum += pSBuf->quadraticDSum;
|
||||
}
|
||||
pDBuf->count += pSBuf->count;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool getLeastSQRFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SLeastSQRInfo);
|
||||
return true;
|
||||
|
@ -1979,6 +2094,24 @@ int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||
int32_t type = pDestCtx->input.pData[0]->info.type;
|
||||
int32_t bytes = pDestCtx->input.pData[0]->info.bytes;
|
||||
|
||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
char* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
|
||||
if (pSResInfo->numOfRes != 0 &&
|
||||
(pDResInfo->numOfRes == 0 || *(TSKEY*)(pDBuf + bytes) < *(TSKEY*)(pSBuf + bytes)) ) {
|
||||
memcpy(pDBuf, pSBuf, bytes);
|
||||
*(TSKEY*)(pDBuf + bytes) = *(TSKEY*)(pSBuf + bytes);
|
||||
pDResInfo->numOfRes = 1;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SDiffInfo);
|
||||
return true;
|
||||
|
|
|
@ -118,6 +118,7 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
|
|||
pFpSet->init = funcMgtBuiltins[funcId].initFunc;
|
||||
pFpSet->process = funcMgtBuiltins[funcId].processFunc;
|
||||
pFpSet->finalize = funcMgtBuiltins[funcId].finalizeFunc;
|
||||
pFpSet->combine = funcMgtBuiltins[funcId].combineFunc;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -230,6 +230,8 @@ const char* nodesNodeName(ENodeType type) {
|
|||
return "PhysiFill";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
|
||||
return "PhysiSessionWindow";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
|
||||
return "PhysiStreamSessionWindow";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
|
||||
return "PhysiStateWindow";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
|
||||
|
@ -2528,6 +2530,29 @@ static int32_t jsonToOrderByExprNode(const SJson* pJson, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static const char* jkSessionWindowTsPrimaryKey = "TsPrimaryKey";
|
||||
static const char* jkSessionWindowGap = "Gap";
|
||||
|
||||
static int32_t sessionWindowNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SSessionWindowNode * pNode = (const SSessionWindowNode*)pObj;
|
||||
|
||||
int32_t code = tjsonAddObject(pJson, jkSessionWindowTsPrimaryKey, nodeToJson, pNode->pCol);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkSessionWindowGap, nodeToJson, pNode->pGap);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToSessionWindowNode(const SJson* pJson, void* pObj) {
|
||||
SSessionWindowNode* pNode = (SSessionWindowNode*)pObj;
|
||||
|
||||
int32_t code = jsonToNodeObject(pJson, jkSessionWindowTsPrimaryKey, (SNode **)&pNode->pCol);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkSessionWindowGap, (SNode **)&pNode->pGap);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkIntervalWindowInterval = "Interval";
|
||||
static const char* jkIntervalWindowOffset = "Offset";
|
||||
static const char* jkIntervalWindowSliding = "Sliding";
|
||||
|
@ -3015,8 +3040,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
return orderByExprNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_LIMIT:
|
||||
case QUERY_NODE_STATE_WINDOW:
|
||||
case QUERY_NODE_SESSION_WINDOW:
|
||||
break;
|
||||
case QUERY_NODE_SESSION_WINDOW:
|
||||
return sessionWindowNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_INTERVAL_WINDOW:
|
||||
return intervalWindowNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_NODE_LIST:
|
||||
|
@ -3096,6 +3122,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN_FILL:
|
||||
return physiFillNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
|
||||
return physiSessionWindowNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
|
||||
return physiStateWindowNodeToJson(pObj, pJson);
|
||||
|
@ -3134,6 +3161,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
return jsonToTempTableNode(pJson, pObj);
|
||||
case QUERY_NODE_ORDER_BY_EXPR:
|
||||
return jsonToOrderByExprNode(pJson, pObj);
|
||||
case QUERY_NODE_SESSION_WINDOW:
|
||||
return jsonToSessionWindowNode(pJson, pObj);
|
||||
case QUERY_NODE_INTERVAL_WINDOW:
|
||||
return jsonToIntervalWindowNode(pJson, pObj);
|
||||
case QUERY_NODE_NODE_LIST:
|
||||
|
@ -3196,6 +3225,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN_FILL:
|
||||
return jsonToPhysiFillNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
|
||||
return jsonToPhysiSessionWindowNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
|
||||
return jsonToPhysiStateWindowNode(pJson, pObj);
|
||||
|
|
|
@ -517,6 +517,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
|
|||
res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
|
||||
res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: {
|
||||
|
|
|
@ -251,6 +251,8 @@ int32_t nodesNodeSize(ENodeType type) {
|
|||
return sizeof(SFillPhysiNode);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
|
||||
return sizeof(SSessionWinodwPhysiNode);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
|
||||
return sizeof(SStreamSessionWinodwPhysiNode);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
|
||||
return sizeof(SStateWinodwPhysiNode);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
|
||||
|
@ -664,6 +666,7 @@ void nodesDestroyNode(SNodeptr pNode) {
|
|||
destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
|
||||
destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
|
||||
|
|
|
@ -945,7 +945,8 @@ static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
|
|||
static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
|
||||
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
|
||||
SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(
|
||||
pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW);
|
||||
pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode,
|
||||
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW : QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW));
|
||||
if (NULL == pSession) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
|
|
@ -127,7 +127,10 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
|
|||
if (pInfo->minTS < 0) {
|
||||
pInfo->minTS = (TSKEY)(ts / pInfo->interval * pInfo->interval);
|
||||
}
|
||||
uint64_t index = (uint64_t)((ts - pInfo->minTS) / pInfo->interval);
|
||||
int64_t index = (int64_t)((ts - pInfo->minTS) / pInfo->interval);
|
||||
if (index < 0) {
|
||||
return NULL;
|
||||
}
|
||||
if (index >= pInfo->numSBFs) {
|
||||
uint64_t count = index + 1 - pInfo->numSBFs;
|
||||
windowSBfDelete(pInfo, count);
|
||||
|
|
|
@ -67,6 +67,8 @@
|
|||
# ---- stream
|
||||
./test.sh -f tsim/stream/basic0.sim
|
||||
./test.sh -f tsim/stream/basic1.sim
|
||||
./test.sh -f tsim/stream/session0.sim
|
||||
./test.sh -f tsim/stream/session1.sim
|
||||
|
||||
# ---- transaction
|
||||
./test.sh -f tsim/trans/lossdata1.sim
|
||||
|
|
|
@ -0,0 +1,162 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
print =============== create database
|
||||
sql create database test vgroups 1
|
||||
sql show databases
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print $data00 $data01 $data02
|
||||
|
||||
sql use test
|
||||
|
||||
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double,id int);
|
||||
sql create stream streams2 trigger at_once into streamt as select _wstartts, count(*) c1, sum(a), max(a), min(d), stddev(a), last(a), first(d), max(id) s from t1 session(ts,10s);
|
||||
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL,1);
|
||||
sql insert into t1 values(1648791223001,10,2,3,1.1,2);
|
||||
sql insert into t1 values(1648791233002,3,2,3,2.1,3);
|
||||
sql insert into t1 values(1648791243003,NULL,NULL,NULL,NULL,4);
|
||||
sql insert into t1 values(1648791213002,NULL,NULL,NULL,NULL,5) (1648791233012,NULL,NULL,NULL,NULL,6);
|
||||
|
||||
sql select * from streamt order by s desc;
|
||||
|
||||
# row 0
|
||||
if $data01 != 3 then
|
||||
print ======$data01
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 3 then
|
||||
print ======$data02
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != 3 then
|
||||
print ======$data03
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data04 != 2.100000000 then
|
||||
print ======$data04
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data05 != 0.000000000 then
|
||||
print ======$data05
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data06 != 3 then
|
||||
print ======$data05
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data07 != 2.100000000 then
|
||||
print ======$data05
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data08 != 6 then
|
||||
print ======$data05
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 1
|
||||
|
||||
if $data11 != 3 then
|
||||
print ======$data01
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data12 != 10 then
|
||||
print ======$data02
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data13 != 10 then
|
||||
print ======$data03
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data14 != 1.100000000 then
|
||||
print ======$data04
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data15 != 0.000000000 then
|
||||
print ======$data05
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data16 != 10 then
|
||||
print ======$data05
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data17 != 1.100000000 then
|
||||
print ======$data05
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data18 != 5 then
|
||||
print ======$data05
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791213000,1,2,3,1.0,7);
|
||||
sql insert into t1 values(1648791223001,2,2,3,1.1,8);
|
||||
sql insert into t1 values(1648791233002,3,2,3,2.1,9);
|
||||
sql insert into t1 values(1648791243003,4,2,3,3.1,10);
|
||||
sql insert into t1 values(1648791213002,4,2,3,4.1,11) ;
|
||||
sql insert into t1 values(1648791213002,4,2,3,4.1,12) (1648791223009,4,2,3,4.1,13);
|
||||
|
||||
sql select * from streamt order by s desc ;
|
||||
|
||||
# row 0
|
||||
if $data01 != 7 then
|
||||
print ======$data01
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 9 then
|
||||
print ======$data02
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != 4 then
|
||||
print ======$data03
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data04 != 1.100000000 then
|
||||
print ======$data04
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data05 != 0.816496581 then
|
||||
print ======$data05
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data06 != 3 then
|
||||
print ======$data05
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data07 != 1.100000000 then
|
||||
print ======$data05
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data08 != 13 then
|
||||
print ======$data05
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -0,0 +1,190 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
print =============== create database
|
||||
sql create database test vgroups 1
|
||||
sql show databases
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print $data00 $data01 $data02
|
||||
|
||||
sql use test
|
||||
|
||||
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double,id int);
|
||||
sql create stream streams2 trigger at_once into streamt as select _wstartts, count(*) c1, sum(a), min(b), max(id) s from t1 session(ts,10s);
|
||||
sql insert into t1 values(1648791210000,1,1,1,1.1,1);
|
||||
sql insert into t1 values(1648791220000,2,2,2,2.1,2);
|
||||
sql insert into t1 values(1648791230000,3,3,3,3.1,3);
|
||||
sql insert into t1 values(1648791240000,4,4,4,4.1,4);
|
||||
|
||||
sql select * from streamt order by s desc;
|
||||
|
||||
# row 0
|
||||
if $data01 != 4 then
|
||||
print ======$data01
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 10 then
|
||||
print ======$data02
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != 1 then
|
||||
print ======$data03
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data04 != 4 then
|
||||
print ======$data04
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791250005,5,5,5,5.1,5);
|
||||
sql insert into t1 values(1648791260006,6,6,6,6.1,6);
|
||||
sql insert into t1 values(1648791270007,7,7,7,7.1,7);
|
||||
sql insert into t1 values(1648791240005,5,5,5,5.1,8) (1648791250006,6,6,6,6.1,9);
|
||||
|
||||
sql select * from streamt order by s desc;
|
||||
|
||||
# row 0
|
||||
if $data01 != 8 then
|
||||
print ======$data01
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 32 then
|
||||
print ======$data02
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != 1 then
|
||||
print ======$data03
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data04 != 9 then
|
||||
print ======$data04
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 1
|
||||
if $data11 != 1 then
|
||||
print ======$data11
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data12 != 7 then
|
||||
print ======$data12
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data13 != 7 then
|
||||
print ======$data13
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data14 != 7 then
|
||||
print ======$data14
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791280008,7,7,7,7.1,10) (1648791300009,8,8,8,8.1,11);
|
||||
sql insert into t1 values(1648791260007,7,7,7,7.1,12) (1648791290008,7,7,7,7.1,13) (1648791290009,8,8,8,8.1,14);
|
||||
sql insert into t1 values(1648791500000,7,7,7,7.1,15) (1648791520000,8,8,8,8.1,16) (1648791540000,8,8,8,8.1,17);
|
||||
sql insert into t1 values(1648791530000,8,8,8,8.1,18);
|
||||
sql insert into t1 values(1648791220000,10,10,10,10.1,19) (1648791290008,2,2,2,2.1,20) (1648791540000,17,17,17,17.1,21) (1648791500001,22,22,22,22.1,22);
|
||||
|
||||
sql select * from streamt order by s desc;
|
||||
|
||||
# row 0
|
||||
if $data01 != 2 then
|
||||
print ======$data01
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 29 then
|
||||
print ======$data02
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != 7 then
|
||||
print ======$data03
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data04 != 22 then
|
||||
print ======$data04
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 1
|
||||
if $data11 != 3 then
|
||||
print ======$data11
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data12 != 33 then
|
||||
print ======$data12
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data13 != 8 then
|
||||
print ======$data13
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data14 != 21 then
|
||||
print ======$data14
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 2
|
||||
if $data21 != 4 then
|
||||
print ======$data21
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data22 != 25 then
|
||||
print ======$data22
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data23 != 2 then
|
||||
print ======$data23
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data24 != 20 then
|
||||
print ======$data24
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 3
|
||||
if $data31 != 10 then
|
||||
print ======$data31
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data32 != 54 then
|
||||
print ======$data32
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data33 != 1 then
|
||||
print ======$data33
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data34 != 19 then
|
||||
print ======$data34
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue