fix issue

This commit is contained in:
54liuyao 2024-10-31 15:50:56 +08:00
parent 5a4c7c2e2a
commit 5158bf6b96
6 changed files with 51 additions and 12 deletions

View File

@ -856,6 +856,7 @@ typedef struct SStreamTimeSliceOperatorInfo {
SGroupResInfo groupResInfo;
bool ignoreNull;
bool isHistoryOp;
SArray* pCloseTs;
struct SOperatorInfo* pOperator;
} SStreamTimeSliceOperatorInfo;

View File

@ -100,6 +100,8 @@ int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY*
int32_t createStreamIntervalSliceOperatorInfo(struct SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
struct SOperatorInfo** ppOptInfo);
int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated);
void removeDuplicateTs(SArray* pTsArrray);
#ifdef __cplusplus
}

View File

@ -1272,7 +1272,7 @@ _end:
return code;
}
static int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated) {
int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int64_t groupId = 0;
@ -1367,6 +1367,7 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR
QUERY_CHECK_CODE(code, lino, _end);
}
removeDuplicateTs(pInfo->pCloseTs);
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pCloseTs); i++) {
TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i);
code = buildAllResultKey(pInfo->pStreamAggSup, ts, pInfo->pUpdated);

View File

@ -19,6 +19,7 @@
#include "storageapi.h"
#include "streamexecutorInt.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "ttime.h"
@ -627,3 +628,9 @@ _error:
(*ppOptInfo) = NULL;
return code;
}
void removeDuplicateTs(SArray* pTsArrray) {
__compar_fn_t fn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, TSDB_ORDER_ASC);
taosArraySort(pTsArrray, fn);
taosArrayRemoveDuplicate(pTsArrray, fn, NULL);
}

View File

@ -177,6 +177,8 @@ void destroyStreamTimeSliceOperatorInfo(void* param) {
taosArrayDestroy(pInfo->historyWins);
taosArrayDestroy(pInfo->pCloseTs);
taosMemoryFreeClear(param);
}
@ -1449,11 +1451,15 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) {
SWinKey* pKey = (SWinKey*)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index);
qDebug("===stream=== build interp res. key:%" PRId64 ",groupId:%" PRId64, pKey->ts, pKey->groupId);
qDebug("===stream=== build interp res. key:%" PRId64 ",groupId:%" PRIu64, pKey->ts, pKey->groupId);
if (pBlock->info.id.groupId == 0) {
pBlock->info.id.groupId = pKey->groupId;
} else if (pBlock->info.id.groupId != pKey->groupId) {
break;
if (pBlock->info.rows > 0) {
break;
} else {
pBlock->info.id.groupId = pKey->groupId;
}
}
SSlicePoint curPoint = {.key.ts = pKey->ts, .key.groupId = pKey->groupId};
SSlicePoint prevPoint = {0};
@ -1813,8 +1819,8 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
goto _end;
} break;
case STREAM_GET_RESULT: {
code = setAllResultKey(pAggSup, pBlock->info.window.skey, pInfo->pUpdatedMap);
QUERY_CHECK_CODE(code, lino, _end);
void* pPushRes = taosArrayPush(pInfo->pCloseTs, &pBlock->info.window.skey);
QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno);
continue;
}
default:
@ -1831,12 +1837,29 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
QUERY_CHECK_CODE(code, lino, _end);
}
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
SWinKey* pKey = (SWinKey*)tSimpleHashGetKey(pIte, NULL);
void* tmp = taosArrayPush(pInfo->pUpdated, pKey);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
if (taosArrayGetSize(pInfo->pCloseTs) > 0) {
removeDuplicateTs(pInfo->pCloseTs);
int32_t size = taosArrayGetSize(pInfo->pCloseTs);
qDebug("build stream result, ts count:%d", size);
for (int32_t i = 0; i < size; i++) {
TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i);
code = buildAllResultKey(&pInfo->streamAggSup, ts, pInfo->pUpdated);
QUERY_CHECK_CODE(code, lino, _end);
}
qDebug("build stream result, ts count:%d", taosArrayGetSize(pInfo->pUpdated));
taosArrayClear(pInfo->pCloseTs);
if (size > 1024) {
taosArrayDestroy(pInfo->pCloseTs);
pInfo->pCloseTs = taosArrayInit(1024, sizeof(TSKEY));
}
} else {
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
SWinKey* pKey = (SWinKey*)tSimpleHashGetKey(pIte, NULL);
void* tmp = taosArrayPush(pInfo->pUpdated, pKey);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
}
}
taosArraySort(pInfo->pUpdated, winKeyCmprImpl);
@ -2088,6 +2111,10 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
if (pHandle) {
pInfo->isHistoryOp = pHandle->fillHistory;
}
pInfo->pCloseTs = taosArrayInit(1024, sizeof(TSKEY));
QUERY_CHECK_NULL(pInfo->pCloseTs, code, lino, _error, terrno);
pInfo->pOperator = pOperator;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC;

View File

@ -275,8 +275,9 @@ class TDTestCase:
time.sleep(self.tdCom.dataDict["interval"])
tdSql.query("show streams")
tdLog.info(f"tdSql.queryResult:{tdSql.queryResult},tdSql.queryRows:{tdSql.queryRows}")
localQueryResult = tdSql.queryResult
for stream_number in range(tdSql.queryRows):
stream_name = tdSql.queryResult[stream_number][0]
stream_name = localQueryResult[stream_number][0]
tdCom.check_stream_task_status(
stream_name=stream_name, vgroups=2, stream_timeout=20,check_wal_info=False
)