diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 15f3246013..043f2d1d12 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -196,7 +196,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock); int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo); int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); -int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); +int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, size_t existRows, uint32_t numOfRows); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 14b7fe5d0e..975d097205 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -203,6 +203,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co if (pSource->hasNull) { pColumnInfoData->hasNull = pSource->hasNull; } + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { // Handle the bitmap char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2)); @@ -1075,8 +1076,8 @@ void blockDataCleanup(SSDataBlock* pDataBlock) { } } -int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) { - if (0 == numOfRows) { +int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, size_t existRows, uint32_t numOfRows) { + if (0 == numOfRows || numOfRows <= existRows) { return TSDB_CODE_SUCCESS; } @@ -1087,19 +1088,16 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) } pColumn->varmeta.offset = (int32_t*)tmp; - memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows); - - pColumn->varmeta.length = 0; - pColumn->varmeta.allocLen = 0; - taosMemoryFreeClear(pColumn->pData); + memset(&pColumn->varmeta.offset[existRows], 0, sizeof(int32_t) * (numOfRows - existRows)); } else { char* tmp = taosMemoryRealloc(pColumn->nullbitmap, BitmapLen(numOfRows)); if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + int32_t oldLen = BitmapLen(existRows); pColumn->nullbitmap = tmp; - memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows)); + memset(&pColumn->nullbitmap[oldLen], 0, BitmapLen(numOfRows) - oldLen); if (pColumn->info.type == TSDB_DATA_TYPE_NULL) { return TSDB_CODE_SUCCESS; @@ -1135,7 +1133,7 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); - code = colInfoDataEnsureCapacity(p, numOfRows); + code = colInfoDataEnsureCapacity(p, pDataBlock->info.rows, numOfRows); if (code) { return code; } @@ -1180,7 +1178,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); - int32_t code = colInfoDataEnsureCapacity(pDst, pDataBlock->info.rows); + int32_t code = colInfoDataEnsureCapacity(pDst, 0, pDataBlock->info.rows); if (code != TSDB_CODE_SUCCESS) { return NULL; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 30524471a9..f4ef55f865 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -52,6 +52,10 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { if (pIter->len == 0) { pIter->len += sizeof(SSubmitReq); } else { + if (pIter->len >= pIter->totalLen) { + ASSERT(0); + } + SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen); ASSERT(pIter->len > 0); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 9282f7197e..cf62ea8714 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -141,7 +141,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { colInfo.info.colId = pColSchema->colId; colInfo.info.type = pColSchema->type; - if (colInfoDataEnsureCapacity(&colInfo, numOfRows) < 0) { + if (colInfoDataEnsureCapacity(&colInfo, 0, numOfRows) < 0) { taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock); return NULL; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ab933f840a..2ad34d1561 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -392,7 +392,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, SColumnInfoData colInfo = {{0}, 0}; colInfo.info = pCond->colList[i]; - int32_t code = colInfoDataEnsureCapacity(&colInfo, pReadHandle->outputCapacity); + int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity); if (code != TSDB_CODE_SUCCESS) { goto _end; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5ef1754913..230ccf6ace 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -342,7 +342,7 @@ typedef struct STableScanInfo { int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; - double sampleRatio; // data block sample ratio + double sampleRatio; // data block sample ratio, 1 by default SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. } STableScanInfo; @@ -395,7 +395,6 @@ typedef struct SOptrBasicInfo { int32_t* rowCellInfoOffset; // offset value for each row result cell info SqlFunctionCtx* pCtx; SSDataBlock* pRes; - int32_t capacity; // TODO remove it } SOptrBasicInfo; // TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset @@ -405,23 +404,29 @@ typedef struct SAggSupporter { SArray* pResultRowArrayList; // The array list that contains the Result rows char* keyBuf; // window key buffer SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file - int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row } SAggSupporter; -typedef struct STableIntervalOperatorInfo { - SOptrBasicInfo binfo; // basic info - SGroupResInfo groupResInfo; // multiple results build supporter - SInterval interval; // interval info - int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. - STimeWindow win; // query time range - bool timeWindowInterpo; // interpolation needed or not - char** pRow; // previous row/tuple of already processed datablock - SAggSupporter aggSup; // aggregate supporter - STableQueryInfo* pCurrent; // current tableQueryInfo struct - int32_t order; // current SSDataBlock scan order - EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] - SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator. +typedef struct STimeWindowSupp { + int8_t calTrigger; + int64_t waterMark; SColumnInfoData timeWindowData; // query time window info for scalar function execution. +} STimeWindowAggSupp; + +typedef struct STableIntervalOperatorInfo { + SOptrBasicInfo binfo; // basic info + SGroupResInfo groupResInfo; // multiple results build supporter + SInterval interval; // interval info + int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. + STimeWindow win; // query time range + bool timeWindowInterpo; // interpolation needed or not + char** pRow; // previous row/tuple of already processed datablock + SAggSupporter aggSup; // aggregate supporter + STableQueryInfo* pCurrent; // current tableQueryInfo struct + int32_t order; // current SSDataBlock scan order + EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator. + STimeWindowAggSupp twAggSup; } STableIntervalOperatorInfo; typedef struct SAggOperatorInfo { @@ -439,19 +444,19 @@ typedef struct SAggOperatorInfo { } SAggOperatorInfo; typedef struct SProjectOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SSDataBlock* existDataBlock; - SArray* pPseudoColInfo; - SLimit limit; - SLimit slimit; + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SSDataBlock* existDataBlock; + SArray* pPseudoColInfo; + SLimit limit; + SLimit slimit; - uint64_t groupId; - int64_t curSOffset; - int64_t curGroupOutput; + uint64_t groupId; + int64_t curSOffset; + int64_t curGroupOutput; - int64_t curOffset; - int64_t curOutput; + int64_t curOffset; + int64_t curOutput; } SProjectOperatorInfo; typedef struct SFillOperatorInfo { @@ -466,10 +471,10 @@ typedef struct SFillOperatorInfo { } SFillOperatorInfo; typedef struct { - char* pData; - bool isNull; - int16_t type; - int32_t bytes; + char* pData; + bool isNull; + int16_t type; + int32_t bytes; } SGroupKeys, SStateKeys; typedef struct SGroupbyOperatorInfo { @@ -488,9 +493,9 @@ typedef struct SGroupbyOperatorInfo { } SGroupbyOperatorInfo; typedef struct SDataGroupInfo { - uint64_t groupId; - int64_t numOfRows; - SArray* pPageList; + uint64_t groupId; + int64_t numOfRows; + SArray* pPageList; } SDataGroupInfo; // The sort in partition may be needed later. @@ -505,9 +510,8 @@ typedef struct SPartitionOperatorInfo { SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file int32_t rowCapacity; // maximum number of rows for each buffer page int32_t* columnOffset; // start position for each column data - - void* pGroupIter; // group iterator - int32_t pageIndex; // page index of current group + void* pGroupIter; // group iterator + int32_t pageIndex; // page index of current group } SPartitionOperatorInfo; typedef struct SWindowRowsSup { @@ -518,13 +522,13 @@ typedef struct SWindowRowsSup { } SWindowRowsSup; typedef struct SSessionAggOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SGroupResInfo groupResInfo; - SWindowRowsSup winSup; - bool reptScan; // next round scan - int64_t gap; // session window gap - SColumnInfoData timeWindowData; // query time window info for scalar function execution. + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + bool reptScan; // next round scan + int64_t gap; // session window gap + STimeWindowAggSupp twAggSup; } SSessionAggOperatorInfo; typedef struct STimeSliceOperatorInfo { @@ -534,14 +538,14 @@ typedef struct STimeSliceOperatorInfo { } STimeSliceOperatorInfo; typedef struct SStateWindowOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SGroupResInfo groupResInfo; - SWindowRowsSup winSup; - int32_t colIndex; // start row index - bool hasKey; - SStateKeys stateKey; - SColumnInfoData timeWindowData; // query time window info for scalar function execution. + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + int32_t colIndex; // start row index + bool hasKey; + SStateKeys stateKey; + STimeWindowAggSupp twAggSup; // bool reptScan; } SStateWindowOperatorInfo; @@ -602,7 +606,8 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); void operatorDummyCloseFn(void* param, int32_t numOfCols); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, - int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); + SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); +void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, int32_t* rowCellOffset); void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, @@ -638,10 +643,11 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot, - const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, + STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); + SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, @@ -654,7 +660,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, - SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); + SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 4af7e563e6..4863b03fb9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -34,6 +34,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pOperator->status = OP_NOT_OPENED; return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id); } else { + pOperator->status = OP_NOT_OPENED; + SStreamBlockScanInfo* pInfo = pOperator->info; // the block type can not be changed in the streamscan operators diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index ed2454faae..87ecdde3ab 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -20,23 +20,19 @@ #include "tname.h" #include "os.h" -#include "parser.h" #include "tdatablock.h" -#include "texception.h" #include "tglobal.h" #include "tmsg.h" #include "tsort.h" #include "ttime.h" #include "executorimpl.h" -#include "function.h" #include "query.h" #include "tcompare.h" #include "tcompression.h" #include "thash.h" #include "vnode.h" #include "ttypes.h" -#include "vnode.h" #define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN) #define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN) @@ -836,7 +832,7 @@ static void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQuer pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP; pColData->info.bytes = sizeof(int64_t); - colInfoDataEnsureCapacity(pColData, 5); + colInfoDataEnsureCapacity(pColData, 0, 5); colDataAppendInt64(pColData, 0, &pQueryWindow->skey); colDataAppendInt64(pColData, 1, &pQueryWindow->ekey); @@ -1065,7 +1061,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc } ASSERT(!IS_VAR_DATA_TYPE(type)); - colInfoDataEnsureCapacity(pColInfo, numOfRows); + colInfoDataEnsureCapacity(pColInfo, 0, numOfRows); if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) { int64_t v = pFuncParam->param.i; @@ -1101,6 +1097,8 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt pInput->totalRows = pBlock->info.rows; pInput->numOfRows = pBlock->info.rows; pInput->startRowIndex = 0; + + pInput->pPTS = taosArrayGet(pBlock->pDataBlock, 0); // todo set the correct timestamp column ASSERT(pInput->pData[j] != NULL); } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { if (createDummyCol) { @@ -1175,31 +1173,43 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* setPseudoOutputColInfo(pResult, pCtx, pPseudoList); pResult->info.groupId = pSrcBlock->info.groupId; + int32_t numOfRows = 0; + for (int32_t k = 0; k < numOfOutput; ++k) { int32_t outputSlotId = pExpr[k].base.resSchema.slotId; SqlFunctionCtx* pfCtx = &pCtx[k]; if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); - colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows); + if (pResult->info.rows > 0) { + colDataMergeCol(pColInfoData, pResult->info.rows, pfCtx->input.pData[0], pfCtx->input.numOfRows); + } else { + colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows); + } - pResult->info.rows = pSrcBlock->info.rows; + numOfRows = pfCtx->input.numOfRows; } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) { SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); + + int32_t offset = pResult->info.rows; for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) { - colDataAppend(pColInfoData, i, taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType), TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType); + colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType), TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType); } - pResult->info.rows = pSrcBlock->info.rows; + + numOfRows = pSrcBlock->info.rows; } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) { SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); taosArrayPush(pBlockList, &pSrcBlock); - SScalarParam dest = {0}; - dest.columnData = taosArrayGet(pResult->pDataBlock, outputSlotId); + SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId); + SColumnInfoData idata = {.info = pResColData->info}; + SScalarParam dest = {.columnData = &idata}; scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); - pResult->info.rows = dest.numOfRows; + colDataMergeCol(pResColData, pResult->info.rows, &idata, dest.numOfRows); + + numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) { ASSERT(!fmIsAggFunc(pfCtx->functionId)); @@ -1216,28 +1226,33 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); pfCtx->offset = pResult->info.rows; // set the start offset + // set the timestamp(_rowts) output buffer if (taosArrayGetSize(pPseudoList) > 0) { int32_t* outputColIndex = taosArrayGet(pPseudoList, 0); pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput; } - int32_t numOfRows = pfCtx->fpSet.process(pfCtx); - pResult->info.rows += numOfRows; + numOfRows = pfCtx->fpSet.process(pfCtx); } else { SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); taosArrayPush(pBlockList, &pSrcBlock); - SScalarParam dest = {0}; - dest.columnData = taosArrayGet(pResult->pDataBlock, outputSlotId); + SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId); + SColumnInfoData idata = {.info = pResColData->info}; + SScalarParam dest = {.columnData = &idata}; scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); - pResult->info.rows = dest.numOfRows; + colDataMergeCol(pResColData, pResult->info.rows, &idata, dest.numOfRows); + + numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); } } else { ASSERT(0); } } + + pResult->info.rows += numOfRows; } void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs, @@ -1503,8 +1518,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe // window start key interpolation doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep, pInfo->order, false); - updateTimeWindowInfo(&pInfo->timeWindowData, &win, true); - doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); + doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); STimeWindow nextWin = win; while (1) { @@ -1535,8 +1550,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, pInfo->order, false); - updateTimeWindowInfo(&pInfo->timeWindowData, &nextWin, true); - doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); + doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } if (pInfo->timeWindowInterpo) { @@ -1607,8 +1622,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } // pInfo->numOfRows data belong to the current session window - updateTimeWindowInfo(&pInfo->timeWindowData, &window, false); - doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); + doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j); @@ -1624,8 +1639,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } - updateTimeWindowInfo(&pInfo->timeWindowData, &pRowSup->win, false); - doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); + doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { @@ -4453,7 +4468,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { SSortHandle* pHandle = pInfo->pSortHandle; SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false); - blockDataEnsureCapacity(pDataBlock, pInfo->binfo.capacity); + blockDataEnsureCapacity(pDataBlock, pOperator->resultInfo.capacity); while (1) { blockDataCleanup(pDataBlock); @@ -4465,7 +4480,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { // build datablock for merge for one group appendOneRowToDataBlock(pDataBlock, pTupleHandle); - if (pDataBlock->info.rows >= pInfo->binfo.capacity) { + if (pDataBlock->info.rows >= pOperator->resultInfo.capacity) { break; } } @@ -4500,7 +4515,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSortedMergeOperatorInfo* pInfo = pOperator->info; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->binfo.capacity); + return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity); } int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; @@ -4607,7 +4622,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t pInfo->bufPageSize = 1024; pInfo->pSortInfo = pSortInfo; - pInfo->binfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize); + pOperator->resultInfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize); pOperator->name = "SortedMerge"; // pOperator->operatorType = OP_SortedMerge; @@ -4879,6 +4894,7 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasi if (!resultRow) { longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT); } + // add a new result set for a new group SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset}; taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition)); @@ -4915,6 +4931,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) SSDataBlock* pRes = pInfo->pRes; blockDataCleanup(pRes); + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } #if 0 if (pProjectInfo->existDataBlock) { // TODO refactor @@ -5106,8 +5126,8 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro return NULL; } - blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); - toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { @@ -5127,7 +5147,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup } if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -5161,8 +5181,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup finalizeUpdatedResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); - blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); - toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); ASSERT(pInfo->binfo.pRes->info.rows > 0); pOperator->status = OP_RES_TO_RETURN; @@ -5263,8 +5283,8 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); OPTR_SET_OPENED(pOperator); - blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); - toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { @@ -5324,8 +5344,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } - updateTimeWindowInfo(&pInfo->timeWindowData, &window, false); - doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); + doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j); @@ -5341,8 +5361,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } - updateTimeWindowInfo(&pInfo->timeWindowData, &pRowSup->win, false); - doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); + doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { @@ -5355,7 +5375,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -5386,8 +5406,8 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); - blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity); - toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); + toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -5404,7 +5424,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -5435,8 +5455,8 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); - blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity); - toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); + toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -5614,15 +5634,23 @@ static void cleanupAggSup(SAggSupporter* pAggSup) { } int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, - int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) { + SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) { pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset); pBasicInfo->pRes = pResultBlock; - pBasicInfo->capacity = numOfRows; doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey); return TSDB_CODE_SUCCESS; } +void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) { + pOperator->resultInfo.capacity = numOfRows; + pOperator->resultInfo.threshold = numOfRows * 0.75; + + if (pOperator->resultInfo.threshold == 0) { + pOperator->resultInfo.capacity = numOfRows; + } +} + static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) { STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); if (pTableQueryInfo == NULL) { @@ -5658,7 +5686,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* int32_t numOfRows = 1; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, keyBufSize, pTaskInfo->id.str); + + initResultSizeInfo(pOperator, numOfRows); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, keyBufSize, pTaskInfo->id.str); pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) { goto _error; @@ -5808,7 +5838,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p int32_t numOfCols = num; int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str); + + initResultSizeInfo(pOperator, numOfRows); + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols); @@ -5837,28 +5869,30 @@ _error: } SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot, - const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, + STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, + SExecTaskInfo* pTaskInfo) { STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->order = TSDB_ORDER_ASC; - pInfo->interval = *pInterval; - pInfo->execModel = pTaskInfo->execModel; - - pInfo->win = pTaskInfo->window; - pInfo->win.skey = 0; - pInfo->win.ekey = INT64_MAX; - - pInfo->primaryTsIndex = primaryTsSlot; + pInfo->order = TSDB_ORDER_ASC; + pInfo->interval = *pInterval; + pInfo->execModel = pTaskInfo->execModel; + pInfo->win = pTaskInfo->window; + pInfo->win.skey = 0; + pInfo->win.ekey = INT64_MAX; + pInfo->primaryTsIndex = primaryTsSlotId; + pInfo->twAggSup = *pTwAggSupp; int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str); - initExecTimeWindowInfo(&pInfo->timeWindowData, &pInfo->win); + + initResultSizeInfo(pOperator, numOfRows); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { @@ -5927,7 +5961,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* return NULL; } -SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSup, + SExecTaskInfo* pTaskInfo) { SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5936,9 +5971,14 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf pInfo->colIndex = -1; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, 4096, pResBlock, keyBufSize, pTaskInfo->id.str); + + initResultSizeInfo(pOperator, 4096); + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); + pInfo->twAggSup = *pTwAggSup; + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + pOperator->name = "StateWindowOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW; pOperator->blockingOptr = true; @@ -5962,7 +6002,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf } SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo) { + SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo) { SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5971,13 +6011,16 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str); + + initResultSizeInfo(pOperator, numOfRows); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } + pInfo->twAggSup = *pTwAggSupp; initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); - initExecTimeWindowInfo(&pInfo->timeWindowData, &pTaskInfo->window); + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); pInfo->gap = gap; pInfo->binfo.pRes = pResBlock; @@ -6527,8 +6570,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision }; + STimeWindowAggSupp as = {.waterMark = pIntervalPhyNode->window.watermark, .calTrigger = pIntervalPhyNode->window.triggerType}; + int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->window.pTspk)->slotId; - pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo); + pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, &as, pTableGroupInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; @@ -6539,9 +6584,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; + STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark, .calTrigger = pSessionNode->window.triggerType}; + SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); + pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, &as, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode; SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys); @@ -6552,9 +6599,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == type) { SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode; + STimeWindowAggSupp as = {.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType}; + SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo); + pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) { SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*) pPhyNode; SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 8739371dd9..8018a8dd31 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou SSDataBlock* pRes = pInfo->binfo.pRes; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + toSDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -307,11 +307,11 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou // pInfo->binfo.rowCellInfoOffset); // } - blockDataEnsureCapacity(pRes, pInfo->binfo.capacity); + blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity); initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); while(1) { - toSDatablock(pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + toSDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); doFilter(pInfo->pCondition, pRes); bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo); @@ -348,7 +348,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx goto _error; } - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str); + initResultSizeInfo(pOperator, 4096); + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); pOperator->name = "GroupbyAggOperator"; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b3050006b7..6b06f3e89b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -539,7 +539,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) SStreamBlockScanInfo* pInfo = pOperator->info; pTaskInfo->code = pOperator->_openFn(pOperator); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -547,6 +547,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) size_t total = taosArrayGetSize(pInfo->pBlockLists); if (pInfo->validBlockIndex >= total) { doClearBufferedBlocks(pInfo); + pOperator->status = OP_EXEC_DONE; return NULL; } @@ -560,11 +561,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { terrno = pTaskInfo->code; + pOperator->status = OP_EXEC_DONE; return NULL; } if (pBlockInfo->rows == 0) { - return NULL; + break; } SArray* pCols = tqRetrieveDataBlock(pInfo->readerHandle); @@ -583,6 +585,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) if (pInfo->pRes->pDataBlock == NULL) { // TODO add log + pOperator->status = OP_EXEC_DONE; pTaskInfo->code = terrno; return NULL; } @@ -594,6 +597,10 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) pInfo->numOfExec++; pInfo->numOfRows += pBlockInfo->rows; + if (pBlockInfo->rows == 0) { + pOperator->status = OP_EXEC_DONE; + } + return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; } } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 0d7e984ae7..2b1e4b9406 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -767,17 +767,105 @@ void percentileFinalize(SqlFunctionCtx* pCtx) { bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); - pEnv->calcMemSize = pNode->node.resType.bytes; + pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t); return true; } -// TODO fix this -// This ordinary first function only handle the data block in ascending order -int32_t firstFunction(SqlFunctionCtx *pCtx) { - if (pCtx->order == TSDB_ORDER_DESC) { +static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) { + if (pTsColInfo == NULL) { return 0; } + return *(TSKEY*) colDataGetData(pTsColInfo, rowIndex); +} + +// This ordinary first function does not care if current scan is ascending order or descending order scan +// the OPTIMIZED version of first function will only handle the ascending order scan +int32_t firstFunction(SqlFunctionCtx *pCtx) { + int32_t numOfElems = 0; + + SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); + char* buf = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pInputCol = pInput->pData[0]; + + int32_t bytes = pInputCol->info.bytes; + + // All null data column, return directly. + if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { + ASSERT(pInputCol->hasNull == true); + return 0; + } + + SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet)? pInput->pColumnDataAgg[0]:NULL; + + TSKEY startKey = getRowPTs(pInput->pPTS, 0); + TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1); + + int32_t blockDataOrder = (startKey <= endKey)? TSDB_ORDER_ASC:TSDB_ORDER_DESC; + + if (blockDataOrder == TSDB_ORDER_ASC) { + // filter according to current result firstly + if (pResInfo->numOfRes > 0) { + TSKEY ts = *(TSKEY*)(buf + bytes); + if (ts < startKey) { + return TSDB_CODE_SUCCESS; + } + } + + for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) { + if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { + continue; + } + + char* data = colDataGetData(pInputCol, i); + TSKEY cts = getRowPTs(pInput->pPTS, i); + + if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { + memcpy(buf, data, bytes); + *(TSKEY*)(buf + bytes) = cts; +// DO_UPDATE_TAG_COLUMNS(pCtx, ts); + + pResInfo->numOfRes = 1; + } + + numOfElems++; + } + } else { + // in case of descending order time stamp serial, which usually happens as the results of the nest query, + // all data needs to be check. + if (pResInfo->numOfRes > 0) { + TSKEY ts = *(TSKEY*)(buf + bytes); + if (ts < endKey) { + return TSDB_CODE_SUCCESS; + } + } + + for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { + if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { + continue; + } + + char* data = colDataGetData(pInputCol, i); + TSKEY cts = getRowPTs(pInput->pPTS, i); + + if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { + memcpy(buf, data, bytes); + *(TSKEY*)(buf + bytes) = cts; +// DO_UPDATE_TAG_COLUMNS(pCtx, ts); + pResInfo->numOfRes = 1; + } + + numOfElems++; + } + } + + SET_VAL(pResInfo, numOfElems, 1); + return TSDB_CODE_SUCCESS; +} + +int32_t lastFunction(SqlFunctionCtx *pCtx) { int32_t numOfElems = 0; SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); @@ -792,48 +880,6 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) { return 0; } - // Check for the first not null data - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { - if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) { - continue; - } - - char* data = colDataGetData(pInputCol, i); - memcpy(buf, data, pInputCol->info.bytes); - // TODO handle the subsidary value -// if (pCtx->ptsList != NULL) { -// TSKEY k = GET_TS_DATA(pCtx, i); -// DO_UPDATE_TAG_COLUMNS(pCtx, k); -// } - - pResInfo->complete = true; - numOfElems++; - break; - } - - SET_VAL(pResInfo, numOfElems, 1); - return TSDB_CODE_SUCCESS; -} - -int32_t lastFunction(SqlFunctionCtx *pCtx) { - if (pCtx->order != TSDB_ORDER_DESC) { - return 0; - } - - int32_t numOfElems = 0; - - SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - char* buf = GET_ROWCELL_INTERBUF(pResInfo); - - SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pInputCol = pInput->pData[0]; - - // All null data column, return directly. - if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) { - ASSERT(pInputCol->hasNull == true); - return 0; - } - if (pCtx->order == TSDB_ORDER_DESC) { for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) { diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 85a42387c4..1e8b732b6a 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -461,8 +461,7 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int if (isNullStr(pToken)) { if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { - int64_t tmpVal = 0; - return func(pMsgBuf, &tmpVal, pSchema->bytes, param); + return buildSyntaxErrMsg(pMsgBuf, "primary timestamp can not be null", pToken->z); } return func(pMsgBuf, NULL, 0, param); diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 0432ae1df8..337a773d5c 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -30,7 +30,7 @@ SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows) { pColumnData->info.scale = pType->scale; pColumnData->info.precision = pType->precision; - int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows); + int32_t code = colInfoDataEnsureCapacity(pColumnData, 0, numOfRows); if (code != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pColumnData); @@ -45,7 +45,7 @@ int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) { in.columnData = createColumnInfoData(&pValueNode->node.resType, 1); colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false); - colInfoDataEnsureCapacity(out->columnData, 1); + colInfoDataEnsureCapacity(out->columnData, 0, 1); int32_t code = vectorConvertImpl(&in, out); sclFreeParam(&in); diff --git a/source/libs/scalar/test/filter/filterTests.cpp b/source/libs/scalar/test/filter/filterTests.cpp index 26ef5dbd44..42998aba00 100644 --- a/source/libs/scalar/test/filter/filterTests.cpp +++ b/source/libs/scalar/test/filter/filterTests.cpp @@ -155,7 +155,7 @@ void flttMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in res->info.numOfCols++; SColumnInfoData *pColumn = (SColumnInfoData *)taosArrayGetLast(res->pDataBlock); - colInfoDataEnsureCapacity(pColumn, rowNum); + colInfoDataEnsureCapacity(pColumn, 0, rowNum); for (int32_t i = 0; i < rowNum; ++i) { colDataAppend(pColumn, i, (const char *)value, false); diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp index 61ef2fdce2..09d6528dd8 100644 --- a/source/libs/scalar/test/scalar/scalarTests.cpp +++ b/source/libs/scalar/test/scalar/scalarTests.cpp @@ -99,7 +99,7 @@ void scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *s SColumnInfoData idata = {0}; idata.info = *colInfo; - colInfoDataEnsureCapacity(&idata, rows); + colInfoDataEnsureCapacity(&idata, 0, rows); taosArrayPush(res->pDataBlock, &idata); @@ -186,7 +186,7 @@ void scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in res->info.numOfCols++; SColumnInfoData *pColumn = (SColumnInfoData *)taosArrayGetLast(res->pDataBlock); - colInfoDataEnsureCapacity(pColumn, rowNum); + colInfoDataEnsureCapacity(pColumn, 0, rowNum); for (int32_t i = 0; i < rowNum; ++i) { colDataAppend(pColumn, i, (const char *)value, false); @@ -1467,7 +1467,7 @@ void scltMakeDataBlock(SScalarParam **pInput, int32_t type, void *pVal, int32_t input->numOfRows = num; input->columnData->info = createColumnInfo(0, type, bytes); - colInfoDataEnsureCapacity(input->columnData, num); + colInfoDataEnsureCapacity(input->columnData, 0, num); if (setVal) { for (int32_t i = 0; i < num; ++i) { diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 809d008aa7..6d3a259079 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -31,9 +31,12 @@ #define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode)) #define GET_HASH_PNODE(_n) ((SHashNode *)((char*)(_n) - sizeof(SHashNode))) -#define FREE_HASH_NODE(_n) \ - do { \ - taosMemoryFreeClear(_n); \ +#define FREE_HASH_NODE(_fp, _n) \ + do { \ +/* if (_fp != NULL) { \ + (_fp)(_n); \ + }*/ \ + taosMemoryFreeClear(_n); \ } while (0); struct SHashNode { @@ -195,7 +198,7 @@ static FORCE_INLINE void doUpdateHashNode(SHashObj *pHashObj, SHashEntry* pe, SH if (pNode->refCount <= 0) { pNewNode->next = pNode->next; - FREE_HASH_NODE(pNode); + FREE_HASH_NODE(pHashObj->freeFp, pNode); } else { pNewNode->next = pNode; pe->num++; @@ -310,7 +313,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo return -1; } - uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); + uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); // need the resize process, write lock applied if (HASH_NEED_RESIZE(pHashObj)) { @@ -523,7 +526,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) { pe->num--; atomic_sub_fetch_64(&pHashObj->size, 1); - FREE_HASH_NODE(pNode); + FREE_HASH_NODE(pHashObj->freeFp, pNode); } } else { prevNode = pNode; @@ -558,7 +561,7 @@ void taosHashClear(SHashObj *pHashObj) { while (pNode) { pNext = pNode->next; - FREE_HASH_NODE(pNode); + FREE_HASH_NODE(pHashObj->freeFp, pNode); pNode = pNext; } @@ -769,7 +772,7 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) { pe->num--; atomic_sub_fetch_64(&pHashObj->size, 1); - FREE_HASH_NODE(pOld); + FREE_HASH_NODE(pHashObj->freeFp, pOld); } } else { // uError("pNode:%p data:%p is not there!!!", pNode, p);