From aa9962318f6b37a6dc254603ef5c25902bcfeec8 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 26 Mar 2024 16:38:24 +0800 Subject: [PATCH] read preversion data of primary key --- include/common/tcommon.h | 1 + source/libs/executor/inc/executorInt.h | 5 +- source/libs/executor/src/scanoperator.c | 142 ++++++++++++++---- .../executor/src/streamcountwindowoperator.c | 2 +- .../executor/src/streameventwindowoperator.c | 2 +- .../executor/src/streamtimewindowoperator.c | 6 +- source/libs/nodes/src/nodesCodeFuncs.c | 7 + 7 files changed, 131 insertions(+), 34 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index d68ba1534a..c84185c120 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -392,6 +392,7 @@ typedef struct STUidTagInfo { #define CALCULATE_START_TS_COLUMN_INDEX 4 #define CALCULATE_END_TS_COLUMN_INDEX 5 #define TABLE_NAME_COLUMN_INDEX 6 +#define PRIMARY_KEY_COLUMN_INDEX 7 // stream create table block column #define UD_TABLE_NAME_COLUMN_INDEX 0 diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c896869a24..73d1e08f37 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -453,6 +453,7 @@ typedef struct SStreamScanInfo { SExprSupp tbnameCalSup; SExprSupp tagCalSup; int32_t primaryTsIndex; // primary time stamp slot id + int32_t primaryKeyIndex; SReadHandle readHandle; SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SColMatchInfo matchInfo; @@ -873,10 +874,8 @@ bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup, SStateStore* pStore); -void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, +void appendDataToSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName); -void appendAllColumnToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, TSKEY* pCalStartTs, - TSKEY* pCalEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName); uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a51e627272..0ab291d25b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1372,13 +1372,51 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU return pBlock->info.rows > 0 ? pBlock : NULL; } -static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) { +bool comparePrimaryKey(SColumnInfoData* pCol, int32_t rowId, void* pVal) { + void* pData = colDataGetData(pCol, rowId); + if (IS_VAR_DATA_TYPE(pCol->info.type)) { + int32_t colLen = varDataLen(pData); + int32_t keyLen = varDataLen(pVal); + if (pCol->info.type == TSDB_DATA_TYPE_JSON) { + colLen = getJsonValueLen(pData); + keyLen = getJsonValueLen(pVal); + } + + if (colLen == keyLen && memcmp(pData, pVal, colLen) == 0) { + return true; + } + } else { + if (memcmp(pData, pVal, pCol->info.bytes) == 0) { + return true; + } + } + return false; +} + +bool hasPrimaryKey(SStreamScanInfo* pInfo) { + return pInfo->primaryKeyIndex != -1; +} + +static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion, void* pVal) { SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion); if (!pPreRes || pPreRes->info.rows == 0) { return 0; } - ASSERT(pPreRes->info.rows == 1); - return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0); + + int32_t rowId = 0; + if (hasPrimaryKey(pInfo)) { + SColumnInfoData* pPkCol = taosArrayGet(pPreRes->pDataBlock, pInfo->primaryKeyIndex); + for (; rowId < pPreRes->info.rows; rowId++) { + if (comparePrimaryKey(pPkCol, rowId, pVal)) { + break; + } + } + } + if (rowId >= pPreRes->info.rows) { + qInfo("===stream===read preversion data of primary key failed. ts:%" PRId64 ",version:%" PRId64); + return 0; + } + return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, rowId); } static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) { @@ -1386,9 +1424,9 @@ static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) { return tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, uid); } -static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) { +static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion, void* pVal) { if (pInfo->partitionSup.needCalc) { - return getGroupIdByCol(pInfo, uid, ts, maxVersion); + return getGroupIdByCol(pInfo, uid, ts, maxVersion, pVal); } return getGroupIdByUid(pInfo, uid); @@ -1573,6 +1611,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr TSKEY* endData = (TSKEY*)pEndTsCol->pData; SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); uint64_t* uidCol = (uint64_t*)pUidCol->pData; + SColumnInfoData* pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX); SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -1582,7 +1621,11 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); int64_t ver = pSrcBlock->info.version - 1; for (int32_t i = 0; i < pSrcBlock->info.rows; i++) { - uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver); + void* pVal = NULL; + if (hasPrimaryKey(pInfo)) { + pVal = colDataGetData(pSrcPkCol, i); + } + uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver, pVal); // gap must be 0. SSessionKey startWin = {0}; getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin); @@ -1628,6 +1671,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB TSKEY* endData = (TSKEY*)pEndTsCol->pData; SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); uint64_t* uidCol = (uint64_t*)pUidCol->pData; + SColumnInfoData* pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX); SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -1637,7 +1681,11 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); int64_t ver = pSrcBlock->info.version - 1; for (int32_t i = 0; i < pSrcBlock->info.rows; i++) { - uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver); + void* pVal = NULL; + if (hasPrimaryKey(pInfo)) { + pVal = colDataGetData(pSrcPkCol, i); + } + uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver, pVal); SSessionKey startWin = {.win.skey = startData[i], .win.ekey = endData[i], .groupId = groupId}; SSessionKey range = {0}; getCountWinRange(pInfo->windowSup.pStreamAggSup, &startWin, mode, &range); @@ -1664,6 +1712,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX); + SColumnInfoData* pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX); uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData; ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); @@ -1688,7 +1737,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS for (int32_t i = 0; i < rows; i++) { uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i); - appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid, + appendDataToSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid, &groupId, NULL); } printDataBlock(pSrcBlock, "new delete", GET_TASKID(pTaskInfo)); @@ -1713,7 +1762,11 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS uint64_t srcUid = srcUidData[i]; uint64_t groupId = srcGp[i]; if (groupId == 0) { - groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver); + void* pVal = NULL; + if (hasPrimaryKey(pInfo)) { + pVal = colDataGetData(pSrcPkCol, i); + } + groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver, pVal); } TSKEY calStartTs = srcStartTsCol[i]; colDataSetVal(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false); @@ -1758,6 +1811,8 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData; SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX); uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData; + SColumnInfoData* pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX); + ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; @@ -1767,12 +1822,17 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS uint64_t groupId = srcGp[i]; char tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0}; if (groupId == 0) { - groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver); + void* pVal = NULL; + if (hasPrimaryKey(pInfo) && pSrcPkCol) { + pVal = colDataGetData(pSrcPkCol, i); + } + groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver, pVal); } if (pInfo->tbnameCalSup.pExprInfo) { void* parTbname = NULL; code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname); if (code != TSDB_CODE_SUCCESS) { + // todo(liuyao) 这里可能需要修改,需要考虑复合主键时,pPreRes包含多行数据。 SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, srcStartTsCol[i], srcStartTsCol[i], ver); printDataBlock(pPreRes, "pre res", GET_TASKID(pInfo->pStreamScanOp->pTaskInfo)); calBlockTbName(pInfo, pPreRes); @@ -1783,7 +1843,7 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS varDataSetLen(tbname, strlen(varDataVal(tbname))); pInfo->stateStore.streamStateFreeVal(parTbname); } - appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId, + appendDataToSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId, tbname[0] == 0 ? NULL : tbname); } return TSDB_CODE_SUCCESS; @@ -1807,13 +1867,8 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, return code; } -void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, - uint64_t* pGp, void* pTbName) { - appendAllColumnToStreamSpecialBlock(pBlock, pStartTs, pEndTs, pStartTs, pEndTs, pUid, pGp, pTbName); -} - -void appendAllColumnToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, TSKEY* pCalStartTs, - TSKEY* pCalEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName) { +void appendOneRowToSpecialBlockImpl(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, TSKEY* pCalStartTs, + TSKEY* pCalEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName, void* pPkData) { SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); @@ -1828,9 +1883,24 @@ void appendAllColumnToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, T colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pCalStartTs, false); colDataSetVal(pCalEndCol, pBlock->info.rows, (const char*)pCalEndTs, false); colDataSetVal(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL); + if (taosArrayGetSize(pBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX) { + SColumnInfoData* pPkCol = taosArrayGet(pBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX); + colDataSetVal(pPkCol, pBlock->info.rows, (const char*)pPkData, pPkData == NULL); + } pBlock->info.rows++; } +void appendPkToSpecialBlock(SSDataBlock* pBlock, TSKEY* pTsArray, SColumnInfoData* pPkCol, int32_t rowId, + uint64_t* pUid, uint64_t* pGp, void* pTbName) { + appendOneRowToSpecialBlockImpl(pBlock, pTsArray + rowId, pTsArray + rowId, pTsArray + rowId, pTsArray + rowId, pUid, + pGp, pTbName, colDataGetData(pPkCol, rowId)); +} + +void appendDataToSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp, + void* pTbName) { + appendOneRowToSpecialBlockImpl(pBlock, pStartTs, pEndTs, pStartTs, pEndTs, pUid, pGp, pTbName, NULL); +} + bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts) { bool isExpired = false; bool isInc = pAPI->isIncrementalTimeStamp(pUpdateInfo, tableId, ts); @@ -1848,6 +1918,11 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* tsCol = (TSKEY*)pColDataInfo->pData; + SColumnInfoData* pPkColDataInfo = NULL; + if (hasPrimaryKey(pInfo)) { + pPkColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryKeyIndex); + } + bool tableInserted = pInfo->stateStore.updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid); for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) { SResultRowInfo dumyInfo; @@ -1865,17 +1940,15 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock } // must check update info first. bool update = pInfo->stateStore.updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]); - bool closedWin = isClosed && isSignleIntervalWindow(pInfo) && + bool isDeleted = isClosed && isSignleIntervalWindow(pInfo) && isDeletedStreamWindow(&win, pBlock->info.id.groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->stateStore); - if ((update || closedWin) && out) { - qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin); + if ((update || isDeleted) && out) { + qDebug("stream update check not pass, update %d, deleted Win %d", update, isDeleted); uint64_t gpId = 0; - appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId, - NULL); - if (closedWin && pInfo->partitionSup.needCalc) { + appendPkToSpecialBlock(pInfo->pUpdateDataRes, tsCol, pPkColDataInfo, rowId, &pBlock->info.id.uid, &gpId, NULL); + if (isDeleted && pInfo->partitionSup.needCalc) { gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId); - appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, - &gpId, NULL); + appendPkToSpecialBlock(pInfo->pUpdateDataRes, tsCol, pPkColDataInfo, rowId, &pBlock->info.id.uid, &gpId, NULL); } } } @@ -2809,6 +2882,14 @@ void streamScanReloadState(SOperatorInfo* pOperator) { } } +void addPrimaryKeyCol(SSDataBlock* pBlock, uint8_t type, int32_t bytes) { + pBlock->info.rowSize += bytes; + SColumnInfoData infoData = {0}; + infoData.info.type = type; + infoData.info.bytes = bytes; + taosArrayPush(pBlock->pDataBlock, &infoData); +} + SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { SArray* pColIds = NULL; @@ -2837,6 +2918,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys goto _error; } + SDataType pkType = {0}; + pInfo->primaryKeyIndex = -1; int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList); pColIds = taosArrayInit(numOfOutput, sizeof(int16_t)); for (int32_t i = 0; i < numOfOutput; ++i) { @@ -2847,6 +2930,10 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { pInfo->primaryTsIndex = id->dstSlotId; } + if (id->isPk) { + pInfo->primaryKeyIndex = id->dstSlotId; + pkType = id->dataType; + } } if (pTableScanNode->pSubtable != NULL) { @@ -2964,6 +3051,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA); pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX}; pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR); + if (hasPrimaryKey(pInfo)) { + addPrimaryKeyCol(pInfo->pUpdateDataRes, pkType.type, pkType.bytes); + } pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->partitionSup.needCalc = false; pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 8199fe141f..5b2db5bb4f 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -269,7 +269,7 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl range.win.skey = TMIN(startTsCols[i], range.win.skey); range.win.ekey = TMAX(startTsCols[rows-1], range.win.ekey); uint64_t uid = 0; - appendOneRowToStreamSpecialBlock(pAggSup->pScanBlock, &range.win.skey, &range.win.ekey, &uid, &range.groupId, NULL); + appendDataToSpecialBlock(pAggSup->pScanBlock, &range.win.skey, &range.win.ekey, &uid, &range.groupId, NULL); break; } code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput, diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 6db5a60ed4..f93ca1a04c 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -326,7 +326,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl ASSERT(winRows >= 1); if (rebuild) { uint64_t uid = 0; - appendOneRowToStreamSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey, + appendDataToSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey, &curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL); tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey)); doDeleteEventWindow(pAggSup, pSeUpdated, &curWin.winInfo.sessionWin); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 075a58e932..ed67c69771 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -377,11 +377,11 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin void* tbname = NULL; pInfo->stateStore.streamStateGetParName(pInfo->pState, pWin->groupId, &tbname); if (tbname == NULL) { - appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL); + appendDataToSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL); } else { char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); - appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName); + appendDataToSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName); } pInfo->stateStore.streamStateFreeVal(tbname); (*index)++; @@ -3514,7 +3514,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl pAggSup->pResultRows, pSeUpdated, pStDeleted); if (!allEqual) { uint64_t uid = 0; - appendOneRowToStreamSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey, + appendDataToSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey, &curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL); tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey)); doDeleteSessionWindow(pAggSup, &curWin.winInfo.sessionWin); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 36bcfdc43b..8c7184f378 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -514,6 +514,7 @@ static const char* jkSchemaType = "Type"; static const char* jkSchemaColId = "ColId"; static const char* jkSchemaBytes = "bytes"; static const char* jkSchemaName = "Name"; +static const char* jkSchemaFlags = "Flags"; static int32_t schemaToJson(const void* pObj, SJson* pJson) { const SSchema* pNode = (const SSchema*)pObj; @@ -528,6 +529,9 @@ static int32_t schemaToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddStringToObject(pJson, jkSchemaName, pNode->name); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkSchemaFlags, pNode->flags); + } return code; } @@ -546,6 +550,9 @@ static int32_t jsonToSchema(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetStringValue(pJson, jkSchemaName, pNode->name); } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkSchemaFlags, pNode->flags, code); + } return code; }