diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 90adba6f4d..06c3b29132 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -420,6 +420,11 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* setQueryTimewindow(pReadHandle, pCond); if (pCond->numOfCols > 0) { + int32_t rowLen = 0; + for(int32_t i = 0; i < pCond->numOfCols; ++i) { + rowLen += pCond->colList[i].bytes; + } + // allocate buffer in order to load data blocks from file pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg)); if (pReadHandle->suppInfo.pstatis == NULL) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 5769e09917..fa840e1cd6 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -14,8 +14,6 @@ */ #include "executor.h" -#include -#include #include "executorimpl.h" #include "planner.h" #include "tdatablock.h" diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 761834f29c..168589148e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -586,8 +586,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow int32_t numOfRows = pCtx[k].input.numOfRows; int32_t startOffset = pCtx[k].input.startRowIndex; - int32_t pos = (order == TSDB_ORDER_ASC) ? offset : offset - (forwardStep - 1); - pCtx[k].input.startRowIndex = pos; + pCtx[k].input.startRowIndex = offset; pCtx[k].input.numOfRows = forwardStep; if (tsCol != NULL) { @@ -758,45 +757,6 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt } } } - - // setBlockStatisInfo(&pCtx[i], pBlock, pOperator->pExpr[i].base.pColumns); - // uint32_t flag = pOperator->pExpr[i].base.pParam[0].pCol->flag; - // if (TSDB_COL_IS_NORMAL_COL(flag) /*|| (pCtx[i].functionId == FUNCTION_BLKINFO) || - // (TSDB_COL_IS_TAG(flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)*/) { - - // SColumn* pCol = pOperator->pExpr[i].base.pParam[0].pCol; - // if (pCtx[i].columnIndex == -1) { - // for(int32_t j = 0; j < pBlock->info.numOfCols; ++j) { - // SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j); - // if (pColData->info.colId == pCol->colId) { - // pCtx[i].columnIndex = j; - // break; - // } - // } - // } - - // uint32_t status = aAggs[pCtx[i].functionId].status; - // if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) { - // SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); - // In case of the top/bottom query again the nest query result, which has no timestamp column - // don't set the ptsList attribute. - // if (tsInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { - // pCtx[i].ptsList = (int64_t*) tsInfo->pData; - // } else { - // pCtx[i].ptsList = NULL; - // } - // } - // } else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) { - // SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo; - // SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex); - // - // pCtx[i].pInput = p->pData; - // assert(p->info.colId == pColIndex->info.colId && pCtx[i].inputType == p->info.type); - // for(int32_t j = 0; j < pBlock->info.rows; ++j) { - // char* dst = p->pData + j * p->info.bytes; - // taosVariantDump(&pOperator->pExpr[i].base.param[1], dst, p->info.type, true); - // } - // } } return code; @@ -875,7 +835,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc taosArrayDestroy(pBlockList); return code; } - + int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; colInfoDataEnsureCapacity(pResColData, startOffset, pResult->info.capacity); colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); @@ -2842,9 +2802,11 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = {0}; - idata.info.type = pSchema[i].type; + + idata.info.type = pSchema[i].type; idata.info.bytes = pSchema[i].bytes; idata.info.colId = pSchema[i].colId; + idata.hasNull = true; taosArrayPush(pBlock->pDataBlock, &idata); if (IS_VAR_DATA_TYPE(idata.info.type)) { @@ -5341,4 +5303,4 @@ int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t ke pCatchSup->pWindowHashTable = taosHashInit(10000, hashFn, true, HASH_NO_LOCK);; return TSDB_CODE_SUCCESS; } - + diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1097e6d84c..f5122a26ef 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -898,11 +898,11 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan if (upRes) { pInfo->pUpdateRes = upRes; - if (upRes->info.type = STREAM_REPROCESS) { + if (upRes->info.type == STREAM_REPROCESS) { pInfo->updateResIndex = 0; prepareDataScan(pInfo); pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES; - } else if (upRes->info.type = STREAM_INVERT) { + } else if (upRes->info.type == STREAM_INVERT) { pInfo->scanMode = STREAM_SCAN_FROM_RES; return upRes; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 62732aa1f9..3be131e550 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1072,7 +1072,9 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SIntervalAggOperatorInfo* pInfo = pOperator->info; - int32_t order = TSDB_ORDER_ASC; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + pInfo->order = TSDB_ORDER_ASC; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -1086,11 +1088,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } - // STimeWindow win = {0}; SOperatorInfo* downstream = pOperator->pDownstream[0]; SArray* pUpdated = NULL; - while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -1103,16 +1103,18 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { // The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the // caller. Note that all the time window are not close till now. // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true); + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true); if (pInfo->invertible) { setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type); } + if (pBlock->info.type == STREAM_REPROCESS) { doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock); + qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); continue; } - pInfo->order = TSDB_ORDER_ASC; + pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); } diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index e46cb81561..3b1e66f2ad 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -207,6 +207,9 @@ bool fmIsInvertible(int32_t funcId) { case FUNCTION_TYPE_SUM: case FUNCTION_TYPE_STDDEV: case FUNCTION_TYPE_AVG: + case FUNCTION_TYPE_WSTARTTS: + case FUNCTION_TYPE_WENDTS: + case FUNCTION_TYPE_WDURATION: res = true; break; default: diff --git a/tests/script/tsim/tstream/basic1.sim b/tests/script/tsim/tstream/basic1.sim index 0997e313c8..8e6391eb0b 100644 --- a/tests/script/tsim/tstream/basic1.sim +++ b/tests/script/tsim/tstream/basic1.sim @@ -137,7 +137,11 @@ endi sql insert into t1 values(1648791223001,12,14,13,11.1); sleep 500 -sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; +sql select * from streamt; + +print count(*) , count(d) , sum(a) , max(b) , min(c) +print 0: $data00 , $data01 , $data02 , $data03 , $data04 , $data05 +print 1: $data10 , $data11 , $data12 , $data13 , $data14 , $data15 if $rows != 4 then print ======$rows diff --git a/tests/system-test/7-tmq/subscribeDb1.py b/tests/system-test/7-tmq/subscribeDb1.py index 7319fadc80..a00bed30e4 100644 --- a/tests/system-test/7-tmq/subscribeDb1.py +++ b/tests/system-test/7-tmq/subscribeDb1.py @@ -354,7 +354,7 @@ class TDTestCase: event.wait() tdLog.info("start consume processor") - pollDelay = 5 + pollDelay = 15 showMsg = 1 showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)