diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c new file mode 100644 index 0000000000..7267bbbe09 --- /dev/null +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -0,0 +1,609 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executorInt.h" +#include "filter.h" +#include "function.h" +#include "functionMgt.h" +#include "operator.h" +#include "querytask.h" +#include "tanal.h" +#include "tcommon.h" +#include "tcompare.h" +#include "tdatablock.h" +#include "tjson.h" +#include "ttime.h" + +#ifdef USE_ANAL + +typedef struct { + SArray* blocks; // SSDataBlock* + SArray* windows; // STimeWindow + uint64_t groupId; + int64_t numOfRows; + int32_t curWinIndex; + STimeWindow curWin; + SResultRow* pResultRow; +} SAnomalyWindowSupp; + +typedef struct { + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SExprSupp scalarSup; + int32_t tsSlotId; + STimeWindowAggSupp twAggSup; + char algoName[TSDB_ANAL_ALGO_NAME_LEN]; + char algoUrl[TSDB_ANAL_ALGO_URL_LEN]; + char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN]; + SAnomalyWindowSupp anomalySup; + SWindowRowsSup anomalyWinRowSup; + SColumn anomalyCol; + SStateKeys anomalyKey; +} SAnomalyWindowOperatorInfo; + +static void anomalyDestroyOperatorInfo(void* param); +static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes); +static void anomalyAggregateBlocks(SOperatorInfo* pOperator); +static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* pBlock); + +int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, + SOperatorInfo** pOptrInfo) { + QRY_PARAM_CHECK(pOptrInfo); + + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SAnomalyWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAnomalyWindowOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SAnomalyWindowPhysiNode* pAnomalyNode = (SAnomalyWindowPhysiNode*)physiNode; + SColumnNode* pColNode = (SColumnNode*)(pAnomalyNode->pAnomalyKey); + if (pInfo == NULL || pOperator == NULL) { + code = terrno; + goto _error; + } + + if (!taosAnalGetOptStr(pAnomalyNode->anomalyOpt, "algo", pInfo->algoName, sizeof(pInfo->algoName))) { + qError("failed to get anomaly_window algorithm name from %s", pAnomalyNode->anomalyOpt); + code = TSDB_CODE_ANAL_ALGO_NOT_FOUND; + goto _error; + } + if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) { + qError("failed to get anomaly_window algorithm url from %s", pInfo->algoName); + code = TSDB_CODE_ANAL_ALGO_NOT_LOAD; + goto _error; + } + + pOperator->exprSupp.hasWindowOrGroup = true; + pInfo->tsSlotId = ((SColumnNode*)pAnomalyNode->window.pTspk)->slotId; + strncpy(pInfo->anomalyOpt, pAnomalyNode->anomalyOpt, sizeof(pInfo->anomalyOpt)); + + if (pAnomalyNode->window.pExprs != NULL) { + int32_t numOfScalarExpr = 0; + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pAnomalyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_CODE(code, lino, _error); + } + + size_t keyBufSize = 0; + int32_t num = 0; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pAnomalyNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + + initResultSizeInfo(&pOperator->resultInfo, 4096); + + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, + pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_CODE(code, lino, _error); + + SSDataBlock* pResBlock = createDataBlockFromDescNode(pAnomalyNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + initBasicInfo(&pInfo->binfo, pResBlock); + + code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); + QUERY_CHECK_CODE(code, lino, _error); + + initResultRowInfo(&pInfo->binfo.resultRowInfo); + pInfo->binfo.inputTsOrder = pAnomalyNode->window.node.inputTsOrder; + pInfo->binfo.outputTsOrder = pAnomalyNode->window.node.outputTsOrder; + + pInfo->anomalyCol = extractColumnFromColumnNode(pColNode); + pInfo->anomalyKey.type = pInfo->anomalyCol.type; + pInfo->anomalyKey.bytes = pInfo->anomalyCol.bytes; + pInfo->anomalyKey.pData = taosMemoryCalloc(1, pInfo->anomalyCol.bytes); + if (pInfo->anomalyKey.pData == NULL) { + goto _error; + } + + int32_t itemSize = sizeof(int32_t) + pInfo->aggSup.resultRowSize + pInfo->anomalyKey.bytes; + pInfo->anomalySup.pResultRow = taosMemoryCalloc(1, itemSize); + pInfo->anomalySup.blocks = taosArrayInit(16, sizeof(SSDataBlock*)); + pInfo->anomalySup.windows = taosArrayInit(16, sizeof(STimeWindow)); + + if (pInfo->anomalySup.windows == NULL || pInfo->anomalySup.blocks == NULL || pInfo->anomalySup.pResultRow == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + code = filterInitFromNode((SNode*)pAnomalyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + QUERY_CHECK_CODE(code, lino, _error); + + code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + QUERY_CHECK_CODE(code, lino, _error); + + setOperatorInfo(pOperator, "AnomalyWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY, true, OP_NOT_OPENED, + pInfo, pTaskInfo); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, anomalyAggregateNext, NULL, anomalyDestroyOperatorInfo, + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + + code = appendDownstream(pOperator, &downstream, 1); + QUERY_CHECK_CODE(code, lino, _error); + + *pOptrInfo = pOperator; + + qDebug("anomaly_window operator is created, algo:%s url:%s opt:%s", pInfo->algoName, pInfo->algoUrl, + pInfo->anomalyOpt); + return TSDB_CODE_SUCCESS; + +_error: + if (pInfo != NULL) { + anomalyDestroyOperatorInfo(pInfo); + } + + destroyOperatorAndDownstreams(pOperator, &downstream, 1); + pTaskInfo->code = code; + qError("failed to create anomaly_window operator, algo:%s code:0x%x", pInfo->algoName, code); + return code; +} + +static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SAnomalyWindowOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SOptrBasicInfo* pBInfo = &pInfo->binfo; + SAnomalyWindowSupp* pSupp = &pInfo->anomalySup; + SSDataBlock* pRes = pInfo->binfo.pRes; + int64_t st = taosGetTimestampUs(); + int32_t numOfBlocks = taosArrayGetSize(pSupp->blocks); + + blockDataCleanup(pRes); + + while (1) { + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); + if (pBlock == NULL) { + break; + } + + if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) { + pSupp->groupId = pBlock->info.id.groupId; + numOfBlocks++; + qDebug("group:%" PRId64 ", blocks:%d, cache block rows:%" PRId64, pSupp->groupId, numOfBlocks, pBlock->info.rows); + code = anomalyCacheBlock(pInfo, pBlock); + QUERY_CHECK_CODE(code, lino, _end); + } else { + qDebug("group:%" PRId64 ", read finish for new group coming, blocks:%d", pSupp->groupId, numOfBlocks); + anomalyAggregateBlocks(pOperator); + pSupp->groupId = pBlock->info.id.groupId; + numOfBlocks = 1; + qDebug("group:%" PRId64 ", new group, cache block rows:%" PRId64, pSupp->groupId, pBlock->info.rows); + code = anomalyCacheBlock(pInfo, pBlock); + QUERY_CHECK_CODE(code, lino, _end); + } + + if (pRes->info.rows > 0) { + (*ppRes) = pRes; + qDebug("group:%" PRId64 ", return to upstream, blocks:%d", pRes->info.id.groupId, numOfBlocks); + return code; + } + } + + if (numOfBlocks > 0) { + qDebug("group:%" PRId64 ", read finish, blocks:%d", pInfo->anomalySup.groupId, numOfBlocks); + anomalyAggregateBlocks(pOperator); + } + + int64_t cost = taosGetTimestampUs() - st; + qDebug("all groups finished, cost:%" PRId64 "us", cost); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } + (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes; + return code; +} + +static void anomalyDestroyOperatorInfo(void* param) { + SAnomalyWindowOperatorInfo* pInfo = (SAnomalyWindowOperatorInfo*)param; + if (pInfo == NULL) return; + + qDebug("anomaly_window operator is destroyed, algo:%s", pInfo->algoName); + + cleanupBasicInfo(&pInfo->binfo); + cleanupAggSup(&pInfo->aggSup); + cleanupExprSupp(&pInfo->scalarSup); + colDataDestroy(&pInfo->twAggSup.timeWindowData); + + for (int32_t i = 0; i < taosArrayGetSize(pInfo->anomalySup.blocks); ++i) { + SSDataBlock* pBlock = taosArrayGetP(pInfo->anomalySup.blocks, i); + blockDataDestroy(pBlock); + } + taosArrayDestroy(pInfo->anomalySup.blocks); + taosArrayDestroy(pInfo->anomalySup.windows); + taosMemoryFreeClear(pInfo->anomalySup.pResultRow); + taosMemoryFreeClear(pInfo->anomalyKey.pData); + + taosMemoryFreeClear(param); +} + +static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* pSrc) { + SSDataBlock* pDst = NULL; + int32_t code = createOneDataBlock(pSrc, true, &pDst); + + if (code != 0) return code; + if (pDst == NULL) return TSDB_CODE_OUT_OF_MEMORY; + if (taosArrayPush(pInfo->anomalySup.blocks, &pDst) == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + return 0; +} + +static int32_t anomalyFindWindow(SAnomalyWindowSupp* pSupp, TSKEY key) { + for (int32_t i = pSupp->curWinIndex; i < taosArrayGetSize(pSupp->windows); ++i) { + STimeWindow* pWindow = taosArrayGet(pSupp->windows, i); + if (key >= pWindow->skey && key < pWindow->ekey) { + pSupp->curWin = *pWindow; + pSupp->curWinIndex = i; + return 0; + } + } + return -1; +} + +static int32_t anomalyParseJson(SJson* pJson, SArray* pWindows) { + int32_t code = 0; + int32_t rows = 0; + STimeWindow win = {0}; + + taosArrayClear(pWindows); + + tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code); + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; + if (rows <= 0) return 0; + + SJson* res = tjsonGetObjectItem(pJson, "res"); + if (res == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; + + int32_t ressize = tjsonGetArraySize(res); + if (ressize != rows) return TSDB_CODE_INVALID_JSON_FORMAT; + + for (int32_t i = 0; i < rows; ++i) { + SJson* row = tjsonGetArrayItem(res, i); + if (row == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; + + int32_t colsize = tjsonGetArraySize(row); + if (colsize != 2) return TSDB_CODE_INVALID_JSON_FORMAT; + + SJson* start = tjsonGetArrayItem(row, 0); + SJson* end = tjsonGetArrayItem(row, 1); + if (start == NULL || end == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; + + tjsonGetObjectValueBigInt(start, &win.skey); + tjsonGetObjectValueBigInt(end, &win.ekey); + + if (win.skey >= win.ekey) { + win.ekey = win.skey + 1; + } + + if (taosArrayPush(pWindows, &win) == NULL) return TSDB_CODE_OUT_OF_BUFFER; + } + + int32_t numOfWins = taosArrayGetSize(pWindows); + qDebug("anomaly window recevied, total:%d", numOfWins); + for (int32_t i = 0; i < numOfWins; ++i) { + STimeWindow* pWindow = taosArrayGet(pWindows, i); + qDebug("anomaly win:%d [%" PRId64 ", %" PRId64 ")", i, pWindow->skey, pWindow->ekey); + } + + return 0; +} + +static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) { + SAnomalyWindowOperatorInfo* pInfo = pOperator->info; + SAnomalyWindowSupp* pSupp = &pInfo->anomalySup; + SJson* pJson = NULL; + SAnalBuf analBuf = {.bufType = ANAL_BUF_TYPE_JSON}; + char dataBuf[64] = {0}; + int32_t code = 0; + + int64_t ts = 0; + // int64_t ts = taosGetTimestampMs(); + snprintf(analBuf.fileName, sizeof(analBuf.fileName), "%s/tdengine-anomaly-%" PRId64 "-%" PRId64, tsTempDir, ts, + pSupp->groupId); + code = tsosAnalBufOpen(&analBuf, 2); + if (code != 0) goto _OVER; + + const char* prec = TSDB_TIME_PRECISION_MILLI_STR; + if (pInfo->anomalyCol.precision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR; + if (pInfo->anomalyCol.precision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR; + + code = taosAnalBufWriteOptStr(&analBuf, "algo", pInfo->algoName); + if (code != 0) goto _OVER; + + code = taosAnalBufWriteOptStr(&analBuf, "prec", prec); + if (code != 0) goto _OVER; + + code = taosAnalBufWriteColMeta(&analBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, "ts"); + if (code != 0) goto _OVER; + + code = taosAnalBufWriteColMeta(&analBuf, 1, pInfo->anomalyCol.type, "val"); + if (code != 0) goto _OVER; + + code = taosAnalBufWriteDataBegin(&analBuf); + if (code != 0) goto _OVER; + + int32_t numOfBlocks = (int32_t)taosArrayGetSize(pSupp->blocks); + + // timestamp + code = taosAnalBufWriteColBegin(&analBuf, 0); + if (code != 0) goto _OVER; + for (int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* pBlock = taosArrayGetP(pSupp->blocks, i); + if (pBlock == NULL) break; + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); + if (pTsCol == NULL) break; + for (int32_t j = 0; j < pBlock->info.rows; ++j) { + code = taosAnalBufWriteColData(&analBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &((TSKEY*)pTsCol->pData)[j]); + if (code != 0) goto _OVER; + } + } + code = taosAnalBufWriteColEnd(&analBuf, 0); + if (code != 0) goto _OVER; + + // data + code = taosAnalBufWriteColBegin(&analBuf, 1); + if (code != 0) goto _OVER; + for (int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* pBlock = taosArrayGetP(pSupp->blocks, i); + if (pBlock == NULL) break; + SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pInfo->anomalyCol.slotId); + if (pValCol == NULL) break; + + for (int32_t j = 0; j < pBlock->info.rows; ++j) { + code = taosAnalBufWriteColData(&analBuf, 1, pValCol->info.type, colDataGetData(pValCol, j)); + if (code != 0) goto _OVER; + if (code != 0) goto _OVER; + } + } + code = taosAnalBufWriteColEnd(&analBuf, 1); + if (code != 0) goto _OVER; + + code = taosAnalBufWriteDataEnd(&analBuf); + if (code != 0) goto _OVER; + + code = taosAnalBufWriteOptStr(&analBuf, "option", pInfo->anomalyOpt); + if (code != 0) goto _OVER; + + code = taosAnalBufClose(&analBuf); + if (code != 0) goto _OVER; + + pJson = taosAnalSendReqRetJson(pInfo->algoUrl, ANAL_HTTP_TYPE_POST, &analBuf); + if (pJson == NULL) { + code = terrno; + goto _OVER; + } + + code = anomalyParseJson(pJson, pSupp->windows); + if (code != 0) goto _OVER; + +_OVER: + if (code != 0) { + qError("failed to analysis window since %s", tstrerror(code)); + } + taosAnalBufDestroy(&analBuf); + if (pJson != NULL) tjsonDelete(pJson); + return code; +} + +static void anomalyAggregateRows(SOperatorInfo* pOperator, SSDataBlock* pBlock) { + SAnomalyWindowOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExprSupp* pExprSup = &pOperator->exprSupp; + SAnomalyWindowSupp* pSupp = &pInfo->anomalySup; + SWindowRowsSup* pRowSup = &pInfo->anomalyWinRowSup; + SResultRow* pResRow = pSupp->pResultRow; + int32_t numOfOutput = pOperator->exprSupp.numOfExprs; + + if (setResultRowInitCtx(pResRow, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset) == 0) { + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pSupp->curWin, 0); + applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); + } +} + +static void anomalyBuildResult(SOperatorInfo* pOperator) { + SAnomalyWindowOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExprSupp* pExprSup = &pOperator->exprSupp; + SSDataBlock* pRes = pInfo->binfo.pRes; + SResultRow* pResRow = pInfo->anomalySup.pResultRow; + + doUpdateNumOfRows(pExprSup->pCtx, pResRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); + copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pResRow, pExprSup->pCtx, pRes, + pExprSup->rowEntryInfoOffset, pTaskInfo); + pRes->info.rows += pResRow->numOfRows; + clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs); +} + +static void anomalyAggregateBlocks(SOperatorInfo* pOperator) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SAnomalyWindowOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExprSupp* pExprSup = &pOperator->exprSupp; + SSDataBlock* pRes = pInfo->binfo.pRes; + SAnomalyWindowSupp* pSupp = &pInfo->anomalySup; + SWindowRowsSup* pRowSup = &pInfo->anomalyWinRowSup; + SResultRow* pResRow = pSupp->pResultRow; + int32_t numOfOutput = pOperator->exprSupp.numOfExprs; + int32_t rowsInWin = 0; + int32_t rowsInBlock = 0; + const int64_t gid = pSupp->groupId; + const int32_t order = pInfo->binfo.inputTsOrder; + + int32_t numOfBlocks = (int32_t)taosArrayGetSize(pSupp->blocks); + if (numOfBlocks == 0) goto _OVER; + + qDebug("group:%" PRId64 ", aggregate blocks, blocks:%d", pSupp->groupId, numOfBlocks); + pRes->info.id.groupId = pSupp->groupId; + + code = anomalyAnalysisWindow(pOperator); + QUERY_CHECK_CODE(code, lino, _OVER); + + int32_t numOfWins = taosArrayGetSize(pSupp->windows); + qDebug("group:%" PRId64 ", wins:%d, rows:%" PRId64, pSupp->groupId, numOfWins, pSupp->numOfRows); + for (int32_t w = 0; w < numOfWins; ++w) { + STimeWindow* pWindow = taosArrayGet(pSupp->windows, w); + if (w == 0) { + pSupp->curWin = *pWindow; + pRowSup->win.skey = pSupp->curWin.skey; + } + qDebug("group:%" PRId64 ", win:%d [%" PRId64 ", %" PRId64 ")", pSupp->groupId, w, pWindow->skey, pWindow->ekey); + } + + if (numOfWins <= 0) goto _OVER; + if (numOfWins > pRes->info.capacity) { + code = blockDataEnsureCapacity(pRes, numOfWins); + QUERY_CHECK_CODE(code, lino, _OVER); + } + + for (int32_t b = 0; b < numOfBlocks; ++b) { + SSDataBlock* pBlock = taosArrayGetP(pSupp->blocks, b); + if (pBlock == NULL) break; + + pRes->info.scanFlag = pBlock->info.scanFlag; + code = setInputDataBlock(pExprSup, pBlock, order, MAIN_SCAN, true); + if (code != 0) break; + + code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId); + if (code != 0) break; + + // there is an scalar expression that needs to be calculated right before apply the group aggregation. + if (pInfo->scalarSup.pExprInfo != NULL) { + code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, + pInfo->scalarSup.numOfExprs, NULL); + if (code != 0) break; + } + + SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pInfo->anomalyCol.slotId); + if (pValCol == NULL) break; + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); + if (pTsCol == NULL) break; + TSKEY* tsList = (TSKEY*)pTsCol->pData; + bool lastBlock = (b == numOfBlocks - 1); + + qTrace("group:%" PRId64 ", block:%d win:%d, riwin:%d riblock:%d, rows:%" PRId64, pSupp->groupId, b, + pSupp->curWinIndex, rowsInWin, rowsInBlock, pBlock->info.rows); + + for (int32_t r = 0; r < pBlock->info.rows; ++r) { + TSKEY key = tsList[r]; + bool keyInWin = (key >= pSupp->curWin.skey && key < pSupp->curWin.ekey); + bool lastRow = (r == pBlock->info.rows - 1); + + if (keyInWin) { + if (r < 5) { + qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d", pSupp->groupId, b, + pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock); + } + if (rowsInBlock == 0) { + doKeepNewWindowStartInfo(pRowSup, tsList, r, gid); + } + doKeepTuple(pRowSup, tsList[r], gid); + rowsInBlock++; + rowsInWin++; + } else { + if (rowsInBlock > 0) { + qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, agg", pSupp->groupId, + b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock); + anomalyAggregateRows(pOperator, pBlock); + rowsInBlock = 0; + } + if (rowsInWin > 0) { + qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, build result", + pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock); + anomalyBuildResult(pOperator); + rowsInWin = 0; + } + if (anomalyFindWindow(pSupp, tsList[r]) == 0) { + qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, new window detect", + pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock); + doKeepNewWindowStartInfo(pRowSup, tsList, r, gid); + doKeepTuple(pRowSup, tsList[r], gid); + rowsInBlock = 1; + rowsInWin = 1; + } else { + qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, window not found", + pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock); + rowsInBlock = 0; + rowsInWin = 0; + } + } + + if (lastRow && rowsInBlock > 0) { + qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, agg since lastrow", + pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock); + anomalyAggregateRows(pOperator, pBlock); + rowsInBlock = 0; + } + } + + if (lastBlock && rowsInWin > 0) { + qTrace("group:%" PRId64 ", block:%d win:%d, riwin:%d riblock:%d, build result since lastblock", pSupp->groupId, b, + pSupp->curWinIndex, rowsInWin, rowsInBlock); + anomalyBuildResult(pOperator); + rowsInWin = 0; + } + } + + code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); + QUERY_CHECK_CODE(code, lino, _OVER); + +_OVER: + for (int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* pBlock = taosArrayGetP(pSupp->blocks, i); + qDebug("%s, clear block, pBlock:%p pBlock->pDataBlock:%p", __func__, pBlock, pBlock->pDataBlock); + blockDataDestroy(pBlock); + } + + taosArrayClear(pSupp->blocks); + taosArrayClear(pSupp->windows); + pSupp->numOfRows = 0; + pSupp->curWin.ekey = 0; + pSupp->curWin.skey = 0; + pSupp->curWinIndex = 0; +} + +#else + +int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, + SOperatorInfo** pOptrInfo) { + return TSDB_CODE_OPS_NOT_SUPPORT; +} +void destroyForecastInfo(void* param) {} + +#endif \ No newline at end of file diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index c74aef3992..4fe45ff72e 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1794,9 +1794,13 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { pExp->pExpr->nodeType = QUERY_NODE_FUNCTION; SFunctionNode* pFuncNode = (SFunctionNode*)pNode; - SDataType* pType = &pFuncNode->node.resType; - pExp->base.resSchema = - createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pFuncNode->node.aliasName); + SDataType* pType = &pFuncNode->node.resType; + const char* pName = pFuncNode->node.aliasName; + if (pFuncNode->funcType == FUNCTION_TYPE_FORECAST_LOW || pFuncNode->funcType == FUNCTION_TYPE_FORECAST_HIGH || + pFuncNode->funcType == FUNCTION_TYPE_FORECAST_ROWTS) { + pName = pFuncNode->functionName; + } + pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pName); tExprNode* pExprNode = pExp->pExpr; diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c new file mode 100644 index 0000000000..599678106c --- /dev/null +++ b/source/libs/executor/src/forecastoperator.c @@ -0,0 +1,663 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "executorInt.h" +#include "filter.h" +#include "function.h" +#include "functionMgt.h" +#include "operator.h" +#include "querytask.h" +#include "storageapi.h" +#include "tanal.h" +#include "tcommon.h" +#include "tcompare.h" +#include "tdatablock.h" +#include "tfill.h" +#include "ttime.h" + +#ifdef USE_ANAL + +typedef struct { + char algoName[TSDB_ANAL_ALGO_NAME_LEN]; + char algoUrl[TSDB_ANAL_ALGO_URL_LEN]; + char algoOpt[TSDB_ANAL_ALGO_OPTION_LEN]; + int64_t maxTs; + int64_t minTs; + int64_t numOfRows; + uint64_t groupId; + int32_t numOfBlocks; + int32_t optRows; + int16_t resTsSlot; + int16_t resValSlot; + int16_t resLowSlot; + int16_t resHighSlot; + int16_t inputTsSlot; + int16_t inputValSlot; + int8_t inputValType; + int8_t inputPrecision; + SAnalBuf analBuf; +} SForecastSupp; + +typedef struct SForecastOperatorInfo { + SSDataBlock* pRes; + SExprSupp scalarSup; // scalar calculation + SForecastSupp forecastSupp; +} SForecastOperatorInfo; + +static void destroyForecastInfo(void* param); + +static FORCE_INLINE int32_t forecastEnsureBlockCapacity(SSDataBlock* pBlock, int32_t newRowsNum) { + if (pBlock->info.rows < pBlock->info.capacity) { + return TSDB_CODE_SUCCESS; + } + + int32_t code = blockDataEnsureCapacity(pBlock, newRowsNum); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + return code; + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SAnalBuf* pBuf = &pSupp->analBuf; + + qDebug("block:%d, %p rows:%" PRId64, pSupp->numOfBlocks, pBlock, pBlock->info.rows); + pSupp->numOfBlocks++; + + for (int32_t j = 0; j < pBlock->info.rows; ++j) { + SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pSupp->inputValSlot); + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSupp->inputTsSlot); + if (pTsCol == NULL || pValCol == NULL) break; + + int64_t ts = ((TSKEY*)pTsCol->pData)[j]; + char* val = colDataGetData(pValCol, j); + int16_t valType = pValCol->info.type; + + pSupp->minTs = MIN(pSupp->minTs, ts); + pSupp->maxTs = MAX(pSupp->maxTs, ts); + pSupp->numOfRows++; + + code = taosAnalBufWriteColData(pBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &ts); + if (TSDB_CODE_SUCCESS != code) return code; + + code = taosAnalBufWriteColData(pBuf, 1, valType, val); + if (TSDB_CODE_SUCCESS != code) return code; + } + + return 0; +} + +static int32_t forecastCloseBuf(SForecastSupp* pSupp) { + SAnalBuf* pBuf = &pSupp->analBuf; + int32_t code = 0; + + for (int32_t i = 0; i < 2; ++i) { + code = taosAnalBufWriteColEnd(pBuf, i); + if (code != 0) return code; + } + + code = taosAnalBufWriteDataEnd(pBuf); + if (code != 0) return code; + + int32_t len = strlen(pSupp->algoOpt); + int64_t every = (pSupp->maxTs - pSupp->minTs) / (pSupp->numOfRows + 1); + int64_t start = pSupp->maxTs + every; + bool hasStart = taosAnalGetOptStr(pSupp->algoOpt, "start", NULL, 0); + if (!hasStart) { + qDebug("forecast start not found from %s, use %" PRId64, pSupp->algoOpt, start); + code = taosAnalBufWriteOptInt(pBuf, "start", start); + if (code != 0) return code; + } + + bool hasEvery = taosAnalGetOptStr(pSupp->algoOpt, "every", NULL, 0); + if (!hasEvery) { + qDebug("forecast every not found from %s, use %" PRId64, pSupp->algoOpt, every); + code = taosAnalBufWriteOptInt(pBuf, "every", every); + if (code != 0) return code; + } + + code = taosAnalBufWriteOptStr(pBuf, "option", pSupp->algoOpt); + if (code != 0) return code; + + code = taosAnalBufClose(pBuf); + return code; +} + +static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock) { + SAnalBuf* pBuf = &pSupp->analBuf; + int32_t resCurRow = pBlock->info.rows; + int8_t tmpI8; + int16_t tmpI16; + int32_t tmpI32; + int64_t tmpI64; + float tmpFloat; + double tmpDouble; + int32_t code = 0; + + SColumnInfoData* pResValCol = taosArrayGet(pBlock->pDataBlock, pSupp->resValSlot); + if (NULL == pResValCol) return TSDB_CODE_OUT_OF_RANGE; + + SColumnInfoData* pResTsCol = (pSupp->resTsSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resTsSlot) : NULL); + SColumnInfoData* pResLowCol = (pSupp->resLowSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resLowSlot) : NULL); + SColumnInfoData* pResHighCol = + (pSupp->resHighSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resHighSlot) : NULL); + + SJson* pJson = taosAnalSendReqRetJson(pSupp->algoUrl, ANAL_HTTP_TYPE_POST, pBuf); + if (pJson == NULL) return terrno; + + int32_t rows = 0; + tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code); + if (code < 0) goto _OVER; + if (rows <= 0) goto _OVER; + + SJson* res = tjsonGetObjectItem(pJson, "res"); + if (res == NULL) goto _OVER; + int32_t ressize = tjsonGetArraySize(res); + bool returnConf = (pSupp->resHighSlot != -1 || pSupp->resLowSlot != -1); + if (returnConf) { + if (ressize != 4) goto _OVER; + } else if (ressize != 2) { + goto _OVER; + } + + if (pResTsCol != NULL) { + resCurRow = pBlock->info.rows; + SJson* tsJsonArray = tjsonGetArrayItem(res, 0); + if (tsJsonArray == NULL) goto _OVER; + int32_t tsSize = tjsonGetArraySize(tsJsonArray); + if (tsSize != rows) goto _OVER; + for (int32_t i = 0; i < tsSize; ++i) { + SJson* tsJson = tjsonGetArrayItem(tsJsonArray, i); + tjsonGetObjectValueBigInt(tsJson, &tmpI64); + colDataSetInt64(pResTsCol, resCurRow, &tmpI64); + resCurRow++; + } + } + + if (pResLowCol != NULL) { + resCurRow = pBlock->info.rows; + SJson* lowJsonArray = tjsonGetArrayItem(res, 2); + if (lowJsonArray == NULL) goto _OVER; + int32_t lowSize = tjsonGetArraySize(lowJsonArray); + if (lowSize != rows) goto _OVER; + for (int32_t i = 0; i < lowSize; ++i) { + SJson* lowJson = tjsonGetArrayItem(lowJsonArray, i); + tjsonGetObjectValueDouble(lowJson, &tmpDouble); + tmpFloat = (float)tmpDouble; + colDataSetFloat(pResLowCol, resCurRow, &tmpFloat); + resCurRow++; + } + } + + if (pResHighCol != NULL) { + resCurRow = pBlock->info.rows; + SJson* highJsonArray = tjsonGetArrayItem(res, 3); + if (highJsonArray == NULL) goto _OVER; + int32_t highSize = tjsonGetArraySize(highJsonArray); + if (highSize != rows) goto _OVER; + for (int32_t i = 0; i < highSize; ++i) { + SJson* highJson = tjsonGetArrayItem(highJsonArray, i); + tjsonGetObjectValueDouble(highJson, &tmpDouble); + tmpFloat = (float)tmpDouble; + colDataSetFloat(pResHighCol, resCurRow, &tmpFloat); + resCurRow++; + } + } + + resCurRow = pBlock->info.rows; + SJson* valJsonArray = tjsonGetArrayItem(res, 1); + if (valJsonArray == NULL) goto _OVER; + int32_t valSize = tjsonGetArraySize(valJsonArray); + if (valSize != rows) goto _OVER; + for (int32_t i = 0; i < valSize; ++i) { + SJson* valJson = tjsonGetArrayItem(valJsonArray, i); + tjsonGetObjectValueDouble(valJson, &tmpDouble); + + switch (pSupp->inputValType) { + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_UTINYINT: + case TSDB_DATA_TYPE_TINYINT: { + tmpI8 = (int8_t)tmpDouble; + colDataSetInt8(pResValCol, resCurRow, &tmpI8); + break; + } + case TSDB_DATA_TYPE_USMALLINT: + case TSDB_DATA_TYPE_SMALLINT: { + tmpI16 = (int16_t)tmpDouble; + colDataSetInt16(pResValCol, resCurRow, &tmpI16); + break; + } + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: { + tmpI32 = (int32_t)tmpDouble; + colDataSetInt32(pResValCol, resCurRow, &tmpI32); + break; + } + case TSDB_DATA_TYPE_TIMESTAMP: + case TSDB_DATA_TYPE_UBIGINT: + case TSDB_DATA_TYPE_BIGINT: { + tmpI64 = (int64_t)tmpDouble; + colDataSetInt64(pResValCol, resCurRow, &tmpI64); + break; + } + case TSDB_DATA_TYPE_FLOAT: { + tmpFloat = (float)tmpDouble; + colDataSetFloat(pResValCol, resCurRow, &tmpFloat); + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + colDataSetDouble(pResValCol, resCurRow, &tmpDouble); + break; + } + default: + code = TSDB_CODE_FUNC_FUNTION_PARA_TYPE; + goto _OVER; + } + resCurRow++; + } + + // for (int32_t i = rows; i < pSupp->optRows; ++i) { + // colDataSetNNULL(pResValCol, rows, (pSupp->optRows - rows)); + // if (pResTsCol != NULL) { + // colDataSetNNULL(pResTsCol, rows, (pSupp->optRows - rows)); + // } + // if (pResLowCol != NULL) { + // colDataSetNNULL(pResLowCol, rows, (pSupp->optRows - rows)); + // } + // if (pResHighCol != NULL) { + // colDataSetNNULL(pResHighCol, rows, (pSupp->optRows - rows)); + // } + // } + + // if (rows == pSupp->optRows) { + // pResValCol->hasNull = false; + // } + + pBlock->info.rows += rows; + + if (pJson != NULL) tjsonDelete(pJson); + return 0; + +_OVER: + if (pJson != NULL) tjsonDelete(pJson); + if (code == 0) { + code = TSDB_CODE_INVALID_JSON_FORMAT; + } + qError("failed to perform forecast finalize since %s", tstrerror(code)); + return TSDB_CODE_INVALID_JSON_FORMAT; +} + +static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBlock) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SAnalBuf* pBuf = &pSupp->analBuf; + + code = forecastCloseBuf(pSupp); + QUERY_CHECK_CODE(code, lino, _end); + + code = forecastEnsureBlockCapacity(pResBlock, 1); + QUERY_CHECK_CODE(code, lino, _end); + + code = forecastAnalysis(pSupp, pResBlock); + QUERY_CHECK_CODE(code, lino, _end); + + uInfo("block:%d, forecast finalize", pSupp->numOfBlocks); + +_end: + pSupp->numOfBlocks = 0; + taosAnalBufDestroy(&pSupp->analBuf); + return code; +} + +static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SForecastOperatorInfo* pInfo = pOperator->info; + SSDataBlock* pResBlock = pInfo->pRes; + SForecastSupp* pSupp = &pInfo->forecastSupp; + SAnalBuf* pBuf = &pSupp->analBuf; + int64_t st = taosGetTimestampUs(); + int32_t numOfBlocks = pSupp->numOfBlocks; + + blockDataCleanup(pResBlock); + + while (1) { + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); + if (pBlock == NULL) { + break; + } + + if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) { + pSupp->groupId = pBlock->info.id.groupId; + numOfBlocks++; + qDebug("group:%" PRId64 ", blocks:%d, cache block rows:%" PRId64, pSupp->groupId, numOfBlocks, pBlock->info.rows); + code = forecastCacheBlock(pSupp, pBlock); + QUERY_CHECK_CODE(code, lino, _end); + } else { + qDebug("group:%" PRId64 ", read finish for new group coming, blocks:%d", pSupp->groupId, numOfBlocks); + forecastAggregateBlocks(pSupp, pResBlock); + pSupp->groupId = pBlock->info.id.groupId; + numOfBlocks = 1; + qDebug("group:%" PRId64 ", new group, cache block rows:%" PRId64, pSupp->groupId, pBlock->info.rows); + code = forecastCacheBlock(pSupp, pBlock); + QUERY_CHECK_CODE(code, lino, _end); + } + + if (pResBlock->info.rows > 0) { + (*ppRes) = pResBlock; + qDebug("group:%" PRId64 ", return to upstream, blocks:%d", pResBlock->info.id.groupId, numOfBlocks); + return code; + } + } + + if (numOfBlocks > 0) { + qDebug("group:%" PRId64 ", read finish, blocks:%d", pSupp->groupId, numOfBlocks); + forecastAggregateBlocks(pSupp, pResBlock); + } + + int64_t cost = taosGetTimestampUs() - st; + qDebug("all groups finished, cost:%" PRId64 "us", cost); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } + (*ppRes) = (pResBlock->info.rows == 0) ? NULL : pResBlock; + return code; +} + +static int32_t forecastParseOutput(SForecastSupp* pSupp, SExprSupp* pExprSup) { + pSupp->resLowSlot = -1; + pSupp->resHighSlot = -1; + pSupp->resTsSlot = -1; + pSupp->resValSlot = -1; + + for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { + SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; + int32_t dstSlot = pExprInfo->base.resSchema.slotId; + if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST) { + pSupp->resValSlot = dstSlot; + } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_ROWTS) { + pSupp->resTsSlot = dstSlot; + } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_LOW) { + pSupp->resLowSlot = dstSlot; + } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_HIGH) { + pSupp->resHighSlot = dstSlot; + } else { + } + } + + return 0; +} + +static int32_t forecastParseInput(SForecastSupp* pSupp, SNodeList* pFuncs) { + SNode* pNode = NULL; + + pSupp->inputTsSlot = -1; + pSupp->inputValSlot = -1; + pSupp->inputValType = -1; + pSupp->inputPrecision = -1; + + FOREACH(pNode, pFuncs) { + if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) { + SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr; + int32_t numOfParam = LIST_LENGTH(pFunc->pParameterList); + + if (pFunc->funcType == FUNCTION_TYPE_FORECAST) { + if (numOfParam == 3) { + SNode* p1 = nodesListGetNode(pFunc->pParameterList, 0); + SNode* p2 = nodesListGetNode(pFunc->pParameterList, 1); + SNode* p3 = nodesListGetNode(pFunc->pParameterList, 2); + if (p1 == NULL || p2 == NULL || p3 == NULL) return TSDB_CODE_PLAN_INTERNAL_ERROR; + if (p1->type != QUERY_NODE_COLUMN) return TSDB_CODE_PLAN_INTERNAL_ERROR; + if (p2->type != QUERY_NODE_VALUE) return TSDB_CODE_PLAN_INTERNAL_ERROR; + if (p3->type != QUERY_NODE_COLUMN) return TSDB_CODE_PLAN_INTERNAL_ERROR; + SColumnNode* pValNode = (SColumnNode*)p1; + SValueNode* pOptNode = (SValueNode*)p2; + SColumnNode* pTsNode = (SColumnNode*)p3; + pSupp->inputTsSlot = pTsNode->slotId; + pSupp->inputPrecision = pTsNode->node.resType.precision; + pSupp->inputValSlot = pValNode->slotId; + pSupp->inputValType = pValNode->node.resType.type; + tstrncpy(pSupp->algoOpt, pOptNode->literal, sizeof(pSupp->algoOpt)); + } else if (numOfParam == 2) { + SNode* p1 = nodesListGetNode(pFunc->pParameterList, 0); + SNode* p2 = nodesListGetNode(pFunc->pParameterList, 1); + if (p1 == NULL || p2 == NULL) return TSDB_CODE_PLAN_INTERNAL_ERROR; + if (p1->type != QUERY_NODE_COLUMN) return TSDB_CODE_PLAN_INTERNAL_ERROR; + if (p2->type != QUERY_NODE_COLUMN) return TSDB_CODE_PLAN_INTERNAL_ERROR; + SColumnNode* pValNode = (SColumnNode*)p1; + SColumnNode* pTsNode = (SColumnNode*)p2; + pSupp->inputTsSlot = pTsNode->slotId; + pSupp->inputPrecision = pTsNode->node.resType.precision; + pSupp->inputValSlot = pValNode->slotId; + pSupp->inputValType = pValNode->node.resType.type; + tstrncpy(pSupp->algoOpt, "algo=arima", TSDB_ANAL_ALGO_OPTION_LEN); + } else { + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + } + } + } + + return 0; +} + +static int32_t forecastParseAlgo(SForecastSupp* pSupp) { + pSupp->maxTs = 0; + pSupp->minTs = INT64_MAX; + pSupp->numOfRows = 0; + + if (!taosAnalGetOptStr(pSupp->algoOpt, "algo", pSupp->algoName, sizeof(pSupp->algoName))) { + qError("failed to get forecast algorithm name from %s", pSupp->algoOpt); + return TSDB_CODE_ANAL_ALGO_NOT_FOUND; + } + + if (taosAnalGetAlgoUrl(pSupp->algoName, ANAL_ALGO_TYPE_FORECAST, pSupp->algoUrl, sizeof(pSupp->algoUrl)) != 0) { + qError("failed to get forecast algorithm url from %s", pSupp->algoName); + return TSDB_CODE_ANAL_ALGO_NOT_LOAD; + } + + return 0; +} + +static int32_t forecastCreateBuf(SForecastSupp* pSupp) { + SAnalBuf* pBuf = &pSupp->analBuf; + int64_t ts = 0; // taosGetTimestampMs(); + + pBuf->bufType = ANAL_BUF_TYPE_JSON_COL; + snprintf(pBuf->fileName, sizeof(pBuf->fileName), "%s/tdengine-forecast-%" PRId64, tsTempDir, ts); + int32_t code = tsosAnalBufOpen(pBuf, 2); + if (code != 0) goto _OVER; + + code = taosAnalBufWriteOptStr(pBuf, "algo", pSupp->algoName); + if (code != 0) goto _OVER; + + bool returnConf = (pSupp->resHighSlot == -1 || pSupp->resLowSlot == -1); + code = taosAnalBufWriteOptStr(pBuf, "return_conf", returnConf ? "true" : "false"); + if (code != 0) goto _OVER; + + bool hasAlpha = taosAnalGetOptStr(pSupp->algoOpt, "alpha", NULL, 0); + if (!hasAlpha) { + qDebug("forecast alpha not found from %s, use default:%f", pSupp->algoOpt, ANAL_FORECAST_DEFAULT_ALPHA); + code = taosAnalBufWriteOptFloat(pBuf, "alpha", ANAL_FORECAST_DEFAULT_ALPHA); + if (code != 0) goto _OVER; + } + + char tmpOpt[32] = {0}; + bool hasParam = taosAnalGetOptStr(pSupp->algoOpt, "param", tmpOpt, sizeof(tmpOpt)); + if (!hasParam) { + qDebug("forecast param not found from %s, use default:%s", pSupp->algoOpt, ANAL_FORECAST_DEFAULT_PARAM); + code = taosAnalBufWriteOptStr(pBuf, "param", ANAL_FORECAST_DEFAULT_PARAM); + if (code != 0) goto _OVER; + } + + bool hasPeriod = taosAnalGetOptInt(pSupp->algoOpt, "period", NULL); + if (!hasPeriod) { + qDebug("forecast period not found from %s, use default:%d", pSupp->algoOpt, ANAL_FORECAST_DEFAULT_PERIOD); + code = taosAnalBufWriteOptInt(pBuf, "period", ANAL_FORECAST_DEFAULT_PERIOD); + if (code != 0) goto _OVER; + } + + bool hasRows = taosAnalGetOptInt(pSupp->algoOpt, "rows", &pSupp->optRows); + if (!hasRows) { + pSupp->optRows = ANAL_FORECAST_DEFAULT_ROWS; + qDebug("forecast rows not found from %s, use default:%d", pSupp->algoOpt, pSupp->optRows); + code = taosAnalBufWriteOptInt(pBuf, "forecast_rows", pSupp->optRows); + if (code != 0) goto _OVER; + } + + const char* prec = TSDB_TIME_PRECISION_MILLI_STR; + if (pSupp->inputPrecision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR; + if (pSupp->inputPrecision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR; + code = taosAnalBufWriteOptStr(pBuf, "prec", prec); + if (code != 0) goto _OVER; + + if (returnConf) { + bool hasConf = taosAnalGetOptStr(pSupp->algoOpt, "conf", NULL, 0); + if (!hasConf) { + qDebug("forecast conf not found from %s, use default:%d", pSupp->algoOpt, ANAL_FORECAST_DEFAULT_CONF); + code = taosAnalBufWriteOptInt(pBuf, "conf", ANAL_FORECAST_DEFAULT_CONF); + if (code != 0) goto _OVER; + } + } + + code = taosAnalBufWriteColMeta(pBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, "ts"); + if (code != 0) goto _OVER; + + code = taosAnalBufWriteColMeta(pBuf, 1, pSupp->inputValType, "val"); + if (code != 0) goto _OVER; + + code = taosAnalBufWriteDataBegin(pBuf); + if (code != 0) goto _OVER; + + for (int32_t i = 0; i < 2; ++i) { + code = taosAnalBufWriteColBegin(pBuf, i); + if (code != 0) goto _OVER; + } + +_OVER: + if (code != 0) { + taosAnalBufClose(pBuf); + taosAnalBufDestroy(pBuf); + } + return code; +} + +int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, + SOperatorInfo** pOptrInfo) { + QRY_PARAM_CHECK(pOptrInfo); + + int32_t code = 0; + int32_t lino = 0; + SForecastOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SForecastOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL || pInfo == NULL) { + code = terrno; + goto _error; + } + + SForecastSupp* pSupp = &pInfo->forecastSupp; + SForecastFuncPhysiNode* pForecastPhyNode = (SForecastFuncPhysiNode*)pPhyNode; + SExprSupp* pExprSup = &pOperator->exprSupp; + int32_t numOfExprs = 0; + SExprInfo* pExprInfo = NULL; + + code = createExprInfo(pForecastPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs); + QUERY_CHECK_CODE(code, lino, _error); + + code = initExprSupp(pExprSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_CODE(code, lino, _error); + + if (pForecastPhyNode->pExprs != NULL) { + int32_t num = 0; + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pForecastPhyNode->pExprs, NULL, &pScalarExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_CODE(code, lino, _error); + } + + code = filterInitFromNode((SNode*)pForecastPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + QUERY_CHECK_CODE(code, lino, _error); + + code = forecastParseInput(pSupp, pForecastPhyNode->pFuncs); + QUERY_CHECK_CODE(code, lino, _error); + + code = forecastParseOutput(pSupp, pExprSup); + QUERY_CHECK_CODE(code, lino, _error); + + code = forecastParseAlgo(pSupp); + QUERY_CHECK_CODE(code, lino, _error); + + code = forecastCreateBuf(pSupp); + QUERY_CHECK_CODE(code, lino, _error); + + initResultSizeInfo(&pOperator->resultInfo, 4096); + + pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); + + setOperatorInfo(pOperator, "ForecastOperator", QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC, false, OP_NOT_OPENED, pInfo, + pTaskInfo); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, forecastNext, NULL, destroyForecastInfo, optrDefaultBufFn, + NULL, optrDefaultGetNextExtFn, NULL); + + code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + QUERY_CHECK_CODE(code, lino, _error); + + code = appendDownstream(pOperator, &downstream, 1); + QUERY_CHECK_CODE(code, lino, _error); + + *pOptrInfo = pOperator; + + qDebug("forecast env is initialized, option:%s", pSupp->algoOpt); + return TSDB_CODE_SUCCESS; + +_error: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + if (pInfo != NULL) destroyForecastInfo(pInfo); + destroyOperatorAndDownstreams(pOperator, &downstream, 1); + pTaskInfo->code = code; + return code; +} + +static void destroyForecastInfo(void* param) { + SForecastOperatorInfo* pInfo = (SForecastOperatorInfo*)param; + + blockDataDestroy(pInfo->pRes); + pInfo->pRes = NULL; + cleanupExprSupp(&pInfo->scalarSup); + taosAnalBufDestroy(&pInfo->forecastSupp.analBuf); + taosMemoryFreeClear(param); +} + +#else + +int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, + SOperatorInfo** pOptrInfo) { + return TSDB_CODE_OPS_NOT_SUPPORT; +} + +#endif diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 8daf4695db..7914f9f320 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -619,6 +619,8 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand code = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr); } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) { code = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr); + } else if (QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC == type) { + code = createForecastOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT == type) { code = createEventwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr); } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE == type) { @@ -629,6 +631,8 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand code = createStreamCountAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT == type) { code = createCountwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr); + } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY == type) { + code = createAnomalywindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr); } else { code = TSDB_CODE_INVALID_PARA; pTaskInfo->code = code;