feat(query): add unique function
This commit is contained in:
parent
8daaea58de
commit
bf4826d9a8
|
@ -76,6 +76,11 @@ int32_t firstFunction(SqlFunctionCtx *pCtx);
|
|||
int32_t lastFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
||||
bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t uniqueFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
||||
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
|
||||
int32_t topFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t bottomFunction(SqlFunctionCtx *pCtx);
|
||||
|
|
|
@ -493,6 +493,21 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"The parameters of UNIQUE can only be columns");
|
||||
}
|
||||
|
||||
pFunc->node.resType = ((SExprNode*)pPara)->resType;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
int32_t paraLen = LIST_LENGTH(pFunc->pParameterList);
|
||||
if (paraLen == 0 || paraLen > 2) {
|
||||
|
@ -878,14 +893,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.finalizeFunc = lastFinalize
|
||||
},
|
||||
{
|
||||
.name = "diff",
|
||||
.type = FUNCTION_TYPE_DIFF,
|
||||
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.translateFunc = translateDiff,
|
||||
.getEnvFunc = getDiffFuncEnv,
|
||||
.initFunc = diffFunctionSetup,
|
||||
.processFunc = diffFunction,
|
||||
.finalizeFunc = functionFinalize
|
||||
.name = "unique",
|
||||
.type = FUNCTION_TYPE_UNIQUE,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.translateFunc = translateUnique,
|
||||
.getEnvFunc = getUniqueFuncEnv,
|
||||
.initFunc = uniqueFunctionSetup,
|
||||
.processFunc = uniqueFunction,
|
||||
.finalizeFunc = uniqueFinalize
|
||||
},
|
||||
{
|
||||
.name = "histogram",
|
||||
|
@ -907,6 +922,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.processFunc = hllFunction,
|
||||
.finalizeFunc = hllFinalize
|
||||
},
|
||||
{
|
||||
.name = "diff",
|
||||
.type = FUNCTION_TYPE_DIFF,
|
||||
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.translateFunc = translateDiff,
|
||||
.getEnvFunc = getDiffFuncEnv,
|
||||
.initFunc = diffFunctionSetup,
|
||||
.processFunc = diffFunction,
|
||||
.finalizeFunc = functionFinalize
|
||||
},
|
||||
{
|
||||
.name = "state_count",
|
||||
.type = FUNCTION_TYPE_STATE_COUNT,
|
||||
|
|
|
@ -28,12 +28,15 @@
|
|||
#define TAIL_MAX_POINTS_NUM 100
|
||||
#define TAIL_MAX_OFFSET 100
|
||||
|
||||
#define UNIQUE_MAX_RESULT_SIZE (1024*1024*10)
|
||||
|
||||
#define HLL_BUCKET_BITS 14 // The bits of the bucket
|
||||
#define HLL_DATA_BITS (64-HLL_BUCKET_BITS)
|
||||
#define HLL_BUCKETS (1<<HLL_BUCKET_BITS)
|
||||
#define HLL_BUCKET_MASK (HLL_BUCKETS-1)
|
||||
#define HLL_ALPHA_INF 0.721347520444481703680 // constant for 0.5/ln(2)
|
||||
|
||||
|
||||
typedef struct SSumRes {
|
||||
union {
|
||||
int64_t isum;
|
||||
|
@ -197,6 +200,20 @@ typedef struct STailInfo {
|
|||
STailItem **pItems;
|
||||
} STailInfo;
|
||||
|
||||
typedef struct SUniqueItem {
|
||||
int64_t timestamp;
|
||||
bool isNull;
|
||||
char data[];
|
||||
} SUniqueItem;
|
||||
|
||||
typedef struct SUniqueInfo {
|
||||
int32_t numOfPoints;
|
||||
uint8_t colType;
|
||||
int16_t colBytes;
|
||||
SHashObj *pHash;
|
||||
char pItems[];
|
||||
} SUniqueInfo;
|
||||
|
||||
#define SET_VAL(_info, numOfElem, res) \
|
||||
do { \
|
||||
if ((numOfElem) <= 0) { \
|
||||
|
@ -216,6 +233,18 @@ typedef struct STailInfo {
|
|||
} \
|
||||
} while (0);
|
||||
|
||||
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
|
||||
do { \
|
||||
for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \
|
||||
SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i]; \
|
||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
|
||||
__ctx->tag.i = (ts); \
|
||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
|
||||
} \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
|
||||
do { \
|
||||
if (((left) < (right)) ^ (sign)) { \
|
||||
|
@ -748,50 +777,6 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
|||
return true;
|
||||
}
|
||||
|
||||
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
|
||||
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
|
||||
|
||||
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
|
||||
do { \
|
||||
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
|
||||
SqlFunctionCtx* __ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
} \
|
||||
} while (0);
|
||||
|
||||
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
|
||||
do { \
|
||||
for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \
|
||||
SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i]; \
|
||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
|
||||
__ctx->tag.i = (ts); \
|
||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
|
||||
} \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
|
||||
do { \
|
||||
if (((left) < (right)) ^ (sign)) { \
|
||||
(left) = (right); \
|
||||
DO_UPDATE_SUBSID_RES(ctx, _ts); \
|
||||
(num) += 1; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
|
||||
do { \
|
||||
_t* d = (_t*)((_col)->pData); \
|
||||
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
|
||||
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
|
||||
continue; \
|
||||
} \
|
||||
TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \
|
||||
UPDATE_DATA(ctx, val, d[i], num, sign, ts); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
|
||||
static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
|
||||
|
||||
|
@ -1994,6 +1979,99 @@ int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
bool getUniqueFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SUniqueInfo) + UNIQUE_MAX_RESULT_SIZE;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
||||
if (!functionSetup(pCtx, pResInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
pInfo->numOfPoints = 0;
|
||||
pInfo->colType = pCtx->resDataInfo.type;
|
||||
pInfo->colBytes = pCtx->resDataInfo.bytes;
|
||||
if (pInfo->pHash != NULL) {
|
||||
taosHashClear(pInfo->pHash);
|
||||
} else {
|
||||
pInfo->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static void doUniqueAdd(SUniqueInfo* pInfo, char *data, TSKEY ts, bool isNull) {
|
||||
int32_t hashKeyBytes = IS_VAR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes;
|
||||
|
||||
SUniqueItem *pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes);
|
||||
if (pHashItem == NULL) {
|
||||
int32_t size = sizeof(SUniqueItem) + pInfo->colBytes;
|
||||
SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + pInfo->numOfPoints * size);
|
||||
pItem->timestamp = ts;
|
||||
memcpy(pItem->data, data, pInfo->colBytes);
|
||||
|
||||
taosHashPut(pInfo->pHash, data, hashKeyBytes, (char *)pItem, sizeof(SUniqueItem*));
|
||||
pInfo->numOfPoints++;
|
||||
} else if (pHashItem->timestamp > ts) {
|
||||
pHashItem->timestamp = ts;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
int32_t uniqueFunction(SqlFunctionCtx* pCtx) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
|
||||
int32_t startOffset = pCtx->offset;
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
doUniqueAdd(pInfo, data, tsList[i], colDataIsNull_s(pInputCol, i));
|
||||
|
||||
if (sizeof(SUniqueInfo) + pInfo->numOfPoints * (sizeof(SUniqueItem) + pInfo->colBytes) >= UNIQUE_MAX_RESULT_SIZE) {
|
||||
taosHashCleanup(pInfo->pHash);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
//taosqsort(pInfo->pItems, pInfo->numOfPoints, POINTER_BYTES, NULL, tailCompFn);
|
||||
|
||||
//for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
|
||||
// int32_t pos = startOffset + i;
|
||||
// STailItem *pItem = pInfo->pItems[i];
|
||||
// if (pItem->isNull) {
|
||||
// colDataAppendNULL(pOutput, pos);
|
||||
// } else {
|
||||
// colDataAppend(pOutput, pos, pItem->data, false);
|
||||
// }
|
||||
//}
|
||||
|
||||
pResInfo->numOfRes = pInfo->numOfPoints;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
for (int32_t i = 0; i < pResInfo->numOfRes; ++i) {
|
||||
SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + i * (sizeof(SUniqueItem) + pInfo->colBytes));
|
||||
colDataAppend(pCol, i, pItem->data, false);
|
||||
//TODO: handle ts output
|
||||
}
|
||||
|
||||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SDiffInfo);
|
||||
return true;
|
||||
|
@ -2106,7 +2184,7 @@ static void doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SCo
|
|||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
|
|
Loading…
Reference in New Issue