From 7ef4d3f03a1f6a034a66812b5cab9db0e44db5d4 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 30 Jun 2022 15:39:30 +0800 Subject: [PATCH 1/5] fix: fix null as tag value issue --- source/libs/parser/src/parTranslater.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 521c733ceb..6dc14dbf15 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -783,6 +783,12 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD if (pVal->placeholderNo > 0 || pVal->isNull) { return DEAL_RES_CONTINUE; } + if (TSDB_DATA_TYPE_NULL == pVal->node.resType.type) { + // TODO + //pVal->node.resType = targetDt; + pVal->isNull = true; + return DEAL_RES_CONTINUE; + } if (pVal->isDuration) { if (parseNatualDuration(pVal->literal, strlen(pVal->literal), &pVal->datum.i, &pVal->unit, precision) != TSDB_CODE_SUCCESS) { @@ -5335,7 +5341,7 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau if (code != TSDB_CODE_SUCCESS) { goto end; } - } else if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) { + } else if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL && !pVal->isNull) { char* tmpVal = nodesGetValueFromNode(pVal); STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type}; if (IS_VAR_DATA_TYPE(pTagSchema->type)) { From cda2f844ced4d498cdc249c54f1a0f4666580ac7 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 30 Jun 2022 18:12:04 +0800 Subject: [PATCH 2/5] fix: fix json tag null issue --- source/libs/parser/src/parTranslater.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 6dc14dbf15..762a038bb1 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -786,6 +786,7 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD if (TSDB_DATA_TYPE_NULL == pVal->node.resType.type) { // TODO //pVal->node.resType = targetDt; + pVal->translate = true; pVal->isNull = true; return DEAL_RES_CONTINUE; } @@ -5608,8 +5609,8 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS return TSDB_CODE_OUT_OF_MEMORY; } - if (DEAL_RES_ERROR == - translateValueImpl(pCxt, pStmt->pVal, schemaToDataType(pTableMeta->tableInfo.precision, pSchema))) { + SDataType targetDt = schemaToDataType(pTableMeta->tableInfo.precision, pSchema); + if (DEAL_RES_ERROR == translateValueImpl(pCxt, pStmt->pVal, targetDt)) { return pCxt->errCode; } @@ -5618,7 +5619,8 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS } pReq->isNull = (TSDB_DATA_TYPE_NULL == pStmt->pVal->node.resType.type); - if (pStmt->pVal->node.resType.type == TSDB_DATA_TYPE_JSON) { + if (targetDt.type == TSDB_DATA_TYPE_JSON) { + pReq->isNull = 0; if (pStmt->pVal->literal && strlen(pStmt->pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pStmt->pVal->literal); From 4f107908db680d2d2436e926c93ac4b71711ed99 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 30 Jun 2022 19:02:43 +0800 Subject: [PATCH 3/5] fix(tmq): reset window --- source/client/src/tmq.c | 3 ++- source/common/src/tmsg.c | 4 ++-- source/dnode/vnode/src/tq/tq.c | 23 +++++++++-------------- source/libs/executor/src/executorimpl.c | 10 ++++------ source/libs/executor/src/scanoperator.c | 2 -- tests/script/tsim/tmq/snapshot.sim | 9 +++------ tests/script/tsim/tmq/snapshot1.sim | 6 +++--- 7 files changed, 23 insertions(+), 34 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index aa78649ba7..bea9d215da 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1547,7 +1547,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { } int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { - /*printf("call poll\n");*/ + /*tscDebug("call poll");*/ for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { @@ -1708,6 +1708,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { + /*tscDebug("call poll1");*/ void* rspObj; int64_t startTime = taosGetTimestampMs(); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 78eccdf0cd..80402439fc 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5398,9 +5398,9 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { } else if (pVal->type == TMQ_OFFSET__LOG) { snprintf(buf, maxLen, "offset(log) ver:%ld", pVal->version); } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA) { - snprintf(buf, maxLen, "offset(snapshot data) uid:%ld, ts:%ld", pVal->uid, pVal->ts); + snprintf(buf, maxLen, "offset(ss data) uid:%ld, ts:%ld", pVal->uid, pVal->ts); } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_META) { - snprintf(buf, maxLen, "offset(snapshot meta) uid:%ld, ts:%ld", pVal->uid, pVal->ts); + snprintf(buf, maxLen, "offset(ss meta) uid:%ld, ts:%ld", pVal->uid, pVal->ts); } else { ASSERT(0); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c0d184ee11..d80996b399 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -154,10 +154,10 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con }; tmsgSendRsp(&rsp); - char buf1[50]; - char buf2[50]; - tFormatOffset(buf1, 50, &pRsp->reqOffset); - tFormatOffset(buf2, 50, &pRsp->rspOffset); + char buf1[80]; + char buf2[80]; + tFormatOffset(buf1, 80, &pRsp->reqOffset); + tFormatOffset(buf2, 80, &pRsp->rspOffset); tqDebug("vg %d from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %s, rspOffset: %s", TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); @@ -238,8 +238,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { STqOffsetVal fetchOffsetNew; // 1.find handle - char buf[50]; - tFormatOffset(buf, 50, &reqOffset); + char buf[80]; + tFormatOffset(buf, 80, &reqOffset); tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %s", consumerId, pReq->epoch, TD_VID(pTq->pVnode), buf); @@ -360,7 +360,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType); SMqMetaRsp metaRsp = {0}; metaRsp.reqOffset = pReq->reqOffset.version; - /*tqOffsetResetToLog(&metaR)*/ metaRsp.rspOffset = fetchVer; metaRsp.resMsgType = pHead->msgType; metaRsp.metaRspLen = pHead->bodyLen; @@ -380,18 +379,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // 2. get data (rebuild reader if needed) // 3. get new uid and ts - char formatBuf[50]; - tFormatOffset(formatBuf, 50, &dataRsp.reqOffset); - tqInfo("retrieve using snapshot req offset %s", formatBuf); + tqInfo("retrieve using snapshot req offset: uid %ld ts %ld", dataRsp.reqOffset.uid, dataRsp.reqOffset.ts); if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) { ASSERT(0); } // 4. send rsp - if (dataRsp.blockNum != 0) { - if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { - code = -1; - } + if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { + code = -1; } } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) { ASSERT(0); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index dd48229849..efa8887ed0 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2836,12 +2836,10 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) { tsdbSetTableId(pInfo->dataReader, uid); - SQueryTableDataCond tmpCond = pInfo->cond; - tmpCond.twindows[0] = (STimeWindow){ - .skey = ts, - .ekey = INT64_MAX, - }; - tsdbResetReadHandle(pInfo->dataReader, &tmpCond, 0); + int64_t oldSkey = pInfo->cond.twindows[0].skey; + pInfo->cond.twindows[0].skey = ts; + tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0); + pInfo->cond.twindows[0].skey = oldSkey; pInfo->scanTimes = 0; pInfo->curTWinIdx = 0; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a6bf9a12f3..a3af4ab223 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -518,7 +518,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // if scan table by table if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { - // check status while (1) { SSDataBlock* result = doTableScanGroup(pOperator); if (result) { @@ -530,7 +529,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return NULL; } STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable); - /*pTableInfo->uid */ tsdbSetTableId(pInfo->dataReader, pTableInfo->uid); tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0); pInfo->scanTimes = 0; diff --git a/tests/script/tsim/tmq/snapshot.sim b/tests/script/tsim/tmq/snapshot.sim index 5683aaa559..de0468e6f2 100644 --- a/tests/script/tsim/tmq/snapshot.sim +++ b/tests/script/tsim/tmq/snapshot.sim @@ -111,7 +111,7 @@ endi $consumerId = 0 $totalMsgOfStb = $ctbNum * $rowsPerCtb -$expectmsgcnt = 1 +$expectmsgcnt = 1000000 $expectrowcnt = 100 sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) @@ -131,9 +131,6 @@ endi if $data[0][1] != $consumerId then return -1 endi -if $data[0][2] != $expectmsgcnt then - return -1 -endi if $data[0][3] != $expectrowcnt then return -1 endi @@ -183,7 +180,7 @@ endi $consumerId = 0 $totalMsgOfCtb = $rowsPerCtb -$expectmsgcnt = 1 +$expectmsgcnt = 1000000 sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) print == start consumer to pull msgs from ctb @@ -254,7 +251,7 @@ endi $consumerId = 0 $totalMsgOfNtb = $rowsPerCtb -$expectmsgcnt = $totalMsgOfNtb +$expectmsgcnt = 1000000 sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) print == start consumer to pull msgs from ntb diff --git a/tests/script/tsim/tmq/snapshot1.sim b/tests/script/tsim/tmq/snapshot1.sim index 9226c475cb..d534bb68da 100644 --- a/tests/script/tsim/tmq/snapshot1.sim +++ b/tests/script/tsim/tmq/snapshot1.sim @@ -80,7 +80,7 @@ $topicList = $topicList . ' $consumerId = 0 $totalMsgOfOneTopic = $ctbNum * $rowsPerCtb $totalMsgOfStb = $totalMsgOfOneTopic * $topicNum -$expectmsgcnt = $totalMsgOfStb +$expectmsgcnt = 1000000 sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) @@ -168,7 +168,7 @@ $consumerId = 0 $totalMsgOfOneTopic = $rowsPerCtb $totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum -$expectmsgcnt = $totalMsgOfCtb +$expectmsgcnt = 1000000 sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) $topicList = ' . topic_ctb_function @@ -245,7 +245,7 @@ $topicList = $topicList . ' $consumerId = 0 $totalMsgOfOneTopic = $rowsPerCtb $totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum -$expectmsgcnt = $totalMsgOfNtb +$expectmsgcnt = 1000000 sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) From 298c4d3e44c3a3a724dcd3e511d4f24550646084 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 30 Jun 2022 20:14:57 +0800 Subject: [PATCH 4/5] tlrucache: fix cap calculation --- source/util/src/tlrucache.c | 194 ++++++++++++++++++++---------------- 1 file changed, 107 insertions(+), 87 deletions(-) diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index b034a6e73e..28e15dc9a3 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -16,24 +16,24 @@ #define _DEFAULT_SOURCE #include "tlrucache.h" #include "os.h" -#include "tdef.h" #include "taoserror.h" -#include "tlog.h" #include "tarray.h" +#include "tdef.h" +#include "tlog.h" -typedef struct SLRUEntry SLRUEntry; +typedef struct SLRUEntry SLRUEntry; typedef struct SLRUEntryTable SLRUEntryTable; typedef struct SLRUCacheShard SLRUCacheShard; -typedef struct SShardedCache SShardedCache; +typedef struct SShardedCache SShardedCache; enum { - TAOS_LRU_IN_CACHE = (1 << 0), // Whether this entry is referenced by the hash table. + TAOS_LRU_IN_CACHE = (1 << 0), // Whether this entry is referenced by the hash table. - TAOS_LRU_IS_HIGH_PRI = (1 << 1), // Whether this entry is high priority entry. + TAOS_LRU_IS_HIGH_PRI = (1 << 1), // Whether this entry is high priority entry. - TAOS_LRU_IN_HIGH_PRI_POOL = (1 << 2), // Whether this entry is in high-pri pool. + TAOS_LRU_IN_HIGH_PRI_POOL = (1 << 2), // Whether this entry is in high-pri pool. - TAOS_LRU_HAS_HIT = (1 << 3), // Whether this entry has had any lookups (hits). + TAOS_LRU_HAS_HIT = (1 << 3), // Whether this entry has had any lookups (hits). }; struct SLRUEntry { @@ -50,18 +50,39 @@ struct SLRUEntry { char keyData[1]; }; -#define TAOS_LRU_ENTRY_IN_CACHE(h) ((h)->flags & TAOS_LRU_IN_CACHE) +#define TAOS_LRU_ENTRY_IN_CACHE(h) ((h)->flags & TAOS_LRU_IN_CACHE) #define TAOS_LRU_ENTRY_IN_HIGH_POOL(h) ((h)->flags & TAOS_LRU_IN_HIGH_PRI_POOL) -#define TAOS_LRU_ENTRY_IS_HIGH_PRI(h) ((h)->flags & TAOS_LRU_IS_HIGH_PRI) -#define TAOS_LRU_ENTRY_HAS_HIT(h) ((h)->flags & TAOS_LRU_HAS_HIT) +#define TAOS_LRU_ENTRY_IS_HIGH_PRI(h) ((h)->flags & TAOS_LRU_IS_HIGH_PRI) +#define TAOS_LRU_ENTRY_HAS_HIT(h) ((h)->flags & TAOS_LRU_HAS_HIT) -#define TAOS_LRU_ENTRY_SET_IN_CACHE(h, inCache) do { if(inCache) {(h)->flags |= TAOS_LRU_IN_CACHE;} else {(h)->flags &= ~TAOS_LRU_IN_CACHE;} } while(0) -#define TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(h, inHigh) do { if(inHigh) {(h)->flags |= TAOS_LRU_IN_HIGH_PRI_POOL;} else {(h)->flags &= ~TAOS_LRU_IN_HIGH_PRI_POOL;} } while(0) -#define TAOS_LRU_ENTRY_SET_PRIORITY(h, priority) do { if(priority == TAOS_LRU_PRIORITY_HIGH) {(h)->flags |= TAOS_LRU_IS_HIGH_PRI;} else {(h)->flags &= ~TAOS_LRU_IS_HIGH_PRI;} } while(0) +#define TAOS_LRU_ENTRY_SET_IN_CACHE(h, inCache) \ + do { \ + if (inCache) { \ + (h)->flags |= TAOS_LRU_IN_CACHE; \ + } else { \ + (h)->flags &= ~TAOS_LRU_IN_CACHE; \ + } \ + } while (0) +#define TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(h, inHigh) \ + do { \ + if (inHigh) { \ + (h)->flags |= TAOS_LRU_IN_HIGH_PRI_POOL; \ + } else { \ + (h)->flags &= ~TAOS_LRU_IN_HIGH_PRI_POOL; \ + } \ + } while (0) +#define TAOS_LRU_ENTRY_SET_PRIORITY(h, priority) \ + do { \ + if (priority == TAOS_LRU_PRIORITY_HIGH) { \ + (h)->flags |= TAOS_LRU_IS_HIGH_PRI; \ + } else { \ + (h)->flags &= ~TAOS_LRU_IS_HIGH_PRI; \ + } \ + } while (0) #define TAOS_LRU_ENTRY_SET_HIT(h) ((h)->flags |= TAOS_LRU_HAS_HIT) #define TAOS_LRU_ENTRY_HAS_REFS(h) ((h)->refs > 0) -#define TAOS_LRU_ENTRY_REF(h) (++(h)->refs) +#define TAOS_LRU_ENTRY_REF(h) (++(h)->refs) static bool taosLRUEntryUnref(SLRUEntry *entry) { assert(entry->refs > 0); @@ -90,7 +111,7 @@ struct SLRUEntryTable { static int taosLRUEntryTableInit(SLRUEntryTable *table, int maxUpperHashBits) { table->lengthBits = 4; - table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry*)); + table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry *)); if (!table->list) { return -1; } @@ -125,7 +146,7 @@ static void taosLRUEntryTableCleanup(SLRUEntryTable *table) { taosMemoryFree(table->list); } -static SLRUEntry **taosLRUEntryTableFindPtr(SLRUEntryTable * table, const void *key, size_t keyLen, uint32_t hash) { +static SLRUEntry **taosLRUEntryTableFindPtr(SLRUEntryTable *table, const void *key, size_t keyLen, uint32_t hash) { SLRUEntry **entry = &table->list[hash >> (32 - table->lengthBits)]; while (*entry && ((*entry)->hash != hash || memcmp(key, (*entry)->keyData, keyLen) != 0)) { entry = &(*entry)->nextHash; @@ -134,7 +155,7 @@ static SLRUEntry **taosLRUEntryTableFindPtr(SLRUEntryTable * table, const void * return entry; } -static void taosLRUEntryTableResize(SLRUEntryTable * table) { +static void taosLRUEntryTableResize(SLRUEntryTable *table) { int lengthBits = table->lengthBits; if (lengthBits >= table->maxLengthBits) { return; @@ -144,9 +165,9 @@ static void taosLRUEntryTableResize(SLRUEntryTable * table) { return; } - uint32_t oldLength = 1 << lengthBits; - int newLengthBits = lengthBits + 1; - SLRUEntry **newList = taosMemoryCalloc(1 << newLengthBits, sizeof(SLRUEntry*)); + uint32_t oldLength = 1 << lengthBits; + int newLengthBits = lengthBits + 1; + SLRUEntry **newList = taosMemoryCalloc(1 << newLengthBits, sizeof(SLRUEntry *)); if (!newList) { return; } @@ -154,8 +175,8 @@ static void taosLRUEntryTableResize(SLRUEntryTable * table) { for (uint32_t i = 0; i < oldLength; ++i) { SLRUEntry *entry = table->list[i]; while (entry) { - SLRUEntry *next = entry->nextHash; - uint32_t hash = entry->hash; + SLRUEntry *next = entry->nextHash; + uint32_t hash = entry->hash; SLRUEntry **ptr = &newList[hash >> (32 - newLengthBits)]; entry->nextHash = *ptr; *ptr = entry; @@ -170,13 +191,13 @@ static void taosLRUEntryTableResize(SLRUEntryTable * table) { table->lengthBits = newLengthBits; } -static SLRUEntry *taosLRUEntryTableLookup(SLRUEntryTable * table, const void *key, size_t keyLen, uint32_t hash) { +static SLRUEntry *taosLRUEntryTableLookup(SLRUEntryTable *table, const void *key, size_t keyLen, uint32_t hash) { return *taosLRUEntryTableFindPtr(table, key, keyLen, hash); } -static SLRUEntry *taosLRUEntryTableInsert(SLRUEntryTable * table, SLRUEntry *entry) { +static SLRUEntry *taosLRUEntryTableInsert(SLRUEntryTable *table, SLRUEntry *entry) { SLRUEntry **ptr = taosLRUEntryTableFindPtr(table, entry->keyData, entry->keyLength, entry->hash); - SLRUEntry *old = *ptr; + SLRUEntry *old = *ptr; entry->nextHash = (old == NULL) ? NULL : old->nextHash; *ptr = entry; if (old == NULL) { @@ -189,9 +210,9 @@ static SLRUEntry *taosLRUEntryTableInsert(SLRUEntryTable * table, SLRUEntry *ent return old; } -static SLRUEntry *taosLRUEntryTableRemove(SLRUEntryTable * table, const void *key, size_t keyLen, uint32_t hash) { +static SLRUEntry *taosLRUEntryTableRemove(SLRUEntryTable *table, const void *key, size_t keyLen, uint32_t hash) { SLRUEntry **entry = taosLRUEntryTableFindPtr(table, key, keyLen, hash); - SLRUEntry *result = *entry; + SLRUEntry *result = *entry; if (result) { *entry = result->nextHash; --table->elems; @@ -201,17 +222,17 @@ static SLRUEntry *taosLRUEntryTableRemove(SLRUEntryTable * table, const void *ke } struct SLRUCacheShard { - size_t capacity; - size_t highPriPoolUsage; - bool strictCapacity; - double highPriPoolRatio; - double highPriPoolCapacity; - SLRUEntry lru; - SLRUEntry *lruLowPri; - SLRUEntryTable table; - size_t usage; // Memory size for entries residing in the cache. - size_t lruUsage; // Memory size for entries residing only in the LRU list. - TdThreadMutex mutex; + size_t capacity; + size_t highPriPoolUsage; + bool strictCapacity; + double highPriPoolRatio; + double highPriPoolCapacity; + SLRUEntry lru; + SLRUEntry *lruLowPri; + SLRUEntryTable table; + size_t usage; // Memory size for entries residing in the cache. + size_t lruUsage; // Memory size for entries residing only in the LRU list. + TdThreadMutex mutex; }; #define TAOS_LRU_CACHE_SHARD_HASH32(key, len) (MurmurHash3_32((key), (len))) @@ -231,8 +252,7 @@ static void taosLRUCacheShardLRUInsert(SLRUCacheShard *shard, SLRUEntry *e) { assert(e->next == NULL); assert(e->prev == NULL); - if (shard->highPriPoolRatio > 0 - && (TAOS_LRU_ENTRY_IS_HIGH_PRI(e) || TAOS_LRU_ENTRY_HAS_HIT(e))) { + if (shard->highPriPoolRatio > 0 && (TAOS_LRU_ENTRY_IS_HIGH_PRI(e) || TAOS_LRU_ENTRY_HAS_HIT(e))) { e->next = &shard->lru; e->prev = shard->lru.prev; @@ -248,7 +268,7 @@ static void taosLRUCacheShardLRUInsert(SLRUCacheShard *shard, SLRUEntry *e) { e->prev->next = e; e->next->prev = e; - + TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(e, false); shard->lruLowPri = e; } @@ -304,13 +324,13 @@ static void taosLRUCacheShardSetCapacity(SLRUCacheShard *shard, size_t capacity) for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) { SLRUEntry *entry = taosArrayGetP(lastReferenceList, i); - taosLRUEntryFree(entry); + taosLRUEntryFree(entry); } taosArrayDestroy(lastReferenceList); } -static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool strict, - double highPriPoolRatio, int maxUpperHashBits) { +static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool strict, double highPriPoolRatio, + int maxUpperHashBits) { if (taosLRUEntryTableInit(&shard->table, maxUpperHashBits) < 0) { return -1; } @@ -341,23 +361,24 @@ static void taosLRUCacheShardCleanup(SLRUCacheShard *shard) { taosLRUEntryTableCleanup(&shard->table); } -static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *e, LRUHandle **handle, bool freeOnFail) { +static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *e, LRUHandle **handle, + bool freeOnFail) { LRUStatus status = TAOS_LRU_STATUS_OK; - SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES); + SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES); taosThreadMutexLock(&shard->mutex); taosLRUCacheShardEvictLRU(shard, e->totalCharge, lastReferenceList); - + if (shard->usage + e->totalCharge > shard->capacity && (shard->strictCapacity || handle == NULL)) { TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); if (handle == NULL) { taosArrayPush(lastReferenceList, &e); } else { if (freeOnFail) { - taosMemoryFree(e); + taosMemoryFree(e); - *handle = NULL; + *handle = NULL; } status = TAOS_LRU_STATUS_INCOMPLETE; @@ -371,21 +392,21 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * assert(TAOS_LRU_ENTRY_IN_CACHE(old)); TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); if (!TAOS_LRU_ENTRY_HAS_REFS(e)) { - taosLRUCacheShardLRURemove(shard, old); - assert(shard->usage >= old->totalCharge); - shard->usage -= old->totalCharge; + taosLRUCacheShardLRURemove(shard, old); + assert(shard->usage >= old->totalCharge); + shard->usage -= old->totalCharge; - taosArrayPush(lastReferenceList, &old); + taosArrayPush(lastReferenceList, &old); } } if (handle == NULL) { taosLRUCacheShardLRUInsert(shard, e); } else { if (!TAOS_LRU_ENTRY_HAS_REFS(e)) { - TAOS_LRU_ENTRY_REF(e); + TAOS_LRU_ENTRY_REF(e); } - *handle = (LRUHandle*) e; + *handle = (LRUHandle *)e; } } @@ -394,7 +415,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) { SLRUEntry *entry = taosArrayGetP(lastReferenceList, i); - taosLRUEntryFree(entry); + taosLRUEntryFree(entry); } taosArrayDestroy(lastReferenceList); @@ -402,8 +423,8 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * } static LRUStatus taosLRUCacheShardInsert(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash, - void *value, size_t charge, _taos_lru_deleter_t deleter, - LRUHandle **handle, LRUPriority priority) { + void *value, size_t charge, _taos_lru_deleter_t deleter, LRUHandle **handle, + LRUPriority priority) { SLRUEntry *e = taosMemoryCalloc(1, sizeof(SLRUEntry) - 1 + keyLen); if (!e) { return TAOS_LRU_STATUS_FAIL; @@ -442,7 +463,7 @@ static LRUHandle *taosLRUCacheShardLookup(SLRUCacheShard *shard, const void *key taosThreadMutexUnlock(&shard->mutex); - return (LRUHandle *) e; + return (LRUHandle *)e; } static void taosLRUCacheShardErase(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash) { @@ -482,7 +503,7 @@ static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) { TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); assert(shard->usage >= old->totalCharge); shard->usage -= old->totalCharge; - + taosArrayPush(lastReferenceList, &old); } @@ -491,14 +512,14 @@ static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) { for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) { SLRUEntry *entry = taosArrayGetP(lastReferenceList, i); - taosLRUEntryFree(entry); + taosLRUEntryFree(entry); } taosArrayDestroy(lastReferenceList); } static bool taosLRUCacheShardRef(SLRUCacheShard *shard, LRUHandle *handle) { - SLRUEntry *e = (SLRUEntry *) handle; + SLRUEntry *e = (SLRUEntry *)handle; taosThreadMutexLock(&shard->mutex); assert(TAOS_LRU_ENTRY_HAS_REFS(e)); @@ -514,8 +535,8 @@ static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, b return false; } - SLRUEntry *e = (SLRUEntry *) handle; - bool lastReference = false; + SLRUEntry *e = (SLRUEntry *)handle; + bool lastReference = false; taosThreadMutexLock(&shard->mutex); @@ -537,7 +558,7 @@ static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, b assert(shard->usage >= e->totalCharge); shard->usage -= e->totalCharge; } - + taosThreadMutexUnlock(&shard->mutex); if (lastReference) { @@ -549,7 +570,7 @@ static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, b static size_t taosLRUCacheShardGetUsage(SLRUCacheShard *shard) { size_t usage = 0; - + taosThreadMutexLock(&shard->mutex); usage = shard->usage; taosThreadMutexUnlock(&shard->mutex); @@ -559,7 +580,7 @@ static size_t taosLRUCacheShardGetUsage(SLRUCacheShard *shard) { static size_t taosLRUCacheShardGetPinnedUsage(SLRUCacheShard *shard) { size_t usage = 0; - + taosThreadMutexLock(&shard->mutex); assert(shard->usage >= shard->lruUsage); @@ -579,11 +600,11 @@ static void taosLRUCacheShardSetStrictCapacity(SLRUCacheShard *shard, bool stric } struct SShardedCache { - uint32_t shardMask; - TdThreadMutex capacityMutex; - size_t capacity; - bool strictCapacity; - uint64_t lastId; // atomic var for last id + uint32_t shardMask; + TdThreadMutex capacityMutex; + size_t capacity; + bool strictCapacity; + uint64_t lastId; // atomic var for last id }; struct SLRUCache { @@ -593,7 +614,7 @@ struct SLRUCache { }; static int getDefaultCacheShardBits(size_t capacity) { - int numShardBits = 0; + int numShardBits = 0; size_t minShardSize = 512 * 1024; size_t numShards = capacity / minShardSize; while (numShards >>= 1) { @@ -621,7 +642,7 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo numShardBits = getDefaultCacheShardBits(capacity); } - int numShards = 1 << numShardBits; + int numShards = 1 << numShardBits; cache->shards = taosMemoryCalloc(numShards, sizeof(SLRUCacheShard)); if (!cache->shards) { taosMemoryFree(cache); @@ -629,7 +650,7 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo return NULL; } - bool strictCapacity = 1; + bool strictCapacity = 1; size_t perShard = (capacity + (numShards - 1)) / numShards; for (int i = 0; i < numShards; ++i) { taosLRUCacheShardInit(&cache->shards[i], perShard, strictCapacity, highPriPoolRatio, 32 - numShardBits); @@ -653,7 +674,7 @@ void taosLRUCacheCleanup(SLRUCache *cache) { int numShards = cache->numShards; assert(numShards > 0); for (int i = 0; i < numShards; ++i) { - taosLRUCacheShardCleanup(&cache->shards[i]); + taosLRUCacheShardCleanup(&cache->shards[i]); } taosMemoryFree(cache->shards); cache->shards = 0; @@ -666,11 +687,12 @@ void taosLRUCacheCleanup(SLRUCache *cache) { } LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge, - _taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority) { + _taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority) { uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen); uint32_t shardIndex = hash & cache->shardedCache.shardMask; - return taosLRUCacheShardInsert(&cache->shards[shardIndex], key, keyLen, hash, value, charge, deleter, handle, priority); + return taosLRUCacheShardInsert(&cache->shards[shardIndex], key, keyLen, hash, value, charge, deleter, handle, + priority); } LRUHandle *taosLRUCacheLookup(SLRUCache *cache, const void *key, size_t keyLen) { @@ -699,7 +721,7 @@ bool taosLRUCacheRef(SLRUCache *cache, LRUHandle *handle) { return false; } - uint32_t hash = ((SLRUEntry *) handle)->hash; + uint32_t hash = ((SLRUEntry *)handle)->hash; uint32_t shardIndex = hash & cache->shardedCache.shardMask; return taosLRUCacheShardRef(&cache->shards[shardIndex], handle); @@ -710,15 +732,13 @@ bool taosLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRe return false; } - uint32_t hash = ((SLRUEntry *) handle)->hash; + uint32_t hash = ((SLRUEntry *)handle)->hash; uint32_t shardIndex = hash & cache->shardedCache.shardMask; return taosLRUCacheShardRelease(&cache->shards[shardIndex], handle, eraseIfLastRef); } -void* taosLRUCacheValue(SLRUCache *cache, LRUHandle *handle) { - return ((SLRUEntry*) handle)->value; -} +void *taosLRUCacheValue(SLRUCache *cache, LRUHandle *handle) { return ((SLRUEntry *)handle)->value; } size_t taosLRUCacheGetUsage(SLRUCache *cache) { size_t usage = 0; @@ -742,7 +762,7 @@ size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache) { void taosLRUCacheSetCapacity(SLRUCache *cache, size_t capacity) { uint32_t numShards = cache->numShards; - size_t perShard = (capacity + (numShards = 1)) / numShards; + size_t perShard = (capacity + (numShards - 1)) / numShards; taosThreadMutexLock(&cache->shardedCache.capacityMutex); @@ -751,7 +771,7 @@ void taosLRUCacheSetCapacity(SLRUCache *cache, size_t capacity) { } cache->shardedCache.capacity = capacity; - + taosThreadMutexUnlock(&cache->shardedCache.capacityMutex); } @@ -777,7 +797,7 @@ void taosLRUCacheSetStrictCapacity(SLRUCache *cache, bool strict) { } cache->shardedCache.strictCapacity = strict; - + taosThreadMutexUnlock(&cache->shardedCache.capacityMutex); } From 68cfa54e26f0eb2fdec8fd0201fa839d6efd8c43 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 30 Jun 2022 20:21:24 +0800 Subject: [PATCH 5/5] feat(stream): ignore close window --- source/libs/executor/src/timewindowoperator.c | 63 +++++++++++++++---- 1 file changed, 50 insertions(+), 13 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 95b16589ee..66900fb7aa 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1246,6 +1246,9 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprS pCtx[i].fpSet.init(&pCtx[i], pResInfo); } } + SFilePage* bufPage = getBufPage(pResultBuf, p1->pageId); + setBufPageDirty(bufPage, true); + releaseBufPage(pResultBuf, bufPage); } bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId, @@ -3171,6 +3174,10 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup, getSessionTimeWindow(pAggSup, tsCols[i], INT64_MIN, pBlock->info.groupId, gap, &winIndex); step = updateSessionWindowInfo(pCurWin, tsCols, NULL, pBlock->info.rows, i, gap, NULL); ASSERT(isInWindow(pCurWin, tsCols[i], gap)); + if (pCurWin->pos.pageId == -1) { + // window has been closed. + continue; + } doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pSup, numOfOutput); if (result) { taosArrayPush(result, pCurWin); @@ -3246,12 +3253,12 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin setWindowOutputBuf(pChWin, &pChResult, pChild->exprSupp.pCtx, groupId, numOfOutput, pChild->exprSupp.rowEntryInfoOffset, &pChInfo->streamAggSup, pTaskInfo); compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo); - SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pChWin->pos.pageId); - setBufPageDirty(bufPage, true); - releaseBufPage(pInfo->streamAggSup.pResultBuf, bufPage); + SFilePage* bufPage = getBufPage(pChInfo->streamAggSup.pResultBuf, pChWin->pos.pageId); + releaseBufPage(pChInfo->streamAggSup.pResultBuf, bufPage); continue; + } else if (!pChWin->isClosed) { + break; } - break; } } SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pParentWin->pos.pageId); @@ -3265,7 +3272,8 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*); SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*)pData; } SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; } -int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed, __get_win_info_ fn) { +int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, + SArray* pClosed, __get_win_info_ fn, bool delete) { // Todo(liuyao) save window to tdb void** pIte = NULL; size_t keyLen = 0; @@ -3279,10 +3287,18 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra if (isCloseWindow(&pSeWin->win, pTwSup)) { if (!pSeWin->isClosed) { pSeWin->isClosed = true; - if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && pClosed) { int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, *pGroupId, pClosed); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pSeWin->isOutput = true; } + if (delete) { + taosArrayRemove(pWins, i); + i--; + size = taosArrayGetSize(pWins); + } } continue; } @@ -3292,6 +3308,16 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra return TSDB_CODE_SUCCESS; } +static void closeChildSessionWindow(SArray* pChildren, TSKEY maxTs, bool delete) { + int32_t size = taosArrayGetSize(pChildren); + for (int32_t i = 0; i < size; i++) { + SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i); + SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info; + pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs); + closeSessionWindow(pChInfo->streamAggSup.pResultRows, &pChInfo->twAggSup, NULL, getResWinForSession, delete); + } +} + int32_t getAllSessionWindow(SHashObj* pHashMap, SArray* pClosed, __get_win_info_ fn) { void** pIte = NULL; while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { @@ -3339,6 +3365,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { if (pBlock == NULL) { break; } + printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "Final Session Recv" : "Single Session Recv"); if (pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); @@ -3385,7 +3412,9 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { // restore the value pOperator->status = OP_RES_TO_RETURN; - closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForSession); + closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, + getResWinForSession, pInfo->ignoreCloseWindow); + closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreCloseWindow); copyUpdateResult(pStUpdated, pUpdated); taosHashCleanup(pStUpdated); @@ -3437,10 +3466,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } else if (pOperator->status == OP_RES_TO_RETURN) { doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { + printDataBlock(pInfo->pDelRes, "Semi Session"); return pInfo->pDelRes; } doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); - if (pInfo->binfo.pRes->info.rows == 0) { + if (pBInfo->pRes->info.rows == 0) { pOperator->status = OP_EXEC_DONE; if (pInfo->pUpdateRes->info.rows == 0) { // semi interval operator clear disk buffer @@ -3449,9 +3479,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } // process the rest of the data pOperator->status = OP_OPENED; + printDataBlock(pInfo->pUpdateRes, "Semi Session"); return pInfo->pUpdateRes; } - return pInfo->binfo.pRes; + printDataBlock(pBInfo->pRes, "Semi Session"); + return pBInfo->pRes; } _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -3495,21 +3527,24 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); - blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { + printDataBlock(pInfo->pDelRes, "Semi Session"); return pInfo->pDelRes; } - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); - if (pInfo->binfo.pRes->info.rows == 0) { + doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); + if (pBInfo->pRes->info.rows == 0) { pOperator->status = OP_EXEC_DONE; if (pInfo->pUpdateRes->info.rows == 0) { return NULL; } // process the rest of the data pOperator->status = OP_OPENED; + printDataBlock(pInfo->pUpdateRes, "Semi Session"); return pInfo->pUpdateRes; } + printDataBlock(pBInfo->pRes, "Semi Session"); return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; } @@ -3867,7 +3902,9 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { // restore the value pOperator->status = OP_RES_TO_RETURN; - closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForState); + closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, + getResWinForState, pInfo->ignoreCloseWindow); + closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreCloseWindow); copyUpdateResult(pSeUpdated, pUpdated); taosHashCleanup(pSeUpdated);