From 0ea8f0fbc63d0820f8637e8c10daa5caa4e73c9f Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 28 Jun 2022 14:08:34 +0800 Subject: [PATCH 1/4] enh(query): fisrt function support selectivity --- source/libs/function/src/builtins.c | 4 ++-- source/libs/function/src/builtinsimpl.c | 27 +++++++++++++++++++------ 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 699a050aea..a6fa8ea721 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1890,7 +1890,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "first", .type = FUNCTION_TYPE_FIRST, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -1903,7 +1903,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_first_partial", .type = FUNCTION_TYPE_FIRST_PARTIAL, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, .translateFunc = translateFirstLastPartial, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 06f24f39e7..d5e5b38558 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2413,7 +2413,7 @@ int32_t getFirstLastInfoSize(int32_t resBytes) { return sizeof(SFirstLastRes) + bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); - pEnv->calcMemSize = sizeof(SFirstLastRes) + pNode->node.resType.bytes + sizeof(int64_t); + pEnv->calcMemSize = sizeof(SFirstLastRes) + pNode->node.resType.bytes + sizeof(int64_t) + sizeof(STuplePos); return true; } @@ -2491,9 +2491,15 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { } memcpy(pInfo->buf, data, bytes); *(TSKEY*)(pInfo->buf + bytes) = cts; - pInfo->hasResult = true; - // DO_UPDATE_TAG_COLUMNS(pCtx, ts); - + //handle selectivity + STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); + if (!pInfo->hasResult) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + pInfo->hasResult = true; + } else { + copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + } + //DO_UPDATE_TAG_COLUMNS(pCtx, ts); pResInfo->numOfRes = 1; break; } @@ -2525,8 +2531,15 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { } memcpy(pInfo->buf, data, bytes); *(TSKEY*)(pInfo->buf + bytes) = cts; - pInfo->hasResult = true; - // DO_UPDATE_TAG_COLUMNS(pCtx, ts); + //handle selectivity + STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); + if (!pInfo->hasResult) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + pInfo->hasResult = true; + } else { + copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + } + //DO_UPDATE_TAG_COLUMNS(pCtx, ts); pResInfo->numOfRes = 1; break; } @@ -2669,6 +2682,8 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); colDataAppend(pCol, pBlock->info.rows, pRes->buf, pResInfo->isNullRes); + STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY)); + setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows); return pResInfo->numOfRes; } From 9ca819aa69330d6d368912a4ed29ea8e7c8ca6b0 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 28 Jun 2022 14:08:34 +0800 Subject: [PATCH 2/4] enh(query): last function support selectivity --- source/libs/function/src/builtins.c | 8 ++++---- source/libs/function/src/builtinsimpl.c | 22 ++++++++++++++++++---- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index a6fa8ea721..e1e8a686aa 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1914,7 +1914,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_first_merge", .type = FUNCTION_TYPE_FIRST_MERGE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, .translateFunc = translateFirstLastMerge, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -1925,7 +1925,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "last", .type = FUNCTION_TYPE_LAST, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -1938,7 +1938,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_last_partial", .type = FUNCTION_TYPE_LAST_PARTIAL, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, .translateFunc = translateFirstLastPartial, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -1949,7 +1949,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_last_merge", .type = FUNCTION_TYPE_LAST_MERGE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, .translateFunc = translateFirstLastMerge, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index d5e5b38558..eb0c444776 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2593,8 +2593,15 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } memcpy(pInfo->buf, data, bytes); *(TSKEY*)(pInfo->buf + bytes) = cts; - // DO_UPDATE_TAG_COLUMNS(pCtx, ts); - pInfo->hasResult = true; + //handle selectivity + STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); + if (!pInfo->hasResult) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + pInfo->hasResult = true; + } else { + copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + } + //DO_UPDATE_TAG_COLUMNS(pCtx, ts); pResInfo->numOfRes = 1; } break; @@ -2616,9 +2623,16 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } memcpy(pInfo->buf, data, bytes); *(TSKEY*)(pInfo->buf + bytes) = cts; - pInfo->hasResult = true; + //handle selectivity + STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); + if (!pInfo->hasResult) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + pInfo->hasResult = true; + } else { + copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + } pResInfo->numOfRes = 1; - // DO_UPDATE_TAG_COLUMNS(pCtx, ts); + //DO_UPDATE_TAG_COLUMNS(pCtx, ts); } break; } From c3d26dc73eb2e4700c1584c2cdb40ad9e9790dfc Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 28 Jun 2022 20:46:45 +0800 Subject: [PATCH 3/4] feat(query): enable selectivity with first/last for stable --- source/libs/function/src/builtinsimpl.c | 24 +++++++++++++++++++----- tests/system-test/2-query/json_tag.py | 8 ++++---- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index eb0c444776..fd57aff721 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2409,11 +2409,11 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) return TSDB_CODE_SUCCESS; } -int32_t getFirstLastInfoSize(int32_t resBytes) { return sizeof(SFirstLastRes) + resBytes + sizeof(int64_t); } +int32_t getFirstLastInfoSize(int32_t resBytes) { return sizeof(SFirstLastRes) + resBytes + sizeof(int64_t) + sizeof(STuplePos); } bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); - pEnv->calcMemSize = sizeof(SFirstLastRes) + pNode->node.resType.bytes + sizeof(int64_t) + sizeof(STuplePos); + pEnv->calcMemSize = getFirstLastInfoSize(pNode->node.resType.bytes); return true; } @@ -2642,7 +2642,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static void firstLastTransferInfo(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst) { +static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst) { + SInputColumnInfoData* pColInfo = &pCtx->input; + int32_t start = pColInfo->startRowIndex; + pOutput->bytes = pInput->bytes; TSKEY* tsIn = (TSKEY*)(pInput->buf + pInput->bytes); TSKEY* tsOut = (TSKEY*)(pOutput->buf + pInput->bytes); @@ -2659,7 +2662,14 @@ static void firstLastTransferInfo(SFirstLastRes* pInput, SFirstLastRes* pOutput, } *tsOut = *tsIn; memcpy(pOutput->buf, pInput->buf, pOutput->bytes); - pOutput->hasResult = true; + //handle selectivity + STuplePos* pTuplePos = (STuplePos*)(pOutput->buf + pOutput->bytes + sizeof(TSKEY)); + if (!pOutput->hasResult) { + saveTupleData(pCtx, start, pCtx->pSrcBlock, pTuplePos); + pOutput->hasResult = true; + } else { + copyTupleData(pCtx, start, pCtx->pSrcBlock, pTuplePos); + } return; } @@ -2674,7 +2684,7 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer char* data = colDataGetData(pCol, start); SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data); - firstLastTransferInfo(pInputInfo, pInfo, isFirstQuery); + firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery); int32_t numOfElems = pInputInfo->hasResult ? 1 : 0; @@ -2696,6 +2706,7 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); colDataAppend(pCol, pBlock->info.rows, pRes->buf, pResInfo->isNullRes); + //handle selectivity STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY)); setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows); @@ -2716,6 +2727,9 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); colDataAppend(pCol, pBlock->info.rows, res, false); + //handle selectivity + STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY)); + setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows); taosMemoryFree(res); return 1; diff --git a/tests/system-test/2-query/json_tag.py b/tests/system-test/2-query/json_tag.py index 9b96b6ebd0..6fb839fe2d 100644 --- a/tests/system-test/2-query/json_tag.py +++ b/tests/system-test/2-query/json_tag.py @@ -562,7 +562,7 @@ class TDTestCase: tdSql.checkRows(3) tdSql.query("select round(dataint) from jsons1 where jtag->'tag1'>1") tdSql.checkRows(3) - + #math function tdSql.query("select sin(dataint) from jsons1 where jtag->'tag1'>1;") tdSql.checkRows(3) @@ -606,7 +606,7 @@ class TDTestCase: tdSql.checkRows(1) tdSql.query("select twa(dataint) from jsons1 where jtag->'tag1'>1;") tdSql.checkRows(1) - + # function not ready # tdSql.query("select tail(dataint,1) from jsons1 where jtag->'tag1'>1;") # tdSql.checkRows(3) @@ -616,7 +616,7 @@ class TDTestCase: # tdSql.checkRows(3) # tdSql.query("select irate(dataint) from jsons1 where jtag->'tag1'>1;") # tdSql.checkRows(1) - + #str function tdSql.query("select upper(dataStr) from jsons1 where jtag->'tag1'>1;") tdSql.checkRows(3) @@ -658,7 +658,7 @@ class TDTestCase: tdSql.checkRows(3) tdSql.query("select ELAPSED(ts,1h) from jsons1 where jtag->'tag1'>1;") tdSql.checkRows(1) - + # # #test TD-12077 tdSql.execute("insert into jsons1_16 using jsons1 tags('{\"tag1\":\"收到货\",\"tag2\":\"\",\"tag3\":-2.111}') values(1591062628000, 2, NULL, '你就会', 'dws')") From e9fba684de55bd5fd1fc9b643be0a2afa3dd7c63 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 29 Jun 2022 10:17:30 +0800 Subject: [PATCH 4/4] fix mem leak --- source/libs/function/src/builtinsimpl.c | 78 +++++++++++++++---------- 1 file changed, 47 insertions(+), 31 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index fd57aff721..7376715968 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2492,13 +2492,15 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { memcpy(pInfo->buf, data, bytes); *(TSKEY*)(pInfo->buf + bytes) = cts; //handle selectivity - STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); - if (!pInfo->hasResult) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); - pInfo->hasResult = true; - } else { - copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + if (pCtx->subsidiaries.num > 0) { + STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); + if (!pInfo->hasResult) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + } else { + copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + } } + pInfo->hasResult = true; //DO_UPDATE_TAG_COLUMNS(pCtx, ts); pResInfo->numOfRes = 1; break; @@ -2532,13 +2534,15 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { memcpy(pInfo->buf, data, bytes); *(TSKEY*)(pInfo->buf + bytes) = cts; //handle selectivity - STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); - if (!pInfo->hasResult) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); - pInfo->hasResult = true; - } else { - copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + if (pCtx->subsidiaries.num > 0) { + STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); + if (!pInfo->hasResult) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + } else { + copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + } } + pInfo->hasResult = true; //DO_UPDATE_TAG_COLUMNS(pCtx, ts); pResInfo->numOfRes = 1; break; @@ -2594,13 +2598,15 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { memcpy(pInfo->buf, data, bytes); *(TSKEY*)(pInfo->buf + bytes) = cts; //handle selectivity - STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); - if (!pInfo->hasResult) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); - pInfo->hasResult = true; - } else { - copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + if (pCtx->subsidiaries.num > 0) { + STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); + if (!pInfo->hasResult) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + } else { + copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + } } + pInfo->hasResult = true; //DO_UPDATE_TAG_COLUMNS(pCtx, ts); pResInfo->numOfRes = 1; } @@ -2624,13 +2630,15 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { memcpy(pInfo->buf, data, bytes); *(TSKEY*)(pInfo->buf + bytes) = cts; //handle selectivity - STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); - if (!pInfo->hasResult) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); - pInfo->hasResult = true; - } else { - copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + if (pCtx->subsidiaries.num > 0) { + STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); + if (!pInfo->hasResult) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + } else { + copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + } } + pInfo->hasResult = true; pResInfo->numOfRes = 1; //DO_UPDATE_TAG_COLUMNS(pCtx, ts); } @@ -2664,12 +2672,15 @@ static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, S memcpy(pOutput->buf, pInput->buf, pOutput->bytes); //handle selectivity STuplePos* pTuplePos = (STuplePos*)(pOutput->buf + pOutput->bytes + sizeof(TSKEY)); - if (!pOutput->hasResult) { - saveTupleData(pCtx, start, pCtx->pSrcBlock, pTuplePos); - pOutput->hasResult = true; - } else { - copyTupleData(pCtx, start, pCtx->pSrcBlock, pTuplePos); + if (pCtx->subsidiaries.num > 0) { + if (!pOutput->hasResult) { + saveTupleData(pCtx, start, pCtx->pSrcBlock, pTuplePos); + } else { + copyTupleData(pCtx, start, pCtx->pSrcBlock, pTuplePos); + } } + pOutput->hasResult = true; + return; } @@ -3086,7 +3097,9 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData pItem->uid = uid; // save the data of this tuple - saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); + if (pCtx->subsidiaries.num > 0) { + saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); + } // allocate the buffer and keep the data of this row into the new allocated buffer pEntryInfo->numOfRes++; @@ -3105,7 +3118,10 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData pItem->uid = uid; // save the data of this tuple by over writing the old data - copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); + if (pCtx->subsidiaries.num > 0) { + copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); + } + taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type, topBotResComparFn, NULL, !isTopQuery); }