From 2ced6c280cae7faca7aa0ee2bb2efbf890c73577 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 16 Aug 2022 12:11:56 +0800 Subject: [PATCH] enh(stream): reduce table scan --- include/libs/stream/tstreamUpdate.h | 45 +++++------ source/client/src/tmq.c | 20 ++--- source/libs/executor/src/scanoperator.c | 47 ++++++------ source/libs/stream/src/streamUpdate.c | 99 ++++++++++++++----------- 4 files changed, 113 insertions(+), 98 deletions(-) diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index 78543118da..c186430f3f 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -25,33 +25,34 @@ extern "C" { #endif typedef struct SUpdateInfo { - SArray *pTsBuckets; - uint64_t numBuckets; - SArray *pTsSBFs; - uint64_t numSBFs; - int64_t interval; - int64_t watermark; - TSKEY minTS; - SScalableBf* pCloseWinSBF; - SHashObj* pMap; - STimeWindow scanWindow; - uint64_t scanGroupId; - uint64_t maxVersion; + SArray *pTsBuckets; + uint64_t numBuckets; + SArray *pTsSBFs; + uint64_t numSBFs; + int64_t interval; + int64_t watermark; + TSKEY minTS; + SScalableBf *pCloseWinSBF; + SHashObj *pMap; + STimeWindow scanWindow; + uint64_t scanGroupId; + uint64_t maxVersion; } SUpdateInfo; -SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark); +SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark); SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); -bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); -void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version); -bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version); -void updateInfoDestroy(SUpdateInfo *pInfo); -void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); -void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo); -int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo); -int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo); +bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); +bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid); +void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version); +bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version); +void updateInfoDestroy(SUpdateInfo *pInfo); +void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); +void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo); +int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo); +int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo); #ifdef __cplusplus } #endif -#endif /* ifndef _TSTREAMUPDATE_H_ */ \ No newline at end of file +#endif /* ifndef _TSTREAMUPDATE_H_ */ diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 086cf653e9..7637ffbc80 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -56,8 +56,8 @@ struct tmq_conf_t { int8_t autoCommit; int8_t resetOffset; int8_t withTbName; - int8_t ssEnable; - int32_t ssBatchSize; + int8_t snapEnable; + int32_t snapBatchSize; bool hbBgEnable; @@ -287,16 +287,21 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value if (strcmp(key, "experimental.snapshot.enable") == 0) { if (strcmp(value, "true") == 0) { - conf->ssEnable = true; + conf->snapEnable = true; return TMQ_CONF_OK; } else if (strcmp(value, "false") == 0) { - conf->ssEnable = false; + conf->snapEnable = false; return TMQ_CONF_OK; } else { return TMQ_CONF_INVALID; } } + if (strcmp(key, "experimental.snapshot.batch.size") == 0) { + conf->snapBatchSize = atoi(value); + return TMQ_CONF_OK; + } + if (strcmp(key, "enable.heartbeat.background") == 0) { if (strcmp(value, "true") == 0) { conf->hbBgEnable = true; @@ -310,11 +315,6 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value return TMQ_CONF_OK; } - if (strcmp(key, "experimental.snapshot.batch.size") == 0) { - conf->ssBatchSize = atoi(value); - return TMQ_CONF_OK; - } - if (strcmp(key, "td.connect.ip") == 0) { conf->ip = strdup(value); return TMQ_CONF_OK; @@ -889,7 +889,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); pTmq->withTbName = conf->withTbName; - pTmq->useSnapshot = conf->ssEnable; + pTmq->useSnapshot = conf->snapEnable; pTmq->autoCommit = conf->autoCommit; pTmq->autoCommitInterval = conf->autoCommitInterval; pTmq->commitCb = conf->commitCb; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d8de8df163..1d3ceece3a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,11 +13,11 @@ * along with this program. If not, see . */ -#include "os.h" #include "executorimpl.h" #include "filter.h" #include "function.h" #include "functionMgt.h" +#include "os.h" #include "querynodes.h" #include "systable.h" #include "tname.h" @@ -178,8 +178,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro STableScanInfo* pTableScanInfo = pOperator->info; - SResultRowPosition* p1 = - (SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); + SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, + GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); if (p1 == NULL) { return NULL; @@ -238,7 +238,7 @@ static FORCE_INLINE bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDat // todo move to the initialization function int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); - bool keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows); + bool keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows); filterFreeInfo(filter); return keep; @@ -312,9 +312,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca return TSDB_CODE_SUCCESS; } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { pCost->loadBlockStatis += 1; - loadSMA = true; // mark the operation of load sma; + loadSMA = true; // mark the operation of load sma; bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo); - if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead + if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); return TSDB_CODE_SUCCESS; @@ -453,7 +453,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows); } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) { colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows); - } else { // todo opt for json tag + } else { // todo opt for json tag for (int32_t i = 0; i < pBlock->info.rows; ++i) { colDataAppend(pColInfoData, i, data, false); } @@ -570,7 +570,10 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); pTableScanInfo->scanFlag = REPEAT_SCAN; - qDebug("%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks due to query func required", GET_TASKID(pTaskInfo)); + qDebug( + "%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks " + "due to query func required", + GET_TASKID(pTaskInfo)); // do prepare for the next round table scan operation tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); @@ -1174,16 +1177,18 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; - bool isClosed = false; + bool isClosed = false; STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX}; if (isOverdue(tsCol[rowId], &pInfo->twAggSup)) { win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC); isClosed = isCloseWindow(&win, &pInfo->twAggSup); } + bool inserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid); // must check update info first. bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); - if ((update || (isSignleIntervalWindow(pInfo) && isClosed && - isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup))) && out) { + bool closedWin = isClosed && inserted && isSignleIntervalWindow(pInfo) && + isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup); + if ((update || closedWin) && out) { appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid); } } @@ -1390,8 +1395,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); if (pSDB) { STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader); - updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId,version); + uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader); + updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId, version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); return pSDB; @@ -1445,7 +1450,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { setBlockIntoRes(pInfo, &block); - if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId, pInfo->pRes->info.version)) { + if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId, + pInfo->pRes->info.version)) { printDataBlock(pInfo->pRes, "stream scan ignore"); blockDataCleanup(pInfo->pRes); continue; @@ -2248,7 +2254,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { // build message and send to mnode to fetch the content of system tables. SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSysTableScanInfo* pInfo = pOperator->info; - char dbName[TSDB_DB_NAME_LEN] = {0}; + char dbName[TSDB_DB_NAME_LEN] = {0}; const char* name = tNameGetTableName(&pInfo->name); if (pInfo->showRewrite) { @@ -2260,8 +2266,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { return sysTableScanUserTables(pOperator); } else if (strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) { return sysTableScanUserTags(pOperator); - } else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && - pInfo->showRewrite && IS_SYS_DBNAME(dbName)) { + } else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && pInfo->showRewrite && + IS_SYS_DBNAME(dbName)) { return sysTableScanUserSTables(pOperator); } else { // load the meta from mnode of the given epset if (pOperator->status == OP_EXEC_DONE) { @@ -2541,7 +2547,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { pInfo->pRes = blockDataDestroy(pInfo->pRes); taosArrayDestroy(pInfo->pColMatchInfo); - + taosMemoryFreeClear(param); } @@ -2597,7 +2603,6 @@ _error: int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* idStr) { - int64_t st = taosGetTimestampUs(); int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo); @@ -2606,7 +2611,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags } int64_t st1 = taosGetTimestampUs(); - qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1-st)/1000.0, idStr); + qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr); if (taosArrayGetSize(pTableListInfo->pTableList) == 0) { qDebug("no table qualified for query, %s" PRIx64, idStr); @@ -2620,7 +2625,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags } int64_t st2 = taosGetTimestampUs(); - qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2-st1)/1000.0, idStr); + qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2 - st1) / 1000.0, idStr); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 0b1ce27b77..d053662bd3 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -13,33 +13,31 @@ * along with this program. If not, see . */ -#include "tstreamUpdate.h" -#include "tencode.h" -#include "ttime.h" #include "query.h" +#include "tencode.h" +#include "tstreamUpdate.h" +#include "ttime.h" -#define DEFAULT_FALSE_POSITIVE 0.01 -#define DEFAULT_BUCKET_SIZE 1310720 -#define DEFAULT_MAP_CAPACITY 1310720 -#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 10) -#define ROWS_PER_MILLISECOND 1 -#define MAX_NUM_SCALABLE_BF 100000 -#define MIN_NUM_SCALABLE_BF 10 -#define DEFAULT_PREADD_BUCKET 1 -#define MAX_INTERVAL MILLISECOND_PER_MINUTE -#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10) -#define DEFAULT_EXPECTED_ENTRIES 10000 +#define DEFAULT_FALSE_POSITIVE 0.01 +#define DEFAULT_BUCKET_SIZE 1310720 +#define DEFAULT_MAP_CAPACITY 1310720 +#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 10) +#define ROWS_PER_MILLISECOND 1 +#define MAX_NUM_SCALABLE_BF 100000 +#define MIN_NUM_SCALABLE_BF 10 +#define DEFAULT_PREADD_BUCKET 1 +#define MAX_INTERVAL MILLISECOND_PER_MINUTE +#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10) +#define DEFAULT_EXPECTED_ENTRIES 10000 -static int64_t adjustExpEntries(int64_t entries) { - return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); -} +static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); } static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { if (pInfo->numSBFs < count) { count = pInfo->numSBFs; } for (uint64_t i = 0; i < count; ++i) { - int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND); + int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND); SScalableBf *tsSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE); taosArrayPush(pInfo->pTsSBFs, &tsSBF); } @@ -78,7 +76,7 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) { static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) { if (watermark <= adjInterval) { - watermark = TMAX(originInt/adjInterval, 1) * adjInterval; + watermark = TMAX(originInt / adjInterval, 1) * adjInterval; } else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) { watermark = MAX_NUM_SCALABLE_BF * adjInterval; }/* else if (watermark < MIN_NUM_SCALABLE_BF * adjInterval) { @@ -158,11 +156,17 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) { return res; } +bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) { + void *pVal = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t)); + if (pVal || taosHashGetSize(pInfo->pMap) >= DEFAULT_MAP_SIZE) return true; + return false; +} + bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { - int32_t res = TSDB_CODE_FAILED; - TSKEY* pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); - uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets; - TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index); + int32_t res = TSDB_CODE_FAILED; + TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); + uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets; + TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index); if (ts < maxTs - pInfo->watermark) { // this window has been closed. if (pInfo->pCloseWinSBF) { @@ -178,42 +182,47 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { } int32_t size = taosHashGetSize(pInfo->pMap); - if ( (!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) { + if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) { taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY)); return false; } - if ( !pMapMaxTs && maxTs < ts ) { + if (!pMapMaxTs && maxTs < ts) { taosArraySet(pInfo->pTsBuckets, index, &ts); return false; } if (ts < pInfo->minTS) { - qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts); + qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId, + maxTs, *pMapMaxTs, ts); return true; } else if (res == TSDB_CODE_SUCCESS) { return false; } - qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts); + qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId, + maxTs, *pMapMaxTs, ts); // check from tsdb api return true; } -void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) { - qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version); +void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) { + qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId, + pWin->skey, pWin->ekey, version); pInfo->scanWindow = *pWin; pInfo->scanGroupId = groupId; pInfo->maxVersion = version; } -bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) { +bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) { if (!pInfo) { return false; } - qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version); - if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey && - pWin->ekey <= pInfo->scanWindow.ekey && version <= pInfo->maxVersion ) { - qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version); + qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId, + pWin->skey, pWin->ekey, version); + if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey && pWin->ekey <= pInfo->scanWindow.ekey && + version <= pInfo->maxVersion) { + qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId, + pWin->skey, pWin->ekey, version); return true; } return false; @@ -261,7 +270,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) int32_t size = taosArrayGetSize(pInfo->pTsBuckets); if (tEncodeI32(&encoder, size) < 0) return -1; for (int32_t i = 0; i < size; i++) { - TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->pTsBuckets, i); + TSKEY *pTs = (TSKEY *)taosArrayGet(pInfo->pTsBuckets, i); if (tEncodeI64(&encoder, *pTs) < 0) return -1; } @@ -270,7 +279,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs); if (tEncodeI32(&encoder, sBfSize) < 0) return -1; for (int32_t i = 0; i < sBfSize; i++) { - SScalableBf* pSBf = taosArrayGetP(pInfo->pTsSBFs, i); + SScalableBf *pSBf = taosArrayGetP(pInfo->pTsSBFs, i); if (tScalableBfEncode(pSBf, &encoder) < 0) return -1; } @@ -278,17 +287,17 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) if (tEncodeI64(&encoder, pInfo->interval) < 0) return -1; if (tEncodeI64(&encoder, pInfo->watermark) < 0) return -1; if (tEncodeI64(&encoder, pInfo->minTS) < 0) return -1; - + if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) return -1; int32_t mapSize = taosHashGetSize(pInfo->pMap); if (tEncodeI32(&encoder, mapSize) < 0) return -1; - void* pIte = NULL; + void *pIte = NULL; size_t keyLen = 0; while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) { - void* key = taosHashGetKey(pIte, &keyLen); - if (tEncodeU64(&encoder, *(uint64_t*)key) < 0) return -1; - if (tEncodeI64(&encoder, *(TSKEY*)pIte) < 0) return -1; + void *key = taosHashGetKey(pIte, &keyLen); + if (tEncodeU64(&encoder, *(uint64_t *)key) < 0) return -1; + if (tEncodeI64(&encoder, *(TSKEY *)pIte) < 0) return -1; } if (tEncodeI64(&encoder, pInfo->scanWindow.skey) < 0) return -1; @@ -311,7 +320,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { int32_t size = 0; if (tDecodeI32(&decoder, &size) < 0) return -1; - pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY)); + pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY)); TSKEY ts = INT64_MIN; for (int32_t i = 0; i < size; i++) { if (tDecodeI64(&decoder, &ts) < 0) return -1; @@ -324,7 +333,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { if (tDecodeI32(&decoder, &sBfSize) < 0) return -1; pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void *)); for (int32_t i = 0; i < sBfSize; i++) { - SScalableBf* pSBf = tScalableBfDecode(&decoder); + SScalableBf *pSBf = tScalableBfDecode(&decoder); if (!pSBf) return -1; taosArrayPush(pInfo->pTsSBFs, &pSBf); } @@ -337,11 +346,11 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { int32_t mapSize = 0; if (tDecodeI32(&decoder, &mapSize) < 0) return -1; - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT); pInfo->pMap = taosHashInit(mapSize, hashFn, true, HASH_NO_LOCK); uint64_t uid = 0; ts = INT64_MIN; - for(int32_t i = 0; i < mapSize; i++) { + for (int32_t i = 0; i < mapSize; i++) { if (tDecodeU64(&decoder, &uid) < 0) return -1; if (tDecodeI64(&decoder, &ts) < 0) return -1; taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY));