fix(query): change unique function type to non-standard sql function
This commit is contained in:
parent
bf4826d9a8
commit
3d34f7850e
|
@ -76,11 +76,6 @@ int32_t firstFunction(SqlFunctionCtx *pCtx);
|
|||
int32_t lastFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
||||
bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t uniqueFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
||||
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
|
||||
int32_t topFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t bottomFunction(SqlFunctionCtx *pCtx);
|
||||
|
@ -125,7 +120,13 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx);
|
|||
bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t tailFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
//int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
||||
bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t uniqueFunction(SqlFunctionCtx *pCtx);
|
||||
//int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
||||
|
||||
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
|
||||
|
|
|
@ -892,16 +892,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.processFunc = lastFunction,
|
||||
.finalizeFunc = lastFinalize
|
||||
},
|
||||
{
|
||||
.name = "unique",
|
||||
.type = FUNCTION_TYPE_UNIQUE,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.translateFunc = translateUnique,
|
||||
.getEnvFunc = getUniqueFuncEnv,
|
||||
.initFunc = uniqueFunctionSetup,
|
||||
.processFunc = uniqueFunction,
|
||||
.finalizeFunc = uniqueFinalize
|
||||
},
|
||||
{
|
||||
.name = "histogram",
|
||||
.type = FUNCTION_TYPE_HISTOGRAM,
|
||||
|
@ -992,6 +982,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.processFunc = tailFunction,
|
||||
.finalizeFunc = tailFinalize
|
||||
},
|
||||
{
|
||||
.name = "unique",
|
||||
.type = FUNCTION_TYPE_UNIQUE,
|
||||
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.translateFunc = translateUnique,
|
||||
.getEnvFunc = getUniqueFuncEnv,
|
||||
.initFunc = uniqueFunctionSetup,
|
||||
.processFunc = uniqueFunction,
|
||||
.finalizeFunc = uniqueFinalize
|
||||
},
|
||||
{
|
||||
.name = "abs",
|
||||
.type = FUNCTION_TYPE_ABS,
|
||||
|
|
|
@ -1979,99 +1979,6 @@ int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
bool getUniqueFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SUniqueInfo) + UNIQUE_MAX_RESULT_SIZE;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
||||
if (!functionSetup(pCtx, pResInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
pInfo->numOfPoints = 0;
|
||||
pInfo->colType = pCtx->resDataInfo.type;
|
||||
pInfo->colBytes = pCtx->resDataInfo.bytes;
|
||||
if (pInfo->pHash != NULL) {
|
||||
taosHashClear(pInfo->pHash);
|
||||
} else {
|
||||
pInfo->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static void doUniqueAdd(SUniqueInfo* pInfo, char *data, TSKEY ts, bool isNull) {
|
||||
int32_t hashKeyBytes = IS_VAR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes;
|
||||
|
||||
SUniqueItem *pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes);
|
||||
if (pHashItem == NULL) {
|
||||
int32_t size = sizeof(SUniqueItem) + pInfo->colBytes;
|
||||
SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + pInfo->numOfPoints * size);
|
||||
pItem->timestamp = ts;
|
||||
memcpy(pItem->data, data, pInfo->colBytes);
|
||||
|
||||
taosHashPut(pInfo->pHash, data, hashKeyBytes, (char *)pItem, sizeof(SUniqueItem*));
|
||||
pInfo->numOfPoints++;
|
||||
} else if (pHashItem->timestamp > ts) {
|
||||
pHashItem->timestamp = ts;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
int32_t uniqueFunction(SqlFunctionCtx* pCtx) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
|
||||
int32_t startOffset = pCtx->offset;
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
doUniqueAdd(pInfo, data, tsList[i], colDataIsNull_s(pInputCol, i));
|
||||
|
||||
if (sizeof(SUniqueInfo) + pInfo->numOfPoints * (sizeof(SUniqueItem) + pInfo->colBytes) >= UNIQUE_MAX_RESULT_SIZE) {
|
||||
taosHashCleanup(pInfo->pHash);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
//taosqsort(pInfo->pItems, pInfo->numOfPoints, POINTER_BYTES, NULL, tailCompFn);
|
||||
|
||||
//for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
|
||||
// int32_t pos = startOffset + i;
|
||||
// STailItem *pItem = pInfo->pItems[i];
|
||||
// if (pItem->isNull) {
|
||||
// colDataAppendNULL(pOutput, pos);
|
||||
// } else {
|
||||
// colDataAppend(pOutput, pos, pItem->data, false);
|
||||
// }
|
||||
//}
|
||||
|
||||
pResInfo->numOfRes = pInfo->numOfPoints;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
for (int32_t i = 0; i < pResInfo->numOfRes; ++i) {
|
||||
SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + i * (sizeof(SUniqueItem) + pInfo->colBytes));
|
||||
colDataAppend(pCol, i, pItem->data, false);
|
||||
//TODO: handle ts output
|
||||
}
|
||||
|
||||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SDiffInfo);
|
||||
return true;
|
||||
|
@ -3659,3 +3566,92 @@ int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
|
||||
return pEntryInfo->numOfRes;
|
||||
}
|
||||
|
||||
bool getUniqueFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SUniqueInfo) + UNIQUE_MAX_RESULT_SIZE;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
||||
if (!functionSetup(pCtx, pResInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
pInfo->numOfPoints = 0;
|
||||
pInfo->colType = pCtx->resDataInfo.type;
|
||||
pInfo->colBytes = pCtx->resDataInfo.bytes;
|
||||
if (pInfo->pHash != NULL) {
|
||||
taosHashClear(pInfo->pHash);
|
||||
} else {
|
||||
pInfo->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static void doUniqueAdd(SUniqueInfo* pInfo, char *data, TSKEY ts, bool isNull) {
|
||||
int32_t hashKeyBytes = IS_VAR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes;
|
||||
|
||||
SUniqueItem *pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes);
|
||||
if (pHashItem == NULL) {
|
||||
int32_t size = sizeof(SUniqueItem) + pInfo->colBytes;
|
||||
SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + pInfo->numOfPoints * size);
|
||||
pItem->timestamp = ts;
|
||||
memcpy(pItem->data, data, pInfo->colBytes);
|
||||
|
||||
taosHashPut(pInfo->pHash, data, hashKeyBytes, (char *)pItem, sizeof(SUniqueItem*));
|
||||
pInfo->numOfPoints++;
|
||||
} else if (pHashItem->timestamp > ts) {
|
||||
pHashItem->timestamp = ts;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
int32_t uniqueFunction(SqlFunctionCtx* pCtx) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
|
||||
int32_t startOffset = pCtx->offset;
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
doUniqueAdd(pInfo, data, tsList[i], colDataIsNull_s(pInputCol, i));
|
||||
|
||||
if (sizeof(SUniqueInfo) + pInfo->numOfPoints * (sizeof(SUniqueItem) + pInfo->colBytes) >= UNIQUE_MAX_RESULT_SIZE) {
|
||||
taosHashCleanup(pInfo->pHash);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
|
||||
SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + i * (sizeof(SUniqueItem) + pInfo->colBytes));
|
||||
colDataAppend(pOutput, i, pItem->data, false);
|
||||
if (pTsOutput != NULL) {
|
||||
colDataAppendInt64(pTsOutput, i, &pItem->timestamp);
|
||||
}
|
||||
}
|
||||
|
||||
return pInfo->numOfPoints;
|
||||
}
|
||||
|
||||
int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
for (int32_t i = 0; i < pResInfo->numOfRes; ++i) {
|
||||
SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + i * (sizeof(SUniqueItem) + pInfo->colBytes));
|
||||
colDataAppend(pCol, i, pItem->data, false);
|
||||
//TODO: handle ts output
|
||||
}
|
||||
|
||||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue