diff --git a/Jenkinsfile2 b/Jenkinsfile2 index c9c9c3a7ca..49e13b7831 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -8,135 +8,202 @@ def skipbuild=0 def win_stop=0 def abortPreviousBuilds() { - def currentJobName = env.JOB_NAME - def currentBuildNumber = env.BUILD_NUMBER.toInteger() - def jobs = Jenkins.instance.getItemByFullName(currentJobName) - def builds = jobs.getBuilds() + def currentJobName = env.JOB_NAME + def currentBuildNumber = env.BUILD_NUMBER.toInteger() + def jobs = Jenkins.instance.getItemByFullName(currentJobName) + def builds = jobs.getBuilds() - for (build in builds) { - if (!build.isBuilding()) { - continue; + for (build in builds) { + if (!build.isBuilding()) { + continue; + } + + if (currentBuildNumber == build.getNumber().toInteger()) { + continue; + } + + build.doKill() //doTerm(),doKill(),doTerm() } - - if (currentBuildNumber == build.getNumber().toInteger()) { - continue; - } - - build.doKill() //doTerm(),doKill(),doTerm() - } } // abort previous build abortPreviousBuilds() def abort_previous(){ - def buildNumber = env.BUILD_NUMBER as int - if (buildNumber > 1) milestone(buildNumber - 1) - milestone(buildNumber) + def buildNumber = env.BUILD_NUMBER as int + if (buildNumber > 1) milestone(buildNumber - 1) + milestone(buildNumber) } def pre_test(){ - sh'hostname' + sh 'hostname' sh ''' - date - sudo rmtaos || echo "taosd has not installed" + date + sudo rmtaos || echo "taosd has not installed" ''' sh ''' - killall -9 taosd ||echo "no taosd running" - killall -9 gdb || echo "no gdb running" - killall -9 python3.8 || echo "no python program running" - cd ${WKC} + killall -9 taosd ||echo "no taosd running" + killall -9 gdb || echo "no gdb running" + killall -9 python3.8 || echo "no python program running" + cd ${WKC} ''' script { - if (env.CHANGE_TARGET == 'master') { - sh ''' - cd ${WKC} - git checkout master - ''' + if (env.CHANGE_TARGET == 'master') { + sh ''' + cd ${WKC} + git checkout master + ''' + } else if(env.CHANGE_TARGET == '2.0') { + sh ''' + cd ${WKC} + git checkout 2.0 + ''' + } else if(env.CHANGE_TARGET == '3.0') { + sh ''' + cd ${WKC} + git checkout 3.0 + [ -d contrib/bdb ] && cd contrib/bdb && git clean -fxd && cd ../.. + ''' + } else { + sh ''' + cd ${WKC} + git checkout develop + ''' } - else if(env.CHANGE_TARGET == '2.0'){ - sh ''' - cd ${WKC} - git checkout 2.0 - ''' - } - else if(env.CHANGE_TARGET == '3.0'){ - sh ''' - cd ${WKC} - git checkout 3.0 - [ -d contrib/bdb ] && cd contrib/bdb && git clean -fxd && cd ../.. - ''' - } - else{ - sh ''' - cd ${WKC} - git checkout develop - ''' - } } - sh''' - cd ${WKC} - git pull >/dev/null - git fetch origin +refs/pull/${CHANGE_ID}/merge - git checkout -qf FETCH_HEAD - git submodule update --init --recursive + sh ''' + cd ${WKC} + git pull >/dev/null + git fetch origin +refs/pull/${CHANGE_ID}/merge + git checkout -qf FETCH_HEAD + git submodule update --init --recursive ''' - sh''' - cd ${WKC} - export TZ=Asia/Harbin - date - rm -rf debug - mkdir debug - cd debug - cmake .. > /dev/null - make -j4> /dev/null + sh ''' + cd ${WKC} + export TZ=Asia/Harbin + date + rm -rf debug + mkdir debug + cd debug + cmake .. > /dev/null + make -j4> /dev/null ''' - sh''' - cd ${WKPY} - git reset --hard - git pull - pip3 install . + sh ''' + cd ${WKPY} + git reset --hard + git pull + pip3 install . + ''' + return 1 +} +def pre_test_win(){ + bat ''' + hostname + date /t + time /t + taskkill /f /t /im python.exe + taskkill /f /t /im bash.exe + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + rd /s /Q C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine\\debug + exit 0 + ''' + bat ''' + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + git reset --hard + git fetch || git fetch + git checkout -f + ''' + script { + if (env.CHANGE_TARGET == 'master') { + bat ''' + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + git checkout master + ''' + } else if(env.CHANGE_TARGET == '2.0') { + bat ''' + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + git checkout 2.0 + ''' + } else if(env.CHANGE_TARGET == '3.0') { + bat ''' + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + git checkout 3.0 + ''' + } else { + bat ''' + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + git checkout develop + ''' + } + } + bat ''' + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + git branch + git pull || git pull + git fetch origin +refs/pull/%CHANGE_ID%/merge + git checkout -qf FETCH_HEAD + ''' +} +def pre_test_build_win() { + bat ''' + echo "building ..." + time /t + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + mkdir debug + cd debug + call "C:\\Program Files (x86)\\Microsoft Visual Studio\\2017\\Community\\VC\\Auxiliary\\Build\\vcvarsall.bat" x64 + set CL=/MP8 + cmake .. -G "NMake Makefiles JOM" + jom -j 4 || exit 8 + time /t ''' return 1 } pipeline { - agent none - options { skipDefaultCheckout() } - environment{ - WK = '/var/lib/jenkins/workspace/TDinternal' - WKC= '/var/lib/jenkins/workspace/TDengine' - WKPY= '/var/lib/jenkins/workspace/taos-connector-python' - } - stages { - stage('pre_build'){ - agent{label " slave3_0 || slave15 || slave16 || slave17 "} - options { skipDefaultCheckout() } - when { - changeRequest() - } - steps { - script{ - abort_previous() - abortPreviousBuilds() + agent none + options { skipDefaultCheckout() } + environment{ + WK = '/var/lib/jenkins/workspace/TDinternal' + WKC= '/var/lib/jenkins/workspace/TDengine' + WKPY= '/var/lib/jenkins/workspace/taos-connector-python' + } + stages { + stage('run test') { + parallel { + stage('windows test') { + agent {label " windows11 "} + steps { + pre_test_win() + pre_test_build_win() + } + } + stage('linux test') { + agent{label " slave3_0 || slave15 || slave16 || slave17 "} + options { skipDefaultCheckout() } + when { + changeRequest() + } + steps { + timeout(time: 45, unit: 'MINUTES'){ + pre_test() + sh ''' + cd ${WKC}/debug + ctest -VV + ''' + sh ''' + export LD_LIBRARY_PATH=${WKC}/debug/build/lib + cd ${WKC}/tests/system-test + ./fulltest.sh + ''' + sh ''' + cd ${WKC}/tests + ./test-all.sh b1fq + ''' + } + } + } } - timeout(time: 45, unit: 'MINUTES'){ - pre_test() - sh''' - cd ${WKC}/debug - ctest -VV - ''' - sh''' - export LD_LIBRARY_PATH=${WKC}/debug/build/lib - cd ${WKC}/tests/system-test - ./fulltest.sh - ''' - sh''' - cd ${WKC}/tests - ./test-all.sh b1fq - ''' - } - } - } - } - post { + } + } + post { success { emailext ( subject: "PR-result: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]' SUCCESS", diff --git a/cmake/cmake.define b/cmake/cmake.define index 1655154506..d1d9266bca 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -2,6 +2,8 @@ cmake_minimum_required(VERSION 3.16) set(CMAKE_VERBOSE_MAKEFILE OFF) +SET(BUILD_SHARED_LIBS "OFF") + #set output directory SET(LIBRARY_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/lib) SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/bin) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 4d3ad591ce..19b108dcbf 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -54,8 +54,13 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \ } while (0) -#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1) -#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1) +#define colDataSetNotNull_f(bm_, r_) \ + do { \ + BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \ + } while (0) + +#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1) +#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1) #define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT) diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index e49f5cac45..0000972f5e 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -22,6 +22,7 @@ #include "tmsg.h" #include "tcommon.h" #include "function.h" +#include "tdatablock.h" #ifdef __cplusplus extern "C" { @@ -54,14 +55,14 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *handle); typedef struct SUdfColumnMeta { int16_t type; - int32_t bytes; // <0 var length, others fixed length bytes + int32_t bytes; uint8_t precision; uint8_t scale; } SUdfColumnMeta; typedef struct SUdfColumnData { int32_t numOfRows; - bool varLengthColumn; + int32_t rowsAlloc; union { struct { int32_t nullBitmapLen; @@ -72,9 +73,10 @@ typedef struct SUdfColumnData { struct { int32_t varOffsetsLen; - char *varOffsets; + int32_t *varOffsets; int32_t payloadLen; char *payload; + int32_t payloadAllocLen; } varLenCol; }; } SUdfColumnData; @@ -131,10 +133,114 @@ typedef int32_t (*TUdfSetupFunc)(); typedef int32_t (*TUdfTeardownFunc)(); //TODO: add API to check function arguments type, number etc. -//TODO: another way to manage memory is provide api for UDF to add data to SUdfColumnData and UDF framework will allocate memory. -// then UDF framework will free the memory -//typedef int32_t addFixedLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t colBytes, char* data); -//typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data); + +#define UDF_MEMORY_EXP_GROWTH 1.5 + +static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) { + SUdfColumnMeta *meta = &pColumn->colMeta; + SUdfColumnData *data = &pColumn->colData; + + if (newCapacity== 0 || newCapacity <= data->rowsAlloc) { + return TSDB_CODE_SUCCESS; + } + + int allocCapacity = TMAX(data->rowsAlloc, 8); + while (allocCapacity < newCapacity) { + allocCapacity *= UDF_MEMORY_EXP_GROWTH; + } + + if (IS_VAR_DATA_TYPE(meta->type)) { + char* tmp = taosMemoryRealloc(data->varLenCol.varOffsets, sizeof(int32_t) * allocCapacity); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + data->varLenCol.varOffsets = (int32_t*)tmp; + data->varLenCol.varOffsetsLen = sizeof(int32_t) * allocCapacity; + // for payload, add data in udfColDataAppend + } else { + char* tmp = taosMemoryRealloc(data->fixLenCol.nullBitmap, BitmapLen(allocCapacity)); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + data->fixLenCol.nullBitmap = tmp; + data->fixLenCol.nullBitmapLen = BitmapLen(allocCapacity); + if (meta->type == TSDB_DATA_TYPE_NULL) { + return TSDB_CODE_SUCCESS; + } + + tmp = taosMemoryRealloc(data->fixLenCol.data, allocCapacity* meta->bytes); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + data->fixLenCol.data = tmp; + data->fixLenCol.dataLen = allocCapacity* meta->bytes; + } + + data->rowsAlloc = allocCapacity; + + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE int32_t udfColSetRow(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) { + SUdfColumnMeta *meta = &pColumn->colMeta; + SUdfColumnData *data = &pColumn->colData; + udfColEnsureCapacity(pColumn, currentRow+1); + bool isVarCol = IS_VAR_DATA_TYPE(meta->type); + if (isNull) { + if (isVarCol) { + data->varLenCol.varOffsets[currentRow] = -1; + } else { + colDataSetNull_f(data->fixLenCol.nullBitmap, currentRow); + } + } else { + if (!isVarCol) { + colDataSetNotNull_f(data->fixLenCol.nullBitmap, currentRow); + memcpy(data->fixLenCol.data + meta->bytes * currentRow, pData, meta->bytes); + } else { + int32_t dataLen = varDataTLen(pData); + if (meta->type == TSDB_DATA_TYPE_JSON) { + if (*pData == TSDB_DATA_TYPE_NULL) { + dataLen = 0; + } else if (*pData == TSDB_DATA_TYPE_NCHAR) { + dataLen = varDataTLen(pData + CHAR_BYTES); + } else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) { + dataLen = LONG_BYTES; + } else if (*pData == TSDB_DATA_TYPE_BOOL) { + dataLen = CHAR_BYTES; + } + dataLen += CHAR_BYTES; + } + + if (data->varLenCol.payloadAllocLen < data->varLenCol.payloadLen + dataLen) { + uint32_t newSize = data->varLenCol.payloadAllocLen; + if (newSize <= 1) { + newSize = 8; + } + + while (newSize < data->varLenCol.payloadLen + dataLen) { + newSize = newSize * UDF_MEMORY_EXP_GROWTH; + } + + char *buf = taosMemoryRealloc(data->varLenCol.payload, newSize); + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + data->varLenCol.payload = buf; + data->varLenCol.payloadAllocLen = newSize; + } + + uint32_t len = data->varLenCol.payloadLen; + data->varLenCol.varOffsets[currentRow] = len; + + memcpy(data->varLenCol.payload + len, pData, dataLen); + data->varLenCol.payloadLen += dataLen; + } + } + data->numOfRows = TMAX(currentRow + 1, data->numOfRows); + return 0; +} typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column); typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol); diff --git a/include/os/osString.h b/include/os/osString.h index a4100652c3..026cb33ad9 100644 --- a/include/os/osString.h +++ b/include/os/osString.h @@ -59,6 +59,8 @@ bool taosMbsToUcs4(const char *mbs, size_t mbs_len, TdUcs4 *ucs4, int32_t ucs int32_t tasoUcs4Compare(TdUcs4 *f1_ucs4, TdUcs4 *f2_ucs4, int32_t bytes); TdUcs4* tasoUcs4Copy(TdUcs4 *target_ucs4, TdUcs4 *source_ucs4, int32_t len_ucs4); bool taosValidateEncodec(const char *encodec); +int32_t taosHexEncode(const char *src, char *dst, int32_t len); +int32_t taosHexDecode(const char *src, char *dst, int32_t len); int32_t taosWcharWidth(TdWchar wchar); int32_t taosWcharsWidth(TdWchar *pWchar, int32_t size); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 875ae15071..4f4b8b196f 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -491,7 +491,12 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) { - bool isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]); + bool isNull = false; + if (pBlock->pBlockAgg == NULL) { + isNull = colDataIsNull(pColData, pBlock->info.rows, j, NULL); + } else { + isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]); + } char* p = colDataGetData(pColData, j); colDataAppend(pDstCol, j - startIndex, p, isNull); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index b6dcc52b5e..0da18ebdeb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3308,7 +3308,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa pDataBlockInfo->rows = cur->rows; pDataBlockInfo->window = cur->win; - pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle)); +// ASSERT(pDataBlockInfo->numOfCols >= (int32_t)(QH_GET_NUM_OF_COLS(pHandle)); } /* diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0404b4d447..bc6139c304 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -277,7 +277,7 @@ typedef struct SOperatorInfo { uint8_t operatorType; bool blocking; // block operator or not uint8_t status; // denote if current operator is completed - int32_t numOfOutput; // number of columns of the current operator results + int32_t numOfExprs; // number of columns of the current operator results char* name; // name, used to show the query execution plan void* info; // extension attribution SExprInfo* pExpr; @@ -415,8 +415,6 @@ typedef struct SOptrBasicInfo { // TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset typedef struct SAggSupporter { SHashObj* pResultRowHashTable; // quick locate the window object for each result -// SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not -// SArray* pResultRowArrayList; // The array list that contains the Result rows char* keyBuf; // window key buffer SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row @@ -577,13 +575,13 @@ typedef struct SSortedMergeOperatorInfo { } SSortedMergeOperatorInfo; typedef struct SSortOperatorInfo { + SOptrBasicInfo binfo; uint32_t sortBufSize; // max buffer size for in-memory sort - SSDataBlock* pDataBlock; SArray* pSortInfo; SSortHandle* pSortHandle; SArray* inputSlotMap; // for index map from table scan output int32_t bufPageSize; - int32_t numOfRowsInRes; +// int32_t numOfRowsInRes; // TODO extact struct int64_t startTs; // sort start time @@ -645,10 +643,13 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols); void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow); void cleanupAggSup(SAggSupporter* pAggSup); -void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); +void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); +void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); -void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, - int32_t* rowCellInfoOffset); +SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity); +SSDataBlock* loadNextDataBlock(void* param); + +void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData, int16_t bytes, bool masterscan, uint64_t groupId, @@ -663,7 +664,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo); -SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pIndexMap, SExecTaskInfo* pTaskInfo); +SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols, + SArray* pIndexMap, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index c584df05dd..2072707b30 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -89,7 +89,7 @@ int32_t tsortClose(SSortHandle* pHandle); * * @return */ -int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp); +int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*), void* param); /** * diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a91d204efa..a237eb0e7d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -202,9 +202,9 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = {{0}}; SSlotDescNode* pDescNode = nodesListGetNode(pNode->pSlots, i); - if (!pDescNode->output) { - continue; - } +// if (!pDescNode->output) { // todo disable it temporarily +// continue; +// } idata.info.type = pDescNode->dataType.type; idata.info.bytes = pDescNode->dataType.bytes; @@ -651,7 +651,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { - for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { + for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { pCtx[i].order = order; pCtx[i].size = pBlock->info.rows; setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock); @@ -713,7 +713,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt bool createDummyCol) { int32_t code = TSDB_CODE_SUCCESS; - for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { + for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { pCtx[i].order = order; pCtx[i].size = pBlock->info.rows; pCtx[i].pSrcBlock = pBlock; @@ -798,7 +798,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt } static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { - for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { + for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { if (functionNeedToExecute(&pCtx[k])) { pCtx[k].startTs = startTs; // this can be set during create the struct pCtx[k].fpSet.process(&pCtx[k]); @@ -2815,7 +2815,6 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf // NOTE: sources columns are more than the destination SSDatablock columns. void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols) { size_t numOfSrcCols = taosArrayGetSize(pCols); - ASSERT(numOfSrcCols >= pBlock->info.numOfCols); int32_t i = 0, j = 0; while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) { @@ -3287,7 +3286,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->numOfOutput = pBlock->info.numOfCols; + pOperator->numOfExprs = pBlock->info.numOfCols; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, @@ -3345,49 +3344,6 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { cleanupAggSup(&pInfo->aggSup); } -// TODO merge aggregate super table -static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - - bool isNull = tsortIsNullVal(pTupleHandle, i); - if (isNull) { - colDataAppend(pColInfo, pBlock->info.rows, NULL, true); - } else { - char* pData = tsortGetValue(pTupleHandle, i); - colDataAppend(pColInfo, pBlock->info.rows, pData, false); - } - } - - pBlock->info.rows += 1; -} - -SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) { - blockDataCleanup(pDataBlock); - blockDataEnsureCapacity(pDataBlock, capacity); - - blockDataEnsureCapacity(pDataBlock, capacity); - - while (1) { - STupleHandle* pTupleHandle = tsortNextTuple(pHandle); - if (pTupleHandle == NULL) { - break; - } - - appendOneRowToDataBlock(pDataBlock, pTupleHandle); - if (pDataBlock->info.rows >= capacity) { - return pDataBlock; - } - } - - return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; -} - -SSDataBlock* loadNextDataBlock(void* param) { - SOperatorInfo* pOperator = (SOperatorInfo*)param; - return pOperator->fpSet.getNextFn(pOperator); -} - static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) { size_t size = taosArrayGetSize(groupInfo); if (size == 0) { @@ -3490,8 +3446,8 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock doMergeResultImpl(pInfo, pCtx, numOfExpr, i); } else { doFinalizeResultImpl(pCtx, numOfExpr); - int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfOutput, NULL); - // setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); + int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfExprs, NULL); + // setTagValueForMultipleRows(pCtx, pOperator->numOfExprs, numOfRows); // TODO check for available buffer; @@ -3541,13 +3497,13 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC, true); // updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, // pOperator->pRuntimeEnv, true); - doMergeImpl(pOperator, pOperator->numOfOutput, pDataBlock); + doMergeImpl(pOperator, pOperator->numOfExprs, pDataBlock); // flush to tuple store, and after all data have been handled, return to upstream node or sink node } - doFinalizeResultImpl(pInfo->binfo.pCtx, pOperator->numOfOutput); - int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfOutput, NULL); - // setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); + doFinalizeResultImpl(pInfo->binfo.pCtx, pOperator->numOfExprs); + int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfExprs, NULL); + // setTagValueForMultipleRows(pCtx, pOperator->numOfExprs, numOfRows); // TODO check for available buffer; @@ -3571,7 +3527,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) { pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, NULL, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)"); - tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); + tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); @@ -3678,7 +3634,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->numOfOutput = num; + pOperator->numOfExprs = num; pOperator->pExpr = pExprInfo; pOperator->pTaskInfo = pTaskInfo; @@ -3703,79 +3659,6 @@ _error: return NULL; } -static SSDataBlock* doSort(SOperatorInfo* pOperator) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SSortOperatorInfo* pInfo = pOperator->info; - - if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); - } - - int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->inputSlotMap, SORT_SINGLESOURCE_SORT, - pInfo->bufPageSize, numOfBufPage, pInfo->pDataBlock, pTaskInfo->id.str); - - tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); - - SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); - ps->param = pOperator->pDownstream[0]; - tsortAddSource(pInfo->pSortHandle, ps); - - int32_t code = tsortOpen(pInfo->pSortHandle); - taosMemoryFreeClear(ps); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, terrno); - } - - pOperator->status = OP_RES_TO_RETURN; - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); -} - -SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, - SArray* pIndexMap, SExecTaskInfo* pTaskInfo) { - SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - int32_t rowSize = pResBlock->info.rowSize; - - if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } - - pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + header - - pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer - pInfo->numOfRowsInRes = 1024; - pInfo->pDataBlock = pResBlock; - pInfo->pSortInfo = pSortInfo; - pInfo->inputSlotMap = pIndexMap; - - pOperator->name = "SortOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - - pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, NULL); - - int32_t code = appendDownstream(pOperator, &downstream, 1); - return pOperator; - -_error: - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pInfo); - taosMemoryFree(pOperator); - return NULL; -} - int32_t getTableScanOrder(SOperatorInfo* pOperator) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { @@ -3813,7 +3696,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { break; } // if (pAggInfo->current != NULL) { - // setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs); // } // there is an scalar expression that needs to be calculated before apply the group aggregation. @@ -3827,7 +3710,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again - setExecutionContext(pOperator->numOfOutput, pBlock->info.groupId, pTaskInfo, pAggInfo); + setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, true); doAggregateImpl(pOperator, 0, pInfo->pCtx); @@ -3848,7 +3731,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } closeAllResultRows(&pAggInfo->binfo.resultRowInfo); - finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfOutput, pAggInfo->aggSup.pResultBuf, + finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfExprs, pAggInfo->aggSup.pResultBuf, &pAggInfo->binfo.resultRowInfo, pAggInfo->binfo.rowCellInfoOffset); initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, false); @@ -4092,17 +3975,17 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // todo dynamic set tags // if (pTableQueryInfo != NULL) { - // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs); // } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC); blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); - projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput); + projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs); if (pRes->info.rows >= pProjectInfo->binfo.capacity * 0.8) { - copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); - resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput); + copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfExprs); + resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfExprs); return pRes; } } @@ -4127,14 +4010,14 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pProjectInfo->existDataBlock = pBlock; break; } else { // init output buffer for a new group data - initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfOutput); + initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfExprs); } } // todo dynamic set tags // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // if (pTableQueryInfo != NULL) { - // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs); // } // the pDataBlock are always the same one, no need to call this again @@ -4143,7 +4026,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, false); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); - projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput, + projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, pProjectInfo->pPseudoColInfo); int32_t status = handleLimitOffset(pOperator, pBlock); @@ -4156,7 +4039,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pProjectInfo->curOutput += pInfo->pRes->info.rows; - // copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); + // copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfExprs); return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL; } @@ -4289,7 +4172,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { } if (pOperator->fpSet.closeFn != NULL) { - pOperator->fpSet.closeFn(pOperator->info, pOperator->numOfOutput); + pOperator->fpSet.closeFn(pOperator->info, pOperator->numOfExprs); } if (pOperator->pDownstream != NULL) { @@ -4425,7 +4308,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo, @@ -4477,14 +4360,6 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { doDestroyBasicInfo(&pInfo->binfo, numOfOutput); } -static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { - SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param; - pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock); - - taosArrayDestroy(pInfo->pSortInfo); - taosArrayDestroy(pInfo->inputSlotMap); -} - void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { SExchangeInfo* pExInfo = (SExchangeInfo*)param; taosArrayDestroy(pExInfo->pSources); @@ -4538,7 +4413,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = num; + pOperator->numOfExprs = num; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, destroyProjectOperatorInfo, NULL, NULL, NULL); @@ -4621,7 +4496,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL; pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->fpSet = @@ -4979,7 +4854,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets); SArray* slotMap = createIndexMap(pSortPhyNode->pTargets); - pOptr = createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo); + + int32_t numOfCols = 0; + SExprInfo* pExprInfo = NULL; + if (pSortPhyNode->pExprs != NULL) { + pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols); + } + + pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, slotMap, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; @@ -5566,7 +5448,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { // only the timestamp match support for ordinary table ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) { - for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { + for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i); SExprInfo* pExprInfo = &pOperator->pExpr[i]; @@ -5633,7 +5515,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index f5dd67217e..58efd75a0b 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -227,16 +227,16 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); - int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); + int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } int32_t rowIndex = j - num; - doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC); // assign the group keys or user input constant values if required - doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); + doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex); recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); num = 1; } @@ -244,15 +244,15 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { if (num > 0) { len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t ret = - setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, + setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } int32_t rowIndex = pBlock->info.rows - num; - doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); - doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); + doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC); + doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex); } } @@ -291,19 +291,19 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL); } - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs); doHashGroupbyAgg(pOperator, pBlock); } pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->binfo.resultRowInfo); - finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, + finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); // if (!stableQuery) { // finalize include the update of result rows - // finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput); + // finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs); // } else { - // updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, + // updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfExprs, &pInfo->binfo.resultRowInfo, // pInfo->binfo.rowCellInfoOffset); // } @@ -357,7 +357,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Groupby; pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; @@ -392,7 +392,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { int32_t* rows = (int32_t*) pPage; - size_t numOfCols = pOperator->numOfOutput; + size_t numOfCols = pOperator->numOfExprs; for(int32_t i = 0; i < numOfCols; ++i) { SExprInfo* pExpr = &pOperator->pExpr[i]; int32_t slotId = pExpr->base.pParam[0].pCol->slotId; @@ -565,7 +565,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { break; } - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs); doHashPartition(pOperator, pBlock); } @@ -616,7 +616,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; pInfo->binfo.pRes = pResultBlock; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->pExpr = pExprInfo; pOperator->info = pInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7e721455c3..8c9fdfe4e6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -386,7 +386,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->numOfOutput = numOfOutput; + pOperator->numOfExprs = numOfOutput; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, NULL, NULL, NULL, NULL); @@ -646,7 +646,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->numOfOutput = pResBlock->info.numOfCols; + pOperator->numOfExprs = pResBlock->info.numOfCols; pOperator->fpSet._openFn = operatorDummyOpenFn; pOperator->fpSet.getNextFn = doStreamBlockScan; pOperator->fpSet.closeFn = operatorDummyCloseFn; @@ -1020,7 +1020,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp; setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, - pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL, pInfo->scanCols); + pTableRsp->compLen, pOperator->numOfExprs, startTs, NULL, pInfo->scanCols); // todo log the filter info doFilterResult(pInfo); @@ -1150,7 +1150,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->numOfOutput = pResBlock->info.numOfCols; + pOperator->numOfExprs = pResBlock->info.numOfCols; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL); pOperator->pTaskInfo = pTaskInfo; @@ -1247,7 +1247,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { char *data = NULL, *dst = NULL; int16_t type = 0, bytes = 0; - for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { + for(int32_t j = 0; j < pOperator->numOfExprs; ++j) { // not assign value in case of user defined constant output column if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.pColumns->flag)) { continue; @@ -1308,7 +1308,7 @@ SOperatorInfo* createTagScanOperatorInfo(void* readHandle, SExprInfo* pExpr, int pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; + pOperator->numOfExprs = numOfOutput; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c new file mode 100644 index 0000000000..0f973b0cf0 --- /dev/null +++ b/source/libs/executor/src/sortoperator.c @@ -0,0 +1,139 @@ +#include "tdatablock.h" +#include "executorimpl.h" + +static SSDataBlock* doSort(SOperatorInfo* pOperator); +static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); + +SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols, + SArray* pIndexMap, SExecTaskInfo* pTaskInfo) { + SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + int32_t rowSize = pResBlock->info.rowSize; + + if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) { + goto _error; + } + + pOperator->pExpr = pExprInfo; + pOperator->numOfExprs = numOfCols; + pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pRes = pResBlock; + + initResultSizeInfo(pOperator, 1024); + pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + header + + pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer + pInfo->pSortInfo = pSortInfo; + pInfo->inputSlotMap = pIndexMap; + pOperator->name = "SortOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + + pOperator->pTaskInfo = pTaskInfo; + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, NULL); + + int32_t code = appendDownstream(pOperator, &downstream, 1); + return pOperator; + + _error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + return NULL; +} + +// TODO merge aggregate super table +void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + + bool isNull = tsortIsNullVal(pTupleHandle, i); + if (isNull) { + colDataAppend(pColInfo, pBlock->info.rows, NULL, true); + } else { + char* pData = tsortGetValue(pTupleHandle, i); + colDataAppend(pColInfo, pBlock->info.rows, pData, false); + } + } + + pBlock->info.rows += 1; +} + +SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) { + blockDataCleanup(pDataBlock); + blockDataEnsureCapacity(pDataBlock, capacity); + + blockDataEnsureCapacity(pDataBlock, capacity); + + while (1) { + STupleHandle* pTupleHandle = tsortNextTuple(pHandle); + if (pTupleHandle == NULL) { + break; + } + + appendOneRowToDataBlock(pDataBlock, pTupleHandle); + if (pDataBlock->info.rows >= capacity) { + return pDataBlock; + } + } + + return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; +} + +SSDataBlock* loadNextDataBlock(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*)param; + return pOperator->fpSet.getNextFn(pOperator); +} + +// todo refactor: merged with fetch fp +void applyScalarFunction(SSDataBlock* pBlock, void* param) { + SOperatorInfo* pOperator = param; + SSortOperatorInfo* pSort = pOperator->info; + if (pOperator->pExpr != NULL) { + projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL); + } +} + +SSDataBlock* doSort(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSortOperatorInfo* pInfo = pOperator->info; + + if (pOperator->status == OP_RES_TO_RETURN) { + return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity); + } + + int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->inputSlotMap, SORT_SINGLESOURCE_SORT, + pInfo->bufPageSize, numOfBufPage, pInfo->binfo.pRes, pTaskInfo->id.str); + + tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator); + + + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + ps->param = pOperator->pDownstream[0]; + tsortAddSource(pInfo->pSortHandle, ps); + + int32_t code = tsortOpen(pInfo->pSortHandle); + taosMemoryFreeClear(ps); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, terrno); + } + + pOperator->status = OP_RES_TO_RETURN; + return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity); +} + +void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { + SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param; + pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); + + taosArrayDestroy(pInfo->pSortInfo); + taosArrayDestroy(pInfo->inputSlotMap); +} diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 580614dc02..224f3db912 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -325,7 +325,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SqlFunctionCtx* pCtx = pInfo->pCtx; - for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { + for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { int32_t functionId = pCtx[k].functionId; if (functionId != FUNCTION_TWA && functionId != FUNCTION_INTERP) { pCtx[k].start.key = INT64_MIN; @@ -406,12 +406,12 @@ static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SqlF // start exactly from this point, no need to do interpolation TSKEY key = ascQuery ? win->skey : win->ekey; if (key == curTs) { - setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); + setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); return true; } if (lastTs == INT64_MIN && ((pos == 0 && ascQuery) || (pos == (numOfRows - 1) && !ascQuery))) { - setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); + setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); return true; } @@ -427,7 +427,7 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFun SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey, STimeWindow* win) { int32_t order = TSDB_ORDER_ASC; - int32_t numOfOutput = pOperatorInfo->numOfOutput; + int32_t numOfOutput = pOperatorInfo->numOfExprs; TSKEY actualEndKey = tsCols[endRowIndex]; TSKEY key = order ? win->ekey : win->skey; @@ -572,7 +572,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); } } else { - setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); + setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); } // point interpolation does not require the end key time window interpolation. @@ -592,7 +592,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); } } else { - setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_END_INTERP); + setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_END_INTERP); } } @@ -612,7 +612,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - int32_t numOfOutput = pOperatorInfo->numOfOutput; + int32_t numOfOutput = pOperatorInfo->numOfExprs; SArray* pUpdated = NULL; if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { @@ -683,7 +683,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); + setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); doApplyFunctions(pInfo->binfo.pCtx, &w, &pInfo->timeWindowData, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } @@ -773,7 +773,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { break; } - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true); STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; @@ -798,7 +798,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { } closeAllResultRows(&pInfo->binfo.resultRowInfo); - finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, + finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); @@ -813,7 +813,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI int64_t gid = pBlock->info.groupId; bool masterScan = true; - int32_t numOfOutput = pOperator->numOfOutput; + int32_t numOfOutput = pOperator->numOfExprs; int16_t bytes = pStateColInfoData->info.bytes; SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); @@ -916,7 +916,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); - finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, + finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); @@ -1013,13 +1013,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { // The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the // caller. Note that all the time window are not close till now. - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true); pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); } - finalizeUpdatedResult(pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); + finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); @@ -1082,7 +1082,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, @@ -1141,7 +1141,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL, @@ -1169,7 +1169,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); bool masterScan = true; - int32_t numOfOutput = pOperator->numOfOutput; + int32_t numOfOutput = pOperator->numOfExprs; int64_t gid = pBlock->info.groupId; int64_t gap = pInfo->gap; @@ -1270,7 +1270,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { // restore the value pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); - finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, + finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); @@ -1309,7 +1309,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { break; } - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfExprs); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, true); // hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0); @@ -1319,7 +1319,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pSliceInfo->binfo.resultRowInfo); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - // finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput); + // finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfExprs); // initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo); // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes); @@ -1346,7 +1346,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; @@ -1388,7 +1388,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; pOperator->info = pInfo; @@ -1440,7 +1440,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 46fc8da00c..50aa4cfc01 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -42,11 +42,7 @@ struct SSortHandle { _sort_fetch_block_fn_t fetchfp; _sort_merge_compar_fn_t comparFn; - - void *pParam; SMultiwayMergeTreeInfo *pMergeTree; - int32_t numOfCols; - int64_t startTs; uint64_t sortElapsed; uint64_t totalElapsed; @@ -61,6 +57,9 @@ struct SSortHandle { bool inMemSort; bool needAdjust; STupleHandle tupleHandle; + + void *param; + void (*beforeFp)(SSDataBlock* pBlock, void* param); }; static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param); @@ -533,6 +532,13 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { pHandle->pDataBlock = createOneDataBlock(pBlock, false); } + // perform the scalar function calculation before apply the sort + if (pHandle->beforeFp != NULL) { + pHandle->beforeFp(pBlock, pHandle->param); + } + + // todo relocate the columns + int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap); if (code != 0) { return code; @@ -623,8 +629,10 @@ int32_t tsortClose(SSortHandle* pHandle) { return TSDB_CODE_SUCCESS; } -int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp) { - pHandle->fetchfp = fp; +int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*), void* param) { + pHandle->fetchfp = fetchFp; + pHandle->beforeFp = fp; + pHandle->param = param; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/test/sortTests.cpp b/source/libs/executor/test/sortTests.cpp index e646387856..c037fae75f 100644 --- a/source/libs/executor/test/sortTests.cpp +++ b/source/libs/executor/test/sortTests.cpp @@ -210,7 +210,7 @@ TEST(testCase, inMem_sort_Test) { taosArrayPush(orderInfo, &oi); SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_SINGLESOURCE_SORT, 1024, 5, NULL, "test_abc"); - tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock); + tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock, NULL, NULL); _info* pInfo = (_info*) taosMemoryCalloc(1, sizeof(_info)); pInfo->startVal = 0; @@ -299,7 +299,7 @@ TEST(testCase, external_mem_sort_Test) { taosArrayPush(orderInfo, &oi); SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_SINGLESOURCE_SORT, 128, 3, NULL, "test_abc"); - tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock); + tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock, NULL, NULL); SSortSource* ps = static_cast(taosMemoryCalloc(1, sizeof(SSortSource))); ps->param = &pInfo[i]; @@ -366,7 +366,7 @@ TEST(testCase, ordered_merge_sort_Test) { } SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_MULTISOURCE_MERGE, 1024, 5, pBlock,"test_abc"); - tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock); + tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock, NULL, NULL); tsortSetComparFp(phandle, docomp); SSortSource* p[10] = {0}; diff --git a/source/libs/function/CMakeLists.txt b/source/libs/function/CMakeLists.txt index 15813b3cb0..c31cabda19 100644 --- a/source/libs/function/CMakeLists.txt +++ b/source/libs/function/CMakeLists.txt @@ -48,8 +48,7 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) target_link_libraries( - udf1 PUBLIC os -) + udf1 PUBLIC os) add_library(udf2 MODULE test/udf2.c) target_include_directories( diff --git a/source/libs/function/inc/tudfInt.h b/source/libs/function/inc/tudfInt.h index 8aedd59c33..6f82542aee 100644 --- a/source/libs/function/inc/tudfInt.h +++ b/source/libs/function/inc/tudfInt.h @@ -19,9 +19,6 @@ extern "C" { #endif -//TODO replaces them with fnDebug -//#define debugPrint(...) taosPrintLog("Function", DEBUG_INFO, 135, __VA_ARGS__) -#define debugPrint(...) {fprintf(stderr, __VA_ARGS__);fprintf(stderr, "\n");} enum { UDF_TASK_SETUP = 0, UDF_TASK_CALL = 1, @@ -107,7 +104,7 @@ void* decodeUdfRequest(const void *buf, SUdfRequest* request); int32_t encodeUdfResponse(void **buf, const SUdfResponse *response); void* decodeUdfResponse(const void* buf, SUdfResponse *response); -void freeUdfColumnData(SUdfColumnData *data); +void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta); void freeUdfColumn(SUdfColumn* col); void freeUdfDataDataBlock(SUdfDataBlock *block); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 9aac2bfe0c..5b1573c88f 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -481,8 +481,8 @@ void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) { return (void*)buf; } -void freeUdfColumnData(SUdfColumnData *data) { - if (data->varLengthColumn) { +void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) { + if (IS_VAR_DATA_TYPE(meta->type)) { taosMemoryFree(data->varLenCol.varOffsets); data->varLenCol.varOffsets = NULL; taosMemoryFree(data->varLenCol.payload); @@ -496,7 +496,7 @@ void freeUdfColumnData(SUdfColumnData *data) { } void freeUdfColumn(SUdfColumn* col) { - freeUdfColumnData(&col->colData); + freeUdfColumnData(&col->colData, &col->colMeta); } void freeUdfDataDataBlock(SUdfDataBlock *block) { @@ -528,8 +528,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo udfCol->colMeta.scale = col->info.scale; udfCol->colMeta.precision = col->info.precision; udfCol->colData.numOfRows = udfBlock->numOfRows; - udfCol->colData.varLengthColumn = IS_VAR_DATA_TYPE(udfCol->colMeta.type); - if (udfCol->colData.varLengthColumn) { + if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) { udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows; udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen); memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen); @@ -555,7 +554,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { block->info.numOfCols = 1; block->info.rows = udfCol->colData.numOfRows; - block->info.hasVarCol = udfCol->colData.varLengthColumn; + block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type); block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); taosArraySetSize(block->pDataBlock, 1); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 06fa49e1c2..ba9fca2969 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -75,8 +75,8 @@ typedef struct SUdf { char path[PATH_MAX]; uv_lib_t lib; + TUdfScalarProcFunc scalarProcFunc; - TUdfFreeUdfColumnFunc freeUdfColumn; TUdfAggStartFunc aggStartFunc; TUdfAggProcessFunc aggProcFunc; @@ -106,11 +106,6 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; strcpy(processFuncName, udfName); uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->scalarProcFunc)); - char freeFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; - char *freeSuffix = "_free"; - strncpy(freeFuncName, processFuncName, strlen(processFuncName)); - strncat(freeFuncName, freeSuffix, strlen(freeSuffix)); - uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn)); } else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) { char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; strcpy(processFuncName, udfName); @@ -215,7 +210,7 @@ void udfdProcessRequest(uv_work_t *req) { udf->scalarProcFunc(&input, &output); convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); - udf->freeUdfColumn(&output); + freeUdfColumn(&output); break; } case TSDB_UDF_CALL_AGG_INIT: { diff --git a/source/libs/function/test/udf1.c b/source/libs/function/test/udf1.c index b2367313ae..4384d326cb 100644 --- a/source/libs/function/test/udf1.c +++ b/source/libs/function/test/udf1.c @@ -18,52 +18,20 @@ int32_t udf1_destroy() { } int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) { - SUdfColumnData *resultData = &resultCol->colData; - resultData->numOfRows = block->numOfRows; - SUdfColumnData *srcData = &block->udfCols[0]->colData; - resultData->varLengthColumn = srcData->varLengthColumn; - - if (resultData->varLengthColumn) { - resultData->varLenCol.varOffsetsLen = srcData->varLenCol.varOffsetsLen; - resultData->varLenCol.varOffsets = malloc(resultData->varLenCol.varOffsetsLen); - memcpy(resultData->varLenCol.varOffsets, srcData->varLenCol.varOffsets, srcData->varLenCol.varOffsetsLen); - - resultData->varLenCol.payloadLen = srcData->varLenCol.payloadLen; - resultData->varLenCol.payload = malloc(resultData->varLenCol.payloadLen); - memcpy(resultData->varLenCol.payload, srcData->varLenCol.payload, srcData->varLenCol.payloadLen); - } else { - resultData->fixLenCol.nullBitmapLen = srcData->fixLenCol.nullBitmapLen; - resultData->fixLenCol.nullBitmap = malloc(resultData->fixLenCol.nullBitmapLen); - memcpy(resultData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmapLen); - - resultData->fixLenCol.dataLen = srcData->fixLenCol.dataLen; - resultData->fixLenCol.data = malloc(resultData->fixLenCol.dataLen); - memcpy(resultData->fixLenCol.data, srcData->fixLenCol.data, srcData->fixLenCol.dataLen); - for (int32_t i = 0; i < resultData->numOfRows; ++i) { - *(resultData->fixLenCol.data + i * sizeof(int32_t)) = 88; - } - } - SUdfColumnMeta *meta = &resultCol->colMeta; meta->bytes = 4; meta->type = TSDB_DATA_TYPE_INT; meta->scale = 0; meta->precision = 0; - return 0; -} -int32_t udf1_free(SUdfColumn *col) { - SUdfColumnData *data = &col->colData; - if (data->varLengthColumn) { - free(data->varLenCol.varOffsets); - data->varLenCol.varOffsets = NULL; - free(data->varLenCol.payload); - data->varLenCol.payload = NULL; - } else { - free(data->fixLenCol.nullBitmap); - data->fixLenCol.nullBitmap = NULL; - free(data->fixLenCol.data); - data->fixLenCol.data = NULL; + SUdfColumnData *resultData = &resultCol->colData; + resultData->numOfRows = block->numOfRows; + SUdfColumnData *srcData = &block->udfCols[0]->colData; + + for (int32_t i = 0; i < resultData->numOfRows; ++i) { + int32_t luckyNum = 88; + udfColSetRow(resultCol, i, (char*)&luckyNum, false); } + return 0; } \ No newline at end of file diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 6c85d9dff5..5dcacc4354 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1971,7 +1971,18 @@ static int32_t datumToJson(const void* pObj, SJson* pJson) { case TSDB_DATA_TYPE_DOUBLE: code = tjsonAddDoubleToObject(pJson, jkValueDatum, pNode->datum.d); break; - case TSDB_DATA_TYPE_NCHAR: + case TSDB_DATA_TYPE_NCHAR: { + //cJSON only support utf-8 encoding. Convert memory content to hex string. + char *buf = taosMemoryCalloc(varDataLen(pNode->datum.p) * 2 + 1, sizeof(char)); + code = taosHexEncode(varDataVal(pNode->datum.p), buf, varDataLen(pNode->datum.p)); + if(code != TSDB_CODE_SUCCESS) { + taosMemoryFree(buf); + return TSDB_CODE_TSC_INVALID_VALUE; + } + code = tjsonAddStringToObject(pJson, jkValueDatum, buf); + taosMemoryFree(buf); + break; + } case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: code = tjsonAddStringToObject(pJson, jkValueDatum, varDataVal(pNode->datum.p)); @@ -2074,7 +2085,26 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) { break; } varDataSetLen(pNode->datum.p, pNode->node.resType.bytes); - code = tjsonGetStringValue(pJson, jkValueDatum, varDataVal(pNode->datum.p)); + if (TSDB_DATA_TYPE_NCHAR == pNode->node.resType.type) { + char *buf = taosMemoryCalloc(1, pNode->node.resType.bytes * 2 + VARSTR_HEADER_SIZE + 1); + if (NULL == buf) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + code = tjsonGetStringValue(pJson, jkValueDatum, buf); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(buf); + break; + } + code = taosHexDecode(buf, varDataVal(pNode->datum.p), pNode->node.resType.bytes - VARSTR_HEADER_SIZE); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(buf); + break; + } + taosMemoryFree(buf); + } else { + code = tjsonGetStringValue(pJson, jkValueDatum, varDataVal(pNode->datum.p)); + } break; } case TSDB_DATA_TYPE_JSON: diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index c021f65090..82c5d17f10 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -363,10 +363,8 @@ SNode* createCastFunctionNode(SAstCreateContext* pCxt, SNode* pExpr, SDataType d CHECK_OUT_OF_MEM(func); strcpy(func->functionName, "cast"); func->node.resType = dt; - if (TSDB_DATA_TYPE_BINARY == dt.type) { - func->node.resType.bytes += 2; - } else if (TSDB_DATA_TYPE_NCHAR == dt.type) { - func->node.resType.bytes = func->node.resType.bytes * TSDB_NCHAR_SIZE + 2; + if (TSDB_DATA_TYPE_NCHAR == dt.type) { + func->node.resType.bytes = func->node.resType.bytes * TSDB_NCHAR_SIZE; } nodesListMakeAppend(&func->pParameterList, pExpr); return (SNode*)func; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index f452838921..152aa6a01e 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1043,6 +1043,7 @@ static void destroyDataBlock(STableDataBlocks* pDataBlock) { static void destroyInsertParseContext(SInsertParseContext* pCxt) { destroyInsertParseContextForTable(pCxt); taosHashCleanup(pCxt->pVgroupsHashObj); + taosHashCleanup(pCxt->pSubTableHashObj); destroyBlockHashmap(pCxt->pTableBlockHashObj); destroyBlockArrayList(pCxt->pVgDataBlocks); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 30e55420d4..fbb1f34217 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -4062,7 +4062,8 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { int32_t translate(SParseContext* pParseCxt, SQuery* pQuery) { STranslateContext cxt = {0}; - int32_t code = initTranslateContext(pParseCxt, &cxt); + + int32_t code = initTranslateContext(pParseCxt, &cxt); if (TSDB_CODE_SUCCESS == code) { code = fmFuncMgtInit(); } diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index a507b41342..51ac48adaf 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -709,6 +709,10 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp int16_t outputType = GET_PARAM_TYPE(&pOutput[0]); int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]); + if (IS_VAR_DATA_TYPE(outputType)) { + outputLen += VARSTR_HEADER_SIZE; + } + char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1); char *output = outputBuf; @@ -790,29 +794,30 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } case TSDB_DATA_TYPE_NCHAR: { int32_t outputCharLen = (outputLen - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; + int32_t len; if (inputType == TSDB_DATA_TYPE_BOOL) { char tmp[8] = {0}; - int32_t len = sprintf(tmp, "%.*s", outputCharLen, *(int8_t *)input ? "true" : "false" ); + len = sprintf(tmp, "%.*s", outputCharLen, *(int8_t *)input ? "true" : "false" ); bool ret = taosMbsToUcs4(tmp, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len); if (!ret) { return TSDB_CODE_FAILED; } varDataSetLen(output, len); } else if (inputType == TSDB_DATA_TYPE_BINARY) { - int32_t len = outputCharLen > varDataLen(input) ? varDataLen(input) : outputCharLen; + len = outputCharLen > varDataLen(input) ? varDataLen(input) : outputCharLen; bool ret = taosMbsToUcs4(input + VARSTR_HEADER_SIZE, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len); if (!ret) { return TSDB_CODE_FAILED; } varDataSetLen(output, len); } else if (inputType == TSDB_DATA_TYPE_NCHAR) { - int32_t len = TMIN(outputLen, varDataLen(input) + VARSTR_HEADER_SIZE); - memcpy(output, input, len); - varDataSetLen(output, len - VARSTR_HEADER_SIZE); + len = TMIN(outputLen - VARSTR_HEADER_SIZE, varDataLen(input)); + memcpy(output, input, len + VARSTR_HEADER_SIZE); + varDataSetLen(output, len); } else { char tmp[400] = {0}; NUM_TO_STRING(inputType, input, sizeof(tmp), tmp); - int32_t len = (int32_t)strlen(tmp); + len = (int32_t)strlen(tmp); len = outputCharLen > len ? len : outputCharLen; bool ret = taosMbsToUcs4(tmp, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len); if (!ret) { @@ -820,6 +825,10 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } varDataSetLen(output, len); } + //for constant conversion, need to set proper length of pOutput description + if (len < outputLen - VARSTR_HEADER_SIZE) { + pOutput->columnData->info.bytes = len; + } break; } default: { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 1bd7d0857e..98e9e67ede 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -435,7 +435,7 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_ if (minNode) { SDelayTask* minTask = container_of(minNode, SDelayTask, node); if (minTask->execTime < task->execTime) { - timeoutMs = minTask->execTime <= now ? 0 : now - minTask->execTime; + timeoutMs = minTask->execTime <= now ? 0 : minTask->execTime - now; } } diff --git a/source/os/src/osDir.c b/source/os/src/osDir.c index 6d90773d1b..19e4defafc 100644 --- a/source/os/src/osDir.c +++ b/source/os/src/osDir.c @@ -44,7 +44,7 @@ int wordexp(char *words, wordexp_t *pwordexp, int flags) { return -1; } - printf("parse relative path:%s to abs path:%s\n", words, pwordexp->wordPos); + // printf("parse relative path:%s to abs path:%s\n", words, pwordexp->wordPos); return 0; } diff --git a/source/os/src/osString.c b/source/os/src/osString.c index ed596a051d..375c5001f4 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -195,6 +195,36 @@ int32_t taosUcs4len(TdUcs4 *ucs4) { return n; } +//dst buffer size should be at least 2*len + 1 +int32_t taosHexEncode(const char *src, char *dst, int32_t len) { + if (!dst) { + return -1; + } + + for (int32_t i = 0; i < len; ++i) { + sprintf(dst + i * 2, "%02x", src[i] & 0xff); + } + + return 0; +} + +int32_t taosHexDecode(const char *src, char *dst, int32_t len) { + if (!dst) { + return -1; + } + + uint16_t hn, ln, out; + for (int i = 0, j = 0; i < len * 2; i += 2, ++j ) { + hn = src[i] > '9' ? src[i] - 'A' + 10 : src[i] - '0'; + ln = src[i + 1] > '9' ? src[i + 1] - 'A' + 10 : src[i + 1] - '0'; + + out = (hn << 4) | ln; + memcpy(dst + j, &out, 1); + } + + return 0; +} + int32_t taosWcharWidth(TdWchar wchar) { return wcwidth(wchar); } int32_t taosWcharsWidth(TdWchar *pWchar, int32_t size) { return wcswidth(pWchar, size); } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index d1c5e83975..b95e822df3 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -39,6 +39,7 @@ ./test.sh -f tsim/query/explain.sim ./test.sh -f tsim/query/session.sim ./test.sh -f tsim/query/scalarNull.sim +./test.sh -f tsim/query/udf.sim # ---- qnode ./test.sh -f tsim/qnode/basic1.sim diff --git a/tests/script/sh/copy_udf.sh b/tests/script/sh/copy_udf.sh new file mode 100755 index 0000000000..e1d9ff53d2 --- /dev/null +++ b/tests/script/sh/copy_udf.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +set +e +#set -x + +echo "Executing copy_udf.sh" + +SCRIPT_DIR=`dirname $0` +cd $SCRIPT_DIR/../ + +IN_TDINTERNAL="community" +if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then + cd ../../.. +else + cd ../../ +fi + +TAOS_DIR=`pwd` +UDF1_DIR=`find $TAOS_DIR -name "libudf1.so"|grep lib|head -n1` +UDF2_DIR=`find $TAOS_DIR -name "libudf2.so"|grep lib|head -n1` + +echo $UDF1_DIR +echo $UDF2_DIR + +UDF_TMP=/tmp/udf +mkdir $UDF_TMP +rm $UDF_TMP/libudf1.so +rm $UDF_TMP/libudf2.so + +echo "Copy udf shared library files to $UDF_TMP" + +cp $UDF1_DIR $UDF_TMP +cp $UDF2_DIR $UDF_TMP diff --git a/tests/script/tsim/db/alter_option.sim b/tests/script/tsim/db/alter_option.sim index 417e53daff..40882306c8 100644 --- a/tests/script/tsim/db/alter_option.sim +++ b/tests/script/tsim/db/alter_option.sim @@ -66,7 +66,7 @@ print ============= create database # | REPLICA value [1 | 3] # | WAL value [1 | 2] -sql create database db CACHELAST 3 COMP 0 DAYS 345600 FSYNC 1000 MAXROWS 8000 MINROWS 10 KEEP 1440000 PRECISION 'ns' REPLICA 1 WAL 2 VGROUPS 6 SINGLE_STABLE 1 +sql create database db CACHELAST 3 COMP 0 DAYS 345600 FSYNC 1000 MAXROWS 8000 MINROWS 10 KEEP 1440000 PRECISION 'ns' REPLICA 3 WAL 2 VGROUPS 6 SINGLE_STABLE 1 sql show databases print rows: $rows print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 @@ -86,7 +86,7 @@ endi if $data3_db != 0 then # ntables return -1 endi -if $data4_db != 1 then # replica +if $data4_db != 3 then # replica return -1 endi if $data5_db != nostrict then # strict diff --git a/tests/script/tsim/query/udf.sim b/tests/script/tsim/query/udf.sim new file mode 100644 index 0000000000..8acd07cfe4 --- /dev/null +++ b/tests/script/tsim/query/udf.sim @@ -0,0 +1,48 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c wallevel -v 2 +system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1 + +print ========= start dnode1 as LEADER +system sh/exec.sh -n dnode1 -s start +sleep 2000 +sql connect + +print ======== step1 udf +system sh/copy_udf.sh +sql create database udf vgroups 3; +sql use udf; +sql show databases; + +sql create table t (ts timestamp, f int); +sql insert into t values(now, 1)(now+1s, 2); + +sql create function udf1 as '/tmp/udf/libudf1.so' outputtype int bufSize 8; +sql create aggregate function udf2 as '/tmp/udf/libudf2.so' outputtype double bufSize 8; +sql show functions; +if $rows != 2 then + return -1 +endi +sql select udf1(f) from t; +if $rows != 2 then + return -1 +endi +if $data00 != 88 then + return -1 +endi +if $data10 != 88 then + return -1 +endi + +sql select udf2(f) from t; +if $rows != 1 then + return -1 +endi +if $data00 != 2.236067977 then + return -1 +endi + +#sql drop function udf1; +#sql drop function udf2; +system sh/exec.sh -n dnode1 -s stop -x SIGKILL diff --git a/tests/system-test/2-query/log.py b/tests/system-test/2-query/log.py index e6762b2d61..6e4c292183 100644 --- a/tests/system-test/2-query/log.py +++ b/tests/system-test/2-query/log.py @@ -191,13 +191,13 @@ class TDTestCase: def support_types(self): type_error_sql_lists = [ "select log(ts ,2 ) from t1" , - "select log(c7,2 ) from t1", - "select log(c8,2 ) from t1", - "select log(c9,2 ) from t1", - "select log(ts,2 ) from ct1" , - "select log(c7,2 ) from ct1", - "select log(c8,2 ) from ct1", - "select log(c9,2 ) from ct1", + "select log(c7,c2 ) from t1", + "select log(c8,c1 ) from t1", + "select log(c9,c2 ) from t1", + "select log(ts,c7 ) from ct1" , + "select log(c7,c9 ) from ct1", + "select log(c8,c2 ) from ct1", + "select log(c9,c1 ) from ct1", "select log(ts,2 ) from ct3" , "select log(c7,2 ) from ct3", "select log(c8,2 ) from ct3", diff --git a/tests/system-test/2-query/pow.py b/tests/system-test/2-query/pow.py new file mode 100644 index 0000000000..8b0137b411 --- /dev/null +++ b/tests/system-test/2-query/pow.py @@ -0,0 +1,652 @@ +import taos +import sys +import datetime +import inspect +import math +from util.log import * +from util.sql import * +from util.cases import * + + +class TDTestCase: + updatecfgDict = {'debugFlag': 143 ,"cDebugFlag":143,"uDebugFlag":143 ,"rpcDebugFlag":143 , "tmrDebugFlag":143 , + "jniDebugFlag":143 ,"simDebugFlag":143,"dDebugFlag":143, "dDebugFlag":143,"vDebugFlag":143,"mDebugFlag":143,"qDebugFlag":143, + "wDebugFlag":143,"sDebugFlag":143,"tsdbDebugFlag":143,"tqDebugFlag":143 ,"fsDebugFlag":143 ,"fnDebugFlag":143} + def init(self, conn, powSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + + def prepare_datas(self): + tdSql.execute( + '''create table stb1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + tags (t1 int) + ''' + ) + + tdSql.execute( + ''' + create table t1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + ''' + ) + for i in range(4): + tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') + + for i in range(9): + tdSql.execute( + f"insert into ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )" + ) + tdSql.execute( + f"insert into ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )" + ) + tdSql.execute("insert into ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )") + tdSql.execute("insert into ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )") + tdSql.execute("insert into ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )") + tdSql.execute("insert into ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )") + + tdSql.execute("insert into ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + tdSql.execute("insert into ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + tdSql.execute("insert into ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + + tdSql.execute( + f'''insert into t1 values + ( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a ) + ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a ) + ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a ) + ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a ) + ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a ) + ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a ) + ( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" ) + ( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" ) + ( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" ) + ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ''' + ) + + def check_result_auto_pow2(self ,origin_query , pow_query): + + pow_result = tdSql.getResult(pow_query) + origin_result = tdSql.getResult(origin_query) + + auto_result =[] + + for row in origin_result: + row_check = [] + for elem in row: + if elem == None: + elem = None + else: + elem = math.pow(elem,2) + row_check.append(elem) + auto_result.append(row_check) + + check_status = True + + for row_index , row in enumerate(pow_result): + for col_index , elem in enumerate(row): + if auto_result[row_index][col_index] == None and not (auto_result[row_index][col_index] == None and elem == None): + check_status = False + elif auto_result[row_index][col_index] != None and (auto_result[row_index][col_index] - elem > 0.00000001): + check_status = False + else: + pass + if not check_status: + tdLog.notice("pow function value has not as expected , sql is \"%s\" "%pow_query ) + sys.exit(1) + else: + tdLog.info("pow value check pass , it work as expected ,sql is \"%s\" "%pow_query ) + + def check_result_auto_pow1(self ,origin_query , pow_query): + pow_result = tdSql.getResult(pow_query) + origin_result = tdSql.getResult(origin_query) + + auto_result =[] + + for row in origin_result: + row_check = [] + for elem in row: + if elem == None: + elem = None + else : + elem = pow(elem ,1) + row_check.append(elem) + auto_result.append(row_check) + + check_status = True + for row_index , row in enumerate(pow_result): + for col_index , elem in enumerate(row): + if auto_result[row_index][col_index] == None and not (auto_result[row_index][col_index] == None and elem == None): + check_status = False + elif auto_result[row_index][col_index] != None and (auto_result[row_index][col_index] - elem > 0.00000001): + check_status = False + else: + pass + if not check_status: + tdLog.notice("pow function value has not as expected , sql is \"%s\" "%pow_query ) + sys.exit(1) + else: + tdLog.info("pow value check pass , it work as expected ,sql is \"%s\" "%pow_query ) + + def check_result_auto_pow__10(self ,origin_query , pow_query): + pow_result = tdSql.getResult(pow_query) + origin_result = tdSql.getResult(origin_query) + + auto_result =[] + + for row in origin_result: + row_check = [] + for elem in row: + if elem == None: + elem = None + elif elem == 0: + elem = None + else: + elem = pow(elem ,-10) + row_check.append(elem) + auto_result.append(row_check) + + check_status = True + for row_index , row in enumerate(pow_result): + for col_index , elem in enumerate(row): + if auto_result[row_index][col_index] == None and not (auto_result[row_index][col_index] == None and elem == None): + check_status = False + elif auto_result[row_index][col_index] != None and (auto_result[row_index][col_index] - elem > 0.00000001): + check_status = False + else: + pass + if not check_status: + tdLog.notice("pow function value has not as expected , sql is \"%s\" "%pow_query ) + sys.exit(1) + else: + tdLog.info("pow value check pass , it work as expected ,sql is \"%s\" "%pow_query ) + + def test_errors(self): + error_sql_lists = [ + "select pow from t1", + # "select pow(-+--+c1 ,2) from t1", + # "select +-pow(c1,2) from t1", + # "select ++-pow(c1,2) from t1", + # "select ++--pow(c1,2) from t1", + # "select - -pow(c1,2)*0 from t1", + # "select pow(tbname+1,2) from t1 ", + "select pow(123--123,2)==1 from t1", + "select pow(c1,2) as 'd1' from t1", + "select pow(c1 ,c2 ,2) from t1", + "select pow(c1 ,NULL ,2) from t1", + "select pow(, 2) from t1;", + "select pow(pow(c1, 2) ab from t1)", + "select pow(c1 ,2 ) as int from t1", + "select pow from stb1", + # "select pow(-+--+c1) from stb1", + # "select +-pow(c1) from stb1", + # "select ++-pow(c1) from stb1", + # "select ++--pow(c1) from stb1", + # "select - -pow(c1)*0 from stb1", + # "select pow(tbname+1) from stb1 ", + "select pow(123--123 ,2)==1 from stb1", + "select pow(c1 ,2) as 'd1' from stb1", + "select pow(c1 ,c2 ,2 ) from stb1", + "select pow(c1 ,NULL,2) from stb1", + "select pow(,) from stb1;", + "select pow(pow(c1 , 2) ab from stb1)", + "select pow(c1 , 2) as int from stb1" + ] + for error_sql in error_sql_lists: + tdSql.error(error_sql) + + def support_types(self): + type_error_sql_lists = [ + "select pow(ts ,2 ) from t1" , + "select pow(c7,c1 ) from t1", + "select pow(c8,c2) from t1", + "select pow(c9,c3 ) from t1", + "select pow(ts,c4 ) from ct1" , + "select pow(c7,c5 ) from ct1", + "select pow(c8,c6 ) from ct1", + "select pow(c9,c8 ) from ct1", + "select pow(ts,2 ) from ct3" , + "select pow(c7,2 ) from ct3", + "select pow(c8,2 ) from ct3", + "select pow(c9,2 ) from ct3", + "select pow(ts,2 ) from ct4" , + "select pow(c7,2 ) from ct4", + "select pow(c8,2 ) from ct4", + "select pow(c9,2 ) from ct4", + "select pow(ts,2 ) from stb1" , + "select pow(c7,2 ) from stb1", + "select pow(c8,2 ) from stb1", + "select pow(c9,2 ) from stb1" , + + "select pow(ts,2 ) from stbbb1" , + "select pow(c7,2 ) from stbbb1", + + "select pow(ts,2 ) from tbname", + "select pow(c9,2 ) from tbname" + + ] + + for type_sql in type_error_sql_lists: + tdSql.error(type_sql) + + + type_sql_lists = [ + "select pow(c1,2 ) from t1", + "select pow(c2,2 ) from t1", + "select pow(c3,2 ) from t1", + "select pow(c4,2 ) from t1", + "select pow(c5,2 ) from t1", + "select pow(c6,2 ) from t1", + + "select pow(c1,2 ) from ct1", + "select pow(c2,2 ) from ct1", + "select pow(c3,2 ) from ct1", + "select pow(c4,2 ) from ct1", + "select pow(c5,2 ) from ct1", + "select pow(c6,2 ) from ct1", + + "select pow(c1,2 ) from ct3", + "select pow(c2,2 ) from ct3", + "select pow(c3,2 ) from ct3", + "select pow(c4,2 ) from ct3", + "select pow(c5,2 ) from ct3", + "select pow(c6,2 ) from ct3", + + "select pow(c1,2 ) from stb1", + "select pow(c2,2 ) from stb1", + "select pow(c3,2 ) from stb1", + "select pow(c4,2 ) from stb1", + "select pow(c5,2 ) from stb1", + "select pow(c6,2 ) from stb1", + + "select pow(c6,2) as alisb from stb1", + "select pow(c6,2) alisb from stb1", + ] + + for type_sql in type_sql_lists: + tdSql.query(type_sql) + + def basic_pow_function(self): + + # basic query + tdSql.query("select c1 from ct3") + tdSql.checkRows(0) + tdSql.query("select c1 from t1") + tdSql.checkRows(12) + tdSql.query("select c1 from stb1") + tdSql.checkRows(25) + + # used for empty table , ct3 is empty + tdSql.query("select pow(c1 ,2) from ct3") + tdSql.checkRows(0) + tdSql.query("select pow(c2 ,2) from ct3") + tdSql.checkRows(0) + tdSql.query("select pow(c3 ,2) from ct3") + tdSql.checkRows(0) + tdSql.query("select pow(c4 ,2) from ct3") + tdSql.checkRows(0) + tdSql.query("select pow(c5 ,2) from ct3") + tdSql.checkRows(0) + tdSql.query("select pow(c6 ,2) from ct3") + tdSql.checkRows(0) + + + # # used for regular table + tdSql.query("select pow(c1 ,2) from t1") + tdSql.checkData(0, 0, None) + tdSql.checkData(1 , 0, 1.000000000) + tdSql.checkData(3 , 0, 9.000000000) + tdSql.checkData(5 , 0, None) + + tdSql.query("select c1, c2, c3 , c4, c5 from t1") + tdSql.checkData(1, 4, 1.11000) + tdSql.checkData(3, 3, 33) + tdSql.checkData(5, 4, None) + + tdSql.query("select ts,c1, c2, c3 , c4, c5 from t1") + tdSql.checkData(1, 5, 1.11000) + tdSql.checkData(3, 4, 33) + tdSql.checkData(5, 5, None) + + self.check_result_auto_pow2( "select c1, c2, c3 , c4, c5 from t1", "select pow(c1 ,2), pow(c2 ,2) ,pow(c3, 2), pow(c4 ,2), pow(c5 ,2) from t1") + self.check_result_auto_pow1( "select c1, c2, c3 , c4, c5 from t1", "select pow(c1 ,1), pow(c2 ,1) ,pow(c3, 1), pow(c4 ,1), pow(c5 ,1) from t1") + self.check_result_auto_pow__10( "select c1, c2, c3 , c4, c5 from t1", "select pow(c1 ,-10), pow(c2 ,-10) ,pow(c3, -10), pow(c4 ,-10), pow(c5 ,-10) from t1") + + # used for sub table + tdSql.query("select c1 ,pow(c1 ,2) from ct1") + tdSql.checkData(0, 1, 64.000000000) + tdSql.checkData(1 , 1, 49.000000000) + tdSql.checkData(3 , 1, 25.000000000) + tdSql.checkData(4 , 1, 0) + + # # test bug fix for pow(c1,c2) + + tdSql.query("select c1, c5 ,pow(c1,c5) from ct4") + tdSql.checkData(0 , 2, None) + tdSql.checkData(1 , 2, 104577724.506799981) + tdSql.checkData(2 , 2, 3684781.623933245) + tdSql.checkData(3 , 2, 152225.429759376) + tdSql.checkData(4 , 2, 7573.273783071) + + + self.check_result_auto_pow2( "select c1, c2, c3 , c4, c5 from ct1", "select pow(c1,2), pow(c2,2) ,pow(c3,2), pow(c4,2), pow(c5,2) from ct1") + self.check_result_auto_pow__10( "select c1, c2, c3 , c4, c5 from ct1", "select pow(c1,-10), pow(c2,-10) ,pow(c3,-10), pow(c4,-10), pow(c5,-10) from ct1") + + # nest query for pow functions + tdSql.query("select c1 , pow(c1,2) ,pow(pow(c1,2),2) , pow(pow(pow(c1,2),2),2) from ct1;") + tdSql.checkData(0 , 0 , 8) + tdSql.checkData(0 , 1 , 64.000000000) + tdSql.checkData(0 , 2 , 4096.000000000) + tdSql.checkData(0 , 3 , 16777216.000000000) + + tdSql.checkData(1 , 0 , 7) + tdSql.checkData(1 , 1 , 49.000000000) + tdSql.checkData(1 , 2 , 2401.000000000) + tdSql.checkData(1 , 3 , 5764801.000000000) + + tdSql.checkData(4 , 0 , 0) + tdSql.checkData(4 , 1 , 0.000000000) + tdSql.checkData(4 , 2 , 0.000000000) + tdSql.checkData(4 , 3 , 0.000000000) + + # # used for stable table + + tdSql.query("select pow(c1, 2) from stb1") + tdSql.checkRows(25) + + + # used for not exists table + tdSql.error("select pow(c1, 2) from stbbb1") + tdSql.error("select pow(c1, 2) from tbname") + tdSql.error("select pow(c1, 2) from ct5") + + # mix with common col + tdSql.query("select c1, pow(c1 ,2) from ct1") + tdSql.checkData(0 , 0 ,8) + tdSql.checkData(0 , 1 ,64.000000000) + tdSql.checkData(4 , 0 ,0) + tdSql.checkData(4 , 1 ,0.000000000) + tdSql.query("select c1, pow(c1,2) from ct4") + tdSql.checkData(0 , 0 , None) + tdSql.checkData(0 , 1 ,None) + tdSql.checkData(4 , 0 ,5) + tdSql.checkData(4 , 1 ,25.000000000) + tdSql.checkData(5 , 0 ,None) + tdSql.checkData(5 , 1 ,None) + + # mix with common functions + tdSql.query("select c1, pow(c1 ,2),pow(c1,2), log(pow(c1,2) ,2) from ct4 ") + tdSql.checkData(0 , 0 ,None) + tdSql.checkData(0 , 1 ,None) + tdSql.checkData(0 , 2 ,None) + tdSql.checkData(0 , 3 ,None) + + tdSql.checkData(3 , 0 , 6) + tdSql.checkData(3 , 1 ,36.000000000) + tdSql.checkData(3 , 2 ,36.000000000) + tdSql.checkData(3 , 3 ,5.169925001) + + tdSql.query("select c1, pow(c1,1),c5, floor(c5 ) from stb1 ") + + # # mix with agg functions , not support + tdSql.error("select c1, pow(c1 ,2),c5, count(c5) from stb1 ") + tdSql.error("select c1, pow(c1 ,2),c5, count(c5) from ct1 ") + tdSql.error("select pow(c1 ,2), count(c5) from stb1 ") + tdSql.error("select pow(c1 ,2), count(c5) from ct1 ") + tdSql.error("select c1, count(c5) from ct1 ") + tdSql.error("select c1, count(c5) from stb1 ") + + # agg functions mix with agg functions + + tdSql.query("select max(c5), count(c5) from stb1") + tdSql.query("select max(c5), count(c5) from ct1") + + + # bug fix for count + tdSql.query("select count(c1) from ct4 ") + tdSql.checkData(0,0,9) + tdSql.query("select count(*) from ct4 ") + tdSql.checkData(0,0,12) + tdSql.query("select count(c1) from stb1 ") + tdSql.checkData(0,0,22) + tdSql.query("select count(*) from stb1 ") + tdSql.checkData(0,0,25) + + # # bug fix for compute + tdSql.query("select c1, pow(c1 ,2) -0 ,pow(c1-4 ,2)-0 from ct4 ") + tdSql.checkData(0, 0, None) + tdSql.checkData(0, 1, None) + tdSql.checkData(0, 2, None) + tdSql.checkData(1, 0, 8) + tdSql.checkData(1, 1, 64.000000000) + tdSql.checkData(1, 2, 16.000000000) + + tdSql.query(" select c1, pow(c1 ,2) -0 ,pow(c1-0.1 ,2)-0.1 from ct4") + tdSql.checkData(0, 0, None) + tdSql.checkData(0, 1, None) + tdSql.checkData(0, 2, None) + tdSql.checkData(1, 0, 8) + tdSql.checkData(1, 1, 64.000000000) + tdSql.checkData(1, 2, 62.310000000) + + tdSql.query("select c1, pow(c1, -10), c2, pow(c2, -10), c3, pow(c3, -10) from ct1") + + def test_big_number(self): + + tdSql.query("select c1, pow(c1, 100000000) from ct1") # bigint to double data overflow + tdSql.checkData(0, 1, None) + tdSql.checkData(1, 1, None) + tdSql.checkData(4, 1, 0.000000000) + + + tdSql.query("select c1, pow(c1, 10000000000000) from ct1") # bigint to double data overflow + tdSql.checkData(0, 1, None) + tdSql.checkData(1, 1, None) + tdSql.checkData(4, 1, 0.000000000) + + tdSql.query("select c1, pow(c1, 10000000000000000000000000) from ct1") # bigint to double data overflow + tdSql.query("select c1, pow(c1, 10000000000000000000000000.0) from ct1") # 10000000000000000000000000.0 is a double value + tdSql.checkData(0, 1, None) + tdSql.checkData(1, 1, None) + tdSql.checkData(4, 1, 0.000000000) + + tdSql.query("select c1, pow(c1, 10000000000000000000000000000000000) from ct1") # bigint to double data overflow + tdSql.query("select c1, pow(c1, 10000000000000000000000000000000000.0) from ct1") # 10000000000000000000000000.0 is a double value + tdSql.checkData(0, 1, None) + tdSql.checkData(1, 1, None) + tdSql.checkData(4, 1, 0.000000000) + + tdSql.query("select c1, pow(c1, 10000000000000000000000000000000000000000) from ct1") # bigint to double data overflow + tdSql.query("select c1, pow(c1, 10000000000000000000000000000000000000000.0) from ct1") # 10000000000000000000000000.0 is a double value + tdSql.checkData(0, 1, None) + tdSql.checkData(1, 1, None) + tdSql.checkData(4, 1, 0.000000000) + + tdSql.query("select c1, pow(c1, 10000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000) from ct1") # bigint to double data overflow + + def pow_base_test(self): + + # base is an regular number ,int or double + tdSql.query("select c1, pow(c1, 2) from ct1") + tdSql.checkData(0, 1,64.000000000) + tdSql.query("select c1, pow(c1, 2.0) from ct1") + tdSql.checkData(0, 1, 64.000000000) + + tdSql.query("select c1, pow(1, 2.0) from ct1") + tdSql.checkData(0, 1, 1.000000000) + tdSql.checkRows(13) + + + # # bug for compute in functions + # tdSql.query("select c1, abs(1/0) from ct1") + # tdSql.checkData(0, 0, 8) + # tdSql.checkData(0, 1, 1) + + tdSql.query("select c1, pow(1, 2.0) from ct1") + tdSql.checkData(0, 1, 1.000000000) + tdSql.checkRows(13) + + # two cols start pow(x,y) + tdSql.query("select c1,c2, pow(c1,c2) from ct1") + tdSql.checkData(0, 2, None) + tdSql.checkData(1, 2, None) + tdSql.checkData(4, 2, 1.000000000) + + tdSql.query("select c1,c2, pow(c2,c1) from ct1") + tdSql.checkData(0, 2, 3897131646727578700481513520437089271808.000000000) + tdSql.checkData(1, 2, 17217033054561120738612297152331776.000000000) + tdSql.checkData(4, 2, 1.000000000) + + tdSql.query("select c1, pow(2.0 , c1) from ct1") + tdSql.checkData(0, 1, 256.000000000) + tdSql.checkData(1, 1, 128.000000000) + tdSql.checkData(4, 1, 1.000000000) + + tdSql.query("select c1, pow(2.0 , c1) from ct1") + tdSql.checkData(0, 1, 256.000000000) + tdSql.checkData(1, 1, 128.000000000) + tdSql.checkData(4, 1, 1.000000000) + + def abs_func_filter(self): + tdSql.execute("use db") + tdSql.query("select c1, abs(c1) -0 ,ceil(c1-0.1)-0 ,floor(c1+0.1)-0.1 ,ceil(pow(c1,2)-0.5) from ct4 where c1>5 ") + tdSql.checkRows(3) + tdSql.checkData(0,0,8) + tdSql.checkData(0,1,8.000000000) + tdSql.checkData(0,2,8.000000000) + tdSql.checkData(0,3,7.900000000) + tdSql.checkData(0,4,64.000000000) + + tdSql.query("select c1, abs(c1) -0 ,ceil(c1-0.1)-0 ,floor(c1+0.1)-0.1 ,ceil(pow(c1,2)-0.5) from ct4 where c1=5 ") + tdSql.checkRows(1) + tdSql.checkData(0,0,5) + tdSql.checkData(0,1,5.000000000) + tdSql.checkData(0,2,5.000000000) + tdSql.checkData(0,3,4.900000000) + tdSql.checkData(0,4,25.000000000) + + tdSql.query("select c1, abs(c1) -0 ,ceil(c1-0.1)-0 ,floor(c1+0.1)-0.1 ,ceil(pow(c1,2)-0.5) from ct4 where c1=5 ") + tdSql.checkRows(1) + tdSql.checkData(0,0,5) + tdSql.checkData(0,1,5.000000000) + tdSql.checkData(0,2,5.000000000) + tdSql.checkData(0,3,4.900000000) + tdSql.checkData(0,4,25.000000000) + + tdSql.query("select c1,c2 , abs(c1) -0 ,ceil(c1-0.1)-0 ,floor(c1+0.1)-0.1 ,ceil(pow(c1,2)-0.5) from ct4 where c1