diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index a20d0e4718..b75b52f5b3 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -76,6 +76,11 @@ int32_t firstFunction(SqlFunctionCtx *pCtx); int32_t lastFunction(SqlFunctionCtx *pCtx); int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +int32_t uniqueFunction(SqlFunctionCtx *pCtx); +int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); + bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); int32_t topFunction(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 5358930df0..85c69d028b 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -493,6 +493,21 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l return TSDB_CODE_SUCCESS; } +static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return TSDB_CODE_SUCCESS; + } + + SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); + if (QUERY_NODE_COLUMN != nodeType(pPara)) { + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "The parameters of UNIQUE can only be columns"); + } + + pFunc->node.resType = ((SExprNode*)pPara)->resType; + return TSDB_CODE_SUCCESS; +} + static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { int32_t paraLen = LIST_LENGTH(pFunc->pParameterList); if (paraLen == 0 || paraLen > 2) { @@ -878,14 +893,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .finalizeFunc = lastFinalize }, { - .name = "diff", - .type = FUNCTION_TYPE_DIFF, - .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, - .translateFunc = translateDiff, - .getEnvFunc = getDiffFuncEnv, - .initFunc = diffFunctionSetup, - .processFunc = diffFunction, - .finalizeFunc = functionFinalize + .name = "unique", + .type = FUNCTION_TYPE_UNIQUE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateUnique, + .getEnvFunc = getUniqueFuncEnv, + .initFunc = uniqueFunctionSetup, + .processFunc = uniqueFunction, + .finalizeFunc = uniqueFinalize }, { .name = "histogram", @@ -907,6 +922,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = hllFunction, .finalizeFunc = hllFinalize }, + { + .name = "diff", + .type = FUNCTION_TYPE_DIFF, + .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateDiff, + .getEnvFunc = getDiffFuncEnv, + .initFunc = diffFunctionSetup, + .processFunc = diffFunction, + .finalizeFunc = functionFinalize + }, { .name = "state_count", .type = FUNCTION_TYPE_STATE_COUNT, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index d54cc96611..79d4e4f225 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -28,12 +28,15 @@ #define TAIL_MAX_POINTS_NUM 100 #define TAIL_MAX_OFFSET 100 +#define UNIQUE_MAX_RESULT_SIZE (1024*1024*10) + #define HLL_BUCKET_BITS 14 // The bits of the bucket #define HLL_DATA_BITS (64-HLL_BUCKET_BITS) #define HLL_BUCKETS (1<subsidiaries.num; ++_i) { \ + SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i]; \ + if (__ctx->functionId == FUNCTION_TS_DUMMY) { \ + __ctx->tag.i = (ts); \ + __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ + } \ + __ctx->fpSet.process(__ctx); \ + } \ + } while (0) + #define UPDATE_DATA(ctx, left, right, num, sign, _ts) \ do { \ if (((left) < (right)) ^ (sign)) { \ @@ -748,50 +777,6 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList)) -#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)]) - -#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \ - do { \ - for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \ - SqlFunctionCtx* __ctx = (ctx)->tagInfo.pTagCtxList[_i]; \ - __ctx->fpSet.process(__ctx); \ - } \ - } while (0); - -#define DO_UPDATE_SUBSID_RES(ctx, ts) \ - do { \ - for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \ - SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i]; \ - if (__ctx->functionId == FUNCTION_TS_DUMMY) { \ - __ctx->tag.i = (ts); \ - __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ - } \ - __ctx->fpSet.process(__ctx); \ - } \ - } while (0) - -#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \ - do { \ - if (((left) < (right)) ^ (sign)) { \ - (left) = (right); \ - DO_UPDATE_SUBSID_RES(ctx, _ts); \ - (num) += 1; \ - } \ - } while (0) - -#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \ - do { \ - _t* d = (_t*)((_col)->pData); \ - for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \ - if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \ - continue; \ - } \ - TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \ - UPDATE_DATA(ctx, val, d[i], num, sign, ts); \ - } \ - } while (0) - static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); @@ -1994,6 +1979,99 @@ int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pResInfo->numOfRes; } +bool getUniqueFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(SUniqueInfo) + UNIQUE_MAX_RESULT_SIZE; + return true; +} + +bool uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { + if (!functionSetup(pCtx, pResInfo)) { + return false; + } + + SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + pInfo->numOfPoints = 0; + pInfo->colType = pCtx->resDataInfo.type; + pInfo->colBytes = pCtx->resDataInfo.bytes; + if (pInfo->pHash != NULL) { + taosHashClear(pInfo->pHash); + } else { + pInfo->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + } + return true; +} + +static void doUniqueAdd(SUniqueInfo* pInfo, char *data, TSKEY ts, bool isNull) { + int32_t hashKeyBytes = IS_VAR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes; + + SUniqueItem *pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes); + if (pHashItem == NULL) { + int32_t size = sizeof(SUniqueItem) + pInfo->colBytes; + SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + pInfo->numOfPoints * size); + pItem->timestamp = ts; + memcpy(pItem->data, data, pInfo->colBytes); + + taosHashPut(pInfo->pHash, data, hashKeyBytes, (char *)pItem, sizeof(SUniqueItem*)); + pInfo->numOfPoints++; + } else if (pHashItem->timestamp > ts) { + pHashItem->timestamp = ts; + } + +} + +int32_t uniqueFunction(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + + SColumnInfoData* pInputCol = pInput->pData[0]; + SColumnInfoData* pTsOutput = pCtx->pTsOutput; + SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; + + int32_t startOffset = pCtx->offset; + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + char* data = colDataGetData(pInputCol, i); + doUniqueAdd(pInfo, data, tsList[i], colDataIsNull_s(pInputCol, i)); + + if (sizeof(SUniqueInfo) + pInfo->numOfPoints * (sizeof(SUniqueItem) + pInfo->colBytes) >= UNIQUE_MAX_RESULT_SIZE) { + taosHashCleanup(pInfo->pHash); + return 0; + } + } + + //taosqsort(pInfo->pItems, pInfo->numOfPoints, POINTER_BYTES, NULL, tailCompFn); + + //for (int32_t i = 0; i < pInfo->numOfPoints; ++i) { + // int32_t pos = startOffset + i; + // STailItem *pItem = pInfo->pItems[i]; + // if (pItem->isNull) { + // colDataAppendNULL(pOutput, pos); + // } else { + // colDataAppend(pOutput, pos, pItem->data, false); + // } + //} + + pResInfo->numOfRes = pInfo->numOfPoints; + return TSDB_CODE_SUCCESS; +} + +int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + for (int32_t i = 0; i < pResInfo->numOfRes; ++i) { + SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + i * (sizeof(SUniqueItem) + pInfo->colBytes)); + colDataAppend(pCol, i, pItem->data, false); + //TODO: handle ts output + } + + return pResInfo->numOfRes; +} + bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SDiffInfo); return true; @@ -2106,7 +2184,7 @@ static void doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SCo default: ASSERT(0); } - } +} int32_t diffFunction(SqlFunctionCtx* pCtx) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);