diff --git a/src/os/src/detail/osString.c b/src/os/src/detail/osString.c index 2c49797d83..fdc70b667e 100644 --- a/src/os/src/detail/osString.c +++ b/src/os/src/detail/osString.c @@ -59,6 +59,9 @@ bool taosMbsToUcs4(char *mbs, size_t mbsLength, char *ucs4, int32_t ucs4_max_len iconv_close(cd); if (len != NULL) { *len = (int32_t)(ucs4_max_len - outLeft); + if (*len < 0) { + return false; + } } return true; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 24547c1e0b..955dd734cf 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -464,6 +464,7 @@ typedef struct SSWindowOperatorInfo { TSKEY prevTs; // previous timestamp int32_t numOfRows; // number of rows int32_t start; // start row index + bool reptScan; // next round scan } SSWindowOperatorInfo; typedef struct SStateWindowOperatorInfo { @@ -473,7 +474,7 @@ typedef struct SStateWindowOperatorInfo { int32_t colIndex; // start row index int32_t start; char* prevData; // previous data - + bool reptScan; } SStateWindowOperatorInfo ; typedef struct SDistinctOperatorInfo { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 8cc25f1fe6..9e78354c19 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -30,6 +30,7 @@ #define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN) #define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN) +#define IS_REPEAT_SCAN(runtime) ((runtime)->scanFlag == REPEAT_SCAN) #define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN) #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) @@ -1337,6 +1338,10 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf int64_t gap = pOperator->pRuntimeEnv->pQueryAttr->sw.gap; pInfo->numOfRows = 0; + if (IS_REPEAT_SCAN(pRuntimeEnv) && !pInfo->reptScan) { + pInfo->reptScan = true; + pInfo->prevTs = INT64_MIN; + } TSKEY* tsList = (TSKEY*)pColInfoData->pData; for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { @@ -1346,7 +1351,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf pInfo->prevTs = tsList[j]; pInfo->numOfRows = 1; pInfo->start = j; - } else if (tsList[j] - pInfo->prevTs <= gap) { + } else if (tsList[j] - pInfo->prevTs <= gap && (tsList[j] - pInfo->prevTs) >= 0) { pInfo->curWindow.ekey = tsList[j]; pInfo->prevTs = tsList[j]; pInfo->numOfRows += 1; @@ -5178,6 +5183,10 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0); TSKEY* tsList = (TSKEY*)pTsColInfoData->pData; + if (IS_REPEAT_SCAN(pRuntimeEnv) && !pInfo->reptScan) { + pInfo->reptScan = true; + tfree(pInfo->prevData); + } pInfo->numOfRows = 0; for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { @@ -5764,6 +5773,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo)); pInfo->colIndex = -1; + pInfo->reptScan = false; pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -5791,7 +5801,8 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); - pInfo->prevTs = INT64_MIN; + pInfo->prevTs = INT64_MIN; + pInfo->reptScan = false; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "SessionWindowAggOperator";