From 3300bf04c11a57a9297c0e3c89e31612e8ed0acc Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 24 Apr 2024 17:27:24 +0800 Subject: [PATCH 1/4] opt bloom filter --- include/libs/executor/storageapi.h | 17 +- include/libs/stream/tstreamUpdate.h | 28 +--- include/util/thash.h | 7 + source/dnode/mnode/impl/src/mndStream.c | 4 +- source/libs/executor/inc/executorInt.h | 17 +- source/libs/executor/src/groupoperator.c | 2 +- source/libs/executor/src/scanoperator.c | 40 +++-- .../executor/src/streamcountwindowoperator.c | 4 +- .../executor/src/streameventwindowoperator.c | 4 +- .../executor/src/streamtimewindowoperator.c | 49 ++++-- source/libs/stream/src/streamUpdate.c | 146 ++++++++++++++---- source/util/src/thash.c | 5 + 12 files changed, 229 insertions(+), 94 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 0cefa0c476..bf7a2a869a 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -307,6 +307,13 @@ typedef struct SUpdateInfo { SScalableBf* pCloseWinSBF; SHashObj* pMap; uint64_t maxDataVersion; + int8_t pkColType; + int32_t pkColLen; + char* pKeyBuff; + char* pValueBuff; + + bool (*comparePkRowFn)(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn); + __compar_fn_t comparePkCol; } SUpdateInfo; typedef struct { @@ -374,17 +381,17 @@ typedef struct SStateStore { void** ppVal, int32_t* pVLen); int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen); - SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp); - TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol); - bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); + SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen); + TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol); + bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len); bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid); - bool (*isIncrementalTimeStamp)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); + bool (*isIncrementalTimeStamp)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len); void (*updateInfoDestroy)(SUpdateInfo* pInfo); void (*windowSBfDelete)(SUpdateInfo* pInfo, uint64_t count); void (*windowSBfAdd)(SUpdateInfo* pInfo, uint64_t count); - SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp); + SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen); void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo); void (*updateInfoDestoryColseWinSBF)(SUpdateInfo* pInfo); int32_t (*updateInfoSerialize)(void* buf, int32_t bufLen, const SUpdateInfo* pInfo); diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index af93c6ac01..632a82743f 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -25,28 +25,10 @@ extern "C" { #endif -typedef struct SUpdateKey { - int64_t tbUid; - TSKEY ts; -} SUpdateKey; - -//typedef struct SUpdateInfo { -// SArray *pTsBuckets; -// uint64_t numBuckets; -// SArray *pTsSBFs; -// uint64_t numSBFs; -// int64_t interval; -// int64_t watermark; -// TSKEY minTS; -// SScalableBf *pCloseWinSBF; -// SHashObj *pMap; -// uint64_t maxDataVersion; -//} SUpdateInfo; - -SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark, bool igUp); -SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp); -TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol); -bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); +SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen); +SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen); +TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol, int32_t primaryKeyCol); +bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len); bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid); void updateInfoDestroy(SUpdateInfo *pInfo); void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); @@ -55,7 +37,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *p int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo); void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count); void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count); -bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); +bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len); #ifdef __cplusplus } diff --git a/include/util/thash.h b/include/util/thash.h index 08caad495d..5dec175a38 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -215,6 +215,13 @@ void taosHashSetFreeFp(SHashObj *pHashObj, _hash_free_fn_t fp); int64_t taosHashGetCompTimes(SHashObj *pHashObj); +/** + * Get the corresponding value length for a given data in hash table + * @param data + * @return + */ +int32_t taosHashGetValueSize(void *data); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 796553b5ad..d225c2e272 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -325,7 +325,7 @@ static int32_t createSchemaByFields(const SArray* pFields, SSchemaWrapper* pWrap return TSDB_CODE_SUCCESS; } -static bool hasPrimaryKey(SSchemaWrapper* pWrapper) { +static bool hasDestPrimaryKey(SSchemaWrapper* pWrapper) { if (pWrapper->nCols < 2) { return false; } @@ -442,7 +442,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, pObj->outputSchema.pSchema = pFullSchema; } - bool hasKey = hasPrimaryKey(&pObj->outputSchema); + bool hasKey = hasDestPrimaryKey(&pObj->outputSchema); SPlanContext cxt = { .pAstRoot = pAst, .topicQuery = false, diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index edfad8c872..fac216e6e6 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -500,6 +500,8 @@ typedef struct SStreamScanInfo { SStoreTqReader readerFn; SStateStore stateStore; SSDataBlock* pCheckpointRes; + int8_t pkColType; + int32_t pkColLen; } SStreamScanInfo; typedef struct { @@ -566,8 +568,13 @@ typedef struct SOpCheckPointInfo { SHashObj* children; // key:child id } SOpCheckPointInfo; +typedef struct SSteamOpBasicInfo { + int32_t primaryPkIndex; +} SSteamOpBasicInfo; + typedef struct SStreamIntervalOperatorInfo { SOptrBasicInfo binfo; // basic info + SSteamOpBasicInfo basic; SAggSupporter aggSup; // aggregate supporter SExprSupp scalarSupp; // supporter for perform scalar function SGroupResInfo groupResInfo; // multiple results build supporter @@ -633,6 +640,7 @@ typedef struct SResultWindowInfo { typedef struct SStreamSessionAggOperatorInfo { SOptrBasicInfo binfo; + SSteamOpBasicInfo basic; SStreamAggSupporter streamAggSup; SExprSupp scalarSupp; // supporter for perform scalar function SGroupResInfo groupResInfo; @@ -665,6 +673,7 @@ typedef struct SStreamSessionAggOperatorInfo { typedef struct SStreamStateAggOperatorInfo { SOptrBasicInfo binfo; + SSteamOpBasicInfo basic; SStreamAggSupporter streamAggSup; SExprSupp scalarSupp; // supporter for perform scalar function SGroupResInfo groupResInfo; @@ -691,6 +700,7 @@ typedef struct SStreamStateAggOperatorInfo { typedef struct SStreamEventAggOperatorInfo { SOptrBasicInfo binfo; + SSteamOpBasicInfo basic; SStreamAggSupporter streamAggSup; SExprSupp scalarSupp; // supporter for perform scalar function SGroupResInfo groupResInfo; @@ -719,6 +729,7 @@ typedef struct SStreamEventAggOperatorInfo { typedef struct SStreamCountAggOperatorInfo { SOptrBasicInfo binfo; + SSteamOpBasicInfo basic; SStreamAggSupporter streamAggSup; SExprSupp scalarSupp; // supporter for perform scalar function SGroupResInfo groupResInfo; @@ -742,6 +753,7 @@ typedef struct SStreamCountAggOperatorInfo { typedef struct SStreamPartitionOperatorInfo { SOptrBasicInfo binfo; + SSteamOpBasicInfo basic; SPartitionBySupporter partitionSup; SExprSupp scalarSup; SExprSupp tbnameCalSup; @@ -775,6 +787,7 @@ typedef struct SStreamFillSupporter { } SStreamFillSupporter; typedef struct SStreamFillOperatorInfo { + SSteamOpBasicInfo basic; SStreamFillSupporter* pFillSup; SSDataBlock* pRes; SSDataBlock* pSrcBlock; @@ -911,7 +924,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, i SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, SStorageAPI* pApi, int32_t tsIndex); void initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex, - STimeWindowAggSupp* pTwSup); + STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic); void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins); void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey); @@ -939,7 +952,7 @@ void compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeW SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool addGap); int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI); void resetWinRange(STimeWindow* winRange); -bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts); +bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len); int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval); void resetUnCloseSessionWinInfo(SSHashObj* winMap); void setStreamOperatorCompleted(struct SOperatorInfo* pOperator); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 670771adcc..9a31e993b2 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1267,7 +1267,7 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup pScanInfo->pPartScalarSup = pExpr; pScanInfo->pPartTbnameSup = pTbnameExpr; if (!pScanInfo->pUpdateInfo) { - pScanInfo->pUpdateInfo = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate); + pScanInfo->pUpdateInfo = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen); } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c30059fffd..ef92e566b6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1380,7 +1380,7 @@ bool comparePrimaryKey(SColumnInfoData* pCol, int32_t rowId, void* pVal) { return false; } -bool hasPrimaryKey(SStreamScanInfo* pInfo) { +bool hasPrimaryKeyCol(SStreamScanInfo* pInfo) { return pInfo->primaryKeyIndex != -1; } @@ -1391,7 +1391,7 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, } int32_t rowId = 0; - if (hasPrimaryKey(pInfo)) { + if (hasPrimaryKeyCol(pInfo)) { SColumnInfoData* pPkCol = taosArrayGet(pPreRes->pDataBlock, pInfo->primaryKeyIndex); for (; rowId < pPreRes->info.rows; rowId++) { if (comparePrimaryKey(pPkCol, rowId, pVal)) { @@ -1630,7 +1630,7 @@ static void getPreVersionDataBlock(uint64_t uid, TSKEY startTs, TSKEY endTs, int SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pPkCol = NULL; - if (hasPrimaryKey(pInfo)) { + if (hasPrimaryKeyCol(pInfo)) { pPkCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryKeyIndex); } for (int32_t i = 0; i < pPreRes->info.rows; i++) { @@ -1659,7 +1659,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr } int64_t ver = pSrcBlock->info.version - 1; - if (pInfo->partitionSup.needCalc && ( startData[0] != endData[0] || (hasPrimaryKey(pInfo) && mode == STREAM_DELETE_DATA) )) { + if (pInfo->partitionSup.needCalc && ( startData[0] != endData[0] || (hasPrimaryKeyCol(pInfo) && mode == STREAM_DELETE_DATA) )) { getPreVersionDataBlock(uidCol[0], startData[0], endData[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock); startData = (TSKEY*)pStartTsCol->pData; endData = (TSKEY*)pEndTsCol->pData; @@ -1682,7 +1682,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr uint64_t groupId = pSrcGp[i]; if (groupId == 0) { void* pVal = NULL; - if (hasPrimaryKey(pInfo) && pSrcPkCol) { + if (hasPrimaryKeyCol(pInfo) && pSrcPkCol) { pVal = colDataGetData(pSrcPkCol, i); } groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver, pVal); @@ -1736,7 +1736,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB } int64_t ver = pSrcBlock->info.version - 1; - if (pInfo->partitionSup.needCalc && ( startData[0] != endData[0] || (hasPrimaryKey(pInfo) && mode == STREAM_DELETE_DATA) )) { + if (pInfo->partitionSup.needCalc && ( startData[0] != endData[0] || (hasPrimaryKeyCol(pInfo) && mode == STREAM_DELETE_DATA) )) { getPreVersionDataBlock(uidCol[0], startData[0], endData[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock); startData = (TSKEY*)pStartTsCol->pData; endData = (TSKEY*)pEndTsCol->pData; @@ -1759,7 +1759,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB uint64_t groupId = pSrcGp[i]; if (groupId == 0) { void* pVal = NULL; - if (hasPrimaryKey(pInfo) && pSrcPkCol) { + if (hasPrimaryKeyCol(pInfo) && pSrcPkCol) { pVal = colDataGetData(pSrcPkCol, i); } groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver, pVal); @@ -1800,7 +1800,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; int64_t ver = pSrcBlock->info.version - 1; - if (pInfo->partitionSup.needCalc && ( srcStartTsCol[0] != srcEndTsCol[0] || (hasPrimaryKey(pInfo) && mode == STREAM_DELETE_DATA) )) { + if (pInfo->partitionSup.needCalc && ( srcStartTsCol[0] != srcEndTsCol[0] || (hasPrimaryKeyCol(pInfo) && mode == STREAM_DELETE_DATA) )) { getPreVersionDataBlock(srcUidData[0], srcStartTsCol[0], srcEndTsCol[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock); srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; @@ -1824,7 +1824,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS uint64_t groupId = srcGp[i]; if (groupId == 0) { void* pVal = NULL; - if (hasPrimaryKey(pInfo) && pSrcPkCol) { + if (hasPrimaryKeyCol(pInfo) && pSrcPkCol) { pVal = colDataGetData(pSrcPkCol, i); } groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver, pVal); @@ -1915,7 +1915,7 @@ static int32_t generateDeleteResultBlockImpl(SStreamScanInfo* pInfo, SSDataBlock char tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0}; if (groupId == 0) { void* pVal = NULL; - if (hasPrimaryKey(pInfo) && pSrcPkCol) { + if (hasPrimaryKeyCol(pInfo) && pSrcPkCol) { pVal = colDataGetData(pSrcPkCol, i); } groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver, pVal); @@ -1979,9 +1979,9 @@ void appendDataToSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndT appendOneRowToSpecialBlockImpl(pBlock, pStartTs, pEndTs, pStartTs, pEndTs, pUid, pGp, pTbName, NULL); } -bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts) { +bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) { bool isExpired = false; - bool isInc = pAPI->isIncrementalTimeStamp(pUpdateInfo, tableId, ts); + bool isInc = pAPI->isIncrementalTimeStamp(pUpdateInfo, tableId, ts, pPkVal, len); if (!isInc) { isExpired = isOverdue(ts, pTwSup); } @@ -1997,7 +1997,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* tsCol = (TSKEY*)pColDataInfo->pData; SColumnInfoData* pPkColDataInfo = NULL; - if (hasPrimaryKey(pInfo)) { + if (hasPrimaryKeyCol(pInfo)) { pPkColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryKeyIndex); } @@ -2017,7 +2017,13 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock isClosed = isCloseWindow(&win, &pInfo->twAggSup); } // must check update info first. - bool update = pInfo->stateStore.updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]); + void* pPkVal = NULL; + int32_t pkLen = 0; + if (hasPrimaryKeyCol(pInfo)) { + pPkVal = colDataGetData(pPkColDataInfo, rowId); + pkLen = colDataGetRowLength(pPkColDataInfo, rowId); + } + bool update = pInfo->stateStore.updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId], pPkVal, pkLen); bool isDeleted = isClosed && isSignleIntervalWindow(pInfo) && isDeletedStreamWindow(&win, pBlock->info.id.groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->stateStore); if ((update || isDeleted) && out) { @@ -2517,7 +2523,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pInfo->pRecoverRes != NULL) { calBlockTbName(pInfo, pInfo->pRecoverRes, 0); if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { - TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); + TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex, pInfo->primaryKeyIndex); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); } if (pInfo->pCreateTbRes->info.rows > 0) { @@ -3202,8 +3208,10 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA); pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX}; pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR); - if (hasPrimaryKey(pInfo)) { + if (hasPrimaryKeyCol(pInfo)) { addPrimaryKeyCol(pInfo->pUpdateDataRes, pkType.type, pkType.bytes); + pInfo->pkColType = pkType.type; + pInfo->pkColLen = pkType.bytes + 2; } pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->partitionSup.needCalc = false; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 9cef8f584e..491a1da6aa 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -243,7 +243,7 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl for (int32_t i = 0; i < rows;) { if (pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, &pInfo->twAggSup, - pSDataBlock->info.id.uid, startTsCols[i])) { + pSDataBlock->info.id.uid, startTsCols[i], NULL, 0)) { i++; continue; } @@ -729,7 +729,7 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState); if (downstream) { - initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); + initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic); code = appendDownstream(pOperator, &downstream, 1); } return pOperator; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 22c7f20658..1116851323 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -312,7 +312,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl blockDataEnsureCapacity(pAggSup->pScanBlock, rows); for (int32_t i = 0; i < rows; i += winRows) { if (pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, - &pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i])) { + &pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i], NULL, 0)) { i++; continue; } @@ -778,7 +778,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAgg, NULL, destroyStreamEventOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamEventReleaseState, streamEventReloadState); - initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); + initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 0cab4b7951..0f6d78caa3 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -487,13 +487,14 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup; if (!pScanInfo->pUpdateInfo) { pScanInfo->pUpdateInfo = - pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate); + pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen); } pScanInfo->interval = pInfo->interval; pScanInfo->twAggSup = pInfo->twAggSup; pScanInfo->pState = pInfo->pState; pInfo->pUpdateInfo = pScanInfo->pUpdateInfo; + pInfo->basic.primaryPkIndex = pScanInfo->primaryKeyIndex; } void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput, @@ -826,6 +827,10 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN return startPos; } +bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { + return pInfo->primaryPkIndex != -1; +} + static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId, SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) { SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info; @@ -845,6 +850,13 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); tsCols = (int64_t*)pColDataInfo->pData; + void* pPkVal = NULL; + int32_t pkLen = 0; + SColumnInfoData* pPkColDataInfo = NULL; + if (hasSrcPrimaryKeyCol(&pInfo->basic)) { + pPkColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); + } + if (pSDataBlock->info.window.skey != tsCols[0] || pSDataBlock->info.window.ekey != tsCols[endRowId]) { qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64 ",maxKey %" PRId64, @@ -868,9 +880,15 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat } while (1) { bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); + if (hasSrcPrimaryKeyCol(&pInfo->basic) && !IS_FINAL_INTERVAL_OP(pOperator) && pInfo->ignoreExpiredData && + pSDataBlock->info.type != STREAM_PULL_DATA) { + pPkVal = colDataGetData(pPkColDataInfo, startPos); + pkLen = colDataGetRowLength(pPkColDataInfo, startPos); + } + if ((!IS_FINAL_INTERVAL_OP(pOperator) && pInfo->ignoreExpiredData && pSDataBlock->info.type != STREAM_PULL_DATA && checkExpiredData(&pInfo->stateStore, pInfo->pUpdateInfo, &pInfo->twAggSup, pSDataBlock->info.id.uid, - nextWin.ekey)) || + nextWin.ekey, NULL, 0)) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); if (startPos < 0) { @@ -1722,14 +1740,14 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num } void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex, - STimeWindowAggSupp* pTwSup) { + STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic) { if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) { SStreamPartitionOperatorInfo* pScanInfo = downstream->info; pScanInfo->tsColIndex = tsColIndex; } if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - initDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup); + initDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic); return; } SStreamScanInfo* pScanInfo = downstream->info; @@ -1737,10 +1755,11 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin pScanInfo->pState = pAggSup->pState; if (!pScanInfo->pUpdateInfo) { pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark, - pScanInfo->igCheckUpdate); + pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen); } pScanInfo->twAggSup = *pTwSup; pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo; + pBasic->primaryPkIndex = pScanInfo->primaryKeyIndex; } static TSKEY sesionTs(void* pKey) { @@ -2113,10 +2132,22 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } TSKEY* endTsCols = (int64_t*)pEndTsCol->pData; + + void* pPkVal = NULL; + int32_t pkLen = 0; + SColumnInfoData* pPkColDataInfo = NULL; + if (hasSrcPrimaryKeyCol(&pInfo->basic)) { + pPkColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); + } + for (int32_t i = 0; i < rows;) { + if (hasSrcPrimaryKeyCol(&pInfo->basic) && !IS_FINAL_SESSION_OP(pOperator) && pInfo->ignoreExpiredData) { + pPkVal = colDataGetData(pPkColDataInfo, i); + pkLen = colDataGetRowLength(pPkColDataInfo, i); + } if (!IS_FINAL_SESSION_OP(pOperator) && pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, &pInfo->twAggSup, - pSDataBlock->info.id.uid, endTsCols[i])) { + pSDataBlock->info.id.uid, endTsCols[i], NULL, 0)) { i++; continue; } @@ -3058,7 +3089,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState); if (downstream) { - initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); + initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic); code = appendDownstream(pOperator, &downstream, 1); } return pOperator; @@ -3490,7 +3521,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId); for (int32_t i = 0; i < rows; i += winRows) { if (pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, - &pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i]) || + &pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i], NULL, 0) || colDataIsNull_s(pKeyColInfo, i)) { i++; continue; @@ -3957,7 +3988,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamStateReloadState); - initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); + initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 764bf6e026..263400b7d3 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +#include "tcompare.h" +#include "tdatablock.h" #include "tencode.h" #include "tstreamUpdate.h" #include "ttime.h" @@ -31,6 +33,40 @@ static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); } +bool lowerThanTs(void* pTs1, void* pTs2, void* pPkVal, __compar_fn_t cmpPkFn) { + return *((TSKEY*)pTs1) < *((TSKEY*)pTs2); +} + +bool lowerThanPk(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn) { + int res = compareInt64Val(pValue1, pTs); + if (res < 0) { + return true; + } else if (res == 0) { + void* pk1 = (char*)pValue1 + sizeof(TSKEY); + return (cmpPkFn(pk1, pPkVal) < 0); + } + return false; +} + +int32_t getKeyBuff(TSKEY ts, int64_t tbUid, void* pVal, int32_t len, char* buff) { + *(TSKEY*)buff = ts; + memcpy(buff+ sizeof(TSKEY), &tbUid, sizeof(int64_t)); + if (len == 0) { + return sizeof(TSKEY) + sizeof(int64_t); + } + memcpy(buff, pVal, len); + return sizeof(TSKEY) + sizeof(int64_t) + len; +} + +int32_t getValueBuff(TSKEY ts, char* pVal, int32_t len, char* buff) { + *(TSKEY*)buff = ts; + if (len == 0) { + return sizeof(TSKEY); + } + memcpy(buff + sizeof(TSKEY), pVal, len); + return sizeof(TSKEY) + len; +} + void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { if (pInfo->numSBFs < count) { count = pInfo->numSBFs; @@ -89,11 +125,11 @@ static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t w return watermark; } -SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark, bool igUp) { - return updateInfoInit(pInterval->interval, pInterval->precision, watermark, igUp); +SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen) { + return updateInfoInit(pInterval->interval, pInterval->precision, watermark, igUp, pkType, pkLen); } -SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp) { +SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen) { SUpdateInfo *pInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); if (pInfo == NULL) { return NULL; @@ -133,6 +169,17 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT); pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK); pInfo->maxDataVersion = 0; + pInfo->pkColLen = pkLen; + pInfo->pkColType = pkType; + pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pkLen); + pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pkLen); + if (pkLen != 0) { + pInfo->comparePkRowFn = lowerThanPk; + pInfo->comparePkCol = getKeyComparFunc(pkType, TSDB_ORDER_ASC);; + } else { + pInfo->comparePkRowFn = lowerThanTs; + pInfo->comparePkCol = NULL; + } return pInfo; } @@ -168,47 +215,60 @@ bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) { return false; } -TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol) { +TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol, int32_t primaryKeyCol) { if (pBlock == NULL || pBlock->info.rows == 0) return INT64_MIN; TSKEY maxTs = INT64_MIN; + void* pPkVal = NULL; + void* pMaxPkVal = NULL; + int32_t maxLen = 0; + int32_t len = 0; int64_t tbUid = pBlock->info.id.uid; SColumnInfoData *pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsCol); + SColumnInfoData *pPkDataInfo = NULL; + if (primaryKeyCol >= 0) { + pPkDataInfo = taosArrayGet(pBlock->pDataBlock, primaryKeyCol); + } for (int32_t i = 0; i < pBlock->info.rows; i++) { TSKEY ts = ((TSKEY *)pColDataInfo->pData)[i]; - maxTs = TMAX(maxTs, ts); + if (maxTs < ts) { + maxTs = ts; + if (primaryKeyCol >= 0) { + pMaxPkVal = colDataGetData(pPkDataInfo, i); + maxLen = colDataGetRowLength(pPkDataInfo, i); + } + } SScalableBf *pSBf = getSBf(pInfo, ts); if (pSBf) { - SUpdateKey updateKey = { - .tbUid = tbUid, - .ts = ts, - }; - tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey)); + if (primaryKeyCol >= 0) { + pPkVal = colDataGetData(pPkDataInfo, i); + len = colDataGetRowLength(pPkDataInfo, i); + } + int32_t buffLen = getKeyBuff(ts, tbUid, pPkVal, len, pInfo->pKeyBuff); + tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen); } } - TSKEY *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t)); - if (pMaxTs == NULL || *pMaxTs > maxTs) { - taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), &maxTs, sizeof(TSKEY)); + void *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t)); + if (pMaxTs == NULL || pInfo->comparePkRowFn(pMaxTs, &maxTs, pMaxPkVal, pInfo->comparePkCol)) { + int32_t valueLen = getValueBuff(maxTs, pMaxPkVal, maxLen, pInfo->pValueBuff); + taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), pInfo->pValueBuff, valueLen); } return maxTs; } -bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { +bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) { int32_t res = TSDB_CODE_FAILED; + int32_t buffLen = 0; - SUpdateKey updateKey = { - .tbUid = tableId, - .ts = ts, - }; - - TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); + buffLen = getKeyBuff(ts, tableId, pPkVal, len, pInfo->pKeyBuff); + void* *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets; TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index); if (ts < maxTs - pInfo->watermark) { // this window has been closed. if (pInfo->pCloseWinSBF) { - res = tScalableBfPut(pInfo->pCloseWinSBF, &updateKey, sizeof(SUpdateKey)); + res = tScalableBfPut(pInfo->pCloseWinSBF, pInfo->pKeyBuff, buffLen); if (res == TSDB_CODE_SUCCESS) { return false; } else { @@ -221,18 +281,19 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { SScalableBf *pSBf = getSBf(pInfo, ts); int32_t size = taosHashGetSize(pInfo->pMap); - if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) { - taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY)); + if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol))) { + int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff); + taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen); // pSBf may be a null pointer if (pSBf) { - res = tScalableBfPutNoCheck(pSBf, &updateKey, sizeof(SUpdateKey)); + res = tScalableBfPutNoCheck(pSBf, pInfo->pKeyBuff, buffLen); } return false; } // pSBf may be a null pointer if (pSBf) { - res = tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey)); + res = tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen); } if (!pMapMaxTs && maxTs < ts) { @@ -262,6 +323,8 @@ void updateInfoDestroy(SUpdateInfo *pInfo) { } taosArrayDestroy(pInfo->pTsSBFs); + taosMemoryFreeClear(pInfo->pKeyBuff); + taosMemoryFreeClear(pInfo->pValueBuff); taosHashCleanup(pInfo->pMap); updateInfoDestoryColseWinSBF(pInfo); taosMemoryFree(pInfo); @@ -322,11 +385,15 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) { void *key = taosHashGetKey(pIte, &keyLen); if (tEncodeU64(&encoder, *(uint64_t *)key) < 0) return -1; - if (tEncodeI64(&encoder, *(TSKEY *)pIte) < 0) return -1; + int32_t valueSize = taosHashGetValueSize(pIte); + if (tEncodeBinary(&encoder, (const uint8_t *)pIte, valueSize) < 0) return -1; } if (tEncodeU64(&encoder, pInfo->maxDataVersion) < 0) return -1; + if (tEncodeI32(&encoder, pInfo->pkColLen) < 0) return -1; + if (tEncodeI8(&encoder, pInfo->pkColType) < 0) return -1; + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -371,28 +438,43 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT); pInfo->pMap = taosHashInit(mapSize, hashFn, true, HASH_NO_LOCK); uint64_t uid = 0; - ts = INT64_MIN; + void* pVal = NULL; + int32_t valSize = 0; for (int32_t i = 0; i < mapSize; i++) { if (tDecodeU64(&decoder, &uid) < 0) return -1; - if (tDecodeI64(&decoder, &ts) < 0) return -1; - taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY)); + if (tDecodeBinary(&decoder, (uint8_t**)&pVal, &valSize) < 0) return -1; + taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &pVal, valSize); } ASSERT(mapSize == taosHashGetSize(pInfo->pMap)); if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1; + if (tDecodeI32(&decoder, &pInfo->pkColLen) < 0) return -1; + if (tDecodeI8(&decoder, &pInfo->pkColType) < 0) return -1; + + pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pInfo->pkColLen); + pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pInfo->pkColLen); + if (pInfo->pkColLen != 0) { + pInfo->comparePkRowFn = lowerThanPk; + pInfo->comparePkCol = NULL; + } else { + pInfo->comparePkRowFn = lowerThanTs; + pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC); + } + tEndDecode(&decoder); tDecoderClear(&decoder); return 0; } -bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { +bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) { TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); bool res = true; - if (pMapMaxTs && ts < *pMapMaxTs) { + if (pMapMaxTs && !pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol)) { res = false; } else { - taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY)); + int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff); + taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen); } return res; } diff --git a/source/util/src/thash.c b/source/util/src/thash.c index cf4f17bfbc..8c0ca3e5a7 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -719,6 +719,11 @@ void *taosHashGetKey(void *data, size_t *keyLen) { return GET_HASH_NODE_KEY(node); } +int32_t taosHashGetValueSize(void *data) { + SHashNode *node = GET_HASH_PNODE(data); + return node->dataLen; +} + // release the pNode, return next pNode, and lock the current entry static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) { SHashNode *pOld = (SHashNode *)GET_HASH_PNODE(p); From d8946dbedd455c849c260c149039eeefd63bbfd4 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 24 Apr 2024 17:33:39 +0800 Subject: [PATCH 2/4] opt bloom filter --- include/util/thash.h | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/include/util/thash.h b/include/util/thash.h index 5dec175a38..c6275d276c 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -180,6 +180,13 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p); */ void *taosHashGetKey(void *data, size_t *keyLen); +/** + * Get the corresponding value length for a given data in hash table + * @param data + * @return + */ +int32_t taosHashGetValueSize(void *data); + /** * return the payload data with the specified key(reference number added) * @@ -215,13 +222,6 @@ void taosHashSetFreeFp(SHashObj *pHashObj, _hash_free_fn_t fp); int64_t taosHashGetCompTimes(SHashObj *pHashObj); -/** - * Get the corresponding value length for a given data in hash table - * @param data - * @return - */ -int32_t taosHashGetValueSize(void *data); - #ifdef __cplusplus } #endif From 85cd290aa57172a225597c770940fa9b71980a15 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 24 Apr 2024 18:40:39 +0800 Subject: [PATCH 3/4] opt bloom filter --- include/libs/executor/storageapi.h | 2 +- .../executor/src/streamtimewindowoperator.c | 4 +-- source/libs/stream/src/streamUpdate.c | 35 +++++++++---------- 3 files changed, 20 insertions(+), 21 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index bf7a2a869a..ec92bd56dd 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -312,7 +312,7 @@ typedef struct SUpdateInfo { char* pKeyBuff; char* pValueBuff; - bool (*comparePkRowFn)(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn); + int (*comparePkRowFn)(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn); __compar_fn_t comparePkCol; } SUpdateInfo; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 0f6d78caa3..08ce0e25f1 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -888,7 +888,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat if ((!IS_FINAL_INTERVAL_OP(pOperator) && pInfo->ignoreExpiredData && pSDataBlock->info.type != STREAM_PULL_DATA && checkExpiredData(&pInfo->stateStore, pInfo->pUpdateInfo, &pInfo->twAggSup, pSDataBlock->info.id.uid, - nextWin.ekey, NULL, 0)) || + nextWin.ekey, pPkVal, pkLen)) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); if (startPos < 0) { @@ -2147,7 +2147,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } if (!IS_FINAL_SESSION_OP(pOperator) && pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, &pInfo->twAggSup, - pSDataBlock->info.id.uid, endTsCols[i], NULL, 0)) { + pSDataBlock->info.id.uid, endTsCols[i], pPkVal, pkLen)) { i++; continue; } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 263400b7d3..0adb7050c6 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -33,19 +33,18 @@ static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); } -bool lowerThanTs(void* pTs1, void* pTs2, void* pPkVal, __compar_fn_t cmpPkFn) { - return *((TSKEY*)pTs1) < *((TSKEY*)pTs2); +int compareKeyTs(void* pTs1, void* pTs2, void* pPkVal, __compar_fn_t cmpPkFn) { + return compareInt64Val(pTs1, pTs2);; } -bool lowerThanPk(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn) { +int compareKeyTsAndPk(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn) { int res = compareInt64Val(pValue1, pTs); - if (res < 0) { - return true; - } else if (res == 0) { + if (res != 0) { + return res; + } else { void* pk1 = (char*)pValue1 + sizeof(TSKEY); - return (cmpPkFn(pk1, pPkVal) < 0); + return cmpPkFn(pk1, pPkVal); } - return false; } int32_t getKeyBuff(TSKEY ts, int64_t tbUid, void* pVal, int32_t len, char* buff) { @@ -174,10 +173,10 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pkLen); pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pkLen); if (pkLen != 0) { - pInfo->comparePkRowFn = lowerThanPk; + pInfo->comparePkRowFn = compareKeyTsAndPk; pInfo->comparePkCol = getKeyComparFunc(pkType, TSDB_ORDER_ASC);; } else { - pInfo->comparePkRowFn = lowerThanTs; + pInfo->comparePkRowFn = compareKeyTs; pInfo->comparePkCol = NULL; } return pInfo; @@ -250,7 +249,7 @@ TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t p } } void *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t)); - if (pMaxTs == NULL || pInfo->comparePkRowFn(pMaxTs, &maxTs, pMaxPkVal, pInfo->comparePkCol)) { + if (pMaxTs == NULL || pInfo->comparePkRowFn(pMaxTs, &maxTs, pMaxPkVal, pInfo->comparePkCol) == -1) { int32_t valueLen = getValueBuff(maxTs, pMaxPkVal, maxLen, pInfo->pValueBuff); taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), pInfo->pValueBuff, valueLen); } @@ -281,7 +280,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* p SScalableBf *pSBf = getSBf(pInfo, ts); int32_t size = taosHashGetSize(pInfo->pMap); - if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol))) { + if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == -1 )) { int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff); taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen); // pSBf may be a null pointer @@ -443,7 +442,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { for (int32_t i = 0; i < mapSize; i++) { if (tDecodeU64(&decoder, &uid) < 0) return -1; if (tDecodeBinary(&decoder, (uint8_t**)&pVal, &valSize) < 0) return -1; - taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &pVal, valSize); + taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), pVal, valSize); } ASSERT(mapSize == taosHashGetSize(pInfo->pMap)); if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1; @@ -454,11 +453,11 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pInfo->pkColLen); pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pInfo->pkColLen); if (pInfo->pkColLen != 0) { - pInfo->comparePkRowFn = lowerThanPk; - pInfo->comparePkCol = NULL; + pInfo->comparePkRowFn = compareKeyTsAndPk; + pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC);; } else { - pInfo->comparePkRowFn = lowerThanTs; - pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC); + pInfo->comparePkRowFn = compareKeyTs; + pInfo->comparePkCol = NULL; } tEndDecode(&decoder); @@ -470,7 +469,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) { TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); bool res = true; - if (pMapMaxTs && !pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol)) { + if (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == 1) { res = false; } else { int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff); From 403a877564353363e958cb542d099b37d58ae630 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 25 Apr 2024 08:52:29 +0800 Subject: [PATCH 4/4] set pk length --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ef92e566b6..838db4c571 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3211,7 +3211,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (hasPrimaryKeyCol(pInfo)) { addPrimaryKeyCol(pInfo->pUpdateDataRes, pkType.type, pkType.bytes); pInfo->pkColType = pkType.type; - pInfo->pkColLen = pkType.bytes + 2; + pInfo->pkColLen = pkType.bytes; } pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->partitionSup.needCalc = false;