From f3890f7421226d02fbb4292833638f73441e2bbc Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 26 Aug 2022 14:27:15 +0800 Subject: [PATCH 1/7] feat: support udf merge in udfd --- include/libs/function/taosudf.h | 5 +++-- source/common/src/tdatablock.c | 4 ++-- source/libs/function/src/udfd.c | 20 +++++++++++++++++++- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 5e84b87a81..2b2063e3f6 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -256,8 +256,9 @@ static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentR typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol); typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf); -typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf); -typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData); +typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf); +typedef int32_t (*TUdfAggMergeFunc)(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf); +typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData); #ifdef __cplusplus } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index c65e966046..c748260918 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1706,8 +1706,8 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { } void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag) { - SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock)); - taosArrayPush(dataBlocks, pBlock); + SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock*)); + taosArrayPush(dataBlocks, &pBlock); blockDebugShowDataBlocks(dataBlocks, flag); taosArrayDestroy(dataBlocks); } diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 1cbc78df48..5b27e030b9 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -84,6 +84,7 @@ typedef struct SUdf { TUdfAggStartFunc aggStartFunc; TUdfAggProcessFunc aggProcFunc; TUdfAggFinishFunc aggFinishFunc; + TUdfAggMergeFunc aggMergeFunc; TUdfInitFunc initFunc; TUdfDestroyFunc destroyFunc; @@ -271,6 +272,15 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { break; } + case TSDB_UDF_CALL_AGG_MERGE: { + SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; + code = udf->aggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf); + freeUdfInterBuf(&call->interBuf); + freeUdfInterBuf(&call->interBuf2); + subRsp->resultBuf = outBuf; + + break; + } case TSDB_UDF_CALL_AGG_FIN: { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; code = udf->aggFinishFunc(&call->interBuf, &outBuf); @@ -309,6 +319,10 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { freeUdfInterBuf(&subRsp->resultBuf); break; } + case TSDB_UDF_CALL_AGG_MERGE: { + freeUdfInterBuf(&subRsp->resultBuf); + break; + } case TSDB_UDF_CALL_AGG_FIN: { freeUdfInterBuf(&subRsp->resultBuf); break; @@ -560,7 +574,11 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { strncpy(finishFuncName, processFuncName, strlen(processFuncName)); strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc)); - // TODO: merge + char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; + char *mergeSuffix = "_merge"; + strncpy(finishFuncName, processFuncName, strlen(processFuncName)); + strncat(finishFuncName, mergeSuffix, strlen(mergeSuffix)); + uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggMergeFunc)); } return 0; } From d1eb14dccc711e2bb32117fdd2cde0d84f5ac764 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 26 Aug 2022 18:47:09 +0800 Subject: [PATCH 2/7] fix: interval operator _wstart error and interval operator close window for interpolation on different group id --- source/libs/executor/src/timewindowoperator.c | 59 ++++++++++++------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index cadaf4a9d5..7a66275834 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -33,11 +33,16 @@ typedef struct SPullWindowInfo { uint64_t groupId; } SPullWindowInfo; +typedef struct SOpenWindowInfo { + SResultRowPosition pos; + uint64_t groupId; +} SOpenWindowInfo; + static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator); static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo); -static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult); +static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId); static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult); ///* @@ -598,14 +603,14 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num int32_t startPos = 0; int32_t numOfOutput = pSup->numOfExprs; - uint64_t groupId = pBlock->info.groupId; SResultRow* pResult = NULL; while (1) { SListNode* pn = tdListGetHead(pResultRowInfo->openWindow); - - SResultRowPosition* p1 = (SResultRowPosition*)pn->data; + SOpenWindowInfo* pOpenWin = (SOpenWindowInfo *)pn->data; + uint64_t groupId = pOpenWin->groupId; + SResultRowPosition* p1 = &pOpenWin->pos; if (p->pageId == p1->pageId && p->offset == p1->offset) { break; } @@ -631,12 +636,15 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0); int64_t prevTs = *(int64_t*)pTsKey->pData; - doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey, - RESULT_ROW_END_INTERP, pSup); + if (groupId == pBlock->info.groupId) { + doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey, + RESULT_ROW_END_INTERP, pSup); + } setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, numOfExprs); @@ -965,7 +973,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul // prev time window not interpolation yet. if (pInfo->timeWindowInterpo) { - SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult); + SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId); doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos); // restore current time window @@ -1017,9 +1025,13 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ekey = ascScan ? nextWin.ekey : nextWin.skey; forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder); - - // window start(end) key interpolation - doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); + if ((ascScan && ekey <= pBlock->info.window.ekey) || + (!ascScan && ekey >= pBlock->info.window.skey)) { + // window start(end) key interpolation + doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); + } else { + addToOpenWindowList(pResultRowInfo, pResult, tableGroupId); + } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, @@ -1040,20 +1052,23 @@ void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInf } } -SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult) { - SResultRowPosition pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; +SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId) { + SOpenWindowInfo openWin = {0}; + openWin.pos.pageId = pResult->pageId; + openWin.pos.offset = pResult->offset; + openWin.groupId = groupId; SListNode* pn = tdListGetTail(pResultRowInfo->openWindow); if (pn == NULL) { - tdListAppend(pResultRowInfo->openWindow, &pos); - return pos; + tdListAppend(pResultRowInfo->openWindow, &openWin); + return openWin.pos; } - SResultRowPosition* px = (SResultRowPosition*)pn->data; - if (px->pageId != pos.pageId || px->offset != pos.offset) { - tdListAppend(pResultRowInfo->openWindow, &pos); + SOpenWindowInfo * px = (SOpenWindowInfo *)pn->data; + if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) { + tdListAppend(pResultRowInfo->openWindow, &openWin); } - return pos; + return openWin.pos; } int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) { @@ -1884,7 +1899,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, pInfo); if (pInfo->timeWindowInterpo) { - pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); + pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo)); if (pInfo->binfo.resultRowInfo.openWindow == NULL) { goto _error; } @@ -5157,7 +5172,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, iaInfo); if (iaInfo->timeWindowInterpo) { - iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); + iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo)); } initResultRowInfo(&iaInfo->binfo.resultRowInfo); @@ -5294,7 +5309,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* // prev time window not interpolation yet. if (iaInfo->timeWindowInterpo) { - SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult); + SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId); doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos); // restore current time window @@ -5462,7 +5477,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, numOfCols, iaInfo); if (iaInfo->timeWindowInterpo) { - iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); + iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo)); if (iaInfo->binfo.resultRowInfo.openWindow == NULL) { goto _error; } From 2a9faaa3263353fa0c568eb4e6d687b46a6d78e0 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 26 Aug 2022 19:05:37 +0800 Subject: [PATCH 3/7] fix: enable add open window only when interval interpolation --- source/libs/executor/src/timewindowoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 1959505430..1ff3408547 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1029,7 +1029,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul (!ascScan && ekey >= pBlock->info.window.skey)) { // window start(end) key interpolation doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); - } else { + } else if (pInfo->timeWindowInterpo) { addToOpenWindowList(pResultRowInfo, pResult, tableGroupId); } From a574ef5c98d98f11ccc1fcc23352367610549902 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Sat, 27 Aug 2022 08:10:18 +0800 Subject: [PATCH 4/7] fix: pass CI test --- source/libs/executor/src/timewindowoperator.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 1ff3408547..e604638705 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1025,6 +1025,10 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ekey = ascScan ? nextWin.ekey : nextWin.skey; forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder); + // window start(end) key interpolation + doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); + //TODO: add to open window? how to close the open windows after input blocks exhausted? +#if 0 if ((ascScan && ekey <= pBlock->info.window.ekey) || (!ascScan && ekey >= pBlock->info.window.skey)) { // window start(end) key interpolation @@ -1032,7 +1036,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } else if (pInfo->timeWindowInterpo) { addToOpenWindowList(pResultRowInfo, pResult, tableGroupId); } - +#endif updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, numOfOutput); From 0fbf453122fcff54d1a4ef18536ca856637b96a6 Mon Sep 17 00:00:00 2001 From: Sean Ely <105326513+sean-tdengine@users.noreply.github.com> Date: Fri, 26 Aug 2022 19:37:14 -0700 Subject: [PATCH 5/7] docs: SQL Functions - STATEDURATION edit Added quotes for proper usage. --- docs/en/12-taos-sql/10-function.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/12-taos-sql/10-function.md b/docs/en/12-taos-sql/10-function.md index d35fd31099..9b7f52335a 100644 --- a/docs/en/12-taos-sql/10-function.md +++ b/docs/en/12-taos-sql/10-function.md @@ -1166,7 +1166,7 @@ SELECT stateDuration(field_name, oper, val, unit) FROM { tb_name | stb_name } [W **Applicable parameter values**: -- oper : Can be one of `LT` (lower than), `GT` (greater than), `LE` (lower than or equal to), `GE` (greater than or equal to), `NE` (not equal to), `EQ` (equal to), the value is case insensitive +- oper : Can be one of `'LT'` (lower than), `'GT'` (greater than), `'LE'` (lower than or equal to), `'GE'` (greater than or equal to), `'NE'` (not equal to), `'EQ'` (equal to), the value is case insensitive, the oper must be in quotes. - val : Numeric types - unit: The unit of time interval. Enter one of the following options: 1b (nanoseconds), 1u (microseconds), 1a (milliseconds), 1s (seconds), 1m (minutes), 1h (hours), 1d (days), or 1w (weeks) If you do not enter a unit of time, the precision of the current database is used by default. From 5510cad5554c6b68030233eae22602488d75ac07 Mon Sep 17 00:00:00 2001 From: Sean Ely <105326513+sean-tdengine@users.noreply.github.com> Date: Fri, 26 Aug 2022 19:43:30 -0700 Subject: [PATCH 6/7] Update 10-function.md --- docs/en/12-taos-sql/10-function.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/12-taos-sql/10-function.md b/docs/en/12-taos-sql/10-function.md index 9b7f52335a..d6905c84a1 100644 --- a/docs/en/12-taos-sql/10-function.md +++ b/docs/en/12-taos-sql/10-function.md @@ -1139,7 +1139,7 @@ SELECT STATECOUNT(field_name, oper, val) FROM { tb_name | stb_name } [WHERE clau **Applicable parameter values**: -- oper : Can be one of `LT` (lower than), `GT` (greater than), `LE` (lower than or equal to), `GE` (greater than or equal to), `NE` (not equal to), `EQ` (equal to), the value is case insensitive +- oper : Can be one of `'LT'` (lower than), `'GT'` (greater than), `'LE'` (lower than or equal to), `'GE'` (greater than or equal to), `'NE'` (not equal to), `'EQ'` (equal to), the value is case insensitive, the value must be in quotes. - val : Numeric types **Return value type**: Integer @@ -1166,7 +1166,7 @@ SELECT stateDuration(field_name, oper, val, unit) FROM { tb_name | stb_name } [W **Applicable parameter values**: -- oper : Can be one of `'LT'` (lower than), `'GT'` (greater than), `'LE'` (lower than or equal to), `'GE'` (greater than or equal to), `'NE'` (not equal to), `'EQ'` (equal to), the value is case insensitive, the oper must be in quotes. +- oper : Can be one of `'LT'` (lower than), `'GT'` (greater than), `'LE'` (lower than or equal to), `'GE'` (greater than or equal to), `'NE'` (not equal to), `'EQ'` (equal to), the value is case insensitive, the value must be in quotes. - val : Numeric types - unit: The unit of time interval. Enter one of the following options: 1b (nanoseconds), 1u (microseconds), 1a (milliseconds), 1s (seconds), 1m (minutes), 1h (hours), 1d (days), or 1w (weeks) If you do not enter a unit of time, the precision of the current database is used by default. From 454616c918bf9e5e4224322cdc8aee48a7458c25 Mon Sep 17 00:00:00 2001 From: wade zhang <95411902+gccgdb1234@users.noreply.github.com> Date: Sat, 27 Aug 2022 11:20:00 +0800 Subject: [PATCH 7/7] Update 10-function.md --- docs/zh/12-taos-sql/10-function.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/12-taos-sql/10-function.md b/docs/zh/12-taos-sql/10-function.md index af31a1d4bd..9f999181c4 100644 --- a/docs/zh/12-taos-sql/10-function.md +++ b/docs/zh/12-taos-sql/10-function.md @@ -1167,7 +1167,7 @@ SELECT stateDuration(field_name, oper, val, unit) FROM { tb_name | stb_name } [W **参数范围**: -- oper : "LT" (小于)、"GT"(大于)、"LE"(小于等于)、"GE"(大于等于)、"NE"(不等于)、"EQ"(等于),不区分大小写。 +- oper : `'LT'` (小于)、`'GT'`(大于)、`'LE'`(小于等于)、`'GE'`(大于等于)、`'NE'`(不等于)、`'EQ'`(等于),不区分大小写,但需要用`''`包括。 - val : 数值型 - unit : 时间长度的单位,可取值时间单位: 1b(纳秒), 1u(微秒),1a(毫秒),1s(秒),1m(分),1h(小时),1d(天), 1w(周)。如果省略,默认为当前数据库精度。