diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 22aac46560..7839859e8b 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -184,6 +184,7 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u int32_t getJsonValueLen(const char* data); int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); +int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows); int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource, int32_t numOfRow2); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index dc0b915d77..2a8f80f030 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -118,6 +118,76 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con return 0; } +int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) { + if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { + return TSDB_CODE_SUCCESS; + } + + if (pColumnInfoData->varmeta.allocLen >= newSize) { + return TSDB_CODE_SUCCESS; + } + + if (pColumnInfoData->varmeta.allocLen < newSize) { + char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize); + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pColumnInfoData->pData = buf; + pColumnInfoData->varmeta.allocLen = newSize; + } + + return TSDB_CODE_SUCCESS; +} + +static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData, int32_t itemLen, int32_t numOfRows) { + ASSERT(pColumnInfoData->info.bytes >= itemLen); + size_t start = 1; + + // the first item + memcpy(pColumnInfoData->pData, pData, itemLen); + + int32_t t = 0; + int32_t count = log(numOfRows)/log(2); + while(t < count) { + int32_t xlen = 1 << t; + memcpy(pColumnInfoData->pData + start * itemLen + pColumnInfoData->varmeta.length, pColumnInfoData->pData, xlen * itemLen); + t += 1; + start += xlen; + } + + // the tail part + if (numOfRows > start) { + memcpy(pColumnInfoData->pData + start * itemLen + currentRow * itemLen, pColumnInfoData->pData, (numOfRows - start) * itemLen); + } + + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { + for(int32_t i = 0; i < numOfRows; ++i) { + pColumnInfoData->varmeta.offset[i + currentRow] = pColumnInfoData->varmeta.length + i * itemLen; + } + + pColumnInfoData->varmeta.length += numOfRows * itemLen; + } +} + +int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows) { + ASSERT(pData != NULL && pColumnInfoData != NULL); + + int32_t len = pColumnInfoData->info.bytes; + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { + len = varDataTLen(pData); + if (pColumnInfoData->varmeta.allocLen < (numOfRows + currentRow) * len) { + int32_t code = colDataReserve(pColumnInfoData, (numOfRows + currentRow) * len); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + } + + doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows); + return TSDB_CODE_SUCCESS; +} + static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource, int32_t numOfRow2) { if (numOfRow2 <= 0) return; @@ -1113,15 +1183,12 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) { if (IS_VAR_DATA_TYPE(pColumn->info.type)) { pColumn->varmeta.length = 0; - if (pColumn->varmeta.offset > 0) { + if (pColumn->varmeta.offset != NULL) { memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows); } } else { if (pColumn->nullbitmap != NULL) { memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows)); - if (pColumn->pData != NULL) { - memset(pColumn->pData, 0, pColumn->info.bytes * numOfRows); - } } } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c003f5a63f..c9ffd33039 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -758,7 +758,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn pReader->cost.blockLoadTime += elapsedTime; int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1; - tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 + tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows, pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 577f9772be..f16b485240 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -917,9 +917,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, - SNode* pCondition, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 96c20d6136..ed225e1dcb 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -571,6 +571,10 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* } *numOfExprs = numOfFuncs + numOfGroupKeys; + if (*numOfExprs == 0) { + return NULL; + } + SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo)); for (int32_t i = 0; i < (*numOfExprs); ++i) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e52cbf40a9..6804a1258c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4171,16 +4171,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; - - STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark, - .calTrigger = pSessionNode->window.triggerType}; - - SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; - - pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, - pPhyNode->pConditions, pTaskInfo); + pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) { pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8821dbd5a1..a1b00fb1fb 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -405,9 +405,15 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int data = (char*)p; } - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - colDataAppend(pColInfoData, i, data, - (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); + bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)); + if (isNullVal) { + colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows); + } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) { + colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows); + } else { // todo opt for json tag + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + colDataAppend(pColInfoData, i, data, false); + } } if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL && diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 26dd772292..e371e6d9cf 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -152,7 +152,7 @@ SSDataBlock* loadNextDataBlock(void* param) { void applyScalarFunction(SSDataBlock* pBlock, void* param) { SOperatorInfo* pOperator = param; SSortOperatorInfo* pSort = pOperator->info; - if (pOperator->exprSupp.pExprInfo != NULL) { + if (pOperator->exprSupp.pExprInfo != NULL && pOperator->exprSupp.numOfExprs > 0) { int32_t code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 802e1f2306..a14f554cf5 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2439,12 +2439,14 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); + colDataDestroy(&pInfo->twAggSup.timeWindowData); + + cleanupAggSup(&pInfo->aggSup); + cleanupGroupResInfo(&pInfo->groupResInfo); taosMemoryFreeClear(param); } -SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, - STimeWindowAggSupp* pTwAggSupp, SNode* pCondition, +SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SExecTaskInfo* pTaskInfo) { SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -2455,6 +2457,10 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); + int32_t numOfCols = 0; + SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); + SSDataBlock* pResBlock = createResDataBlock(pSessionNode->window.node.pOutputDataBlockDesc); + int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -2462,16 +2468,19 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo initBasicInfo(&pInfo->binfo, pResBlock); - pInfo->twAggSup = *pTwAggSupp; + pInfo->twAggSup.waterMark = pSessionNode->window.watermark; + pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType; + pInfo->gap = pSessionNode->gap; + initResultRowInfo(&pInfo->binfo.resultRowInfo); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->tsSlotId = tsSlotId; - pInfo->gap = gap; - pInfo->binfo.pRes = pResBlock; + pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; + pInfo->binfo.pRes = pResBlock; pInfo->winSup.prevTs = INT64_MIN; - pInfo->reptScan = false; - pInfo->pCondition = pCondition; + pInfo->reptScan = false; + pInfo->pCondition = pSessionNode->window.node.pConditions; + pOperator->name = "SessionWindowAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION; pOperator->blocking = true; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index a70afe25d6..2d889dd925 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1727,16 +1727,11 @@ int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pO char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; metaGetTableNameByUid(pInput->param, uid, str); - - for(int32_t i = 0; i < pInput->numOfRows; ++i) { - colDataAppend(pOutput->columnData, pOutput->numOfRows + i, str, false); - } - + colDataAppendNItems(pOutput->columnData, pOutput->numOfRows, str, pInput->numOfRows); pOutput->numOfRows += pInput->numOfRows; return TSDB_CODE_SUCCESS; } - /** Aggregation functions **/ int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { SColumnInfoData *pInputData = pInput->columnData; diff --git a/source/os/src/osMath.c b/source/os/src/osMath.c index 98cd63a831..cd2acac261 100644 --- a/source/os/src/osMath.c +++ b/source/os/src/osMath.c @@ -31,6 +31,7 @@ void swapStr(char* j, char* J, int width) { } #endif +// todo refactor: 1) move away; 2) use merge sort instead; 3) qsort is not a stable sort actually. void taosSort(void* arr, int64_t sz, int64_t width, __compar_fn_t compar) { #ifdef WINDOWS int64_t i, j;