From 8345a5429bf30c1745cf3e1c4b117dd8c7e99e9d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 10 Mar 2022 11:12:44 +0800 Subject: [PATCH] [td-13039] add min/max/systable-scanner. --- include/libs/function/function.h | 12 +- source/libs/executor/inc/executorimpl.h | 32 ++- source/libs/executor/src/executorimpl.c | 159 +++++++------ source/libs/function/inc/builtinsimpl.h | 6 + source/libs/function/src/builtins.c | 20 ++ source/libs/function/src/builtinsimpl.c | 280 +++++++++++++++++++++- source/libs/function/src/taggfunction.c | 296 +++++------------------- 7 files changed, 463 insertions(+), 342 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 21013ef906..c01e267c42 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -132,11 +132,11 @@ struct SqlFunctionCtx; struct SResultRowEntryInfo; //for selectivity query, the corresponding tag value is assigned if the data is qualified -typedef struct SExtTagsInfo { - int16_t tagsLen; // keep the tags data for top/bottom query result - int16_t numOfTagCols; - struct SqlFunctionCtx **pTagCtxList; -} SExtTagsInfo; +typedef struct SSubsidiaryResInfo { + int16_t bufLen; // keep the tags data for top/bottom query result + int16_t numOfCols; + struct SqlFunctionCtx **pCtx; +} SSubsidiaryResInfo; typedef struct SResultDataInfo { int16_t precision; @@ -187,7 +187,7 @@ typedef struct SqlFunctionCtx { void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ SVariant tag; struct SResultRowEntryInfo *resultInfo; - SExtTagsInfo tagInfo; + SSubsidiaryResInfo subsidiaryRes; SPoint1 start; SPoint1 end; SFuncExecFuncs fpSet; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d46978b26d..c8b8feb3af 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -441,17 +441,24 @@ typedef struct SStreamBlockScanInfo { } SStreamBlockScanInfo; typedef struct SSysTableScanInfo { - void *pTransporter; - SEpSet epSet; - int32_t type; // show type - tsem_t ready; - void *readHandle; - SSchema *pSchema; - SSDataBlock *pRes; - int64_t numOfBlocks; // extract basic running information. - int64_t totalRows; - int64_t elapsedTime; - int64_t totalBytes; + union { + void* pTransporter; + void* readHandle; + }; + + void *pCur; // cursor + SRetrieveTableReq* pReq; + SEpSet epSet; + int32_t type; // show type + tsem_t ready; + SSchema* pSchema; + SSDataBlock* pRes; + + int32_t capacity; + int64_t numOfBlocks; // extract basic running information. + int64_t totalRows; + int64_t elapsedTime; + int64_t totalBytes; } SSysTableScanInfo; typedef struct SOptrBasicInfo { @@ -630,7 +637,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSystemScanOperatorInfo(void* pSystemTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, + int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 6c710a5932..ab4eee4b86 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -212,6 +212,7 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); static void destroyOperatorInfo(SOperatorInfo* pOperator); +static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput); static void doSetOperatorCompleted(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; @@ -1920,9 +1921,9 @@ static int32_t setCtxTagColumnInfo(SqlFunctionCtx *pCtx, int32_t numOfOutput) { } } if (p != NULL) { - p->tagInfo.pTagCtxList = pTagCtx; - p->tagInfo.numOfTagCols = num; - p->tagInfo.tagsLen = tagLen; + p->subsidiaryRes.pCtx = pTagCtx; + p->subsidiaryRes.numOfCols = num; + p->subsidiaryRes.bufLen = tagLen; } else { tfree(pTagCtx); } @@ -2127,7 +2128,7 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC return pFuncCtx; } -static void* destroySQLFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { +static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { if (pCtx == NULL) { return NULL; } @@ -2138,7 +2139,7 @@ static void* destroySQLFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { } taosVariantDestroy(&pCtx[i].tag); - tfree(pCtx[i].tagInfo.pTagCtxList); + tfree(pCtx[i].subsidiaryRes.pCtx); } tfree(pCtx); @@ -2222,46 +2223,6 @@ static void destroyTsComp(STaskRuntimeEnv *pRuntimeEnv, STaskAttr *pQueryAttr) { } } -static void teardownQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv) { - STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; - SQInfo* pQInfo = (SQInfo*) pRuntimeEnv->qinfo; - - //qDebug("QInfo:0x%"PRIx64" teardown runtime env", pQInfo->qId); - - //destroyScalarFuncSupport(pRuntimeEnv->scalarSup, pQueryAttr->numOfOutput); -// destroyUdfInfo(pRuntimeEnv->pUdfInfo); - destroyDiskbasedBuf(pRuntimeEnv->pResultBuf); - doFreeQueryHandle(pRuntimeEnv); - - destroyTsComp(pRuntimeEnv, pQueryAttr); - - pRuntimeEnv->pTsBuf = tsBufDestroy(pRuntimeEnv->pTsBuf); - - tfree(pRuntimeEnv->keyBuf); - tfree(pRuntimeEnv->prevRow); - tfree(pRuntimeEnv->tagVal); - - taosHashCleanup(pRuntimeEnv->pResultRowHashTable); - pRuntimeEnv->pResultRowHashTable = NULL; - - taosHashCleanup(pRuntimeEnv->pTableRetrieveTsMap); - pRuntimeEnv->pTableRetrieveTsMap = NULL; - - taosHashCleanup(pRuntimeEnv->pResultRowListSet); - pRuntimeEnv->pResultRowListSet = NULL; - - destroyOperatorInfo(pRuntimeEnv->proot); - - pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool); - taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult); - taosArrayDestroy(pRuntimeEnv->pResultRowArrayList); - pRuntimeEnv->prevResult = NULL; -} - -static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) { - return pQInfo->rspContext != NULL; -} - bool isTaskKilled(SExecTaskInfo *pTaskInfo) { // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived // abort current query execution. @@ -5475,38 +5436,67 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSysTableScanInfo* pInfo = pOperator->info; - SRetrieveTableReq* req = calloc(1, sizeof(SRetrieveTableReq)); - if (req == NULL) { - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + // retrieve local table list info from vnode + if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { + if (pInfo->pCur == NULL) { + pInfo->pCur = metaOpenTbCursor(pInfo->readHandle); + } + + SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, 0); + + char * name = NULL; + int32_t numOfRows = 0; + while ((name = metaTbCursorNext(pInfo->pCur)) != NULL) { + colDataAppend(pTableNameCol, numOfRows, name, false); + numOfRows += 1; + if (numOfRows >= pInfo->capacity) { + break; + } + } + + pInfo->totalRows += numOfRows; + pInfo->pRes->info.rows = numOfRows; + +// pInfo->elapsedTime; +// pInfo->totalBytes; + return (pInfo->pRes->info.rows == 0)? NULL:pInfo->pRes; + } else { // load the meta from mnode of the given epset + if (pInfo->pReq == NULL) { + pInfo->pReq = calloc(1, sizeof(SRetrieveTableReq)); + if (pInfo->pReq == NULL) { + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pInfo->pReq->type = pInfo->type; + } + + // send the fetch remote task result reques + SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + if (NULL == pMsgSendInfo) { + qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + + pMsgSendInfo->param = NULL; + pMsgSendInfo->msgInfo.pData = pInfo->pReq; + pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableReq); + pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE; + pMsgSendInfo->fp = loadRemoteDataCallback; + + int64_t transporterId = 0; + int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo); + + tsem_wait(&pInfo->ready); + // handle the response and return to the caller } - req->type = pInfo->type; - - // send the fetch remote task result reques - SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); - if (NULL == pMsgSendInfo) { - qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } - - pMsgSendInfo->param = NULL; - pMsgSendInfo->msgInfo.pData = req; - pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableReq); - pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE; - pMsgSendInfo->fp = loadRemoteDataCallback; - - int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo); - - tsem_wait(&pInfo->ready); - // handle the response and return to the caller - return NULL; } -SOperatorInfo* createSystemScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, + int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo) { SSysTableScanInfo* pInfo = calloc(1, sizeof(SSysTableScanInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5516,6 +5506,17 @@ SOperatorInfo* createSystemScanOperatorInfo(void* pSysTableReadHandle, const SAr return NULL; } + // todo: create the schema of result data block + pInfo->capacity = 4096; + pInfo->type = tableType; + if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { + pInfo->readHandle = pSysTableReadHandle; + blockDataEnsureCapacity(pInfo->pRes, pInfo->capacity); + } else { + tsem_init(&pInfo->ready, 0, 0); + pInfo->epSet = epset; + } + pInfo->readHandle = pSysTableReadHandle; pOperator->name = "SysTableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN; @@ -5524,6 +5525,7 @@ SOperatorInfo* createSystemScanOperatorInfo(void* pSysTableReadHandle, const SAr pOperator->info = pInfo; pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->nextDataFn = doSysTableScan; + pOperator->closeFn = destroySysTableScannerOperatorInfo; pOperator->pTaskInfo = pTaskInfo; return pOperator; @@ -7165,7 +7167,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { assert(pInfo != NULL); - destroySQLFunctionCtx(pInfo->pCtx, numOfOutput); + destroySqlFunctionCtx(pInfo->pCtx, numOfOutput); tfree(pInfo->rowCellInfoOffset); cleanupResultRowInfo(&pInfo->resultRowInfo); @@ -7185,6 +7187,7 @@ static void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); } + static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { SSWindowOperatorInfo* pInfo = (SSWindowOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); @@ -7233,6 +7236,16 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { pInfo->pRes = blockDataDestroy(pInfo->pRes); } +static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput) { + SSysTableScanInfo* pInfo = (SSysTableScanInfo*) param; + tsem_destroy(&pInfo->ready); + blockDataDestroy(pInfo->pRes); + + if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { + metaCloseTbCursor(pInfo->pCur); + } +} + SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 18d1ff41e2..7ba7d7bdcc 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -31,6 +31,12 @@ void countFunction(SqlFunctionCtx *pCtx); bool getSumFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); void sumFunction(SqlFunctionCtx *pCtx); +bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); +void minFunction(SqlFunctionCtx* pCtx); +void maxFunction(SqlFunctionCtx *pCtx); + #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 7a73bdb4e8..cc2f3c94f9 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -41,6 +41,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = sumFunction, .finalizeFunc = functionFinalizer }, + { + .name = "min", + .type = FUNCTION_TYPE_MIN, + .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = minFunctionSetup, + .processFunc = minFunction, + .finalizeFunc = functionFinalizer + }, + { + .name = "max", + .type = FUNCTION_TYPE_MAX, + .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalizer + }, { .name = "concat", .type = FUNCTION_TYPE_CONCAT, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index fb30cce6a9..e514943f47 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -14,6 +14,7 @@ */ #include "builtinsimpl.h" +#include #include "taggfunction.h" #include "tdatablock.h" @@ -27,7 +28,6 @@ } while (0) typedef struct SSumRes { -// int8_t hasResult; union { int64_t isum; uint64_t usum; @@ -115,7 +115,7 @@ void countFunction(SqlFunctionCtx *pCtx) { } \ } while (0) -static void do_sum(SqlFunctionCtx *pCtx) { +void sumFunction(SqlFunctionCtx *pCtx) { int32_t numOfElem = 0; // Only the pre-computing information loaded and actual data does not loaded @@ -179,14 +179,272 @@ bool getSumFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -void sumFunction(SqlFunctionCtx *pCtx) { - do_sum(pCtx); - // keep the result data in output buffer, not in the intermediate buffer -// SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); -// if (pResInfo->hasResult == DATA_SET_FLAG) { - // set the flag for super table query -// SSumRes *pSum = (SSumRes *)pCtx->pOutput; -// pSum->hasResult = DATA_SET_FLAG; -// } +bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { + if (!functionSetup(pCtx, pResultInfo)) { + return false; + } + + char* buf = GET_ROWCELL_INTERBUF(pResultInfo); + switch (pCtx->input.pData[0]->info.type) { + case TSDB_DATA_TYPE_INT: + *((int32_t *)buf) = INT32_MIN; + break; + case TSDB_DATA_TYPE_UINT: + *((uint32_t *)buf) = 0; + break; + case TSDB_DATA_TYPE_FLOAT: + *((float *)buf) = -FLT_MAX; + break; + case TSDB_DATA_TYPE_DOUBLE: + SET_DOUBLE_VAL(((double *)buf), -DBL_MAX); + break; + case TSDB_DATA_TYPE_BIGINT: + *((int64_t *)buf) = INT64_MIN; + break; + case TSDB_DATA_TYPE_UBIGINT: + *((uint64_t *)buf) = 0; + break; + case TSDB_DATA_TYPE_SMALLINT: + *((int16_t *)buf) = INT16_MIN; + break; + case TSDB_DATA_TYPE_USMALLINT: + *((uint16_t *)buf) = 0; + break; + case TSDB_DATA_TYPE_TINYINT: + *((int8_t *)buf) = INT8_MIN; + break; + case TSDB_DATA_TYPE_UTINYINT: + *((uint8_t *)buf) = 0; + break; + default: + assert(0); + } + return true; } + +bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { + if (!functionSetup(pCtx, pResultInfo)) { + return false; // not initialized since it has been initialized + } + + char* buf = GET_ROWCELL_INTERBUF(pResultInfo); + switch (pCtx->input.pData[0]->info.type) { + case TSDB_DATA_TYPE_TINYINT: + *((int8_t *)buf) = INT8_MAX; + break; + case TSDB_DATA_TYPE_UTINYINT: + *(uint8_t *) buf = UINT8_MAX; + break; + case TSDB_DATA_TYPE_SMALLINT: + *((int16_t *)buf) = INT16_MAX; + break; + case TSDB_DATA_TYPE_USMALLINT: + *((uint16_t *)buf) = UINT16_MAX; + break; + case TSDB_DATA_TYPE_INT: + *((int32_t *)buf) = INT32_MAX; + break; + case TSDB_DATA_TYPE_UINT: + *((uint32_t *)buf) = UINT32_MAX; + break; + case TSDB_DATA_TYPE_BIGINT: + *((int64_t *)buf) = INT64_MAX; + break; + case TSDB_DATA_TYPE_UBIGINT: + *((uint64_t *)buf) = UINT64_MAX; + break; + case TSDB_DATA_TYPE_FLOAT: + *((float *)buf) = FLT_MAX; + break; + case TSDB_DATA_TYPE_DOUBLE: + SET_DOUBLE_VAL(((double *)buf), DBL_MAX); + break; + default: + assert(0); + } + + return true; +} + +bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + SNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); + pEnv->calcMemSize = sizeof(int64_t); + return true; +} + +#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList)) +#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)]) + +#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \ + do { \ + for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \ + SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \ + __ctx->fpSet.process(__ctx); \ + } \ + } while (0); + +#define DO_UPDATE_SUBSID_RES(ctx, ts) \ + do { \ + for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \ + SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \ + if (__ctx->functionId == FUNCTION_TS_DUMMY) { \ + __ctx->tag.i = (ts); \ + __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ + } \ + __ctx->fpSet.process(__ctx); \ + } \ + } while (0) + +#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \ + do { \ + if (((left) < (right)) ^ (sign)) { \ + (left) = (right); \ + DO_UPDATE_SUBSID_RES(ctx, _ts); \ + (num) += 1; \ + } \ + } while (0) + +#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \ + do { \ + _t* d = (_t*)((_col)->pData); \ + for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \ + if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \ + continue; \ + } \ + TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \ + UPDATE_DATA(ctx, val, d[i], num, sign, ts); \ + } \ + } while (0) + +int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) { + int32_t numOfElems = 0; + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0]; + + SColumnInfoData* pCol = pInput->pData[0]; + int32_t type = pCol->info.type; + + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + char* buf = GET_ROWCELL_INTERBUF(pResInfo); + + // data in current data block are qualified to the query + if (pInput->colDataAggIsSet) { + numOfElems = pInput->numOfRows - pAgg->numOfNull; + ASSERT(pInput->numOfRows == pInput->totalRows && numOfElems >= 0); + + if (numOfElems == 0) { + return numOfElems; + } + + void* tval = NULL; + int16_t index = 0; + + if (isMinFunc) { + tval = &pInput->pColumnDataAgg[0]->min; + index = pInput->pColumnDataAgg[0]->minIndex; + } else { + tval = &pInput->pColumnDataAgg[0]->max; + index = pInput->pColumnDataAgg[0]->maxIndex; + } + + TSKEY key = TSKEY_INITIAL_VAL; + if (pCtx->ptsList != NULL) { + // the index is the original position, not the relative position + key = pCtx->ptsList[index]; + } + + if (IS_SIGNED_NUMERIC_TYPE(type)) { + int64_t val = GET_INT64_VAL(tval); + +#if defined(_DEBUG_VIEW) + qDebug("max value updated according to pre-cal:%d", *data); +#endif + + if ((*(int64_t*)buf < val) ^ isMinFunc) { + *(int64_t*) buf = val; + for (int32_t i = 0; i < (pCtx)->subsidiaryRes.numOfCols; ++i) { + SqlFunctionCtx* __ctx = pCtx->subsidiaryRes.pCtx[i]; + if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor + __ctx->tag.i = key; + __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; + } + + __ctx->fpSet.process(__ctx); + } + } + } else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) { + uint64_t val = GET_UINT64_VAL(tval); + UPDATE_DATA(pCtx, *(uint64_t*)buf, val, numOfElems, isMinFunc, key); + } else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) { + double val = GET_DOUBLE_VAL(tval); + UPDATE_DATA(pCtx, *(double*)buf, val, numOfElems, isMinFunc, key); + } else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) { + double val = GET_DOUBLE_VAL(tval); + UPDATE_DATA(pCtx, *(float*)buf, (float)val, numOfElems, isMinFunc, key); + } + + return numOfElems; + } + + int32_t start = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; + + if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { + if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { + LOOPCHECK_N(*(int64_t*)buf, pCol, pCtx, int8_t, numOfRows, start, isMinFunc, numOfElems); + } else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) { + LOOPCHECK_N(*(int64_t*) buf, pCol, pCtx, int16_t, numOfRows, start, isMinFunc, numOfElems); + } else if (pCtx->inputType == TSDB_DATA_TYPE_INT) { + int32_t *pData = (int32_t*)pCol->pData; + int64_t *val = (int64_t*) buf; + + for (int32_t i = 0; i < pCtx->size; ++i) { + if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + if ((*val < pData[i]) ^ isMinFunc) { + *val = pData[i]; + TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i) : 0; + DO_UPDATE_SUBSID_RES(pCtx, ts); + } + + numOfElems += 1; + } + +#if defined(_DEBUG_VIEW) + qDebug("max value updated:%d", *retVal); +#endif + } else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) { + LOOPCHECK_N(*(int64_t*) buf, pCol, pCtx, int64_t, numOfRows, start, isMinFunc, numOfElems); + } + } else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) { + if (pCtx->inputType == TSDB_DATA_TYPE_UTINYINT) { + LOOPCHECK_N(*(uint64_t*) buf, pCol, pCtx, uint8_t, numOfRows, start, isMinFunc, numOfElems); + } else if (pCtx->inputType == TSDB_DATA_TYPE_USMALLINT) { + LOOPCHECK_N(*(uint64_t*) buf, pCol, pCtx, uint16_t, numOfRows, start, isMinFunc, numOfElems); + } else if (pCtx->inputType == TSDB_DATA_TYPE_UINT) { + LOOPCHECK_N(*(uint64_t*) buf, pCol, pCtx, uint32_t, numOfRows, start, isMinFunc, numOfElems); + } else if (pCtx->inputType == TSDB_DATA_TYPE_UBIGINT) { + LOOPCHECK_N(*(uint64_t*) buf, pCol, pCtx, uint64_t, numOfRows, start, isMinFunc, numOfElems); + } + } else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) { + LOOPCHECK_N(*(double*) buf, pCol, pCtx, double, numOfRows, start, isMinFunc, numOfElems); + } else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) { + LOOPCHECK_N(*(float*) buf, pCol, pCtx, float, numOfRows, start, isMinFunc, numOfElems); + } + + return numOfElems; +} + +void minFunction(SqlFunctionCtx *pCtx) { + int32_t numOfElems = doMinMaxHelper(pCtx, 1); + SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); +} + +void maxFunction(SqlFunctionCtx *pCtx) { + int32_t numOfElems = doMinMaxHelper(pCtx, 0); + SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); +} \ No newline at end of file diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index 47d09ec2dc..4360515328 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -958,157 +958,6 @@ static void avg_finalizer(SqlFunctionCtx *pCtx) { ///////////////////////////////////////////////////////////////////////////////////////////// -static void minMax_function(SqlFunctionCtx *pCtx, char *pOutput, int32_t isMin, int32_t *notNullElems) { - // data in current data block are qualified to the query - if (pCtx->isAggSet) { - *notNullElems = pCtx->size - pCtx->agg.numOfNull; - assert(*notNullElems >= 0); - - if (*notNullElems == 0) { - return; - } - - void* tval = NULL; - int16_t index = 0; - - if (isMin) { - tval = &pCtx->agg.min; - index = pCtx->agg.minIndex; - } else { - tval = &pCtx->agg.max; - index = pCtx->agg.maxIndex; - } - - TSKEY key = TSKEY_INITIAL_VAL; - if (pCtx->ptsList != NULL) { - /** - * NOTE: work around the bug caused by invalid pre-calculated function. - * Here the selectivity + ts will not return correct value. - * - * The following codes of 3 lines will be removed later. - */ -// if (index < 0 || index >= pCtx->size + pCtx->startOffset) { -// index = 0; -// } - - // the index is the original position, not the relative position - key = pCtx->ptsList[index]; - } - - if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { - int64_t val = GET_INT64_VAL(tval); - if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { - int8_t *data = (int8_t *)pOutput; - - UPDATE_DATA(pCtx, *data, (int8_t)val, notNullElems, isMin, key); - } else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) { - int16_t *data = (int16_t *)pOutput; - - UPDATE_DATA(pCtx, *data, (int16_t)val, notNullElems, isMin, key); - } else if (pCtx->inputType == TSDB_DATA_TYPE_INT) { - int32_t *data = (int32_t *)pOutput; -#if defined(_DEBUG_VIEW) - qDebug("max value updated according to pre-cal:%d", *data); -#endif - - if ((*data < val) ^ isMin) { - *data = (int32_t)val; - for (int32_t i = 0; i < (pCtx)->tagInfo.numOfTagCols; ++i) { - SqlFunctionCtx *__ctx = pCtx->tagInfo.pTagCtxList[i]; - if (__ctx->functionId == FUNCTION_TS_DUMMY) { - __ctx->tag.i = key; - __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; - } - - aggFunc[FUNCTION_TAG].addInput(__ctx); - } - } - } else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) { - int64_t *data = (int64_t *)pOutput; - UPDATE_DATA(pCtx, *data, val, notNullElems, isMin, key); - } - } else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) { - uint64_t val = GET_UINT64_VAL(tval); - if (pCtx->inputType == TSDB_DATA_TYPE_UTINYINT) { - uint8_t *data = (uint8_t *)pOutput; - - UPDATE_DATA(pCtx, *data, (uint8_t)val, notNullElems, isMin, key); - } else if (pCtx->inputType == TSDB_DATA_TYPE_USMALLINT) { - uint16_t *data = (uint16_t *)pOutput; - UPDATE_DATA(pCtx, *data, (uint16_t)val, notNullElems, isMin, key); - } else if (pCtx->inputType == TSDB_DATA_TYPE_UINT) { - uint32_t *data = (uint32_t *)pOutput; - UPDATE_DATA(pCtx, *data, (uint32_t)val, notNullElems, isMin, key); - } else if (pCtx->inputType == TSDB_DATA_TYPE_UBIGINT) { - uint64_t *data = (uint64_t *)pOutput; - UPDATE_DATA(pCtx, *data, val, notNullElems, isMin, key); - } - } else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) { - double *data = (double *)pOutput; - double val = GET_DOUBLE_VAL(tval); - - UPDATE_DATA(pCtx, *data, val, notNullElems, isMin, key); - } else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) { - float *data = (float *)pOutput; - double val = GET_DOUBLE_VAL(tval); - - UPDATE_DATA(pCtx, *data, (float)val, notNullElems, isMin, key); - } - - return; - } - - void *p = GET_INPUT_DATA_LIST(pCtx); - TSKEY *tsList = GET_TS_LIST(pCtx); - - *notNullElems = 0; - - if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { - if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { - TYPED_LOOPCHECK_N(int8_t, pOutput, p, pCtx, pCtx->inputType, isMin, *notNullElems); - } else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) { - TYPED_LOOPCHECK_N(int16_t, pOutput, p, pCtx, pCtx->inputType, isMin, *notNullElems); - } else if (pCtx->inputType == TSDB_DATA_TYPE_INT) { - int32_t *pData = p; - int32_t *retVal = (int32_t*) pOutput; - - for (int32_t i = 0; i < pCtx->size; ++i) { - if (pCtx->hasNull && isNull((const char*)&pData[i], pCtx->inputType)) { - continue; - } - - if ((*retVal < pData[i]) ^ isMin) { - *retVal = pData[i]; - TSKEY k = tsList[i]; - - DO_UPDATE_TAG_COLUMNS(pCtx, k); - } - - *notNullElems += 1; - } -#if defined(_DEBUG_VIEW) - qDebug("max value updated:%d", *retVal); -#endif - } else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) { - TYPED_LOOPCHECK_N(int64_t, pOutput, p, pCtx, pCtx->inputType, isMin, *notNullElems); - } - } else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) { - if (pCtx->inputType == TSDB_DATA_TYPE_UTINYINT) { - TYPED_LOOPCHECK_N(uint8_t, pOutput, p, pCtx, pCtx->inputType, isMin, *notNullElems); - } else if (pCtx->inputType == TSDB_DATA_TYPE_USMALLINT) { - TYPED_LOOPCHECK_N(uint16_t, pOutput, p, pCtx, pCtx->inputType, isMin, *notNullElems); - } else if (pCtx->inputType == TSDB_DATA_TYPE_UINT) { - TYPED_LOOPCHECK_N(uint32_t, pOutput, p, pCtx, pCtx->inputType, isMin, *notNullElems); - } else if (pCtx->inputType == TSDB_DATA_TYPE_UBIGINT) { - TYPED_LOOPCHECK_N(uint64_t, pOutput, p, pCtx, pCtx->inputType, isMin, *notNullElems); - } - } else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) { - TYPED_LOOPCHECK_N(double, pOutput, p, pCtx, pCtx->inputType, isMin, *notNullElems); - } else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) { - TYPED_LOOPCHECK_N(float, pOutput, p, pCtx, pCtx->inputType, isMin, *notNullElems); - } -} - static bool min_func_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { if (!function_setup(pCtx, pResultInfo)) { return false; // not initialized since it has been initialized @@ -1204,43 +1053,9 @@ static bool max_func_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInf /* * the output result of min/max function is the final output buffer, not the intermediate result buffer */ -static void min_function(SqlFunctionCtx *pCtx) { - int32_t notNullElems = 0; - minMax_function(pCtx, pCtx->pOutput, 1, ¬NullElems); - - SET_VAL(pCtx, notNullElems, 1); - - if (notNullElems > 0) { - SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - //pResInfo->hasResult = DATA_SET_FLAG; - - // set the flag for super table query - if (pCtx->stableQuery) { - *(pCtx->pOutput + pCtx->inputBytes) = DATA_SET_FLAG; - } - } -} - -static void max_function(SqlFunctionCtx *pCtx) { - int32_t notNullElems = 0; - minMax_function(pCtx, pCtx->pOutput, 0, ¬NullElems); - - SET_VAL(pCtx, notNullElems, 1); - - if (notNullElems > 0) { - SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - //pResInfo->hasResult = DATA_SET_FLAG; - - // set the flag for super table query - if (pCtx->stableQuery) { - *(pCtx->pOutput + pCtx->inputBytes) = DATA_SET_FLAG; - } - } -} - static int32_t minmax_merge_impl(SqlFunctionCtx *pCtx, int32_t bytes, char *output, bool isMin) { int32_t notNullElems = 0; - +#if 0 GET_TRUE_DATA_TYPE(); assert(pCtx->stableQuery); @@ -1319,7 +1134,8 @@ static int32_t minmax_merge_impl(SqlFunctionCtx *pCtx, int32_t bytes, char *outp break; } } - +#endif + return notNullElems; } @@ -1618,7 +1434,7 @@ static void first_function(SqlFunctionCtx *pCtx) { memcpy(pCtx->pOutput, data, pCtx->inputBytes); if (pCtx->ptsList != NULL) { TSKEY k = GET_TS_DATA(pCtx, i); - DO_UPDATE_TAG_COLUMNS(pCtx, k); +// DO_UPDATE_TAG_COLUMNS(pCtx, k); } SResultRowEntryInfo *pInfo = GET_RES_INFO(pCtx); @@ -1642,7 +1458,7 @@ static void first_data_assign_impl(SqlFunctionCtx *pCtx, char *pData, int32_t in pInfo->hasResult = DATA_SET_FLAG; pInfo->ts = timestamp[index]; - DO_UPDATE_TAG_COLUMNS(pCtx, pInfo->ts); +// DO_UPDATE_TAG_COLUMNS(pCtx, pInfo->ts); } } @@ -1696,7 +1512,7 @@ static void first_dist_func_merge(SqlFunctionCtx *pCtx) { pCtx->param[1].i = pInput->ts; pCtx->param[1].nType = pCtx->resDataInfo.type; - DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts); +// DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts); } SET_VAL(pCtx, 1, 1); @@ -1730,7 +1546,7 @@ static void last_function(SqlFunctionCtx *pCtx) { memcpy(pCtx->pOutput, data, pCtx->inputBytes); TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0; - DO_UPDATE_TAG_COLUMNS(pCtx, ts); +// DO_UPDATE_TAG_COLUMNS(pCtx, ts); //pResInfo->hasResult = DATA_SET_FLAG; pResInfo->complete = true; // set query completed on this column @@ -1777,7 +1593,7 @@ static void last_data_assign_impl(SqlFunctionCtx *pCtx, char *pData, int32_t ind pInfo->hasResult = DATA_SET_FLAG; pInfo->ts = timestamp[index]; - DO_UPDATE_TAG_COLUMNS(pCtx, pInfo->ts); +// DO_UPDATE_TAG_COLUMNS(pCtx, pInfo->ts); } } @@ -1833,7 +1649,7 @@ static void last_dist_func_merge(SqlFunctionCtx *pCtx) { pCtx->param[1].i = pInput->ts; pCtx->param[1].nType = pCtx->resDataInfo.type; - DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts); +// DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts); } SET_VAL(pCtx, 1, 1); @@ -1860,10 +1676,10 @@ static void last_row_function(SqlFunctionCtx *pCtx) { pInfo1->ts = GET_TS_DATA(pCtx, pCtx->size - 1); pInfo1->hasResult = DATA_SET_FLAG; - DO_UPDATE_TAG_COLUMNS(pCtx, pInfo1->ts); +// DO_UPDATE_TAG_COLUMNS(pCtx, pInfo1->ts); } else { TSKEY ts = GET_TS_DATA(pCtx, pCtx->size - 1); - DO_UPDATE_TAG_COLUMNS(pCtx, ts); +// DO_UPDATE_TAG_COLUMNS(pCtx, ts); } SET_VAL(pCtx, pCtx->size, 1); @@ -1883,25 +1699,25 @@ static void last_row_finalizer(SqlFunctionCtx *pCtx) { ////////////////////////////////////////////////////////////////////////////////// static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int64_t tsKey, char *pTags, - SExtTagsInfo *pTagInfo, int16_t stage) { + SSubsidiaryResInfo *pTagInfo, int16_t stage) { dst->v.nType = type; dst->v.i = *(int64_t *)val; dst->timestamp = tsKey; int32_t size = 0; if (stage == MERGE_STAGE) { - memcpy(dst->pTags, pTags, (size_t)pTagInfo->tagsLen); +// memcpy(dst->pTags, pTags, (size_t)pTagInfo->tagsLen); } else { // the tags are dumped from the ctx tag fields - for (int32_t i = 0; i < pTagInfo->numOfTagCols; ++i) { - SqlFunctionCtx* ctx = pTagInfo->pTagCtxList[i]; - if (ctx->functionId == FUNCTION_TS_DUMMY) { - ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; - ctx->tag.i = tsKey; - } - - taosVariantDump(&ctx->tag, dst->pTags + size, ctx->tag.nType, true); - size += pTagInfo->pTagCtxList[i]->resDataInfo.bytes; - } +// for (int32_t i = 0; i < pTagInfo->numOfTagCols; ++i) { +// SqlFunctionCtx* ctx = pTagInfo->pTagCtxList[i]; +// if (ctx->functionId == FUNCTION_TS_DUMMY) { +// ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; +// ctx->tag.i = tsKey; +// } +// +// taosVariantDump(&ctx->tag, dst->pTags + size, ctx->tag.nType, true); +// size += pTagInfo->pTagCtxList[i]->resDataInfo.bytes; +// } } } @@ -1956,7 +1772,7 @@ static void topBotSwapFn(void *dst, void *src, const void *param) } static void do_top_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pData, int64_t ts, uint16_t type, - SExtTagsInfo *pTagInfo, char *pTags, int16_t stage) { + SSubsidiaryResInfo *pTagInfo, char *pTags, int16_t stage) { SVariant val = {0}; taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type); @@ -1966,7 +1782,7 @@ static void do_top_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pData, if (pInfo->num < maxLen) { valuePairAssign(pList[pInfo->num], type, (const char *)&val.i, ts, pTags, pTagInfo, stage); - taosheapsort((void *) pList, sizeof(tValuePair **), pInfo->num + 1, (const void *) &type, topBotComparFn, (const void *) &pTagInfo->tagsLen, topBotSwapFn, 0); +// taosheapsort((void *) pList, sizeof(tValuePair **), pInfo->num + 1, (const void *) &type, topBotComparFn, (const void *) &pTagInfo->tagsLen, topBotSwapFn, 0); pInfo->num++; } else { @@ -1974,13 +1790,13 @@ static void do_top_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pData, (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pList[0]->v.u) || (IS_FLOAT_TYPE(type) && val.d > pList[0]->v.d)) { valuePairAssign(pList[0], type, (const char *)&val.i, ts, pTags, pTagInfo, stage); - taosheapadjust((void *) pList, sizeof(tValuePair **), 0, maxLen - 1, (const void *) &type, topBotComparFn, (const void *) &pTagInfo->tagsLen, topBotSwapFn, 0); +// taosheapadjust((void *) pList, sizeof(tValuePair **), 0, maxLen - 1, (const void *) &type, topBotComparFn, (const void *) &pTagInfo->tagsLen, topBotSwapFn, 0); } } } static void do_bottom_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pData, int64_t ts, uint16_t type, - SExtTagsInfo *pTagInfo, char *pTags, int16_t stage) { + SSubsidiaryResInfo *pTagInfo, char *pTags, int16_t stage) { SVariant val = {0}; taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type); @@ -1990,7 +1806,7 @@ static void do_bottom_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pDa if (pInfo->num < maxLen) { valuePairAssign(pList[pInfo->num], type, (const char *)&val.i, ts, pTags, pTagInfo, stage); - taosheapsort((void *) pList, sizeof(tValuePair **), pInfo->num + 1, (const void *) &type, topBotComparFn, (const void *) &pTagInfo->tagsLen, topBotSwapFn, 1); +// taosheapsort((void *) pList, sizeof(tValuePair **), pInfo->num + 1, (const void *) &type, topBotComparFn, (const void *) &pTagInfo->tagsLen, topBotSwapFn, 1); pInfo->num++; } else { @@ -1998,7 +1814,7 @@ static void do_bottom_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pDa (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u < pList[0]->v.u) || (IS_FLOAT_TYPE(type) && val.d < pList[0]->v.d)) { valuePairAssign(pList[0], type, (const char *)&val.i, ts, pTags, pTagInfo, stage); - taosheapadjust((void *) pList, sizeof(tValuePair **), 0, maxLen - 1, (const void *) &type, topBotComparFn, (const void *) &pTagInfo->tagsLen, topBotSwapFn, 1); +// taosheapadjust((void *) pList, sizeof(tValuePair **), 0, maxLen - 1, (const void *) &type, topBotComparFn, (const void *) &pTagInfo->tagsLen, topBotSwapFn, 1); } } } @@ -2113,21 +1929,21 @@ static void copyTopBotRes(SqlFunctionCtx *pCtx, int32_t type) { // set the corresponding tag data for each record // todo check malloc failure - char **pData = calloc(pCtx->tagInfo.numOfTagCols, POINTER_BYTES); - for (int32_t i = 0; i < pCtx->tagInfo.numOfTagCols; ++i) { - pData[i] = pCtx->tagInfo.pTagCtxList[i]->pOutput; - } +// char **pData = calloc(pCtx->tagInfo.numOfTagCols, POINTER_BYTES); +// for (int32_t i = 0; i < pCtx->tagInfo.numOfTagCols; ++i) { +// pData[i] = pCtx->tagInfo.pTagCtxList[i]->pOutput; +// } - for (int32_t i = 0; i < len; ++i, output += step) { - int16_t offset = 0; - for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) { - memcpy(pData[j], tvp[i]->pTags + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->resDataInfo.bytes); - offset += pCtx->tagInfo.pTagCtxList[j]->resDataInfo.bytes; - pData[j] += pCtx->tagInfo.pTagCtxList[j]->resDataInfo.bytes; - } - } +// for (int32_t i = 0; i < len; ++i, output += step) { +// int16_t offset = 0; +// for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) { +// memcpy(pData[j], tvp[i]->pTags + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->resDataInfo.bytes); +// offset += pCtx->tagInfo.pTagCtxList[j]->resDataInfo.bytes; +// pData[j] += pCtx->tagInfo.pTagCtxList[j]->resDataInfo.bytes; +// } +// } - tfree(pData); +// tfree(pData); } /* @@ -2161,13 +1977,13 @@ static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SqlFunctionCtx *pCtx) { pTopBotInfo->res = (tValuePair**) tmp; tmp += POINTER_BYTES * pCtx->param[0].i; - size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen; +// size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen; - for (int32_t i = 0; i < pCtx->param[0].i; ++i) { - pTopBotInfo->res[i] = (tValuePair*) tmp; - pTopBotInfo->res[i]->pTags = tmp + sizeof(tValuePair); - tmp += size; - } +// for (int32_t i = 0; i < pCtx->param[0].i; ++i) { +// pTopBotInfo->res[i] = (tValuePair*) tmp; +// pTopBotInfo->res[i]->pTags = tmp + sizeof(tValuePair); +// tmp += size; +// } } bool topbot_datablock_filter(SqlFunctionCtx *pCtx, const char *minval, const char *maxval) { @@ -2256,7 +2072,7 @@ static void top_function(SqlFunctionCtx *pCtx) { // NOTE: Set the default timestamp if it is missing [todo refactor] TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0; - do_top_function_add(pRes, (int32_t)pCtx->param[0].i, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); +// do_top_function_add(pRes, (int32_t)pCtx->param[0].i, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); } if (!pCtx->hasNull) { @@ -2283,8 +2099,8 @@ static void top_func_merge(SqlFunctionCtx *pCtx) { // the intermediate result is binary, we only use the output data type for (int32_t i = 0; i < pInput->num; ++i) { int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->resDataInfo.type; - do_top_function_add(pOutput, (int32_t)pCtx->param[0].i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, - type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); +// do_top_function_add(pOutput, (int32_t)pCtx->param[0].i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, +// type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); } SET_VAL(pCtx, pInput->num, pOutput->num); @@ -2313,7 +2129,7 @@ static void bottom_function(SqlFunctionCtx *pCtx) { notNullElems++; // NOTE: Set the default timestamp if it is missing [todo refactor] TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0; - do_bottom_function_add(pRes, (int32_t)pCtx->param[0].i, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); +// do_bottom_function_add(pRes, (int32_t)pCtx->param[0].i, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); } if (!pCtx->hasNull) { @@ -2340,8 +2156,8 @@ static void bottom_func_merge(SqlFunctionCtx *pCtx) { // the intermediate result is binary, we only use the output data type for (int32_t i = 0; i < pInput->num; ++i) { int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT) ? TSDB_DATA_TYPE_DOUBLE : pCtx->resDataInfo.type; - do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, type, - &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); +// do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, type, +// &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); } SET_VAL(pCtx, pInput->num, pOutput->num); @@ -4448,7 +4264,7 @@ SAggFunctionInfo aggFunc[35] = {{ FUNCTION_MIN, BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY, min_func_setup, - min_function, + NULL, function_finalizer, min_func_merge, statisRequired, @@ -4461,7 +4277,7 @@ SAggFunctionInfo aggFunc[35] = {{ FUNCTION_MAX, BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY, max_func_setup, - max_function, + NULL, function_finalizer, max_func_merge, statisRequired,