diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 230ccf6ace..4c9ed78769 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -490,6 +490,7 @@ typedef struct SGroupbyOperatorInfo { SExprInfo* pScalarExprInfo; int32_t numOfScalarExpr; // the number of scalar expression in group operator SqlFunctionCtx* pScalarFuncCtx; + int32_t* rowCellInfoOffset; // offset value for each row result cell info } SGroupbyOperatorInfo; typedef struct SDataGroupInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 87ecdde3ab..cd129b54fb 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1173,6 +1173,9 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* setPseudoOutputColInfo(pResult, pCtx, pPseudoList); pResult->info.groupId = pSrcBlock->info.groupId; + // if the source equals to the destination, it is to create a new column as the result of scalar function or some operators. + bool createNewColModel = (pResult == pSrcBlock); + int32_t numOfRows = 0; for (int32_t k = 0; k < numOfOutput; ++k) { @@ -1181,7 +1184,7 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); - if (pResult->info.rows > 0) { + if (pResult->info.rows > 0 && !createNewColModel) { colDataMergeCol(pColInfoData, pResult->info.rows, pfCtx->input.pData[0], pfCtx->input.numOfRows); } else { colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows); @@ -1191,7 +1194,7 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) { SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); - int32_t offset = pResult->info.rows; + int32_t offset = createNewColModel? 0: pResult->info.rows; for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) { colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType), TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType); } @@ -1207,7 +1210,8 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* SScalarParam dest = {.columnData = &idata}; scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); - colDataMergeCol(pResColData, pResult->info.rows, &idata, dest.numOfRows); + int32_t startOffset = createNewColModel? 0:pResult->info.rows; + colDataMergeCol(pResColData, startOffset, &idata, dest.numOfRows); numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); @@ -1224,7 +1228,7 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pfCtx->fpSet.init(&pCtx[k], pResInfo); pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); - pfCtx->offset = pResult->info.rows; // set the start offset + pfCtx->offset = createNewColModel? 0:pResult->info.rows; // set the start offset // set the timestamp(_rowts) output buffer if (taosArrayGetSize(pPseudoList) > 0) { @@ -1242,7 +1246,9 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* SScalarParam dest = {.columnData = &idata}; scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); - colDataMergeCol(pResColData, pResult->info.rows, &idata, dest.numOfRows); + + int32_t startOffset = createNewColModel? 0:pResult->info.rows; + colDataMergeCol(pResColData, startOffset, &idata, dest.numOfRows); numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); @@ -1252,7 +1258,9 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* } } - pResult->info.rows += numOfRows; + if (!createNewColModel) { + pResult->info.rows += numOfRows; + } } void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs, diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 8018a8dd31..d610880e30 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -341,7 +341,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pInfo->pScalarExprInfo = pScalarExprInfo; pInfo->numOfScalarExpr = numOfScalarExpr; - pInfo->pScalarFuncCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset); + pInfo->pScalarFuncCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowCellInfoOffset); int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 7699219f52..3d796515a0 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -901,7 +901,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { + if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) { memcpy(buf, data, bytes); *(TSKEY*)(buf + bytes) = cts; // DO_UPDATE_TAG_COLUMNS(pCtx, ts); @@ -919,7 +919,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { + if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) { memcpy(buf, data, bytes); *(TSKEY*)(buf + bytes) = cts; pResInfo->numOfRes = 1; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 1e8b732b6a..da759bc15a 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -888,10 +888,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { TSKEY tsKey = TD_ROW_KEY(row); - if (checkTimestamp(pDataBlocks, (const char*)&tsKey) != TSDB_CODE_SUCCESS) { - buildSyntaxErrMsg(&pCxt->msg, "client time/server time can not be mixed up", sToken.z); - return TSDB_CODE_TSC_INVALID_TIME_STAMP; - } + checkTimestamp(pDataBlocks, (const char*)&tsKey); } }