diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 5743d33608..687949a9c0 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -244,6 +244,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, QUERY_NODE_PHYSICAL_PLAN_PARTITION, + QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, QUERY_NODE_PHYSICAL_PLAN_DISPATCH, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 6fd6a316eb..8aeb86102a 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -488,6 +488,8 @@ typedef struct SPartitionPhysiNode { SNodeList* pTargets; } SPartitionPhysiNode; +typedef SPartitionPhysiNode SStreamPartitionPhysiNode; + typedef struct SDataSinkNode { ENodeType type; SDataBlockDescNode* pInputDataBlockDesc; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 89e5b2da0a..3bed81f657 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -410,6 +410,7 @@ typedef enum EStreamScanMode { STREAM_SCAN_FROM_READERHANDLE = 1, STREAM_SCAN_FROM_RES, STREAM_SCAN_FROM_UPDATERES, + STREAM_SCAN_FROM_DELETERES, STREAM_SCAN_FROM_DATAREADER_RETRIEVE, STREAM_SCAN_FROM_DATAREADER_RANGE, } EStreamScanMode; @@ -438,12 +439,24 @@ typedef struct SStreamAggSupporter { SSDataBlock* pScanBlock; } SStreamAggSupporter; -typedef struct SessionWindowSupporter { +typedef struct SWindowSupporter { SStreamAggSupporter* pStreamAggSup; int64_t gap; uint16_t parentType; SAggSupporter* pIntervalAggSup; -} SessionWindowSupporter; +} SWindowSupporter; + +typedef struct SPartitionBySupporter { + SArray* pGroupCols; // group by columns, SArray + SArray* pGroupColVals; // current group column values, SArray + char* keyBuf; // group by keys for hash + bool needCalc; // partition by column +} SPartitionBySupporter; + +typedef struct SPartitionDataInfo { + uint64_t groupId; + SArray* rowIds; +} SPartitionDataInfo; typedef struct STimeWindowSupp { int8_t calTrigger; @@ -478,7 +491,9 @@ typedef struct SStreamScanInfo { SOperatorInfo* pStreamScanOp; SOperatorInfo* pTableScanOp; SArray* childIds; - SessionWindowSupporter sessionSup; + SWindowSupporter windowSup; + SPartitionBySupporter partitionSup; + SExprSupp* pPartScalarSup; bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. int32_t scanWinIndex; // for state operator int32_t pullDataResIndex; @@ -691,7 +706,6 @@ typedef struct SPartitionOperatorInfo { SArray* sortedGroupArray; // SDataGroupInfo sorted by group id int32_t groupIndex; // group index int32_t pageIndex; // page index of current group - SSDataBlock* pUpdateRes; SExprSupp scalarSup; } SPartitionOperatorInfo; @@ -743,8 +757,8 @@ typedef struct SStreamSessionAggOperatorInfo { SSDataBlock* pWinBlock; // window result SqlFunctionCtx* pDummyCtx; // for combine SSDataBlock* pDelRes; // delete result - bool returnDelete; SSDataBlock* pUpdateRes; // update window + bool returnUpdate; SHashObj* pStDeleted; void* pDelIterator; SArray* pChildren; // cache for children's result; final stream operator @@ -753,6 +767,16 @@ typedef struct SStreamSessionAggOperatorInfo { bool ignoreExpiredData; } SStreamSessionAggOperatorInfo; +typedef struct SStreamPartitionOperatorInfo { + SOptrBasicInfo binfo; + SPartitionBySupporter partitionSup; + SExprSupp scalarSup; + SHashObj* pPartitions; + void* parIte; + SSDataBlock* pInputDataBlock; + int32_t tsColIndex; +} SStreamPartitionOperatorInfo; + typedef struct STimeSliceOperatorInfo { SSDataBlock* pRes; STimeWindow win; @@ -954,6 +978,9 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); + SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); @@ -1022,8 +1049,9 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx); bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); -void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, int32_t uidCol, uint64_t* pID); +void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp); void printDataBlock(SSDataBlock* pBlock, const char* flag); +uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b53d35a1a1..d8e5c0abe5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4150,6 +4150,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) { + pOptr = createStreamPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) { SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode; pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 5d123f723e..599edb0722 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -830,3 +830,205 @@ int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset); return TSDB_CODE_SUCCESS; } + +uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId) { + if (pExprSup->pExprInfo != NULL) { + int32_t code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + if (code != TSDB_CODE_SUCCESS) { + qError("calaculate group id error, code:%d", code); + } + } + recordNewGroupKeys(pParSup->pGroupCols, pParSup->pGroupColVals, pBlock, rowId); + int32_t len = buildGroupKeys(pParSup->keyBuf, pParSup->pGroupColVals); + uint64_t groupId = calcGroupId(pParSup->keyBuf, len); + return groupId; +} + +static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { + return pInfo->parIte != NULL; +} + +static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { + SStreamPartitionOperatorInfo* pInfo = pOperator->info; + SSDataBlock* pDest = pInfo->binfo.pRes; + ASSERT(hasRemainPartion(pInfo)); + SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->parIte; + blockDataCleanup(pDest); + int32_t rows = taosArrayGetSize(pParInfo->rowIds); + SSDataBlock* pSrc = pInfo->pInputDataBlock; + for (int32_t i = 0; i < rows; i++) { + int32_t rowIndex = *(int32_t*)taosArrayGet(pParInfo->rowIds, i); + for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; j++) { + int32_t slotId = pOperator->exprSupp.pExprInfo[j].base.pParam[0].pCol->slotId; + SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, slotId); + SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, j); + bool isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL); + char* pSrcData = colDataGetData(pSrcCol, rowIndex); + colDataAppend(pDestCol, pDest->info.rows, pSrcData, isNull); + } + pDest->info.rows++; + } + blockDataUpdateTsWindow(pDest, pInfo->tsColIndex); + pDest->info.groupId = pParInfo->groupId; + pOperator->resultInfo.totalRows += pDest->info.rows; + pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte); + ASSERT(pDest->info.rows > 0); + printDataBlock(pDest, "stream partitionby"); + return pDest; +} + +static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDataBlock* pBlock) { + pInfo->pInputDataBlock = pBlock; + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + recordNewGroupKeys(pInfo->partitionSup.pGroupCols, pInfo->partitionSup.pGroupColVals, pBlock, i); + int32_t keyLen = buildGroupKeys(pInfo->partitionSup.keyBuf, pInfo->partitionSup.pGroupColVals); + SPartitionDataInfo* pParData = + (SPartitionDataInfo*) taosHashGet(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen); + if (pParData) { + taosArrayPush(pParData->rowIds, &i); + } else { + SPartitionDataInfo newParData = {0}; + newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen); + newParData.rowIds = taosArrayInit(64, sizeof(int32_t)); + taosArrayPush(newParData.rowIds, &i); + taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData, + sizeof(SPartitionDataInfo)); + } + } +} + +static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStreamPartitionOperatorInfo* pInfo = pOperator->info; + if (hasRemainPartion(pInfo)) { + return buildStreamPartitionResult(pOperator); + } + + int64_t st = taosGetTimestampUs(); + SOperatorInfo* downstream = pOperator->pDownstream[0]; + { + pInfo->pInputDataBlock = NULL; + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + if (pBlock == NULL) { + doSetOperatorCompleted(pOperator); + return NULL; + } + printDataBlock(pBlock, "stream partitionby recv"); + switch (pBlock->info.type) { + case STREAM_NORMAL: + case STREAM_PULL_DATA: + case STREAM_INVALID: + pInfo->binfo.pRes->info.type = pBlock->info.type; + break; + default: + return pBlock; + } + + // there is an scalar expression that needs to be calculated right before apply the group aggregation. + if (pInfo->scalarSup.pExprInfo != NULL) { + pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, + pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, pTaskInfo->code); + } + } + taosHashClear(pInfo->pPartitions); + doStreamHashPartitionImpl(pInfo, pBlock); + } + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; + + pInfo->parIte = taosHashIterate(pInfo->pPartitions, NULL); + return buildStreamPartitionResult(pOperator); +} + +static void destroyStreamPartitionOperatorInfo(void* param) { + SStreamPartitionOperatorInfo* pInfo = (SStreamPartitionOperatorInfo*)param; + cleanupBasicInfo(&pInfo->binfo); + taosArrayDestroy(pInfo->partitionSup.pGroupCols); + + for(int i = 0; i < taosArrayGetSize(pInfo->partitionSup.pGroupColVals); i++){ + SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->partitionSup.pGroupColVals, i); + taosMemoryFree(key.pData); + } + taosArrayDestroy(pInfo->partitionSup.pGroupColVals); + + taosMemoryFree(pInfo->partitionSup.keyBuf); + cleanupExprSupp(&pInfo->scalarSup); + taosMemoryFreeClear(param); +} + +void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr) { + if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + return; + } + SStreamScanInfo* pScanInfo = downstream->info; + pScanInfo->partitionSup = *pParSup; + pScanInfo->pPartScalarSup = pExpr; +} + +SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) { + SStreamPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamPartitionOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + int32_t code = TSDB_CODE_SUCCESS; + pInfo->partitionSup.pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys); + + if (pPartNode->pExprs != NULL) { + int32_t num = 0; + SExprInfo* pCalExprInfo = createExprInfo(pPartNode->pExprs, NULL, &num); + code = initExprSupp(&pInfo->scalarSup, pCalExprInfo, num); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } + + int32_t keyLen = 0; + code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf, pInfo->partitionSup.pGroupCols); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pInfo->partitionSup.needCalc = true; + + SSDataBlock* pResBlock = createResDataBlock(pPartNode->node.pOutputDataBlockDesc); + if (!pResBlock) { + goto _error; + } + blockDataEnsureCapacity(pResBlock, 4096); + pInfo->binfo.pRes = pResBlock; + pInfo->parIte = NULL; + pInfo->pInputDataBlock = NULL; + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); + pInfo->tsColIndex = 0; + + int32_t numOfCols = 0; + SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols); + + pOperator->name = "StreamPartitionOperator"; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION; + pOperator->exprSupp.numOfExprs = numOfCols; + pOperator->exprSupp.pExprInfo = pExprInfo; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, NULL, destroyStreamPartitionOperatorInfo, + NULL, NULL, NULL); + + initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup); + code = appendDownstream(pOperator, &downstream, 1); + return pOperator; + + _error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + return NULL; +} + diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9a5368e90e..b740ec21d3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -920,49 +920,28 @@ static void doClearBufferedBlocks(SStreamScanInfo* pInfo) { } static bool isSessionWindow(SStreamScanInfo* pInfo) { - return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || - pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; + return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || + pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; } static bool isStateWindow(SStreamScanInfo* pInfo) { - return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE; + return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE; } static bool isIntervalWindow(SStreamScanInfo* pInfo) { - return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || - pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || - pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL; + return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || + pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || + pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL; } static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) { - return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL; + return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL; } static bool isSlidingWindow(SStreamScanInfo* pInfo) { return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding; } -static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) { - uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t)); - if (groupId) { - return *groupId; - } - return 0; - /* Todo(liuyao) for partition by column - recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, rowId); - int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals); - uint64_t resId = 0; - uint64_t* groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len); - if (groupId) { - return *groupId; - } else if (len != 0) { - resId = calcGroupId(pTableScanInfo->keyBuf, len); - taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &resId, sizeof(uint64_t)); - } - return resId; - */ -} - static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex); uint64_t* groupCol = (uint64_t*)pColInfo->pData; @@ -976,6 +955,62 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) { pTableScanInfo->currentGroupId = -1; } +static void freeArray(void* array) { + taosArrayDestroy(array); +} + +static void resetTableScanOperator(SOperatorInfo* pTableScanOp) { + STableScanInfo* pTableScanInfo = pTableScanOp->info; + pTableScanInfo->cond.startVersion = -1; + pTableScanInfo->cond.endVersion = -1; + SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList; + SArray* allTbls = pTableScanOp->pTaskInfo->tableqinfoList.pTableList; + taosArrayClearP(gpTbls, freeArray); + taosArrayPush(gpTbls, &allTbls); + STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX}; + resetTableScanInfo(pTableScanOp->info, &win); +} + +static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs, int64_t maxVersion) { + SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList; + taosArrayClear(gpTbls); + STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0}; + SArray* tbls = taosArrayInit(1, sizeof(STableKeyInfo)); + taosArrayPush(tbls, &tblInfo); + taosArrayPush(gpTbls, &tbls); + + STimeWindow win = {.skey = startTs, .ekey = endTs}; + STableScanInfo* pTableScanInfo = pTableScanOp->info; + pTableScanInfo->cond.startVersion = -1; + pTableScanInfo->cond.endVersion = maxVersion; + resetTableScanInfo(pTableScanOp->info, &win); + SSDataBlock* pRes = doTableScan(pTableScanOp); + resetTableScanOperator(pTableScanOp); + return pRes; +} + +static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) { + SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion); + if (!pPreRes || pPreRes->info.rows == 0) { + return 0; + } + ASSERT(pPreRes->info.rows == 1); + return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0); +} + +static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) { + if (pInfo->partitionSup.needCalc) { + return getGroupIdByCol(pInfo, uid, ts, maxVersion); + } + + SHashObj* map = pInfo->pTableScanOp->pTaskInfo->tableqinfoList.map; + uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t)); + if (groupId) { + return *groupId; + } + return 0; +} + static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) { if ((*pRowIndex) == pBlock->info.rows) { return false; @@ -987,6 +1022,9 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); TSKEY* endData = (TSKEY*)pEndTsCol->pData; STimeWindow win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]}; + SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); + uint64_t* gpData = (uint64_t*)pGpCol->pData; + uint64_t groupId = gpData[*pRowIndex]; SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); TSKEY* calStartData = (TSKEY*)pCalStartTsCol->pData; @@ -1001,11 +1039,11 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ (*pRowIndex)++; for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) { - if (win.skey == startData[*pRowIndex]) { + if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) { win.ekey = TMAX(win.ekey, endData[*pRowIndex]); continue; } - if (win.skey == endData[*pRowIndex]) { + if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) { win.skey = TMIN(win.skey, startData[*pRowIndex]); continue; } @@ -1020,15 +1058,19 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ } static STimeWindow getSlidingWindow(TSKEY* tsCol, SInterval* pInterval, SDataBlockInfo* pDataBlockInfo, - int32_t* pRowIndex) { + int32_t* pRowIndex, bool hasGroup) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC); STimeWindow endWin = win; STimeWindow preWin = win; while (1) { - (*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey, binarySearchForKey, NULL, - TSDB_ORDER_ASC); + if (hasGroup) { + (*pRowIndex) += 1; + } else { + (*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey, + binarySearchForKey, NULL, TSDB_ORDER_ASC); + } do { preWin = endWin; getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC); @@ -1060,7 +1102,26 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32 return NULL; } - if (pResult->info.groupId == pInfo->groupId) { + if (pInfo->partitionSup.needCalc) { + SSDataBlock* tmpBlock = createOneDataBlock(pResult, true); + blockDataCleanup(pResult); + for (int32_t i = 0; i < tmpBlock->info.rows; i++) { + if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) { + for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) { + SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j); + SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j); + bool isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL); + char* pSrcData = colDataGetData(pSrcCol, i); + colDataAppend(pDestCol, pResult->info.rows, pSrcData, isNull); + } + pResult->info.rows++; + } + } + if (pResult->info.rows > 0) { + pResult->info.calWin = pInfo->updateWin; + return pResult; + } + } else if (pResult->info.groupId == pInfo->groupId) { pResult->info.calWin = pInfo->updateWin; return pResult; } @@ -1091,17 +1152,18 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); int32_t dummy = 0; + int64_t version = pSrcBlock->info.version - 1; for (int32_t i = 0; i < pSrcBlock->info.rows; i++) { - uint64_t groupId = getGroupId(pInfo->pTableScanOp, uidCol[i]); + uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version); // gap must be 0. SResultWindowInfo* pStartWin = - getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, startData[i], endData[i], groupId, 0, &dummy); + getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], endData[i], groupId, 0, &dummy); if (!pStartWin) { // window has been closed. continue; } SResultWindowInfo* pEndWin = - getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, endData[i], endData[i], groupId, 0, &dummy); + getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, 0, &dummy); ASSERT(pEndWin); TSKEY ts = INT64_MIN; colDataAppend(pDestStartCol, i, (const char*)&pStartWin->win.skey, false); @@ -1121,34 +1183,49 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS if (rows == 0) { return TSDB_CODE_SUCCESS; } - int32_t code = blockDataEnsureCapacity(pDestBlock, rows); + int32_t code = blockDataEnsureCapacity(pDestBlock, rows * 2); if (code != TSDB_CODE_SUCCESS) { return code; } - SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX); - SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); - uint64_t* uidCol = (uint64_t*)pUidCol->pData; - ASSERT(pTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); - TSKEY* tsCol = (TSKEY*)pTsCol->pData; + SColumnInfoData* pSrcTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); + uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData; + SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX); + uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData; + ASSERT(pSrcTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); + TSKEY* tsCol = (TSKEY*)pSrcTsCol->pData; SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX); + SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX); SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); - uint64_t groupId = getGroupId(pInfo->pTableScanOp, uidCol[0]); + int64_t version = pSrcBlock->info.version - 1; for (int32_t i = 0; i < rows;) { - colDataAppend(pCalStartTsCol, pDestBlock->info.rows, (const char*)(tsCol + i), false); - STimeWindow win = getSlidingWindow(tsCol, &pInfo->interval, &pSrcBlock->info, &i); - colDataAppend(pCalEndTsCol, pDestBlock->info.rows, (const char*)(tsCol + i - 1), false); - + uint64_t srcUid = srcUidData[i]; + uint64_t groupId = getGroupIdByData(pInfo, srcUid, tsCol[i], version); + uint64_t srcGpId = srcGp[i]; + TSKEY calStartTs = tsCol[i]; + colDataAppend(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false); + STimeWindow win = getSlidingWindow(tsCol, &pInfo->interval, &pSrcBlock->info, &i, pInfo->partitionSup.needCalc); + TSKEY calEndTs = tsCol[i - 1]; + colDataAppend(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false); + colDataAppend(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false); colDataAppend(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false); colDataAppend(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false); colDataAppend(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false); pDestBlock->info.rows++; + if (pInfo->partitionSup.needCalc && srcGpId != 0 && groupId != srcGpId) { + colDataAppend(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false); + colDataAppend(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false); + colDataAppend(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false); + colDataAppend(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false); + colDataAppend(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false); + colDataAppend(pGpCol, pDestBlock->info.rows, (const char*)(&srcGpId), false); + pDestBlock->info.rows++; + } } - // all rows have same group id - pDestBlock->info.groupId = groupId; return TSDB_CODE_SUCCESS; } @@ -1160,17 +1237,20 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock); } pDestBlock->info.type = STREAM_CLEAR; + pDestBlock->info.version = pSrcBlock->info.version; blockDataUpdateTsWindow(pDestBlock, 0); return code; } -void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, int32_t uidCol, uint64_t* pID) { +void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp) { SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); - SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, uidCol); + SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); + SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false); colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false); - colDataAppend(pUidCol, pBlock->info.rows, (const char*)pID, false); + colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false); + colDataAppend(pGpCol, pBlock->info.rows, (const char*)pGp, false); pBlock->info.rows++; } @@ -1195,24 +1275,18 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock // must check update info first. bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); bool closedWin = isClosed && isSignleIntervalWindow(pInfo) && - isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup); + isDeletedWindow(&win, pBlock->info.groupId, pInfo->windowSup.pIntervalAggSup); if ((update || closedWin) && out) { - appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, UID_COLUMN_INDEX, &pBlock->info.uid); + uint64_t gpId = closedWin&&pInfo->partitionSup.needCalc ? + calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId) : 0; + appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, + &gpId); } } - if (out) { + if (out && pInfo->pUpdateDataRes->info.rows > 0) { + pInfo->pUpdateDataRes->info.version = pBlock->info.version; blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0); - pInfo->pUpdateDataRes->info.type = STREAM_CLEAR; - } -} - -static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t uidColIndex) { - ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3); - SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, uidColIndex); - uint64_t* uidCol = (uint64_t*)pColDataInfo->pData; - ASSERT(pBlock->info.rows > 0); - for (int32_t i = 0; i < pBlock->info.rows; i++) { - uidCol[i] = getGroupId(pOperator, uidCol[i]); + pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR; } } @@ -1447,6 +1521,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { default: break; } + // printDataBlock(pBlock, "stream scan recv"); return pBlock; } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) { qDebug("scan mode %d", pInfo->scanMode); @@ -1456,6 +1531,14 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; return pInfo->pRes; } break; + case STREAM_SCAN_FROM_DELETERES: { + generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); + prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); + pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; + return pInfo->pDeleteDataRes; + } break; case STREAM_SCAN_FROM_UPDATERES: { generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); @@ -1471,6 +1554,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId, version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); + // printDataBlock(pSDB, "stream scan update"); return pSDB; } pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; @@ -1479,7 +1563,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { break; } - SStreamAggSupporter* pSup = pInfo->sessionSup.pStreamAggSup; + SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup; if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; pInfo->updateResIndex = 0; @@ -1545,7 +1629,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { // record the scan action. pInfo->numOfExec++; pOperator->resultInfo.totalRows += pBlockInfo->rows; - printDataBlock(pInfo->pRes, "stream scan"); + // printDataBlock(pInfo->pRes, "stream scan"); if (pBlockInfo->rows == 0) { updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); @@ -1554,19 +1638,20 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { checkUpdateData(pInfo, true, pInfo->pRes, true); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey); if (pInfo->pUpdateDataRes->info.rows > 0) { + pInfo->updateResIndex = 0; if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) { - pInfo->updateResIndex = 0; pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES; } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) { pInfo->scanMode = STREAM_SCAN_FROM_RES; return pInfo->pUpdateDataRes; + } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) { + pInfo->scanMode = STREAM_SCAN_FROM_DELETERES; } } } qDebug("scan rows: %d", pBlockInfo->rows); return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; - } else { ASSERT(0); return NULL; @@ -1830,13 +1915,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ASSERT(pHandle->tqReader); pInfo->tqReader = pHandle->tqReader; } - - if (pTSInfo->pdInfo.interval.interval > 0) { - pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->pdInfo.interval, pInfo->twAggSup.waterMark); - } else { - pInfo->pUpdateInfo = NULL; - } - + + pInfo->pUpdateInfo = NULL; pInfo->pTableScanOp = pTableScanOp; pInfo->interval = pTSInfo->pdInfo.interval; @@ -1867,8 +1947,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); pInfo->pCondition = pScanPhyNode->node.pConditions; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - pInfo->sessionSup = - (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; + pInfo->windowSup = + (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; pInfo->groupId = 0; pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE); pInfo->pStreamScanOp = pOperator; @@ -1877,6 +1957,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX}; pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR); pInfo->assignBlockUid = pTableScanNode->assignBlockUid; + pInfo->partitionSup.needCalc = false; pOperator->name = "StreamScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 152bd5939d..ab304b8d2d 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -897,7 +897,7 @@ int64_t getWinReskey(void* data, int32_t index) { int32_t compareWinRes(void* pKey, void* data, int32_t index) { SArray* res = (SArray*)data; - SWinKey* pos = taosArrayGetP(res, index); + SWinKey* pos = taosArrayGet(res, index); SResKeyPos* pData = (SResKeyPos*)pKey; if (*(int64_t*)pData->key == pos->ts) { if (pData->groupId > pos->groupId) { @@ -919,10 +919,11 @@ static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) { } void* pIte = NULL; while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { - SResKeyPos* pResKey = (SResKeyPos*)pIte; + SResKeyPos* pResKey = *(SResKeyPos**)pIte; int32_t index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinRes); if (index >= 0 && 0 == compareWinRes(pResKey, pDelWins, index)) { taosArrayRemove(pDelWins, index); + delSize = taosArrayGetSize(pDelWins); } } } @@ -1423,7 +1424,7 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) return true; } -void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* pUpWins, SInterval* pInterval) { +void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* pDelWins, SInterval* pInterval, SHashObj* pUpdatedMap) { SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* tsStarts = (TSKEY*)pStartCol->pData; SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); @@ -1433,9 +1434,12 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, dumyInfo.cur.pageId = -1; STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, TSDB_ORDER_ASC); doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]); - if (pUpWins) { - SWinKey winRes = {.ts = win.skey, .groupId = groupIds[i]}; - taosArrayPush(pUpWins, &winRes); + SWinKey winRes = {.ts = win.skey, .groupId = groupIds[i]}; + if (pDelWins) { + taosArrayPush(pDelWins, &winRes); + } + if (pUpdatedMap) { + taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey)); } } } @@ -1446,19 +1450,14 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData; SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); TSKEY* endTsCols = (TSKEY*)pEndTsCol->pData; - uint64_t* pGpDatas = NULL; - if (pBlock->info.type == STREAM_RETRIEVE) { - SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); - pGpDatas = (uint64_t*)pGpCol->pData; - } - int32_t step = 0; - int32_t startPos = 0; + SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); + uint64_t* pGpDatas = (uint64_t*)pGpCol->pData; for (int32_t i = 0; i < pBlock->info.rows; i++) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC); while (win.ekey <= endTsCols[i]) { - uint64_t winGpId = pGpDatas ? pGpDatas[startPos] : pBlock->info.groupId; + uint64_t winGpId = pGpDatas[i]; bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TSKEY), winGpId, numOfOutput); if (pUpWins && res) { SWinKey winRes = {.ts = win.skey, .groupId = winGpId}; @@ -1571,13 +1570,10 @@ static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlo return; } blockDataEnsureCapacity(pBlock, size - *index); - SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); - SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); + uint64_t uid = 0; for (int32_t i = *index; i < size; i++) { SWinKey* pWin = taosArrayGet(pWins, i); - colDataAppend(pTsCol, pBlock->info.rows, (const char*)&pWin->ts, false); - colDataAppend(pGroupCol, pBlock->info.rows, (const char*)&pWin->groupId, false); - pBlock->info.rows++; + appendOneRow(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId); (*index)++; } } @@ -1596,6 +1592,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_RES_TO_RETURN) { doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows > 0) { + printDataBlock(pInfo->pDelRes, "single interval"); return pInfo->pDelRes; } @@ -1632,7 +1629,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { continue; } if (pBlock->info.type == STREAM_DELETE_DATA) { - doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval); + doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval, pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); @@ -1707,6 +1704,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { taosHashCleanup(pUpdatedMap); doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows > 0) { + printDataBlock(pInfo->pDelRes, "single interval"); return pInfo->pDelRes; } @@ -1828,14 +1826,16 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt return needed; } -void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup) { +void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup, SInterval* pInterval, int64_t waterMark) { if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - // Todo(liuyao) support partition by column + initIntervalDownStream(downstream->pDownstream[0], type, pSup, pInterval, waterMark); return; } SStreamScanInfo* pScanInfo = downstream->info; - pScanInfo->sessionSup.parentType = type; - pScanInfo->sessionSup.pIntervalAggSup = pSup; + pScanInfo->windowSup.parentType = type; + pScanInfo->windowSup.pIntervalAggSup = pSup; + pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, waterMark); + pScanInfo->interval = *pInterval; } void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) { @@ -1921,7 +1921,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL) { - initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, &pInfo->aggSup); + initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, &pInfo->aggSup, &pInfo->interval, pInfo->twAggSup.waterMark); } code = appendDownstream(pOperator, &downstream, 1); @@ -2849,14 +2849,26 @@ _error: } void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput, - SExecTaskInfo* pTaskInfo) { + SExecTaskInfo* pTaskInfo, SColumnInfoData* pTimeWindowData) { for (int32_t k = 0; k < numOfOutput; ++k) { if (fmIsWindowPseudoColumnFunc(pDestCtx[k].functionId)) { - continue; - } - int32_t code = TSDB_CODE_SUCCESS; - if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) { - code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]); + if (!pTimeWindowData) { + continue; + } + + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pDestCtx[k]); + char* p = GET_ROWCELL_INTERBUF(pEntryInfo); + SColumnInfoData idata = {0}; + idata.info.type = TSDB_DATA_TYPE_BIGINT; + idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; + idata.pData = p; + + SScalarParam out = {.columnData = &idata}; + SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData}; + pDestCtx[k].sfp.process(&tw, 1, &out); + pEntryInfo->numOfRes = 1; + }else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) { + int32_t code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]); if (code != TSDB_CODE_SUCCESS) { qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code)); pTaskInfo->code = code; @@ -2874,8 +2886,14 @@ bool hasIntervalWindow(SAggSupporter* pSup, TSKEY ts, uint64_t groupId) { return p1 != NULL; } +STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) { + STimeWindow w = {.skey = ts, .ekey = INT64_MAX}; + w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; + return w; +} + static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExprSupp* pSup, SArray* pWinArray, - int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SArray* pUpdated) { + int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pUpdatedMap) { int32_t size = taosArrayGetSize(pWinArray); if (!pInfo->pChildren) { return; @@ -2883,11 +2901,14 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr for (int32_t i = 0; i < size; i++) { SWinKey* pWinRes = taosArrayGet(pWinArray, i); SResultRow* pCurResult = NULL; - STimeWindow ParentWin = {.skey = pWinRes->ts, .ekey = pWinRes->ts + 1}; - setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &ParentWin, true, &pCurResult, pWinRes->groupId, pSup->pCtx, + STimeWindow parentWin = getFinalTimeWindow(pWinRes->ts, &pInfo->interval); + if (isDeletedWindow(&parentWin, pWinRes->groupId, &pInfo->aggSup) && isCloseWindow(&parentWin, &pInfo->twAggSup)) { + continue; + } + setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &parentWin, true, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); - bool find = true; + int32_t num = 0; for (int32_t j = 0; j < numOfChildren; j++) { SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j); SIntervalAggOperatorInfo* pChInfo = pChildOp->info; @@ -2895,15 +2916,16 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr if (!hasIntervalWindow(&pChInfo->aggSup, pWinRes->ts, pWinRes->groupId)) { continue; } - find = true; + num++; SResultRow* pChResult = NULL; - setTimeWindowOutputBuf(&pChInfo->binfo.resultRowInfo, &ParentWin, true, &pChResult, pWinRes->groupId, + setTimeWindowOutputBuf(&pChInfo->binfo.resultRowInfo, &parentWin, true, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs, pChildSup->rowEntryInfoOffset, &pChInfo->aggSup, pTaskInfo); - compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true); + compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); } - if (find && pUpdated) { - saveResultRow(pCurResult, pWinRes->groupId, pUpdated); + if (num > 1 && pUpdatedMap) { + saveWinResultRow(pCurResult, pWinRes->groupId, pUpdatedMap); setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo.cur); } } @@ -2934,12 +2956,6 @@ void addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) { static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; } -STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) { - STimeWindow w = {.skey = ts, .ekey = INT64_MAX}; - w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; - return w; -} - static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId, SHashObj* pUpdatedMap) { SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info; @@ -3126,6 +3142,25 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) { } } +static void addRetriveWindow(SArray* wins, SStreamFinalIntervalOperatorInfo* pInfo) { + int32_t size = taosArrayGetSize(wins); + for (int32_t i = 0; i < size; i++) { + SWinKey* winKey = taosArrayGet(wins, i); + STimeWindow nextWin = getFinalTimeWindow(winKey->ts, &pInfo->interval); + if (isCloseWindow(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) { + void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey)); + if (!chIds) { + SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId}; + // add pull data request + savePullWindow(&pull, pInfo->pPullWins); + int32_t size = taosArrayGetSize(pInfo->pChildren); + addPullWindow(pInfo->pPullDataMap, winKey, size); + qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size); + } + } + } +} + static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; @@ -3150,12 +3185,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return pInfo->pPullDataRes; } + doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); + if (pInfo->pDelRes->info.rows != 0) { + // process the rest of the data + printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); + return pInfo->pDelRes; + } + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); if (pInfo->binfo.pRes->info.rows == 0) { pOperator->status = OP_EXEC_DONE; if (!IS_FINAL_OP(pInfo)) { // semi interval operator clear disk buffer clearStreamIntervalOperator(pInfo); + qDebug("===stream===clear semi operator"); } else { freeAllPages(pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); } @@ -3219,23 +3262,28 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } removeResults(pUpWins, pUpdatedMap); copyDataBlock(pInfo->pUpdateRes, pBlock); - // copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex); pInfo->returnUpdate = true; taosArrayDestroy(pUpWins); break; } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { - doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval); + SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); + doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, delWins, &pInfo->interval, pUpdatedMap); if (IS_FINAL_OP(pInfo)) { int32_t childIndex = getChildIndex(pBlock); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SStreamFinalIntervalOperatorInfo* pChildInfo = pChildOp->info; SExprSupp* pChildSup = &pChildOp->exprSupp; - doDeleteSpecifyIntervalWindow(&pChildInfo->aggSup, pBlock, NULL, &pChildInfo->interval); - rebuildIntervalWindow(pInfo, pSup, pInfo->pDelWins, pInfo->binfo.pRes->info.groupId, - pOperator->exprSupp.numOfExprs, pOperator->pTaskInfo, pUpdated); + doDeleteSpecifyIntervalWindow(&pChildInfo->aggSup, pBlock, NULL, &pChildInfo->interval, NULL); + rebuildIntervalWindow(pInfo, pSup, delWins, pInfo->binfo.pRes->info.groupId, + pOperator->exprSupp.numOfExprs, pOperator->pTaskInfo, pUpdatedMap); + addRetriveWindow(delWins, pInfo); + taosArrayAddAll(pInfo->pDelWins, delWins); + taosArrayDestroy(delWins); continue; } - removeResults(pInfo->pDelWins, pUpdatedMap); + removeResults(delWins, pUpdatedMap); + taosArrayAddAll(pInfo->pDelWins, delWins); + taosArrayDestroy(delWins); break; } else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) { getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); @@ -3309,6 +3357,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return pInfo->pPullDataRes; } + // we should send result first. doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); if (pInfo->binfo.pRes->info.rows != 0) { printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); @@ -3470,7 +3519,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { - initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup); + initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, pInfo->twAggSup.waterMark); } code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -3567,10 +3616,18 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num } void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int64_t gap, int64_t waterMark, - uint16_t type) { - ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); + uint16_t type, int32_t tsColIndex) { + if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) { + SStreamPartitionOperatorInfo* pScanInfo = downstream->info; + pScanInfo->tsColIndex = tsColIndex; + } + + if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + initDownStream(downstream->pDownstream[0], pAggSup, gap, waterMark, type, tsColIndex); + return; + } SStreamScanInfo* pScanInfo = downstream->info; - pScanInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = pAggSup, .gap = gap, .parentType = type}; + pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = gap, .parentType = type}; pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark); } @@ -3642,7 +3699,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pInfo->isFinal = false; pInfo->pPhyNode = pPhyNode; pInfo->ignoreExpiredData = pSessionNode->window.igExpired; - pInfo->returnDelete = false; pOperator->name = "StreamSessionWindowAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; @@ -3653,7 +3709,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); if (downstream) { - initDownStream(downstream, &pInfo->streamAggSup, pInfo->gap, pInfo->twAggSup.waterMark, pOperator->operatorType); + initDownStream(downstream, &pInfo->streamAggSup, pInfo->gap, pInfo->twAggSup.waterMark, pOperator->operatorType, pInfo->primaryTsIndex); code = appendDownstream(pOperator, &downstream, 1); } return pOperator; @@ -3683,13 +3739,13 @@ bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) { bool isInWindow(SResultWindowInfo* pWinInfo, TSKEY ts, int64_t gap) { return isInTimeWindow(&pWinInfo->win, ts, gap); } -static SResultWindowInfo* insertNewSessionWindow(SArray* pWinInfos, TSKEY ts, int32_t index) { - SResultWindowInfo win = {.pos.offset = -1, .pos.pageId = -1, .win.skey = ts, .win.ekey = ts, .isOutput = false}; +static SResultWindowInfo* insertNewSessionWindow(SArray* pWinInfos, TSKEY startTs, TSKEY endTs, int32_t index) { + SResultWindowInfo win = {.pos.offset = -1, .pos.pageId = -1, .win.skey = startTs, .win.ekey = endTs, .isOutput = false}; return taosArrayInsert(pWinInfos, index, &win); } -static SResultWindowInfo* addNewSessionWindow(SArray* pWinInfos, TSKEY ts) { - SResultWindowInfo win = {.pos.offset = -1, .pos.pageId = -1, .win.skey = ts, .win.ekey = ts, .isOutput = false}; +static SResultWindowInfo* addNewSessionWindow(SArray* pWinInfos, TSKEY startTs, TSKEY endTs) { + SResultWindowInfo win = {.pos.offset = -1, .pos.pageId = -1, .win.skey = startTs, .win.ekey = endTs, .isOutput = false}; return taosArrayPush(pWinInfos, &win); } @@ -3748,7 +3804,7 @@ SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY star int32_t size = taosArrayGetSize(pWinInfos); if (size == 0) { *pIndex = 0; - return addNewSessionWindow(pWinInfos, startTs); + return addNewSessionWindow(pWinInfos, startTs, endTs); } // find the first position which is smaller than the key int32_t index = binarySearch(pWinInfos, size, startTs, TSDB_ORDER_DESC, getSessionWindowEndkey); @@ -3774,10 +3830,10 @@ SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY star if (index == size - 1) { *pIndex = taosArrayGetSize(pWinInfos); - return addNewSessionWindow(pWinInfos, startTs); + return addNewSessionWindow(pWinInfos, startTs, endTs); } *pIndex = index + 1; - return insertNewSessionWindow(pWinInfos, startTs, index + 1); + return insertNewSessionWindow(pWinInfos, startTs, endTs, index + 1); } int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId, @@ -3789,7 +3845,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TS if (pWinInfo->win.skey > pStartTs[i]) { if (pStDeleted && pWinInfo->isOutput) { SWinKey res = {.ts = pWinInfo->win.skey, .groupId = groupId}; - taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey)); + taosHashPut(pStDeleted, &res, sizeof(SWinKey), &res, sizeof(SWinKey)); pWinInfo->isOutput = false; } pWinInfo->win.skey = pStartTs[i]; @@ -3904,11 +3960,12 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, setWindowOutputBuf(pWinInfo, &pWinResult, pInfo->pDummyCtx, groupId, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->streamAggSup, pTaskInfo); pCurWin->win.ekey = TMAX(pCurWin->win.ekey, pWinInfo->win.ekey); - compactFunctions(pSup->pCtx, pInfo->pDummyCtx, numOfOutput, pTaskInfo); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->win, true); + compactFunctions(pSup->pCtx, pInfo->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); taosHashRemove(pStUpdated, &pWinInfo->pos, sizeof(SResultRowPosition)); if (pWinInfo->isOutput) { SWinKey res = {.ts = pWinInfo->win.skey, .groupId = groupId}; - taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey)); + taosHashPut(pStDeleted, &res, sizeof(SWinKey), &res, sizeof(SWinKey)); pWinInfo->isOutput = false; } taosArrayRemove(pInfo->streamAggSup.pCurWins, i); @@ -4005,10 +4062,11 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc if (!pCurWin) { break; } + SResultWindowInfo delWin = *pCurWin; deleteWindow(pAggSup->pCurWins, winIndex, fp); if (result) { - pCurWin->groupId = gpDatas[i]; - taosArrayPush(result, pCurWin); + delWin.groupId = gpDatas[i]; + taosArrayPush(result, &delWin); } } } @@ -4033,6 +4091,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup, ASSERT(isInWindow(pCurWin, tsCols[i], gap)); doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pSup, numOfOutput); if (result) { + pCurWin->groupId = gpCols[i]; taosArrayPush(result, pCurWin); } } @@ -4067,10 +4126,18 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It size_t keyLen = 0; while (((*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) { SWinKey* res = *Ite; - SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); - colDataAppend(pTsCol, pBlock->info.rows, (const char*)&res->ts, false); + SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)&res->ts, false); + SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)&res->ts, false); + SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); + colDataAppendNULL(pUidCol, pBlock->info.rows); SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); colDataAppend(pGpCol, pBlock->info.rows, (const char*)&res->groupId, false); + SColumnInfoData* pCalStCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); + colDataAppendNULL(pCalStCol, pBlock->info.rows); + SColumnInfoData* pCalEdCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); + colDataAppendNULL(pCalEdCol, pBlock->info.rows); pBlock->info.rows += 1; if (pBlock->info.rows + 1 >= pBlock->info.capacity) { break; @@ -4081,8 +4148,8 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It } } -static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWinArray, int32_t groupId, - int32_t numOfOutput, SOperatorInfo* pOperator) { +static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWinArray, + int32_t numOfOutput, SOperatorInfo* pOperator, SHashObj* pStUpdated, bool needCreate) { SExprSupp* pSup = &pOperator->exprSupp; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -4092,9 +4159,15 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin for (int32_t i = 0; i < size; i++) { SResultWindowInfo* pParentWin = taosArrayGet(pWinArray, i); SResultRow* pCurResult = NULL; + uint64_t groupId = pParentWin->groupId; + int32_t winIndex = 0; + if (needCreate) { + pParentWin = getSessionTimeWindow(&pInfo->streamAggSup, pParentWin->win.skey, pParentWin->win.ekey, groupId, 0, &winIndex); + } setWindowOutputBuf(pParentWin, &pCurResult, pSup->pCtx, groupId, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->streamAggSup, pTaskInfo); int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); + int32_t num = 0; for (int32_t j = 0; j < numOfChildren; j++) { SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j); SStreamSessionAggOperatorInfo* pChInfo = pChild->info; @@ -4110,15 +4183,24 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin SResultRow* pChResult = NULL; setWindowOutputBuf(pChWin, &pChResult, pChild->exprSupp.pCtx, groupId, numOfOutput, pChild->exprSupp.rowEntryInfoOffset, &pChInfo->streamAggSup, pTaskInfo); - compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pChWin->win, true); + compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); SFilePage* bufPage = getBufPage(pChInfo->streamAggSup.pResultBuf, pChWin->pos.pageId); releaseBufPage(pChInfo->streamAggSup.pResultBuf, bufPage); + num++; continue; } else if (!pChWin->isClosed) { break; } } } + if (num == 0 && needCreate) { + deleteWindow(pInfo->streamAggSup.pCurWins, winIndex, NULL); + } + if (pStUpdated && num > 0) { + SWinKey value = {.ts = pParentWin->win.skey, .groupId = groupId}; + taosHashPut(pStUpdated, &pParentWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinKey)); + } SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pParentWin->pos.pageId); ASSERT(size > 0); setBufPageDirty(bufPage, true); @@ -4198,7 +4280,46 @@ static void copyDeleteWindowInfo(SArray* pResWins, SHashObj* pStDeleted) { for (int32_t i = 0; i < size; i++) { SResultWindowInfo* pWinInfo = taosArrayGet(pResWins, i); SWinKey res = {.ts = pWinInfo->win.skey, .groupId = pWinInfo->groupId}; - taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey)); + taosHashPut(pStDeleted, &res, sizeof(SWinKey), &res, sizeof(SWinKey)); + } +} + +static void removeSessionResults(SHashObj* pHashMap, SArray* pWins) { + int32_t size = taosArrayGetSize(pWins); + for (int32_t i = 0; i < size; i++) { + SResultWindowInfo* pWin = taosArrayGet(pWins, i); + taosHashRemove(pHashMap, &pWin->pos, sizeof(SResultRowPosition)); + } +} + +int32_t compareWinKey(void* pKey, void* data, int32_t index) { + SArray* res = (SArray*)data; + SResKeyPos* pos = taosArrayGetP(res, index); + SWinKey* pData = (SWinKey*)pKey; + if (pData->ts == *(int64_t*)pos->key) { + if (pData->groupId > pos->groupId) { + return 1; + } else if (pData->groupId < pos->groupId) { + return -1; + } + return 0; + } else if (pData->ts > *(int64_t*)pos->key) { + return 1; + } + return -1; +} + +static void removeSessionDeleteResults(SArray* update, SHashObj* pStDeleted) { + int32_t size = taosHashGetSize(pStDeleted); + if (size == 0) { + return; + } + + int32_t num = taosArrayGetSize(update); + for (int32_t i = 0; i < num; i++) { + SResKeyPos* pos = taosArrayGetP(update, i); + SWinKey winKey = {.ts = *(int64_t*)pos->key, .groupId = pos->groupId}; + taosHashRemove(pStDeleted, &winKey, sizeof(SWinKey)); } } @@ -4226,7 +4347,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); SHashObj* pStUpdated = taosHashInit(64, hashFn, true, HASH_NO_LOCK); SOperatorInfo* downstream = pOperator->pDownstream[0]; - SArray* pUpdated = taosArrayInit(16, POINTER_BYTES); + SArray* pUpdated = taosArrayInit(16, POINTER_BYTES); // SResKeyPos while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -4242,9 +4363,9 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { int32_t childIndex = getChildIndex(pBlock); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; - doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX, - pChildOp->exprSupp.numOfExprs, 0, NULL); - rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator); + doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX, pChildOp->exprSupp.numOfExprs, + 0, NULL); + rebuildTimeWindow(pInfo, pWins, pOperator->exprSupp.numOfExprs, pOperator, NULL, false); } taosArrayDestroy(pWins); continue; @@ -4258,9 +4379,10 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; // gap must be 0 doDeleteTimeWindows(&pChildInfo->streamAggSup, pBlock, 0, NULL, NULL); - rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator); + rebuildTimeWindow(pInfo, pWins, pOperator->exprSupp.numOfExprs, pOperator, pStUpdated, true); } copyDeleteWindowInfo(pWins, pInfo->pStDeleted); + removeSessionResults(pStUpdated, pWins); taosArrayDestroy(pWins); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { @@ -4303,6 +4425,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { pInfo->ignoreExpiredData, NULL); closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData, NULL); copyUpdateResult(pStUpdated, pUpdated); + removeSessionDeleteResults(pUpdated, pInfo->pStDeleted); taosHashCleanup(pStUpdated); finalizeUpdatedResult(pSup->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); @@ -4333,14 +4456,6 @@ static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) { pInfo->streamAggSup.currentPageId = -1; } -static void removeSessionResults(SHashObj* pHashMap, SArray* pWins) { - int32_t size = taosArrayGetSize(pWins); - for (int32_t i = 0; i < size; i++) { - SResultWindowInfo* pWin = taosArrayGet(pWins, i); - taosHashRemove(pHashMap, &pWin->pos, sizeof(SResultRowPosition)); - } -} - static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SOptrBasicInfo* pBInfo = &pInfo->binfo; @@ -4349,30 +4464,34 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; - } else if (pOperator->status == OP_RES_TO_RETURN) { + } + + { doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); if (pBInfo->pRes->info.rows > 0) { printDataBlock(pBInfo->pRes, "semi session"); return pBInfo->pRes; } - // doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); - if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) { - pInfo->returnDelete = true; + doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, "semi session"); return pInfo->pDelRes; } - if (pInfo->pUpdateRes->info.rows > 0) { + if (pInfo->pUpdateRes->info.rows > 0 && pInfo->returnUpdate) { + pInfo->returnUpdate = false; // process the rest of the data - pOperator->status = OP_OPENED; printDataBlock(pInfo->pUpdateRes, "semi session"); return pInfo->pUpdateRes; } - // semi interval operator clear disk buffer - clearStreamSessionOperator(pInfo); - pOperator->status = OP_EXEC_DONE; - return NULL; + + if (pOperator->status == OP_RES_TO_RETURN) { + // semi interval operator clear disk buffer + clearStreamSessionOperator(pInfo); + pOperator->status = OP_EXEC_DONE; + return NULL; + } } _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -4383,6 +4502,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { clearSpecialDataBlock(pInfo->pUpdateRes); + pOperator->status = OP_RES_TO_RETURN; break; } printDataBlock(pBlock, "semi session recv"); @@ -4393,12 +4513,15 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { removeSessionResults(pStUpdated, pWins); taosArrayDestroy(pWins); copyDataBlock(pInfo->pUpdateRes, pBlock); + pInfo->returnUpdate = true; break; } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { // gap must be 0 - doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, NULL, NULL); - copyDataBlock(pInfo->pDelRes, pBlock); - pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; + SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); + doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, pWins, NULL); + copyDeleteWindowInfo(pWins, pInfo->pStDeleted); + removeSessionResults(pStUpdated, pWins); + taosArrayDestroy(pWins); break; } else if (pBlock->info.type == STREAM_GET_ALL) { getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getResWinForSession); @@ -4411,18 +4534,15 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); - doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted, false); + doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, NULL, false); maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pBInfo->pRes->info.watermark = pInfo->twAggSup.maxTs; - // restore the value - pOperator->status = OP_RES_TO_RETURN; - // semi operator - // closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, - // getResWinForSession); + copyUpdateResult(pStUpdated, pUpdated); + removeSessionDeleteResults(pUpdated, pInfo->pStDeleted); taosHashCleanup(pStUpdated); finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, @@ -4436,16 +4556,15 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { return pBInfo->pRes; } - // doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); - if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) { - pInfo->returnDelete = true; + doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, "semi session"); return pInfo->pDelRes; } - if (pInfo->pUpdateRes->info.rows > 0) { + if (pInfo->pUpdateRes->info.rows > 0 && pInfo->returnUpdate) { + pInfo->returnUpdate = false; // process the rest of the data - pOperator->status = OP_OPENED; printDataBlock(pInfo->pUpdateRes, "semi session"); return pInfo->pUpdateRes; } @@ -4669,7 +4788,7 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, u if (pWinInfo->winInfo.win.skey > pTs[i]) { if (pSeDeleted && pWinInfo->winInfo.isOutput) { SWinKey res = {.ts = pWinInfo->winInfo.win.skey, .groupId = groupId}; - taosHashPut(pSeDeleted, &pWinInfo->winInfo.pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey)); + taosHashPut(pSeDeleted, &res, sizeof(SWinKey), &res, sizeof(SWinKey)); pWinInfo->winInfo.isOutput = false; } pWinInfo->winInfo.win.skey = pTs[i]; @@ -4737,8 +4856,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, groupId, pKeyColInfo, pSDataBlock->info.rows, i, &allEqual, pStDeleted); if (!allEqual) { - appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, GROUPID_COLUMN_INDEX, - &groupId); + uint64_t uid = 0; + appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, + &uid, &groupId); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo); continue; @@ -4767,6 +4887,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { SExprSupp* pSup = &pOperator->exprSupp; SStreamStateAggOperatorInfo* pInfo = pOperator->info; SOptrBasicInfo* pBInfo = &pInfo->binfo; + int64_t maxTs = INT64_MIN; if (pOperator->status == OP_RES_TO_RETURN) { doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { @@ -4799,6 +4920,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, pWins, destroyStateWinInfo); copyDeleteWindowInfo(pWins, pInfo->pSeDeleted); + removeSessionResults(pSeUpdated, pWins); taosArrayDestroy(pWins); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { @@ -4813,8 +4935,9 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted); - pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); + maxTs = TMAX(maxTs, pBlock->info.window.ekey); } + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); // restore the value pOperator->status = OP_RES_TO_RETURN; @@ -4913,7 +5036,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL, destroyStreamStateOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); - initDownStream(downstream, &pInfo->streamAggSup, 0, pInfo->twAggSup.waterMark, pOperator->operatorType); + initDownStream(downstream, &pInfo->streamAggSup, 0, pInfo->twAggSup.waterMark, pOperator->operatorType, pInfo->primaryTsIndex); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 83bccbffb4..eb0b604d37 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -777,6 +777,7 @@ SNode* nodesCloneNode(const SNode* pNode) { code = physiSessionCopy((const SSessionWinodwPhysiNode*)pNode, (SSessionWinodwPhysiNode*)pDst); break; case QUERY_NODE_PHYSICAL_PLAN_PARTITION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: code = physiPartitionCopy((const SPartitionPhysiNode*)pNode, (SPartitionPhysiNode*)pDst); break; default: diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 822bdec365..8976daadbd 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -265,6 +265,8 @@ const char* nodesNodeName(ENodeType type) { return "PhysiStreamStateWindow"; case QUERY_NODE_PHYSICAL_PLAN_PARTITION: return "PhysiPartition"; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: + return "PhysiStreamPartition"; case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: return "PhysiIndefRowsFunc"; case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: @@ -4485,6 +4487,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: return physiStateWindowNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_PARTITION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: return physiPartitionNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: return physiIndefRowsFuncNodeToJson(pObj, pJson); @@ -4632,6 +4635,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: return jsonToPhysiStateWindowNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_PARTITION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: return jsonToPhysiPartitionNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: return jsonToPhysiIndefRowsFuncNode(pJson, pObj); diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index 2e23998aad..728e173ff8 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -537,7 +537,8 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk } break; } - case QUERY_NODE_PHYSICAL_PLAN_PARTITION: { + case QUERY_NODE_PHYSICAL_PLAN_PARTITION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: { SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)pNode; res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext); if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index d13057a93e..61b2ad954f 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -322,6 +322,8 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SStreamStateWinodwPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_PARTITION: return makeNode(type, sizeof(SPartitionPhysiNode)); + case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: + return makeNode(type, sizeof(SStreamPartitionPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: return makeNode(type, sizeof(SIndefRowsFuncPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: @@ -951,7 +953,8 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pPhyNode->pStateKey); break; } - case QUERY_NODE_PHYSICAL_PLAN_PARTITION: { + case QUERY_NODE_PHYSICAL_PLAN_PARTITION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: { SPartitionPhysiNode* pPhyNode = (SPartitionPhysiNode*)pNode; destroyPhysiNode((SPhysiNode*)pPhyNode); nodesDestroyList(pPhyNode->pExprs); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index cafae18dbe..0cbb833a4d 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1324,7 +1324,8 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) { SPartitionPhysiNode* pPart = - (SPartitionPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pPartLogicNode, QUERY_NODE_PHYSICAL_PLAN_PARTITION); + (SPartitionPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pPartLogicNode, + pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION : QUERY_NODE_PHYSICAL_PLAN_PARTITION); if (NULL == pPart) { return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index d053662bd3..332f7ad2fd 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -170,8 +170,17 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { if (ts < maxTs - pInfo->watermark) { // this window has been closed. if (pInfo->pCloseWinSBF) { - return tScalableBfPut(pInfo->pCloseWinSBF, &ts, sizeof(TSKEY)); + res = tScalableBfPut(pInfo->pCloseWinSBF, &ts, sizeof(TSKEY)); + if (res == TSDB_CODE_SUCCESS) { + return false; + } else { + qDebug("===stream===Update close window sbf. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId, + maxTs, *pMapMaxTs, ts); + return true; + } } + qDebug("===stream===Update close window. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId, + maxTs, *pMapMaxTs, ts); return true; } @@ -193,7 +202,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { } if (ts < pInfo->minTS) { - qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId, + qDebug("===stream===Update min ts. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId, maxTs, *pMapMaxTs, ts); return true; } else if (res == TSDB_CODE_SUCCESS) { diff --git a/tests/script/tsim/stream/basic1.sim b/tests/script/tsim/stream/basic1.sim index 5392979c0a..d9777d5133 100644 --- a/tests/script/tsim/stream/basic1.sim +++ b/tests/script/tsim/stream/basic1.sim @@ -462,10 +462,10 @@ if $data25 != 3 then return -1 endi -sql create database test2 vgroups 1 -sql select * from information_schema.ins_databases +sql create database test2 vgroups 1; +sql select * from information_schema.ins_databases; -sql use test2 +sql use test2; sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); diff --git a/tests/script/tsim/stream/partitionbyColumn0.sim b/tests/script/tsim/stream/partitionbyColumn0.sim new file mode 100644 index 0000000000..d91d4b7bf0 --- /dev/null +++ b/tests/script/tsim/stream/partitionbyColumn0.sim @@ -0,0 +1,570 @@ +$loop_all = 0 +looptest: + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +sql drop stream if exists streams0; +sql drop stream if exists streams1; +sql drop stream if exists streams2; +sql drop stream if exists streams3; +sql drop stream if exists streams4; +sql drop database if exists test; +sql create database test vgroups 1; +sql use test; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(a) c3, _group_key(a) c4 from t1 partition by a interval(10s); + +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); + +$loop_count = 0 + +loop0: +sleep 100 +sql select * from streamt order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop0 +endi + +if $data02 != NULL then + print =====data02=$data02 + goto loop0 +endi + + +sql insert into t1 values(1648791213000,1,2,3,1.0); + +$loop_count = 0 + +loop1: +sleep 100 +sql select * from streamt order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop1 +endi + +sql insert into t1 values(1648791213000,2,2,3,1.0); + +$loop_count = 0 + +loop2: +sleep 100 +sql select * from streamt order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop2 +endi + +if $data02 != 2 then + print =====data02=$data02 + goto loop2 +endi + +sql insert into t1 values(1648791213000,2,2,3,1.0); +sql insert into t1 values(1648791213001,2,2,3,1.0); +sql insert into t1 values(1648791213002,2,2,3,1.0); +sql insert into t1 values(1648791213002,1,2,3,1.0); + +$loop_count = 0 + +loop3: +sleep 100 +sql select * from streamt order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop3 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop3 +endi + +if $data11 != 2 then + print =====data11=$data11 + goto loop3 +endi + +if $data12 != 2 then + print =====data12=$data12 + goto loop3 +endi + +sql insert into t1 values(1648791223000,1,2,3,1.0); +sql insert into t1 values(1648791223001,1,2,3,1.0); +sql insert into t1 values(1648791223002,3,2,3,1.0); +sql insert into t1 values(1648791223003,3,2,3,1.0); +sql insert into t1 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0); + +$loop_count = 0 + +loop4: +sleep 100 +sql select * from streamt order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop4 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop4 +endi + +if $data11 != 1 then + print =====data11=$data11 + goto loop4 +endi + +if $data12 != 2 then + print =====data12=$data12 + goto loop4 +endi + +if $data21 != 2 then + print =====data21=$data21 + goto loop4 +endi + +if $data22 != 1 then + print =====data22=$data22 + goto loop4 +endi + +if $data31 != 1 then + print =====data31=$data31 + goto loop4 +endi + +if $data32 != 2 then + print =====data32=$data32 + goto loop4 +endi + +if $data41 != 1 then + print =====data41=$data41 + goto loop4 +endi + +if $data42 != 3 then + print =====data42=$data42 + goto loop4 +endi + +sql drop stream if exists streams1; +sql drop database if exists test1; +sql create database test1 vgroups 1; +sql use test1; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once into streamt1 as select _wstart c1, count(*) c2, max(c) c3, _group_key(a+b) c4 from t1 partition by a+b interval(10s); + +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); +sql insert into t1 values(1648791213000,1,2,1,1.0); +sql insert into t1 values(1648791213001,2,1,2,2.0); +sql insert into t1 values(1648791213001,1,2,3,2.0); + +$loop_count = 0 + +loop5: +sleep 100 +sql select * from streamt1 order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop5 +endi + +sql insert into t1 values(1648791223000,1,2,4,2.0); +sql insert into t1 values(1648791223001,1,2,5,2.0); +sql insert into t1 values(1648791223002,1,2,5,2.0); +sql insert into t1 values(1648791213001,1,1,6,2.0) (1648791223002,1,1,7,2.0); + +$loop_count = 0 + +loop6: +sleep 100 +sql select * from streamt1 order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop6 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop6 +endi + +if $data11 != 1 then + print =====data11=$data11 + goto loop6 +endi + +if $data12 != 1 then + print =====data12=$data12 + goto loop6 +endi + +if $data21 != 1 then + print =====data21=$data21 + goto loop6 +endi + +if $data22 != 7 then + print =====data22=$data22 + goto loop6 +endi + +if $data31 != 2 then + print =====data31=$data31 + goto loop6 +endi + +if $data32 != 5 then + print =====data32=$data32 + goto loop6 +endi + +sql drop stream if exists streams2; +sql drop database if exists test2; +sql create database test2 vgroups 4; +sql use test2; +sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams2 trigger at_once into test.streamt2 as select _wstart c1, count(*) c2, max(a) c3 from st partition by a interval(10s); + +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); +sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL); +sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL); + +$loop_count = 0 + +loop7: +sleep 100 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop7 +endi + +if $data02 != NULL then + print =====data02=$data02 + goto loop7 +endi + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t2 values(1648791213000,1,2,3,1.0); + +$loop_count = 0 + +loop8: +sleep 100 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop8 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop8 +endi + +sql insert into t1 values(1648791213000,2,2,3,1.0); +sql insert into t2 values(1648791213000,2,2,3,1.0); + +$loop_count = 0 + +loop9: +sleep 100 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop9 +endi + +if $data02 != 2 then + print =====data02=$data02 + goto loop9 +endi + +sql insert into t1 values(1648791213000,2,2,3,1.0); +sql insert into t1 values(1648791213001,2,2,3,1.0); +sql insert into t1 values(1648791213002,2,2,3,1.0); +sql insert into t1 values(1648791213002,1,2,3,1.0); +sql insert into t2 values(1648791213000,2,2,3,1.0); +sql insert into t2 values(1648791213001,2,2,3,1.0); +sql insert into t2 values(1648791213002,2,2,3,1.0); +sql insert into t2 values(1648791213002,1,2,3,1.0); + +$loop_count = 0 + +loop10: +sleep 100 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop10 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop10 +endi + +if $data11 != 4 thenloop4 + print =====data11=$data11 + goto loop10 +endi + +if $data12 != 2 then + print =====data12=$data12 + goto loop10 +endi + +sql insert into t1 values(1648791223000,1,2,3,1.0); +sql insert into t1 values(1648791223001,1,2,3,1.0); +sql insert into t1 values(1648791223002,3,2,3,1.0); +sql insert into t1 values(1648791223003,3,2,3,1.0); +sql insert into t1 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0); +sql insert into t2 values(1648791223000,1,2,3,1.0); +sql insert into t2 values(1648791223001,1,2,3,1.0); +sql insert into t2 values(1648791223002,3,2,3,1.0); +sql insert into t2 values(1648791223003,3,2,3,1.0); +sql insert into t2 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0); + +$loop_count = 0 + +loop11: +sleep 100 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop11 +endi + +if $data02 != 2 then + print =====data02=$data02 + goto loop11 +endi + +if $data11 != 4 then + print =====data11=$data11 + goto loop11 +endi + +if $data12 != 1 then + print =====data12=$data12 + goto loop11 +endi + +if $data21 != 2 then + print =====data21=$data21 + goto loop11 +endi + +if $data22 != 2 then + print =====data22=$data22 + goto loop11 +endi + +if $data31 != 2 then + print =====data31=$data31 + goto loop11 +endi + +if $data32 != 3 then + print =====data32=$data32 + goto loop11 +endi + +if $data41 != 4 then + print =====data41=$data41 + goto loop11 +endi + +if $data42 != 1 then + print =====data42=$data42 + goto loop11 +endi + +sql drop stream if exists streams4; +sql drop database if exists test4; +sql create database test4 vgroups 4; +sql use test4; +sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(2,2,2); +sql create table t4 using st tags(2,2,2); +sql create stream streams4 trigger at_once into test.streamt4 as select _wstart c1, count(*) c2, max(a) c3 from st partition by a interval(10s); + +sql insert into t1 values(1648791213000,2,2,3,1.0); +sql insert into t2 values(1648791213000,2,2,3,1.0); +sql insert into t3 values(1648791213000,2,2,3,1.0); +sql insert into t4 values(1648791213000,2,2,3,1.0); +sql insert into t4 values(1648791213000,1,2,3,1.0); + +$loop_count = 0 + +loop13: +sleep 100 +sql select * from test.streamt4 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows + goto loop13 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop13 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop13 +endi + +if $data11 != 3 then + print =====data11=$data11 + goto loop13 +endi + +if $data12 != 2 then + print =====data12=$data12 + goto loop13 +endi + +sql insert into t4 values(1648791213000,2,2,3,1.0); +sql insert into t1 values(1648791233000,2,2,3,1.0); + + +sql insert into t1 values(1648791213000,1,2,3,1.0); + +$loop_count = 0 + +loop14: +sleep 100 +sql select * from test.streamt4 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 3 then + print =====rows=$rows + goto loop14 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop14 +endi + +if $data11 != 3 then + print =====data11=$data11 + goto loop14 +endi + +if $data21 != 1 then + print =====data21=$data21 + goto loop14 +endi + +$loop_all = $loop_all + 1 +print ============loop_all=$loop_all + +system sh/stop_dnodes.sh + +#goto looptest \ No newline at end of file diff --git a/tests/script/tsim/stream/partitionbyColumn1.sim b/tests/script/tsim/stream/partitionbyColumn1.sim new file mode 100644 index 0000000000..7f5c53ebe3 --- /dev/null +++ b/tests/script/tsim/stream/partitionbyColumn1.sim @@ -0,0 +1,546 @@ +$loop_all = 0 +looptest: + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +sql drop stream if exists streams0; +sql drop stream if exists streams1; +sql drop stream if exists streams2; +sql drop stream if exists streams3; +sql drop stream if exists streams4; +sql drop database if exists test; +sql create database test vgroups 1; +sql use test; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(a) c3, _group_key(a) c4 from t1 partition by a session(ts, 5s); + +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); + +$loop_count = 0 + +loop0: +sleep 300 +sql select * from streamt order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop0 +endi + +if $data02 != NULL then + print =====data02=$data02 + goto loop0 +endi + + +sql insert into t1 values(1648791213000,1,2,3,1.0); + +loop1: +sleep 300 +sql select * from streamt order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop1 +endi + +sql insert into t1 values(1648791213000,2,2,3,1.0); + +loop2: +sleep 300 +sql select * from streamt order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop2 +endi + +if $data02 != 2 then + print =====data02=$data02 + goto loop2 +endi + +sql insert into t1 values(1648791213000,2,2,3,1.0); +sql insert into t1 values(1648791213001,2,2,3,1.0); +sql insert into t1 values(1648791213002,2,2,3,1.0); +sql insert into t1 values(1648791213002,1,2,3,1.0); + +loop3: +sleep 300 +sql select * from streamt order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop3 +endi + +if $data02 != 2 then + print =====data02=$data02 + goto loop3 +endi + +if $data11 != 1 then + print =====data11=$data11 + goto loop3 +endi + +if $data12 != 1 then + print =====data12=$data12 + goto loop3 +endi + +sql insert into t1 values(1648791223000,1,2,3,1.0); +sql insert into t1 values(1648791223001,1,2,3,1.0); +sql insert into t1 values(1648791223002,3,2,3,1.0); +sql insert into t1 values(1648791223003,3,2,3,1.0); +sql insert into t1 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0); + +loop4: +sleep 300 +sql select * from streamt order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop4 +endi + +if $data02 != 2 then + print =====data02=$data02 + goto loop4 +endi + +if $data11 != 2 then + print =====data11=$data11 + goto loop4 +endi + +if $data12 != 1 then + print =====data12=$data12 + goto loop4 +endi + +if $data21 != 2 then + print =====data21=$data21 + goto loop4 +endi + +if $data22 != 1 then + print =====data22=$data22 + goto loop4 +endi + +if $data31 != 1 then + print =====data31=$data31 + goto loop4 +endi + +if $data32 != 2 then + print =====data32=$data32 + goto loop4 +endi + +if $data41 != 1 then + print =====data41=$data41 + goto loop4 +endi + +if $data42 != 3 then + print =====data42=$data42 + goto loop4 +endi + +sql drop database if exists test1; +sql create database test1 vgroups 1; +sql use test1; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once into streamt1 as select _wstart c1, count(*) c2, max(c) c3, _group_key(a+b) c4 from t1 partition by a+b session(ts, 5s); + +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); +sql insert into t1 values(1648791213000,1,2,1,1.0); +sql insert into t1 values(1648791213001,2,1,2,2.0); +sql insert into t1 values(1648791213001,1,2,3,2.0); + +$loop_count = 0 + +loop5: +sleep 300 +sql select * from streamt1 order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop5 +endi + +sql insert into t1 values(1648791223000,1,2,4,2.0); +sql insert into t1 values(1648791223001,1,2,5,2.0); +sql insert into t1 values(1648791223002,1,2,5,2.0); +sql insert into t1 values(1648791213001,1,1,6,2.0) (1648791223002,1,1,7,2.0); + +loop6: +sleep 300 +sql select * from streamt1 order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop6 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop6 +endi + +if $data11 != 1 then + print =====data11=$data11 + goto loop6 +endi + +if $data12 != 6 then + print =====data12=$data12 + goto loop6 +endi + +if $data21 != 2 then + print =====data21=$data21 + goto loop6 +endi + +if $data22 != 5 then + print =====data22=$data22 + goto loop6 +endi + +if $data31 != 1 then + print =====data31=$data31 + goto loop6 +endi + +if $data32 != 7 then + print =====data32=$data32 + goto loop6 +endi + +sql drop database if exists test2; +sql create database test2 vgroups 4; +sql use test2; +sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams2 trigger at_once into test.streamt2 as select _wstart c1, count(*) c2, max(a) c3 from st partition by a session(ts, 5s); + +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); +sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL); +sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL); + +$loop_count = 0 + +loop7: +sleep 300 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop7 +endi + +if $data02 != NULL then + print =====data02=$data02 + goto loop7 +endi + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t2 values(1648791213000,1,2,3,1.0); + +loop8: +sleep 300 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop8 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop8 +endi + +sql insert into t1 values(1648791213000,2,2,3,1.0); +sql insert into t2 values(1648791213000,2,2,3,1.0); +loop9: +sleep 300 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop9 +endi + +if $data02 != 2 then + print =====data02=$data02 + goto loop9 +endi + +sql insert into t1 values(1648791213000,2,2,3,1.0); +sql insert into t1 values(1648791213001,2,2,3,1.0); +sql insert into t1 values(1648791213002,2,2,3,1.0); +sql insert into t1 values(1648791213002,1,2,3,1.0); +sql insert into t2 values(1648791213000,2,2,3,1.0); +sql insert into t2 values(1648791213001,2,2,3,1.0); +sql insert into t2 values(1648791213002,2,2,3,1.0); +sql insert into t2 values(1648791213002,1,2,3,1.0); + +loop10: +sleep 300 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 4 then + print =====data01=$data01 + goto loop10 +endi + +if $data02 != 2 then + print =====data02=$data02 + goto loop10 +endi + +if $data11 != 2 thenloop4 + print =====data11=$data11 + goto loop3 +endi + +if $data12 != 1 then + print =====data12=$data12 + goto loop10 +endi + +sql insert into t1 values(1648791223000,1,2,3,1.0); +sql insert into t1 values(1648791223001,1,2,3,1.0); +sql insert into t1 values(1648791223002,3,2,3,1.0); +sql insert into t1 values(1648791223003,3,2,3,1.0); +sql insert into t1 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0); +sql insert into t2 values(1648791223000,1,2,3,1.0); +sql insert into t2 values(1648791223001,1,2,3,1.0); +sql insert into t2 values(1648791223002,3,2,3,1.0); +sql insert into t2 values(1648791223003,3,2,3,1.0); +sql insert into t2 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0); + +loop11: +sleep 300 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop11 +endi + +if $data02 != 2 then + print =====data02=$data02 + goto loop11 +endi + +if $data11 != 4 then + print =====data11=$data11 + goto loop11 +endi + +if $data12 != 1 then + print =====data12=$data12 + goto loop11 +endi + +if $data21 != 4 then + print =====data21=$data21 + goto loop11 +endi + +if $data22 != 1 then + print =====data22=$data22 + goto loop11 +endi + +if $data31 != 2 then + print =====data31=$data31 + goto loop11 +endi + +if $data32 != 2 then + print =====data32=$data32 + goto loop11 +endi + +if $data41 != 2 then + print =====data41=$data41 + goto loop11 +endi + +if $data42 != 3 then + print =====data42=$data42 + goto loop11 +endi + +sql drop database if exists test4; +sql create database test4 vgroups 4; +sql use test4; +sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(2,2,2); +sql create table t4 using st tags(2,2,2); +sql create stream streams4 trigger at_once into test.streamt4 as select _wstart c1, count(*) c2, max(a) c3 from st partition by a session(ts, 5s); + +sql insert into t1 values(1648791213000,2,2,3,1.0); +sql insert into t2 values(1648791213000,2,2,3,1.0); +sql insert into t3 values(1648791213000,2,2,3,1.0); +sql insert into t4 values(1648791213000,2,2,3,1.0); +sql insert into t4 values(1648791213000,1,2,3,1.0); + +$loop_count = 0 + +loop13: +sleep 300 +sql select * from test.streamt4 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows + goto loop14 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop13 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop13 +endi + +if $data11 != 3 then + print =====data11=$data11 + goto loop11 +endi + +if $data12 != 2 then + print =====data12=$data12 + goto loop11 +endi + +sql insert into t4 values(1648791213000,2,2,3,1.0); +sql insert into t1 values(1648791233000,2,2,3,1.0); + + +sql insert into t1 values(1648791213000,1,2,3,1.0); + +loop14: +sleep 300 +sql select * from test.streamt4 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 3 then + print =====rows=$rows + goto loop14 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop14 +endi + +if $data11 != 3 then + print =====data11=$data11 + goto loop14 +endi + +if $data21 != 1 then + print =====data21=$data21 + goto loop14 +endi + +system sh/stop_dnodes.sh + +$loop_all = $loop_all + 1 +print ============loop_all=$loop_all + +#goto looptest \ No newline at end of file diff --git a/tests/script/tsim/stream/partitionbyColumn2.sim b/tests/script/tsim/stream/partitionbyColumn2.sim new file mode 100644 index 0000000000..3d9acbcac5 --- /dev/null +++ b/tests/script/tsim/stream/partitionbyColumn2.sim @@ -0,0 +1,269 @@ +$loop_all = 0 +looptest: + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +sql drop database if exists test; +sql create database test vgroups 1; +sql use test; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(a) c3, _group_key(a) c4 from t1 partition by a state_window(b); + +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); + +$loop_count = 0 + +loop0: +sleep 300 +sql select * from streamt order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop0 +endi + +if $data02 != NULL then + print =====data02=$data02 + goto loop0 +endi + + +sql insert into t1 values(1648791213000,1,1,3,1.0); + +loop1: +sleep 300 +sql select * from streamt order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop1 +endi + +sql insert into t1 values(1648791213000,2,1,3,1.0); + +loop2: +sleep 300 +sql select * from streamt order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop2 +endi + +if $data02 != 2 then + print =====data02=$data02 + goto loop2 +endi + +sql insert into t1 values(1648791213000,2,1,3,1.0); +sql insert into t1 values(1648791213001,2,1,3,1.0); +sql insert into t1 values(1648791213002,2,1,3,1.0); +sql insert into t1 values(1648791213002,1,1,3,1.0); + +loop3: +sleep 300 +sql select * from streamt order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop3 +endi + +if $data02 != 2 then + print =====data02=$data02 + goto loop3 +endi + +if $data11 != 1 then + print =====data11=$data11 + goto loop3 +endi + +if $data12 != 1 then + print =====data12=$data12 + goto loop3 +endi + +sql insert into t1 values(1648791223000,1,2,3,1.0); +sql insert into t1 values(1648791223001,1,2,3,1.0); +sql insert into t1 values(1648791223002,3,2,3,1.0); +sql insert into t1 values(1648791223003,3,2,3,1.0); +sql insert into t1 values(1648791213001,1,1,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0); + +loop4: +sleep 300 +sql select * from streamt order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop4 +endi + +if $data02 != 2 then + print =====data02=$data02 + goto loop4 +endi + +if $data11 != 2 then + print =====data11=$data11 + goto loop4 +endi + +if $data12 != 1 then + print =====data12=$data12 + goto loop4 +endi + +if $data21 != 2 then + print =====data21=$data21 + goto loop4 +endi + +if $data22 != 1 then + print =====data22=$data22 + goto loop4 +endi + +if $data31 != 1 then + print =====data31=$data31 + goto loop4 +endi + +if $data32 != 2 then + print =====data32=$data32 + goto loop4 +endi + +if $data41 != 1 then + print =====data41=$data41 + goto loop4 +endi + +if $data42 != 3 then + print =====data42=$data42 + goto loop4 +endi + +sql drop database if exists test1; +sql create database test1 vgroups 1; +sql use test1; +sql create table t1(ts timestamp, a int, b int , c int, d int); +sql create stream streams1 trigger at_once into streamt1 as select _wstart c1, count(*) c2, max(d) c3, _group_key(a+b) c4 from t1 partition by a+b state_window(c); + +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); +sql insert into t1 values(1648791213000,1,2,1,1); +sql insert into t1 values(1648791213001,2,1,1,2); +sql insert into t1 values(1648791213001,1,2,1,3); + +$loop_count = 0 + +loop5: +sleep 300 +sql select * from streamt1 order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop5 +endi + +sql insert into t1 values(1648791223000,1,2,2,4); +sql insert into t1 values(1648791223001,1,2,2,5); +sql insert into t1 values(1648791223002,1,2,2,6); +sql insert into t1 values(1648791213001,1,1,1,7) (1648791223002,1,1,2,8); + +loop6: +sleep 300 +sql select * from streamt1 order by c1, c4, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop6 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop6 +endi + +if $data11 != 1 then + print =====data11=$data11 + goto loop6 +endi + +if $data12 != 7 then + print =====data12=$data12 + goto loop6 +endi + +if $data21 != 2 then + print =====data21=$data21 + goto loop6 +endi + +if $data22 != 5 then + print =====data22=$data22 + goto loop6 +endi + +if $data31 != 1 then + print =====data31=$data31 + goto loop6 +endi + +if $data32 != 8 then + print =====data32=$data32 + goto loop6 +endi + +system sh/stop_dnodes.sh + +$loop_all = $loop_all + 1 +print ============loop_all=$loop_all + +#goto looptest