diff --git a/docs/examples/.gitignre b/docs/examples/.gitignre new file mode 100644 index 0000000000..0853156c65 --- /dev/null +++ b/docs/examples/.gitignre @@ -0,0 +1,2 @@ +.vscode +*.lock \ No newline at end of file diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 831c561ceb..b359fa5d6a 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -132,7 +132,6 @@ typedef struct SqlFunctionCtx { char *pOutput; // final result output buffer, point to sdata->data int32_t numOfParams; SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param - int64_t *ptsList; // corresponding timestamp array list, todo remove it SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ int32_t offset; struct SResultRowEntryInfo *resultInfo; diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index a73b9b47f4..dcc0a2ba4b 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -134,6 +134,7 @@ typedef enum EFunctionType { FUNCTION_TYPE_HYPERLOGLOG_MERGE, FUNCTION_TYPE_ELAPSED_PARTIAL, FUNCTION_TYPE_ELAPSED_MERGE, + FUNCTION_TYPE_TOP_PARTIAL, FUNCTION_TYPE_TOP_MERGE, FUNCTION_TYPE_BOTTOM_PARTIAL, @@ -184,6 +185,7 @@ bool fmIsUserDefinedFunc(int32_t funcId); bool fmIsDistExecFunc(int32_t funcId); bool fmIsForbidFillFunc(int32_t funcId); bool fmIsForbidStreamFunc(int32_t funcId); +bool fmIsIntervalInterpoFunc(int32_t funcId); int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc); diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index b8975854c9..ca25b7aab1 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -44,7 +44,6 @@ typedef struct SGroupResInfo { int32_t index; SArray* pRows; // SArray - int32_t position; } SGroupResInfo; typedef struct SResultRow { @@ -56,7 +55,7 @@ typedef struct SResultRow { uint32_t numOfRows; // number of rows of current time window STimeWindow win; struct SResultRowEntryInfo pEntryInfo[]; // For each result column, there is a resultInfo -// char *key; // start key of current result row +// char *key; // start key of current result row } SResultRow; typedef struct SResultRowPosition { @@ -71,9 +70,7 @@ typedef struct SResKeyPos { } SResKeyPos; typedef struct SResultRowInfo { - SResultRowPosition *pPosition; // todo remove this int32_t size; // number of result set - int32_t capacity; // max capacity SResultRowPosition cur; SList* openWindow; } SResultRowInfo; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index df54161720..c8546a1afe 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -20,6 +20,8 @@ extern "C" { #endif +extern int32_t exchangeObjRefPool; + typedef struct { char* pData; bool isNull; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index caba10834b..cc690a3e5e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -181,7 +181,6 @@ typedef struct SExecTaskInfo { STaskCostInfo cost; int64_t owner; // if it is in execution int32_t code; -// uint64_t totalRows; // total number of rows struct { char *tablename; char *dbname; @@ -222,10 +221,10 @@ typedef struct STaskRuntimeEnv { } STaskRuntimeEnv; enum { - OP_NOT_OPENED = 0x0, - OP_OPENED = 0x1, + OP_NOT_OPENED = 0x0, + OP_OPENED = 0x1, OP_RES_TO_RETURN = 0x5, - OP_EXEC_DONE = 0x9, + OP_EXEC_DONE = 0x9, }; typedef struct SOperatorFpSet { @@ -262,12 +261,12 @@ typedef enum { } EX_SOURCE_STATUS; typedef struct SSourceDataInfo { - struct SExchangeInfo* pEx; int32_t index; SRetrieveTableRsp* pRsp; uint64_t totalRows; int32_t code; EX_SOURCE_STATUS status; + const char* taskId; } SSourceDataInfo; typedef struct SLoadRemoteDataInfo { @@ -285,6 +284,7 @@ typedef struct SExchangeInfo { bool seqLoadData; // sequential load data or not, false by default int32_t current; SLoadRemoteDataInfo loadInfo; + uint64_t self; } SExchangeInfo; #define COL_MATCH_FROM_COL_ID 0x1 @@ -470,10 +470,8 @@ typedef struct SIntervalAggOperatorInfo { bool timeWindowInterpo; // interpolation needed or not char** pRow; // previous row/tuple of already processed datablock SArray* pInterpCols; // interpolation columns - STableQueryInfo* pCurrent; // current tableQueryInfo struct int32_t order; // current SSDataBlock scan order EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] - SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator. STimeWindowAggSupp twAggSup; bool invertible; SArray* pPrevValues; // SArray used to keep the previous not null value for interpolation. @@ -502,8 +500,6 @@ typedef struct SAggOperatorInfo { STableQueryInfo *current; uint64_t groupId; SGroupResInfo groupResInfo; - STableQueryInfo *pTableQueryInfo; - SExprInfo *pScalarExprInfo; int32_t numOfScalarExpr; // the number of scalar expression before the aggregate function can be applied SqlFunctionCtx *pScalarCtx; // scalar function requried sql function struct. @@ -639,8 +635,13 @@ typedef struct SStreamSessionAggOperatorInfo { typedef struct STimeSliceOperatorInfo { SOptrBasicInfo binfo; + STimeWindow win; SInterval interval; - SGroupResInfo groupResInfo; // multiple results build supporter + int64_t current; + SArray* pPrevRow; // SArray + SArray* pCols; // SArray + int32_t fillType; // fill type + struct SFillColInfo* pFillColInfo; // fill column info } STimeSliceOperatorInfo; typedef struct SStateWindowOperatorInfo { @@ -733,6 +734,8 @@ typedef struct SJoinOperatorInfo { #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) +void doDestroyExchangeOperatorInfo(void* param); + SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn, __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode, __optr_decode_fn_t decode, __optr_explain_fn_t explain); @@ -840,7 +843,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); + SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index a1f45fd665..3b80b262ca 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -28,8 +28,6 @@ struct SSDataBlock; typedef struct SFillColInfo { SExprInfo *pExpr; -// SResSchema schema; -// int16_t functionId; // sql function id int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN int16_t tagIndex; // index of current tag in SFillTagColInfo array list SVariant fillVal; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 7550c744c8..6ba1cf859e 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -41,13 +41,7 @@ int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) { int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size) { pResultRowInfo->size = 0; - pResultRowInfo->capacity = size; pResultRowInfo->cur.pageId = -1; - - pResultRowInfo->pPosition = taosMemoryCalloc(pResultRowInfo->capacity, sizeof(SResultRowPosition)); - if (pResultRowInfo->pPosition == NULL) { - return TSDB_CODE_QRY_OUT_OF_MEMORY; - } return TSDB_CODE_SUCCESS; } @@ -56,25 +50,14 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) { return; } - if (pResultRowInfo->capacity == 0) { -// assert(pResultRowInfo->pResult == NULL); - return; - } - for(int32_t i = 0; i < pResultRowInfo->size; ++i) { // if (pResultRowInfo->pResult[i]) { // taosMemoryFreeClear(pResultRowInfo->pResult[i]->key); // } } - - taosMemoryFreeClear(pResultRowInfo->pPosition); } void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { - if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0) { - return; - } - for (int32_t i = 0; i < pResultRowInfo->size; ++i) { // SResultRow *pWindowRes = pResultRowInfo->pResult[i]; // clearResultRow(pRuntimeEnv, pWindowRes); @@ -288,232 +271,3 @@ void orderTheResultRows(STaskRuntimeEnv* pRuntimeEnv) { taosArraySort(pRuntimeEnv->pResultRowArrayList, fn); } - -static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, uint64_t groupId, int32_t* rowCellInfoOffset) { - if (pGroupResInfo->pRows == NULL) { - pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES); - } - - size_t len = taosArrayGetSize(pRuntimeEnv->pResultRowArrayList); - for(; pGroupResInfo->position < len; ++pGroupResInfo->position) { - SResultRowCell* pResultRowCell = taosArrayGet(pRuntimeEnv->pResultRowArrayList, pGroupResInfo->position); - if (pResultRowCell->groupId != groupId) { - break; - } - - - int64_t num = 0;//getNumOfResultWindowRes(pRuntimeEnv, &pResultRowCell->pos, rowCellInfoOffset); - if (num <= 0) { - continue; - } - - taosArrayPush(pGroupResInfo->pRows, &pResultRowCell->pos); -// pResultRowCell->pRow->numOfRows = (uint32_t) num; - } - - return TSDB_CODE_SUCCESS; -} - -static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList, - int32_t* rowCellInfoOffset) { - bool ascQuery = true; -#if 0 - int32_t code = TSDB_CODE_SUCCESS; - - int32_t *posList = NULL; - SMultiwayMergeTreeInfo *pTree = NULL; - STableQueryInfo **pTableQueryInfoList = NULL; - - size_t size = taosArrayGetSize(pTableList); - if (pGroupResInfo->pRows == NULL) { - pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES); - } - - posList = taosMemoryCalloc(size, sizeof(int32_t)); - pTableQueryInfoList = taosMemoryMalloc(POINTER_BYTES * size); - - if (pTableQueryInfoList == NULL || posList == NULL || pGroupResInfo->pRows == NULL || pGroupResInfo->pRows == NULL) { -// qError("QInfo:%"PRIu64" failed alloc memory", GET_TASKID(pRuntimeEnv)); - code = TSDB_CODE_QRY_OUT_OF_MEMORY; - goto _end; - } - - int32_t numOfTables = 0; - for (int32_t i = 0; i < size; ++i) { - STableQueryInfo *item = taosArrayGetP(pTableList, i); -// if (item->resInfo.size > 0) { -// pTableQueryInfoList[numOfTables++] = item; -// } - } - - // there is no data in current group - // no need to merge results since only one table in each group -// if (numOfTables == 0) { -// goto _end; -// } - - int32_t order = TSDB_ORDER_ASC; - SCompSupporter cs = {pTableQueryInfoList, posList, order}; - - int32_t ret = tMergeTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn); - if (ret != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_QRY_OUT_OF_MEMORY; - goto _end; - } - - int64_t lastTimestamp = ascQuery? INT64_MIN:INT64_MAX; - int64_t startt = taosGetTimestampMs(); - - while (1) { - int32_t tableIndex = tMergeTreeGetChosenIndex(pTree); - - SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->resInfo; - ASSERT(0); - SResultRow *pWindowRes = NULL;//getResultRow(pBuf, pWindowResInfo, cs.rowIndex[tableIndex]); - - int64_t num = 0;//getNumOfResultWindowRes(pRuntimeEnv, pWindowRes, rowCellInfoOffset); - if (num <= 0) { - cs.rowIndex[tableIndex] += 1; - - if (cs.rowIndex[tableIndex] >= pWindowResInfo->size) { - cs.rowIndex[tableIndex] = -1; - if (--numOfTables == 0) { // all input sources are exhausted - break; - } - } - } else { - assert((pWindowRes->win.skey >= lastTimestamp && ascQuery) || (pWindowRes->win.skey <= lastTimestamp && !ascQuery)); - - if (pWindowRes->win.skey != lastTimestamp) { - taosArrayPush(pGroupResInfo->pRows, &pWindowRes); - pWindowRes->numOfRows = (uint32_t) num; - } - - lastTimestamp = pWindowRes->win.skey; - - // move to the next row of current entry - if ((++cs.rowIndex[tableIndex]) >= pWindowResInfo->size) { - cs.rowIndex[tableIndex] = -1; - - // all input sources are exhausted - if ((--numOfTables) == 0) { - break; - } - } - } - - tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree)); - } - - int64_t endt = taosGetTimestampMs(); - -// qDebug("QInfo:%"PRIx64" result merge completed for group:%d, elapsed time:%" PRId64 " ms", GET_TASKID(pRuntimeEnv), -// pGroupResInfo->currentGroup, endt - startt); - - _end: - taosMemoryFreeClear(pTableQueryInfoList); - taosMemoryFreeClear(posList); - taosMemoryFreeClear(pTree); - - return code; -} - -int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, STaskRuntimeEnv* pRuntimeEnv, int32_t* offset) { - int64_t st = taosGetTimestampUs(); - - while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) { - mergeIntoGroupResultImplRv(pRuntimeEnv, pGroupResInfo, pGroupResInfo->currentGroup, offset); - - // this group generates at least one result, return results - if (taosArrayGetSize(pGroupResInfo->pRows) > 0) { - break; - } - -// qDebug("QInfo:%"PRIu64" no result in group %d, continue", GET_TASKID(pRuntimeEnv), pGroupResInfo->currentGroup); - cleanupGroupResInfo(pGroupResInfo); - incNextGroup(pGroupResInfo); - } - -// int64_t elapsedTime = taosGetTimestampUs() - st; -// qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_TASKID(pRuntimeEnv), -// pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime); -#endif - - return TSDB_CODE_SUCCESS; -} - -//void blockDistInfoToBinary(STableBlockDist* pDist, struct SBufferWriter* bw) { -// tbufWriteUint32(bw, pDist->numOfTables); -// tbufWriteUint16(bw, pDist->numOfFiles); -// tbufWriteUint64(bw, pDist->totalSize); -// tbufWriteUint64(bw, pDist->totalRows); -// tbufWriteInt32(bw, pDist->maxRows); -// tbufWriteInt32(bw, pDist->minRows); -// tbufWriteUint32(bw, pDist->numOfInmemRows); -// tbufWriteUint32(bw, pDist->numOfSmallBlocks); -// tbufWriteUint64(bw, taosArrayGetSize(pDist->dataBlockInfos)); -// -// // compress the binary string -// char* p = TARRAY_GET_START(pDist->dataBlockInfos); -// -// // compress extra bytes -// size_t x = taosArrayGetSize(pDist->dataBlockInfos) * pDist->dataBlockInfos->elemSize; -// char* tmp = taosMemoryMalloc(x + 2); -// -// bool comp = false; -// int32_t len = tsCompressString(p, (int32_t)x, 1, tmp, (int32_t)x, ONE_STAGE_COMP, NULL, 0); -// if (len == -1 || len >= x) { // compress failed, do not compress this binary data -// comp = false; -// len = (int32_t)x; -// } else { -// comp = true; -// } -// -// tbufWriteUint8(bw, comp); -// tbufWriteUint32(bw, len); -// if (comp) { -// tbufWriteBinary(bw, tmp, len); -// } else { -// tbufWriteBinary(bw, p, len); -// } -// taosMemoryFreeClear(tmp); -//} - -//void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist) { -// SBufferReader br = tbufInitReader(data, len, false); -// -// pDist->numOfTables = tbufReadUint32(&br); -// pDist->numOfFiles = tbufReadUint16(&br); -// pDist->totalSize = tbufReadUint64(&br); -// pDist->totalRows = tbufReadUint64(&br); -// pDist->maxRows = tbufReadInt32(&br); -// pDist->minRows = tbufReadInt32(&br); -// pDist->numOfInmemRows = tbufReadUint32(&br); -// pDist->numOfSmallBlocks = tbufReadUint32(&br); -// int64_t numSteps = tbufReadUint64(&br); -// -// bool comp = tbufReadUint8(&br); -// uint32_t compLen = tbufReadUint32(&br); -// -// size_t originalLen = (size_t) (numSteps *sizeof(SFileBlockInfo)); -// -// char* outputBuf = NULL; -// if (comp) { -// outputBuf = taosMemoryMalloc(originalLen); -// -// size_t actualLen = compLen; -// const char* compStr = tbufReadBinary(&br, &actualLen); -// -// int32_t orignalLen = tsDecompressString(compStr, compLen, 1, outputBuf, -// (int32_t)originalLen , ONE_STAGE_COMP, NULL, 0); -// assert(orignalLen == numSteps *sizeof(SFileBlockInfo)); -// } else { -// outputBuf = (char*) tbufReadBinary(&br, &originalLen); -// } -// -// pDist->dataBlockInfos = taosArrayFromList(outputBuf, (uint32_t)numSteps, sizeof(SFileBlockInfo)); -// if (comp) { -// taosMemoryFreeClear(outputBuf); -// } -//} - diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index c014b23953..b1125c8e80 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -13,28 +13,30 @@ * along with this program. If not, see . */ -#include -#include "dataSinkMgt.h" -#include "texception.h" #include "os.h" -#include "tarray.h" -#include "tcache.h" -#include "tglobal.h" +#include "tref.h" +#include "dataSinkMgt.h" #include "tmsg.h" #include "tudf.h" #include "executor.h" #include "executorimpl.h" #include "query.h" -#include "thash.h" -#include "tlosertree.h" -#include "ttypes.h" + +static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT; +int32_t exchangeObjRefPool = -1; + +static void initRefPool() { + exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); +} int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, - qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) { + qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) { assert(readHandle != NULL && pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; + taosThreadOnce(&initPoolOnce, initRefPool); + int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7d1dbdbcf3..15191bf435 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "tref.h" #include "filter.h" #include "function.h" #include "functionMgt.h" @@ -37,9 +38,6 @@ #include "vnode.h" #define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN) -#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN) -#define IS_REPEAT_SCAN(runtime) ((runtime)->scanFlag == REPEAT_SCAN) -#define SET_MAIN_SCAN_FLAG(runtime) ((runtime)->scanFlag = MAIN_SCAN) #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) #define SDATA_BLOCK_INITIALIZER \ @@ -47,12 +45,6 @@ #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) -enum { - TS_JOIN_TS_EQUAL = 0, - TS_JOIN_TS_NOT_EQUALS = 1, - TS_JOIN_TAG_NOT_EQUALS = 2, -}; - #if 0 static UNUSED_FUNC void *u_malloc (size_t __size) { uint32_t v = taosRand(); @@ -88,7 +80,6 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { #endif #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) -//#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList) #define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; } @@ -110,7 +101,6 @@ static void releaseQueryBuf(size_t numOfTables); static void destroySFillOperatorInfo(void* param, int32_t numOfOutput); static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput); -static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput); static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); @@ -118,7 +108,6 @@ static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput); static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput); static void destroyOperatorInfo(SOperatorInfo* pOperator); -static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput); void doSetOperatorCompleted(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; @@ -562,10 +551,6 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow pCtx[k].input.startRowIndex = offset; pCtx[k].input.numOfRows = forwardStep; - if (tsCol != NULL) { - pCtx[k].ptsList = tsCol; - } - // not a whole block involved in query processing, statistics data can not be used // NOTE: the original value of isSet have been changed here if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) { @@ -1133,40 +1118,6 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, pCtx->increase = false; pCtx->param = pFunct->pParam; - // for (int32_t j = 0; j < pCtx->numOfParams; ++j) { - // // set the order information for top/bottom query - // int32_t functionId = pCtx->functionId; - // if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) { - // int32_t f = getExprFunctionId(&pExpr[0]); - // assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY); - // - // // pCtx->param[2].i = pQueryAttr->order.order; - // // pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; - // // pCtx->param[3].i = functionId; - // // pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT; - // - // // pCtx->param[1].i = pQueryAttr->order.col.info.colId; - // } else if (functionId == FUNCTION_INTERP) { - // // pCtx->param[2].i = (int8_t)pQueryAttr->fillType; - // // if (pQueryAttr->fillVal != NULL) { - // // if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) { - // // pCtx->param[1].nType = TSDB_DATA_TYPE_NULL; - // // } else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value - // // if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) { - // // taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i], - // pCtx->inputBytes, pCtx->inputType); - // // } - // // } - // // } - // } else if (functionId == FUNCTION_TWA) { - // // pCtx->param[1].i = pQueryAttr->window.skey; - // // pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT; - // // pCtx->param[2].i = pQueryAttr->window.ekey; - // // pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; - // } else if (functionId == FUNCTION_ARITHM) { - // // pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i); - // } - // } } for (int32_t i = 1; i < numOfOutput; ++i) { @@ -2438,44 +2389,44 @@ static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQuer #endif } -// static void updateTableIdInfo(STableQueryInfo* pTableQueryInfo, SSDataBlock* pBlock, SHashObj* pTableIdInfo, int32_t -// order) { -// int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); -// pTableQueryInfo->lastKey = ((order == TSDB_ORDER_ASC)? pBlock->info.window.ekey:pBlock->info.window.skey) + step; -// -// if (pTableQueryInfo->pTable == NULL) { -// return; -// } -// -// STableIdInfo tidInfo = createTableIdInfo(pTableQueryInfo); -// STableIdInfo *idinfo = taosHashGet(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid)); -// if (idinfo != NULL) { -// assert(idinfo->tid == tidInfo.tid && idinfo->uid == tidInfo.uid); -// idinfo->key = tidInfo.key; -// } else { -// taosHashPut(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid), &tidInfo, sizeof(STableIdInfo)); -// } -// } +typedef struct SFetchRspHandleWrapper { + uint32_t exchangeId; + int32_t sourceIndex; +} SFetchRspHandleWrapper; int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) { - SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*)param; + SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*) param; + + SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId); + if (pExchangeInfo == NULL) { + qWarn("failed to acquire exchange operator, since it may have been released"); + return TSDB_CODE_SUCCESS; + } + + int32_t index = pWrapper->sourceIndex; + SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index); + if (code == TSDB_CODE_SUCCESS) { pSourceDataInfo->pRsp = pMsg->pData; SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp; pRsp->numOfRows = htonl(pRsp->numOfRows); - pRsp->compLen = htonl(pRsp->compLen); + pRsp->compLen = htonl(pRsp->compLen); pRsp->numOfCols = htonl(pRsp->numOfCols); - pRsp->useconds = htobe64(pRsp->useconds); + pRsp->useconds = htobe64(pRsp->useconds); - ASSERT(pSourceDataInfo->pRsp != NULL); - qDebug("fetch rsp received, index:%d, rows:%d", pSourceDataInfo->index, pRsp->numOfRows); + ASSERT(pRsp != NULL); + qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows); } else { pSourceDataInfo->code = code; } pSourceDataInfo->status = EX_SOURCE_DATA_READY; - tsem_post(&pSourceDataInfo->pEx->ready); + + tsem_post(&pExchangeInfo->ready); + taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId); + + taosMemoryFree(pWrapper); return TSDB_CODE_SUCCESS; } @@ -2524,9 +2475,9 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources); pMsg->header.vgId = htonl(pSource->addr.nodeId); - pMsg->sId = htobe64(pSource->schedId); - pMsg->taskId = htobe64(pSource->taskId); - pMsg->queryId = htobe64(pTaskInfo->id.queryId); + pMsg->sId = htobe64(pSource->schedId); + pMsg->taskId = htobe64(pSource->taskId); + pMsg->queryId = htobe64(pTaskInfo->id.queryId); // send the fetch remote task result reques SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); @@ -2537,11 +2488,15 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf return pTaskInfo->code; } - pMsgSendInfo->param = pDataInfo; + SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper)); + pWrapper->exchangeId = pExchangeInfo->self; + pWrapper->sourceIndex = sourceIndex; + + pMsgSendInfo->param = pWrapper; pMsgSendInfo->msgInfo.pData = pMsg; pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); pMsgSendInfo->msgType = TDMT_VND_FETCH; - pMsgSendInfo->fp = loadRemoteDataCallback; + pMsgSendInfo->fp = loadRemoteDataCallback; int64_t transporterId = 0; int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); @@ -2689,10 +2644,10 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx SSDataBlock* pRes = pExchangeInfo->pResult; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { - qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 - " try next", - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows, - pExchangeInfo->loadInfo.totalRows); + qDebug("%s vgId:%d, taskId:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 + ", completed:%d try next %d/%"PRIzu, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pDataInfo->totalRows, + pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources); pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; completed += 1; taosMemoryFreeClear(pDataInfo->pRsp); @@ -2708,10 +2663,11 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx } if (pRsp->completed == 1) { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows, - pLoadInfo->totalRows, pLoadInfo->totalSize, i + 1, totalSources); + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 + ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 ", completed:%d try next %d/%" PRIzu, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pRes->info.rows, pDataInfo->totalRows, + pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources); + completed += 1; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 @@ -2761,13 +2717,13 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { } int64_t endTs = taosGetTimestampUs(); - qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%" PRId64, GET_TASKID(pTaskInfo), - totalSources, endTs - startTs); + qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo), + totalSources, (endTs - startTs)/1000.0); - tsem_wait(&pExchangeInfo->ready); pOperator->status = OP_RES_TO_RETURN; pOperator->cost.openCost = taosGetTimestampUs() - startTs; + tsem_wait(&pExchangeInfo->ready); return TSDB_CODE_SUCCESS; } @@ -2883,7 +2839,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { } } -static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { +static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) { pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); if (pInfo->pSourceDataInfo == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -2892,11 +2848,10 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { for (int32_t i = 0; i < numOfSources; ++i) { SSourceDataInfo dataInfo = {0}; dataInfo.status = EX_SOURCE_DATA_NOT_READY; - dataInfo.pEx = pInfo; - dataInfo.index = i; - - void* ret = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); - if (ret == NULL) { + dataInfo.taskId = id; + dataInfo.index = i; + SSourceDataInfo *pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); + if (pDs == NULL) { taosArrayDestroy(pInfo->pSourceDataInfo); return TSDB_CODE_OUT_OF_MEMORY; } @@ -2924,7 +2879,9 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* taosArrayPush(pInfo->pSources, pNode); } - return initDataSource(numOfSources, pInfo); + pInfo->self = taosAddRef(exchangeObjRefPool, pInfo); + + return initDataSource(numOfSources, pInfo, id); } SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo) { @@ -2939,18 +2896,18 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode goto _error; } - pInfo->seqLoadData = false; - pInfo->pTransporter = pTransporter; - pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc); tsem_init(&pInfo->ready, 0, 0); - pOperator->name = "ExchangeOperator"; + pInfo->seqLoadData = false; + pInfo->pTransporter = pTransporter; + pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc); + pOperator->name = "ExchangeOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->numOfExprs = pInfo->pResult->info.numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL, NULL, NULL); @@ -2958,7 +2915,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode _error: if (pInfo != NULL) { - destroyExchangeOperatorInfo(pInfo, LIST_LENGTH(pExNode->pSrcEndPoints)); + doDestroyExchangeOperatorInfo(pInfo); } taosMemoryFreeClear(pInfo); @@ -4105,6 +4062,12 @@ static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) { void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { SExchangeInfo* pExInfo = (SExchangeInfo*)param; + taosRemoveRef(exchangeObjRefPool, pExInfo->self); +} + +void doDestroyExchangeOperatorInfo(void* param) { + SExchangeInfo* pExInfo = (SExchangeInfo*) param; + taosArrayDestroy(pExInfo->pSources); taosArrayDestroy(pExInfo->pSourceDataInfo); if (pExInfo->pResult != NULL) { @@ -4322,6 +4285,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); if (pInfo->pFillInfo == NULL || pInfo->p == NULL) { + taosMemoryFree(pInfo->pFillInfo); + taosMemoryFree(pInfo->p); return TSDB_CODE_OUT_OF_MEMORY; } else { return TSDB_CODE_SUCCESS; @@ -4846,11 +4811,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { - qDebug("[******]create Semi"); int32_t children = 0; pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) { - qDebug("[******]create Final"); int32_t children = 1; pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index a48b4080a3..688c576d03 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -332,12 +332,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->binfo.resultRowInfo); - // if (!stableQuery) { // finalize include the update of result rows - // finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs); - // } else { - // updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfExprs, &pInfo->binfo.resultRowInfo, - // pInfo->binfo.rowCellInfoOffset); - // } + #if 0 if(pOperator->fpSet.encodeResultRow){ char *result = NULL; @@ -378,8 +373,9 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { return (rows == 0)? NULL:pRes; } -SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, - SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, + SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) { SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -407,7 +403,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Groupby; pOperator->pExpr = pExprInfo; - pOperator->numOfExprs = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; @@ -669,8 +665,8 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFree(pInfo->columnOffset); } -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, - SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo) { SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 738c5b2b26..0fcf52b1a4 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -17,6 +17,7 @@ #include "functionMgt.h" #include "tdatablock.h" #include "ttime.h" +#include "tfill.h" typedef enum SResultTsInterpType { RESULT_ROW_START_INTERP = 1, @@ -30,18 +31,18 @@ static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult); static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult); -/* - * There are two cases to handle: - * - * 1. Query range is not set yet (queryRangeSet = 0). we need to set the query range info, including - * pQueryAttr->lastKey, pQueryAttr->window.skey, and pQueryAttr->eKey. - * 2. Query range is set and query is in progress. There may be another result with the same query ranges to be - * merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there - * is a previous result generated or not. - */ -static void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWindow* pQRange) { - // do nothing -} +///* +// * There are two cases to handle: +// * +// * 1. Query range is not set yet (queryRangeSet = 0). we need to set the query range info, including +// * pQueryAttr->lastKey, pQueryAttr->window.skey, and pQueryAttr->eKey. +// * 2. Query range is set and query is in progress. There may be another result with the same query ranges to be +// * merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there +// * is a previous result generated or not. +// */ +//static void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWindow* pQRange) { +// // do nothing +//} static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; } @@ -327,8 +328,7 @@ void doTimeWindowInterpolation(SIntervalAggOperatorInfo* pInfo, int32_t numOfExp int32_t index = 1; for (int32_t k = 0; k < numOfExprs; ++k) { - // todo use flag instead of function name - if (strcmp(pCtx[k].pExpr->pExpr->_function.functionName, "twa") != 0) { + if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) { pCtx[k].start.key = INT64_MIN; continue; } @@ -341,7 +341,7 @@ void doTimeWindowInterpolation(SIntervalAggOperatorInfo* pInfo, int32_t numOfExp SFunctParam* pParam = &pCtx[k].param[0]; SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId); - ASSERT(pColInfo->info.colId == pParam->pCol->colId && curTs != windowKey); + ASSERT(pColInfo->info.type == pParam->pCol->type && curTs != windowKey); double v1 = 0, v2 = 0, v = 0; if (prevRowIndex == -1) { @@ -958,9 +958,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, scanFlag, true); - STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; - - setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL); #if 0 // test for encode/decode result info @@ -1414,7 +1411,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt for (int32_t i = 0; i < numOfCols; ++i) { SExprInfo* pExpr = pCtx[i].pExpr; - if (strcmp(pExpr->pExpr->_function.functionName, "twa") == 0) { + if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) { SFunctParam* pParam = &pExpr->base.pParam[0]; SColumn c = *pParam->pCol; @@ -1475,11 +1472,9 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->timeWindowInterpo = timeWindowinterpNeeded(pInfo->binfo.pCtx, numOfCols, pInfo); if (pInfo->timeWindowInterpo) { pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); - } - - // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); - if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { - goto _error; + if (pInfo->binfo.resultRowInfo.openWindow == NULL) { + goto _error; + } } initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); @@ -1694,24 +1689,44 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { return (rows == 0) ? NULL : pBInfo->pRes; } -static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { +static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock) { + int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + + // null data should not be kept since it can not be used to perform interpolation + if (!colDataIsNull_s(pColInfoData, i)) { + SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, i); + + pkey->isNull = false; + char* val = colDataGetData(pColInfoData, i); + memcpy(pkey->pData, val, pkey->bytes); + } + } +} + +static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } STimeSliceOperatorInfo* pSliceInfo = pOperator->info; - if (pOperator->status == OP_RES_TO_RETURN) { - // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); - if (pSliceInfo->binfo.pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) { - doSetOperatorCompleted(pOperator); - } + SSDataBlock* pResBlock = pSliceInfo->binfo.pRes; - return pSliceInfo->binfo.pRes; - } +// if (pOperator->status == OP_RES_TO_RETURN) { +// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); +// if (pResBlock->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) { +// doSetOperatorCompleted(pOperator); +// } +// +// return pResBlock; +// } - int32_t order = TSDB_ORDER_ASC; + int32_t order = TSDB_ORDER_ASC; + SInterval* pInterval = &pSliceInfo->interval; SOperatorInfo* downstream = pOperator->pDownstream[0]; + int32_t numOfRows = 0; while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -1720,48 +1735,198 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true); - // hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0); + + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, 0); + for(int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t ts = *(int64_t*) colDataGetData(pTsCol, i); + + if (ts == pSliceInfo->current) { + for(int32_t j = 0; j < pOperator->numOfExprs; ++j) { + SExprInfo* pExprInfo = &pOperator->pExpr[j]; + int32_t dstSlot = pExprInfo->base.resSchema.slotId; + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + + SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); + SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlot); + + char* v = colDataGetData(pSrc, i); + colDataAppend(pDst, numOfRows, v, false); + } + + numOfRows += 1; + + pSliceInfo->current += taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + if (pSliceInfo->current > pSliceInfo->win.ekey) { + doSetOperatorCompleted(pOperator); + break; + } + } else if (ts < pSliceInfo->current) { + if (i != pBlock->info.window.ekey) { + int64_t nextTs = *(int64_t*) colDataGetData(pTsCol, i + 1); + if (nextTs > pSliceInfo->current) { + // output the result + for (int32_t j = 0; j < pOperator->numOfExprs; ++j) { + SExprInfo* pExprInfo = &pOperator->pExpr[j]; + int32_t dstSlot = pExprInfo->base.resSchema.slotId; + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + + SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); + SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlot); + + switch (pSliceInfo->fillType) { + case TSDB_FILL_NULL: + colDataAppendNULL(pDst, numOfRows); + break; + + case TSDB_FILL_SET_VALUE: { + SVariant* pVar = &pSliceInfo->pFillColInfo[i].fillVal; + + if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { + float v = 0; + GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); + colDataAppend(pDst, numOfRows, (char*)&v, false); + } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) { + double v = 0; + GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); + colDataAppend(pDst, numOfRows, (char*)&v, false); + } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) { + int64_t v = 0; + GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); + colDataAppend(pDst, numOfRows, (char*)&v, false); + } + } + break; + + case TSDB_FILL_LINEAR: +#if 0 + if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs + || pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) { +// goto interp_exit; + } + + double v1 = -1, v2 = -1; + GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val); + GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val); + + SPoint point1 = {.key = ts, .val = &v1}; + SPoint point2 = {.key = nextTs, .val = &v2}; + SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput}; + + int32_t srcType = pCtx->inputType; + if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) { + setNull(pCtx->pOutput, srcType, pCtx->inputBytes); + } else { + bool exceedMax = false, exceedMin = false; + taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin); + if (exceedMax || exceedMin) { + __compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0); + if (func(&pCtx->start.val, &pCtx->end.val) <= 0) { + COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val); + } else { + COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val); + } + } + } +#endif + break; + + case TSDB_FILL_PREV: { + SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot); + colDataAppend(pDst, numOfRows, pkey->pData, false); + } break; + + case TSDB_FILL_NEXT: { + } break; + + case TSDB_FILL_NONE: + default: + break; + } + + pSliceInfo->current += + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + if (pSliceInfo->current > pSliceInfo->win.ekey) { + doSetOperatorCompleted(pOperator); + break; + } + } + } else { + // ignore current row, and do nothing + } + } else { // it is the last row of current block + doKeepPrevRows(pSliceInfo, pBlock); + } + } + } } // restore the value - pOperator->status = OP_RES_TO_RETURN; - closeAllResultRows(&pSliceInfo->binfo.resultRowInfo); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - // finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfExprs); - - // initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo); - // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes); - - if (pSliceInfo->binfo.pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) { + if (pResBlock->info.rows == 0) { pOperator->status = OP_EXEC_DONE; } - return pSliceInfo->binfo.pRes->info.rows == 0 ? NULL : pSliceInfo->binfo.pRes; + return pResBlock->info.rows == 0 ? NULL : pResBlock; +} + +static int32_t initTimesliceInfo(STimeSliceOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfCols) { + pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys)); + pInfo->pCols = taosArrayInit(4, sizeof(SColumn)); + + if (pInfo->pPrevRow == NULL || pInfo->pCols == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SExprInfo* pExpr = pCtx[i].pExpr; + + SFunctParam* pParam = &pExpr->base.pParam[0]; + + SColumn c = *pParam->pCol; + taosArrayPush(pInfo->pCols, &c); + + SGroupKeys key = {0}; + key.bytes = c.bytes; + key.type = c.type; + key.isNull = false; + key.pData = taosMemoryCalloc(1, c.bytes); + taosArrayPush(pInfo->pPrevRow, &key); + } + + return TSDB_CODE_SUCCESS; } SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo) { + SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo) { STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pOperator == NULL || pInfo == NULL) { goto _error; } + int32_t code = initTimesliceInfo(pInfo, pInfo->binfo.pCtx, numOfCols); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); + pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfCols, pValNode); - pOperator->name = "TimeSliceOperator"; + pInfo->binfo.pRes = pResultBlock; + + pOperator->name = "TimeSliceOperator"; // pOperator->operatorType = OP_AllTimeWindow; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExprInfo; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; pOperator->numOfExprs = numOfCols; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doAllIntervalAgg, NULL, NULL, destroyBasicOperatorInfo, + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyBasicOperatorInfo, NULL, NULL, NULL); - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); return pOperator; _error: @@ -3344,9 +3509,6 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { getTableScanInfo(pOperator, &iaInfo->order, &scanFlag); setInputDataBlock(pOperator, iaInfo->binfo.pCtx, pBlock, iaInfo->order, scanFlag, true); - STableQueryInfo* pTableQueryInfo = iaInfo->pCurrent; - - setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); if (pRes->info.rows >= pOperator->resultInfo.threshold) { diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index f3060243ed..c243c1c175 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -97,6 +97,10 @@ bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); int32_t diffFunction(SqlFunctionCtx *pCtx); +bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool derivativeFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); +int32_t derivativeFunction(SqlFunctionCtx *pCtx); + bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t firstFunction(SqlFunctionCtx *pCtx); int32_t firstFunctionMerge(SqlFunctionCtx *pCtx); diff --git a/source/libs/function/inc/functionMgtInt.h b/source/libs/function/inc/functionMgtInt.h index 6fefcceb87..1443c26820 100644 --- a/source/libs/function/inc/functionMgtInt.h +++ b/source/libs/function/inc/functionMgtInt.h @@ -42,7 +42,8 @@ extern "C" { #define FUNC_MGT_SELECT_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(13) #define FUNC_MGT_REPEAT_SCAN_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(14) #define FUNC_MGT_FORBID_FILL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(15) -#define FUNC_MGT_FORBID_STREAM_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(16) +#define FUNC_MGT_INTERVAL_INTERPO_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(16) +#define FUNC_MGT_FORBID_STREAM_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(17) #define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0) diff --git a/source/libs/function/inc/texpr.h b/source/libs/function/inc/texpr.h index eb4ab1b6e3..68aedcb5a2 100644 --- a/source/libs/function/inc/texpr.h +++ b/source/libs/function/inc/texpr.h @@ -60,12 +60,6 @@ typedef struct SExprTraverseSupp { void *pExtInfo; } SExprTraverseSupp; -tExprNode* exprTreeFromTableName(const char* tbnameCond); - -bool exprTreeApplyFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param); - -void buildFilterSetFromBinary(void **q, const char *buf, int32_t len); - #ifdef __cplusplus } #endif diff --git a/source/libs/function/inc/tunaryoperator.h b/source/libs/function/inc/tunaryoperator.h deleted file mode 100644 index cd40297e07..0000000000 --- a/source/libs/function/inc/tunaryoperator.h +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_COMMON_UNARY_SCALAR_OPERATOR_H_ -#define _TD_COMMON_UNARY_SCALAR_OPERATOR_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -//#include "tscalarfunction.h" - -//typedef void (*_unary_scalar_fn_t)(SScalarParam *pLeft, SScalarParam* pOutput); -//_unary_scalar_fn_t getUnaryScalarOperatorFn(int32_t binOperator); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_COMMON_BIN_SCALAR_OPERATOR_H_*/ diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 62a0c663b6..b0741908a6 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -997,6 +997,38 @@ static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len return TSDB_CODE_SUCCESS; } +static int32_t translateDerivative(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (3 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + + // param1 + SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1); + if (QUERY_NODE_VALUE != nodeType(pParamNode1)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + SValueNode* pValue = (SValueNode*)pParamNode1; + pValue->notReserved = true; + + if (!IS_NUMERIC_TYPE(colType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + SNode* pParamNode2 = nodesListGetNode(pFunc->pParameterList, 2); + SValueNode* pValue2 = (SValueNode*)pParamNode2; + pValue2->notReserved = true; + + if (pValue2->datum.i != 0 && pValue2->datum.i != 1) { + return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; + return TSDB_CODE_SUCCESS; +} + static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // first(col_list) will be rewritten as first(col) if (1 != LIST_LENGTH(pFunc->pParameterList)) { @@ -1596,6 +1628,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_AVG, .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateInNumOutDou, + .dataRequiredFunc = statisDataRequired, .getEnvFunc = getAvgFuncEnv, .initFunc = avgFunctionSetup, .processFunc = avgFunction, @@ -1793,7 +1826,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "elapsed", .type = FUNCTION_TYPE_ELAPSED, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .dataRequiredFunc = statisDataRequired, .translateFunc = translateElapsed, .getEnvFunc = getElapsedFuncEnv, @@ -1831,6 +1864,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .invertFunc = NULL, .combineFunc = elapsedCombine, }, + { + .name = "interp", + .type = FUNCTION_TYPE_INTERP, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC, + .translateFunc = translateFirstLast, + .getEnvFunc = getSelectivityFuncEnv, + .initFunc = functionSetup, + .processFunc = NULL, + .finalizeFunc = NULL + }, + { + .name = "derivative", + .type = FUNCTION_TYPE_DERIVATIVE, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateDerivative, + .getEnvFunc = getDerivativeFuncEnv, + .initFunc = derivativeFuncSetup, + .processFunc = derivativeFunction, + .finalizeFunc = functionFinalize + }, { .name = "last_row", .type = FUNCTION_TYPE_LAST_ROW, @@ -1914,8 +1967,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "twa", .type = FUNCTION_TYPE_TWA, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateInNumOutDou, + .dataRequiredFunc = statisDataRequired, .getEnvFunc = getTwaFuncEnv, .initFunc = twaFunctionSetup, .processFunc = twaFunction, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index de17c0f32c..90700580bd 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -990,9 +990,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { index = pInput->pColumnDataAgg[0]->maxIndex; } - // the index is the original position, not the relative position - TSKEY key = (pCtx->ptsList != NULL) ? pCtx->ptsList[index] : TSKEY_INITIAL_VAL; - if (!pBuf->assign) { pBuf->v = *(int64_t*)tval; if (pCtx->subsidiaries.num > 0) { @@ -3424,7 +3421,7 @@ bool elapsedFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo pInfo->min = MAX_TS_KEY; pInfo->max = 0; - if (pCtx->numOfParams == 2) { + if (pCtx->numOfParams > 2) { pInfo->timeUnit = pCtx->param[1].param.i; } else { pInfo->timeUnit = 1; @@ -3474,8 +3471,8 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) { SColumnInfoData* pCol = pInput->pData[0]; - int32_t start = pInput->startRowIndex; - TSKEY* ptsList = (int64_t*)colDataGetData(pCol, start); + int32_t start = pInput->startRowIndex; + TSKEY* ptsList = (int64_t*)colDataGetData(pCol, 0); if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->start.key == INT64_MIN) { pInfo->max = @@ -5166,3 +5163,198 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return row; } + +typedef struct SDerivInfo { + double prevValue; // previous value + TSKEY prevTs; // previous timestamp + bool ignoreNegative;// ignore the negative value + int64_t tsWindow; // time window for derivative + bool valueSet; // the value has been set already +} SDerivInfo; + +bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(SDerivInfo); + return true; +} + +bool derivativeFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo) { + if (!functionSetup(pCtx, pResInfo)) { + return false; // not initialized since it has been initialized + } + + SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo); + + pDerivInfo->ignoreNegative = pCtx->param[2].param.i; + pDerivInfo->prevTs = -1; + pDerivInfo->tsWindow = pCtx->param[1].param.i; + pDerivInfo->valueSet = false; + return true; +} + +int32_t derivativeFunction(SqlFunctionCtx *pCtx) { + SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); + SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pInputCol = pInput->pData[0]; + + int32_t numOfElems = 0; + SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; + SColumnInfoData* pTsOutput = pCtx->pTsOutput; + + int32_t i = pInput->startRowIndex; + TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + + double v = 0; + + if (pCtx->order == TSDB_ORDER_ASC) { + for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + continue; + } + + char* d = (char*)pInputCol->pData + pInputCol->info.bytes * i; + GET_TYPED_DATA(v, double, pInputCol->info.type, d); + + int32_t pos = pCtx->offset + numOfElems; + if (!pDerivInfo->valueSet) { // initial value is not set yet + pDerivInfo->valueSet = true; + } else { + double r = ((v - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs); + if (pDerivInfo->ignoreNegative && r < 0) { + } else { + colDataAppend(pOutput, pos, (const char*)&r, false); + if (pTsOutput != NULL) { + colDataAppendInt64(pTsOutput, pos, &tsList[i]); + } + numOfElems++; + } + } + + pDerivInfo->prevValue = v; + pDerivInfo->prevTs = tsList[i]; + } + } else { + for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + continue; + } + + char* d = (char*)pInputCol->pData + pInputCol->info.bytes * i; + GET_TYPED_DATA(v, double, pInputCol->info.type, d); + + int32_t pos = pCtx->offset + numOfElems; + if (!pDerivInfo->valueSet) { // initial value is not set yet + pDerivInfo->valueSet = true; + } else { + double r = ((pDerivInfo->prevValue - v) * pDerivInfo->tsWindow) / (pDerivInfo->prevTs - tsList[i]); + if (pDerivInfo->ignoreNegative && r < 0) { + } else { + colDataAppend(pOutput, pos, (const char*)&r, false); + if (pTsOutput != NULL) { + colDataAppendInt64(pTsOutput, pos, &pDerivInfo->prevTs); + } + numOfElems++; + } + } + + pDerivInfo->prevValue = v; + pDerivInfo->prevTs = tsList[i]; + } + } + + return numOfElems; +} + +int32_t interpFunction(SqlFunctionCtx *pCtx) { +#if 0 + int32_t fillType = (int32_t) pCtx->param[2].i64; + //bool ascQuery = (pCtx->order == TSDB_ORDER_ASC); + + if (pCtx->start.key == pCtx->startTs) { + assert(pCtx->start.key != INT64_MIN); + + COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->start.val); + + goto interp_success_exit; + } else if (pCtx->end.key == pCtx->startTs && pCtx->end.key != INT64_MIN && fillType == TSDB_FILL_NEXT) { + COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->end.val); + + goto interp_success_exit; + } + + switch (fillType) { + case TSDB_FILL_NULL: + setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + break; + + case TSDB_FILL_SET_VALUE: + tVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true); + break; + + case TSDB_FILL_LINEAR: + if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs + || pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) { + goto interp_exit; + } + + double v1 = -1, v2 = -1; + GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val); + GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val); + + SPoint point1 = {.key = pCtx->start.key, .val = &v1}; + SPoint point2 = {.key = pCtx->end.key, .val = &v2}; + SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput}; + + int32_t srcType = pCtx->inputType; + if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) { + setNull(pCtx->pOutput, srcType, pCtx->inputBytes); + } else { + bool exceedMax = false, exceedMin = false; + taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin); + if (exceedMax || exceedMin) { + __compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0); + if (func(&pCtx->start.val, &pCtx->end.val) <= 0) { + COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val); + } else { + COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val); + } + } + } + break; + + case TSDB_FILL_PREV: + if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs) { + goto interp_exit; + } + + COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->start.val); + break; + + case TSDB_FILL_NEXT: + if (pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) { + goto interp_exit; + } + + COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->end.val); + break; + + case TSDB_FILL_NONE: + // do nothing + default: + goto interp_exit; + } + + + interp_success_exit: + *(TSKEY*)pCtx->ptsOutputBuf = pCtx->startTs; + INC_INIT_VAL(pCtx, 1); + + interp_exit: + pCtx->start.key = INT64_MIN; + pCtx->end.key = INT64_MIN; + pCtx->endTs = pCtx->startTs; +#endif + + return TSDB_CODE_SUCCESS; +} \ No newline at end of file diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index e9c916034d..0c762513cb 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -161,6 +161,8 @@ bool fmIsUserDefinedFunc(int32_t funcId) { return funcId > FUNC_UDF_ID_START; } bool fmIsForbidFillFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_FILL_FUNC); } +bool fmIsIntervalInterpoFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_INTERVAL_INTERPO_FUNC); } + bool fmIsForbidStreamFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_STREAM_FUNC); } void fmFuncMgtDestroy() { diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index b310b1a8bb..47ae07f823 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -161,13 +161,13 @@ typedef struct SRateInfo { bool isIRate; // true for IRate functions, false for Rate functions } SRateInfo; -typedef struct SDerivInfo { - double prevValue; // previous value - TSKEY prevTs; // previous timestamp - bool ignoreNegative;// ignore the negative value - int64_t tsWindow; // time window for derivative - bool valueSet; // the value has been set already -} SDerivInfo; +//typedef struct SDerivInfo { +// double prevValue; // previous value +// TSKEY prevTs; // previous timestamp +// bool ignoreNegative;// ignore the negative value +// int64_t tsWindow; // time window for derivative +// bool valueSet; // the value has been set already +//} SDerivInfo; typedef struct SResPair { TSKEY key; diff --git a/source/libs/function/src/texpr.c b/source/libs/function/src/texpr.c index 703b19ced7..f04b4f17f9 100644 --- a/source/libs/function/src/texpr.c +++ b/source/libs/function/src/texpr.c @@ -48,34 +48,6 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) { *pExpr = NULL; } -bool exprTreeApplyFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param) { -#if 0 - //non-leaf nodes, recursively traverse the expression tree in the post-root order - if (pLeft->nodeType == TEXPR_BINARYEXPR_NODE && pRight->nodeType == TEXPR_BINARYEXPR_NODE) { - if (pExpr->_node.optr == LOGIC_COND_TYPE_OR) { // or - if (exprTreeApplyFilter(pLeft, pItem, param)) { - return true; - } - - // left child does not satisfy the query condition, try right child - return exprTreeApplyFilter(pRight, pItem, param); - } else { // and - if (!exprTreeApplyFilter(pLeft, pItem, param)) { - return false; - } - - return exprTreeApplyFilter(pRight, pItem, param); - } - } - - // handle the leaf node - param->setupInfoFn(pExpr, param->pExtInfo); - return param->nodeFilterFn(pItem, pExpr->_node.info); -#endif - - return 0; -} - // TODO: these three functions should be made global static void* exception_calloc(size_t nmemb, size_t size) { void* p = taosMemoryCalloc(nmemb, size); @@ -101,214 +73,3 @@ static UNUSED_FUNC char* exception_strdup(const char* str) { return p; } -void buildFilterSetFromBinary(void **q, const char *buf, int32_t len) { - SBufferReader br = tbufInitReader(buf, len, false); - uint32_t type = tbufReadUint32(&br); - SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(type), true, false); - -// taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(type)); - - int dummy = -1; - int32_t sz = tbufReadInt32(&br); - for (int32_t i = 0; i < sz; i++) { - if (type == TSDB_DATA_TYPE_BOOL || IS_SIGNED_NUMERIC_TYPE(type)) { - int64_t val = tbufReadInt64(&br); - taosHashPut(pObj, (char *)&val, sizeof(val), &dummy, sizeof(dummy)); - } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { - uint64_t val = tbufReadUint64(&br); - taosHashPut(pObj, (char *)&val, sizeof(val), &dummy, sizeof(dummy)); - } - else if (type == TSDB_DATA_TYPE_TIMESTAMP) { - int64_t val = tbufReadInt64(&br); - taosHashPut(pObj, (char *)&val, sizeof(val), &dummy, sizeof(dummy)); - } else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) { - double val = tbufReadDouble(&br); - taosHashPut(pObj, (char *)&val, sizeof(val), &dummy, sizeof(dummy)); - } else if (type == TSDB_DATA_TYPE_BINARY) { - size_t t = 0; - const char *val = tbufReadBinary(&br, &t); - taosHashPut(pObj, (char *)val, t, &dummy, sizeof(dummy)); - } else if (type == TSDB_DATA_TYPE_NCHAR) { - size_t t = 0; - const char *val = tbufReadBinary(&br, &t); - taosHashPut(pObj, (char *)val, t, &dummy, sizeof(dummy)); - } - } - *q = (void *)pObj; -} - -void convertFilterSetFromBinary(void **q, const char *buf, int32_t len, uint32_t tType) { - SBufferReader br = tbufInitReader(buf, len, false); - uint32_t sType = tbufReadUint32(&br); - SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(tType), true, false); - -// taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(tType)); - - int dummy = -1; - SVariant tmpVar = {0}; - size_t t = 0; - int32_t sz = tbufReadInt32(&br); - void *pvar = NULL; - int64_t val = 0; - int32_t bufLen = 0; - if (IS_NUMERIC_TYPE(sType)) { - bufLen = 60; // The maximum length of string that a number is converted to. - } else { - bufLen = 128; - } - - char *tmp = taosMemoryCalloc(1, bufLen * TSDB_NCHAR_SIZE); - - for (int32_t i = 0; i < sz; i++) { - switch (sType) { - case TSDB_DATA_TYPE_BOOL: - case TSDB_DATA_TYPE_UTINYINT: - case TSDB_DATA_TYPE_TINYINT: { - *(uint8_t *)&val = (uint8_t)tbufReadInt64(&br); - t = sizeof(val); - pvar = &val; - break; - } - case TSDB_DATA_TYPE_USMALLINT: - case TSDB_DATA_TYPE_SMALLINT: { - *(uint16_t *)&val = (uint16_t)tbufReadInt64(&br); - t = sizeof(val); - pvar = &val; - break; - } - case TSDB_DATA_TYPE_UINT: - case TSDB_DATA_TYPE_INT: { - *(uint32_t *)&val = (uint32_t)tbufReadInt64(&br); - t = sizeof(val); - pvar = &val; - break; - } - case TSDB_DATA_TYPE_TIMESTAMP: - case TSDB_DATA_TYPE_UBIGINT: - case TSDB_DATA_TYPE_BIGINT: { - *(uint64_t *)&val = (uint64_t)tbufReadInt64(&br); - t = sizeof(val); - pvar = &val; - break; - } - case TSDB_DATA_TYPE_DOUBLE: { - *(double *)&val = tbufReadDouble(&br); - t = sizeof(val); - pvar = &val; - break; - } - case TSDB_DATA_TYPE_FLOAT: { - *(float *)&val = (float)tbufReadDouble(&br); - t = sizeof(val); - pvar = &val; - break; - } - case TSDB_DATA_TYPE_BINARY: { - pvar = (char *)tbufReadBinary(&br, &t); - break; - } - case TSDB_DATA_TYPE_NCHAR: { - pvar = (char *)tbufReadBinary(&br, &t); - break; - } - default: - taosHashCleanup(pObj); - *q = NULL; - return; - } - - taosVariantCreateFromBinary(&tmpVar, (char *)pvar, t, sType); - - if (bufLen < t) { - tmp = taosMemoryRealloc(tmp, t * TSDB_NCHAR_SIZE); - bufLen = (int32_t)t; - } - - switch (tType) { - case TSDB_DATA_TYPE_BOOL: - case TSDB_DATA_TYPE_UTINYINT: - case TSDB_DATA_TYPE_TINYINT: { - if (taosVariantDump(&tmpVar, (char *)&val, tType, false)) { - goto err_ret; - } - pvar = &val; - t = sizeof(val); - break; - } - case TSDB_DATA_TYPE_USMALLINT: - case TSDB_DATA_TYPE_SMALLINT: { - if (taosVariantDump(&tmpVar, (char *)&val, tType, false)) { - goto err_ret; - } - pvar = &val; - t = sizeof(val); - break; - } - case TSDB_DATA_TYPE_UINT: - case TSDB_DATA_TYPE_INT: { - if (taosVariantDump(&tmpVar, (char *)&val, tType, false)) { - goto err_ret; - } - pvar = &val; - t = sizeof(val); - break; - } - case TSDB_DATA_TYPE_TIMESTAMP: - case TSDB_DATA_TYPE_UBIGINT: - case TSDB_DATA_TYPE_BIGINT: { - if (taosVariantDump(&tmpVar, (char *)&val, tType, false)) { - goto err_ret; - } - pvar = &val; - t = sizeof(val); - break; - } - case TSDB_DATA_TYPE_DOUBLE: { - if (taosVariantDump(&tmpVar, (char *)&val, tType, false)) { - goto err_ret; - } - pvar = &val; - t = sizeof(val); - break; - } - case TSDB_DATA_TYPE_FLOAT: { - if (taosVariantDump(&tmpVar, (char *)&val, tType, false)) { - goto err_ret; - } - pvar = &val; - t = sizeof(val); - break; - } - case TSDB_DATA_TYPE_BINARY: { - if (taosVariantDump(&tmpVar, tmp, tType, true)) { - goto err_ret; - } - t = varDataLen(tmp); - pvar = varDataVal(tmp); - break; - } - case TSDB_DATA_TYPE_NCHAR: { - if (taosVariantDump(&tmpVar, tmp, tType, true)) { - goto err_ret; - } - t = varDataLen(tmp); - pvar = varDataVal(tmp); - break; - } - default: - goto err_ret; - } - - taosHashPut(pObj, (char *)pvar, t, &dummy, sizeof(dummy)); - taosVariantDestroy(&tmpVar); - memset(&tmpVar, 0, sizeof(tmpVar)); - } - - *q = (void *)pObj; - pObj = NULL; - -err_ret: - taosVariantDestroy(&tmpVar); - taosHashCleanup(pObj); - taosMemoryFreeClear(tmp); -} diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index 10a5475555..1331c37507 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -53,19 +53,19 @@ typedef struct SCacheEntry { SCacheNode *next; } SCacheEntry; -typedef struct STrashElem { +struct STrashElem { struct STrashElem *prev; struct STrashElem *next; SCacheNode *pData; -} STrashElem; +}; -typedef struct SCacheIter { +struct SCacheIter { SCacheObj *pCacheObj; SCacheNode **pCurrent; int32_t entryIndex; int32_t index; int32_t numOfObj; -} SCacheIter; +}; /* * to accommodate the old data which has the same key value of new one in hashList diff --git a/tests/system-test/2-query/json_tag.py b/tests/system-test/2-query/json_tag.py index b6890a8503..957e916e34 100644 --- a/tests/system-test/2-query/json_tag.py +++ b/tests/system-test/2-query/json_tag.py @@ -543,7 +543,7 @@ class TDTestCase: tdSql.checkData(0, 0, 1.5) #tdSql.query("select last_row(dataint) from jsons1 where jtag->'tag1'>1") #tdSql.checkData(0, 0, 11) - tdSql.error("select interp(dataint) from jsons1 where ts = '2020-06-02 09:17:08.000' and jtag->'tag1'>1") + #tdSql.error("select interp(dataint) from jsons1 where ts = '2020-06-02 09:17:08.000' and jtag->'tag1'>1") # # #test calculation function:diff/derivative/spread/ceil/floor/round/ #tdSql.error("select diff(dataint) from jsons1 where jtag->'tag1'>1") diff --git a/tools/taosadapter b/tools/taosadapter deleted file mode 160000 index 9ce3f5c98e..0000000000 --- a/tools/taosadapter +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 9ce3f5c98ef95d9c7c596c4ed7302b0ed69a92b2