diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h
index 78543118da..c186430f3f 100644
--- a/include/libs/stream/tstreamUpdate.h
+++ b/include/libs/stream/tstreamUpdate.h
@@ -25,33 +25,34 @@ extern "C" {
#endif
typedef struct SUpdateInfo {
- SArray *pTsBuckets;
- uint64_t numBuckets;
- SArray *pTsSBFs;
- uint64_t numSBFs;
- int64_t interval;
- int64_t watermark;
- TSKEY minTS;
- SScalableBf* pCloseWinSBF;
- SHashObj* pMap;
- STimeWindow scanWindow;
- uint64_t scanGroupId;
- uint64_t maxVersion;
+ SArray *pTsBuckets;
+ uint64_t numBuckets;
+ SArray *pTsSBFs;
+ uint64_t numSBFs;
+ int64_t interval;
+ int64_t watermark;
+ TSKEY minTS;
+ SScalableBf *pCloseWinSBF;
+ SHashObj *pMap;
+ STimeWindow scanWindow;
+ uint64_t scanGroupId;
+ uint64_t maxVersion;
} SUpdateInfo;
-SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
+SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
-bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
-void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version);
-bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version);
-void updateInfoDestroy(SUpdateInfo *pInfo);
-void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
-void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
-int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo);
-int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo);
+bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
+bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid);
+void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version);
+bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version);
+void updateInfoDestroy(SUpdateInfo *pInfo);
+void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
+void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
+int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo);
+int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo);
#ifdef __cplusplus
}
#endif
-#endif /* ifndef _TSTREAMUPDATE_H_ */
\ No newline at end of file
+#endif /* ifndef _TSTREAMUPDATE_H_ */
diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c
index 086cf653e9..7637ffbc80 100644
--- a/source/client/src/tmq.c
+++ b/source/client/src/tmq.c
@@ -56,8 +56,8 @@ struct tmq_conf_t {
int8_t autoCommit;
int8_t resetOffset;
int8_t withTbName;
- int8_t ssEnable;
- int32_t ssBatchSize;
+ int8_t snapEnable;
+ int32_t snapBatchSize;
bool hbBgEnable;
@@ -287,16 +287,21 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
if (strcmp(key, "experimental.snapshot.enable") == 0) {
if (strcmp(value, "true") == 0) {
- conf->ssEnable = true;
+ conf->snapEnable = true;
return TMQ_CONF_OK;
} else if (strcmp(value, "false") == 0) {
- conf->ssEnable = false;
+ conf->snapEnable = false;
return TMQ_CONF_OK;
} else {
return TMQ_CONF_INVALID;
}
}
+ if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
+ conf->snapBatchSize = atoi(value);
+ return TMQ_CONF_OK;
+ }
+
if (strcmp(key, "enable.heartbeat.background") == 0) {
if (strcmp(value, "true") == 0) {
conf->hbBgEnable = true;
@@ -310,11 +315,6 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
return TMQ_CONF_OK;
}
- if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
- conf->ssBatchSize = atoi(value);
- return TMQ_CONF_OK;
- }
-
if (strcmp(key, "td.connect.ip") == 0) {
conf->ip = strdup(value);
return TMQ_CONF_OK;
@@ -889,7 +889,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId);
pTmq->withTbName = conf->withTbName;
- pTmq->useSnapshot = conf->ssEnable;
+ pTmq->useSnapshot = conf->snapEnable;
pTmq->autoCommit = conf->autoCommit;
pTmq->autoCommitInterval = conf->autoCommitInterval;
pTmq->commitCb = conf->commitCb;
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index fa9e3a1d85..454a0b0070 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -13,11 +13,11 @@
* along with this program. If not, see .
*/
-#include "os.h"
#include "executorimpl.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
+#include "os.h"
#include "querynodes.h"
#include "systable.h"
#include "tname.h"
@@ -178,8 +178,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
STableScanInfo* pTableScanInfo = pOperator->info;
- SResultRowPosition* p1 =
- (SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
+ SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf,
+ GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
if (p1 == NULL) {
return NULL;
@@ -238,7 +238,7 @@ static FORCE_INLINE bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDat
// todo move to the initialization function
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
- bool keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows);
+ bool keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows);
filterFreeInfo(filter);
return keep;
@@ -312,9 +312,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
pCost->loadBlockStatis += 1;
- loadSMA = true; // mark the operation of load sma;
+ loadSMA = true; // mark the operation of load sma;
bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
- if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
+ if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
return TSDB_CODE_SUCCESS;
@@ -453,7 +453,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows);
} else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows);
- } else { // todo opt for json tag
+ } else { // todo opt for json tag
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, data, false);
}
@@ -570,7 +570,10 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTableScanInfo->scanFlag = REPEAT_SCAN;
- qDebug("%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks due to query func required", GET_TASKID(pTaskInfo));
+ qDebug(
+ "%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks "
+ "due to query func required",
+ GET_TASKID(pTaskInfo));
// do prepare for the next round table scan operation
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
@@ -1174,16 +1177,18 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
- bool isClosed = false;
+ bool isClosed = false;
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
if (isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
isClosed = isCloseWindow(&win, &pInfo->twAggSup);
}
+ bool inserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid);
// must check update info first.
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
- if ((update || (isSignleIntervalWindow(pInfo) && isClosed &&
- isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup))) && out) {
+ bool closedWin = isClosed && inserted && isSignleIntervalWindow(pInfo) &&
+ isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup);
+ if ((update || closedWin) && out) {
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid);
}
}
@@ -1390,8 +1395,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB) {
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
- uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader);
- updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId,version);
+ uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader);
+ updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId, version);
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false);
return pSDB;
@@ -1445,7 +1450,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
setBlockIntoRes(pInfo, &block);
- if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId, pInfo->pRes->info.version)) {
+ if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId,
+ pInfo->pRes->info.version)) {
printDataBlock(pInfo->pRes, "stream scan ignore");
blockDataCleanup(pInfo->pRes);
continue;
@@ -2248,7 +2254,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
// build message and send to mnode to fetch the content of system tables.
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info;
- char dbName[TSDB_DB_NAME_LEN] = {0};
+ char dbName[TSDB_DB_NAME_LEN] = {0};
const char* name = tNameGetTableName(&pInfo->name);
if (pInfo->showRewrite) {
@@ -2260,8 +2266,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
return sysTableScanUserTables(pOperator);
} else if (strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
return sysTableScanUserTags(pOperator);
- } else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, TSDB_TABLE_FNAME_LEN) == 0 &&
- pInfo->showRewrite && IS_SYS_DBNAME(dbName)) {
+ } else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && pInfo->showRewrite &&
+ IS_SYS_DBNAME(dbName)) {
return sysTableScanUserSTables(pOperator);
} else { // load the meta from mnode of the given epset
if (pOperator->status == OP_EXEC_DONE) {
@@ -2541,7 +2547,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = blockDataDestroy(pInfo->pRes);
taosArrayDestroy(pInfo->pColMatchInfo);
-
+
taosMemoryFreeClear(param);
}
@@ -2597,7 +2603,6 @@ _error:
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* idStr) {
-
int64_t st = taosGetTimestampUs();
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo);
@@ -2606,7 +2611,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
}
int64_t st1 = taosGetTimestampUs();
- qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1-st)/1000.0, idStr);
+ qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr);
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
qDebug("no table qualified for query, %s" PRIx64, idStr);
@@ -2620,7 +2625,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
}
int64_t st2 = taosGetTimestampUs();
- qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2-st1)/1000.0, idStr);
+ qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2 - st1) / 1000.0, idStr);
return TSDB_CODE_SUCCESS;
}
diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c
index 0b1ce27b77..d053662bd3 100644
--- a/source/libs/stream/src/streamUpdate.c
+++ b/source/libs/stream/src/streamUpdate.c
@@ -13,33 +13,31 @@
* along with this program. If not, see .
*/
-#include "tstreamUpdate.h"
-#include "tencode.h"
-#include "ttime.h"
#include "query.h"
+#include "tencode.h"
+#include "tstreamUpdate.h"
+#include "ttime.h"
-#define DEFAULT_FALSE_POSITIVE 0.01
-#define DEFAULT_BUCKET_SIZE 1310720
-#define DEFAULT_MAP_CAPACITY 1310720
-#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 10)
-#define ROWS_PER_MILLISECOND 1
-#define MAX_NUM_SCALABLE_BF 100000
-#define MIN_NUM_SCALABLE_BF 10
-#define DEFAULT_PREADD_BUCKET 1
-#define MAX_INTERVAL MILLISECOND_PER_MINUTE
-#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10)
-#define DEFAULT_EXPECTED_ENTRIES 10000
+#define DEFAULT_FALSE_POSITIVE 0.01
+#define DEFAULT_BUCKET_SIZE 1310720
+#define DEFAULT_MAP_CAPACITY 1310720
+#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 10)
+#define ROWS_PER_MILLISECOND 1
+#define MAX_NUM_SCALABLE_BF 100000
+#define MIN_NUM_SCALABLE_BF 10
+#define DEFAULT_PREADD_BUCKET 1
+#define MAX_INTERVAL MILLISECOND_PER_MINUTE
+#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10)
+#define DEFAULT_EXPECTED_ENTRIES 10000
-static int64_t adjustExpEntries(int64_t entries) {
- return TMIN(DEFAULT_EXPECTED_ENTRIES, entries);
-}
+static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); }
static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
if (pInfo->numSBFs < count) {
count = pInfo->numSBFs;
}
for (uint64_t i = 0; i < count; ++i) {
- int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
+ int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
SScalableBf *tsSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
taosArrayPush(pInfo->pTsSBFs, &tsSBF);
}
@@ -78,7 +76,7 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) {
static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) {
if (watermark <= adjInterval) {
- watermark = TMAX(originInt/adjInterval, 1) * adjInterval;
+ watermark = TMAX(originInt / adjInterval, 1) * adjInterval;
} else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
watermark = MAX_NUM_SCALABLE_BF * adjInterval;
}/* else if (watermark < MIN_NUM_SCALABLE_BF * adjInterval) {
@@ -158,11 +156,17 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
return res;
}
+bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) {
+ void *pVal = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t));
+ if (pVal || taosHashGetSize(pInfo->pMap) >= DEFAULT_MAP_SIZE) return true;
+ return false;
+}
+
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
- int32_t res = TSDB_CODE_FAILED;
- TSKEY* pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
- uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
- TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
+ int32_t res = TSDB_CODE_FAILED;
+ TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
+ uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
+ TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
if (ts < maxTs - pInfo->watermark) {
// this window has been closed.
if (pInfo->pCloseWinSBF) {
@@ -178,42 +182,47 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
}
int32_t size = taosHashGetSize(pInfo->pMap);
- if ( (!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) {
+ if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) {
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY));
return false;
}
- if ( !pMapMaxTs && maxTs < ts ) {
+ if (!pMapMaxTs && maxTs < ts) {
taosArraySet(pInfo->pTsBuckets, index, &ts);
return false;
}
if (ts < pInfo->minTS) {
- qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts);
+ qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId,
+ maxTs, *pMapMaxTs, ts);
return true;
} else if (res == TSDB_CODE_SUCCESS) {
return false;
}
- qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts);
+ qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId,
+ maxTs, *pMapMaxTs, ts);
// check from tsdb api
return true;
}
-void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) {
- qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version);
+void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) {
+ qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
+ pWin->skey, pWin->ekey, version);
pInfo->scanWindow = *pWin;
pInfo->scanGroupId = groupId;
pInfo->maxVersion = version;
}
-bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) {
+bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) {
if (!pInfo) {
return false;
}
- qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version);
- if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey &&
- pWin->ekey <= pInfo->scanWindow.ekey && version <= pInfo->maxVersion ) {
- qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version);
+ qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
+ pWin->skey, pWin->ekey, version);
+ if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey && pWin->ekey <= pInfo->scanWindow.ekey &&
+ version <= pInfo->maxVersion) {
+ qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
+ pWin->skey, pWin->ekey, version);
return true;
}
return false;
@@ -261,7 +270,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo)
int32_t size = taosArrayGetSize(pInfo->pTsBuckets);
if (tEncodeI32(&encoder, size) < 0) return -1;
for (int32_t i = 0; i < size; i++) {
- TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->pTsBuckets, i);
+ TSKEY *pTs = (TSKEY *)taosArrayGet(pInfo->pTsBuckets, i);
if (tEncodeI64(&encoder, *pTs) < 0) return -1;
}
@@ -270,7 +279,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo)
int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs);
if (tEncodeI32(&encoder, sBfSize) < 0) return -1;
for (int32_t i = 0; i < sBfSize; i++) {
- SScalableBf* pSBf = taosArrayGetP(pInfo->pTsSBFs, i);
+ SScalableBf *pSBf = taosArrayGetP(pInfo->pTsSBFs, i);
if (tScalableBfEncode(pSBf, &encoder) < 0) return -1;
}
@@ -278,17 +287,17 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo)
if (tEncodeI64(&encoder, pInfo->interval) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->watermark) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->minTS) < 0) return -1;
-
+
if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) return -1;
int32_t mapSize = taosHashGetSize(pInfo->pMap);
if (tEncodeI32(&encoder, mapSize) < 0) return -1;
- void* pIte = NULL;
+ void *pIte = NULL;
size_t keyLen = 0;
while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) {
- void* key = taosHashGetKey(pIte, &keyLen);
- if (tEncodeU64(&encoder, *(uint64_t*)key) < 0) return -1;
- if (tEncodeI64(&encoder, *(TSKEY*)pIte) < 0) return -1;
+ void *key = taosHashGetKey(pIte, &keyLen);
+ if (tEncodeU64(&encoder, *(uint64_t *)key) < 0) return -1;
+ if (tEncodeI64(&encoder, *(TSKEY *)pIte) < 0) return -1;
}
if (tEncodeI64(&encoder, pInfo->scanWindow.skey) < 0) return -1;
@@ -311,7 +320,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
int32_t size = 0;
if (tDecodeI32(&decoder, &size) < 0) return -1;
- pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY));
+ pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY));
TSKEY ts = INT64_MIN;
for (int32_t i = 0; i < size; i++) {
if (tDecodeI64(&decoder, &ts) < 0) return -1;
@@ -324,7 +333,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
if (tDecodeI32(&decoder, &sBfSize) < 0) return -1;
pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void *));
for (int32_t i = 0; i < sBfSize; i++) {
- SScalableBf* pSBf = tScalableBfDecode(&decoder);
+ SScalableBf *pSBf = tScalableBfDecode(&decoder);
if (!pSBf) return -1;
taosArrayPush(pInfo->pTsSBFs, &pSBf);
}
@@ -337,11 +346,11 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
int32_t mapSize = 0;
if (tDecodeI32(&decoder, &mapSize) < 0) return -1;
- _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
+ _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
pInfo->pMap = taosHashInit(mapSize, hashFn, true, HASH_NO_LOCK);
uint64_t uid = 0;
ts = INT64_MIN;
- for(int32_t i = 0; i < mapSize; i++) {
+ for (int32_t i = 0; i < mapSize; i++) {
if (tDecodeU64(&decoder, &uid) < 0) return -1;
if (tDecodeI64(&decoder, &ts) < 0) return -1;
taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY));