enh(query): add interp function.

This commit is contained in:
Haojun Liao 2022-06-14 11:54:13 +08:00
parent 05c917d351
commit 531b85ce22
12 changed files with 225 additions and 314 deletions

View File

@ -132,7 +132,6 @@ typedef struct SqlFunctionCtx {
char *pOutput; // final result output buffer, point to sdata->data
int32_t numOfParams;
SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t *ptsList; // corresponding timestamp array list, todo remove it
SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int32_t offset;
struct SResultRowEntryInfo *resultInfo;

View File

@ -176,6 +176,7 @@ bool fmIsRepeatScanFunc(int32_t funcId);
bool fmIsUserDefinedFunc(int32_t funcId);
bool fmIsDistExecFunc(int32_t funcId);
bool fmIsForbidFillFunc(int32_t funcId);
bool fmIsIntervalInterpoFunc(int32_t funcId);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);

View File

@ -44,7 +44,6 @@
typedef struct SGroupResInfo {
int32_t index;
SArray* pRows; // SArray<SResKeyPos>
int32_t position;
} SGroupResInfo;
typedef struct SResultRow {
@ -71,9 +70,7 @@ typedef struct SResKeyPos {
} SResKeyPos;
typedef struct SResultRowInfo {
SResultRowPosition *pPosition; // todo remove this
int32_t size; // number of result set
int32_t capacity; // max capacity
SResultRowPosition cur;
SList* openWindow;
} SResultRowInfo;

View File

@ -472,10 +472,8 @@ typedef struct SIntervalAggOperatorInfo {
bool timeWindowInterpo; // interpolation needed or not
char** pRow; // previous row/tuple of already processed datablock
SArray* pInterpCols; // interpolation columns
STableQueryInfo* pCurrent; // current tableQueryInfo struct
int32_t order; // current SSDataBlock scan order
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
STimeWindowAggSupp twAggSup;
bool invertible;
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
@ -504,8 +502,6 @@ typedef struct SAggOperatorInfo {
STableQueryInfo *current;
uint64_t groupId;
SGroupResInfo groupResInfo;
STableQueryInfo *pTableQueryInfo;
SExprInfo *pScalarExprInfo;
int32_t numOfScalarExpr; // the number of scalar expression before the aggregate function can be applied
SqlFunctionCtx *pScalarCtx; // scalar function requried sql function struct.
@ -640,8 +636,12 @@ typedef struct SStreamSessionAggOperatorInfo {
typedef struct STimeSliceOperatorInfo {
SOptrBasicInfo binfo;
STimeWindow win;
SInterval interval;
int64_t current;
SGroupResInfo groupResInfo; // multiple results build supporter
SArray* pPrevRow; // SArray<SGroupValue>
SArray* pCols; // SArray<SColumn>
} STimeSliceOperatorInfo;
typedef struct SStateWindowOperatorInfo {

View File

@ -41,13 +41,7 @@ int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size) {
pResultRowInfo->size = 0;
pResultRowInfo->capacity = size;
pResultRowInfo->cur.pageId = -1;
pResultRowInfo->pPosition = taosMemoryCalloc(pResultRowInfo->capacity, sizeof(SResultRowPosition));
if (pResultRowInfo->pPosition == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
@ -56,25 +50,14 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) {
return;
}
if (pResultRowInfo->capacity == 0) {
// assert(pResultRowInfo->pResult == NULL);
return;
}
for(int32_t i = 0; i < pResultRowInfo->size; ++i) {
// if (pResultRowInfo->pResult[i]) {
// taosMemoryFreeClear(pResultRowInfo->pResult[i]->key);
// }
}
taosMemoryFreeClear(pResultRowInfo->pPosition);
}
void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) {
if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0) {
return;
}
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
// SResultRow *pWindowRes = pResultRowInfo->pResult[i];
// clearResultRow(pRuntimeEnv, pWindowRes);
@ -288,232 +271,3 @@ void orderTheResultRows(STaskRuntimeEnv* pRuntimeEnv) {
taosArraySort(pRuntimeEnv->pResultRowArrayList, fn);
}
static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, uint64_t groupId, int32_t* rowCellInfoOffset) {
if (pGroupResInfo->pRows == NULL) {
pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES);
}
size_t len = taosArrayGetSize(pRuntimeEnv->pResultRowArrayList);
for(; pGroupResInfo->position < len; ++pGroupResInfo->position) {
SResultRowCell* pResultRowCell = taosArrayGet(pRuntimeEnv->pResultRowArrayList, pGroupResInfo->position);
if (pResultRowCell->groupId != groupId) {
break;
}
int64_t num = 0;//getNumOfResultWindowRes(pRuntimeEnv, &pResultRowCell->pos, rowCellInfoOffset);
if (num <= 0) {
continue;
}
taosArrayPush(pGroupResInfo->pRows, &pResultRowCell->pos);
// pResultRowCell->pRow->numOfRows = (uint32_t) num;
}
return TSDB_CODE_SUCCESS;
}
static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList,
int32_t* rowCellInfoOffset) {
bool ascQuery = true;
#if 0
int32_t code = TSDB_CODE_SUCCESS;
int32_t *posList = NULL;
SMultiwayMergeTreeInfo *pTree = NULL;
STableQueryInfo **pTableQueryInfoList = NULL;
size_t size = taosArrayGetSize(pTableList);
if (pGroupResInfo->pRows == NULL) {
pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES);
}
posList = taosMemoryCalloc(size, sizeof(int32_t));
pTableQueryInfoList = taosMemoryMalloc(POINTER_BYTES * size);
if (pTableQueryInfoList == NULL || posList == NULL || pGroupResInfo->pRows == NULL || pGroupResInfo->pRows == NULL) {
// qError("QInfo:%"PRIu64" failed alloc memory", GET_TASKID(pRuntimeEnv));
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end;
}
int32_t numOfTables = 0;
for (int32_t i = 0; i < size; ++i) {
STableQueryInfo *item = taosArrayGetP(pTableList, i);
// if (item->resInfo.size > 0) {
// pTableQueryInfoList[numOfTables++] = item;
// }
}
// there is no data in current group
// no need to merge results since only one table in each group
// if (numOfTables == 0) {
// goto _end;
// }
int32_t order = TSDB_ORDER_ASC;
SCompSupporter cs = {pTableQueryInfoList, posList, order};
int32_t ret = tMergeTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
if (ret != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end;
}
int64_t lastTimestamp = ascQuery? INT64_MIN:INT64_MAX;
int64_t startt = taosGetTimestampMs();
while (1) {
int32_t tableIndex = tMergeTreeGetChosenIndex(pTree);
SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->resInfo;
ASSERT(0);
SResultRow *pWindowRes = NULL;//getResultRow(pBuf, pWindowResInfo, cs.rowIndex[tableIndex]);
int64_t num = 0;//getNumOfResultWindowRes(pRuntimeEnv, pWindowRes, rowCellInfoOffset);
if (num <= 0) {
cs.rowIndex[tableIndex] += 1;
if (cs.rowIndex[tableIndex] >= pWindowResInfo->size) {
cs.rowIndex[tableIndex] = -1;
if (--numOfTables == 0) { // all input sources are exhausted
break;
}
}
} else {
assert((pWindowRes->win.skey >= lastTimestamp && ascQuery) || (pWindowRes->win.skey <= lastTimestamp && !ascQuery));
if (pWindowRes->win.skey != lastTimestamp) {
taosArrayPush(pGroupResInfo->pRows, &pWindowRes);
pWindowRes->numOfRows = (uint32_t) num;
}
lastTimestamp = pWindowRes->win.skey;
// move to the next row of current entry
if ((++cs.rowIndex[tableIndex]) >= pWindowResInfo->size) {
cs.rowIndex[tableIndex] = -1;
// all input sources are exhausted
if ((--numOfTables) == 0) {
break;
}
}
}
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
}
int64_t endt = taosGetTimestampMs();
// qDebug("QInfo:%"PRIx64" result merge completed for group:%d, elapsed time:%" PRId64 " ms", GET_TASKID(pRuntimeEnv),
// pGroupResInfo->currentGroup, endt - startt);
_end:
taosMemoryFreeClear(pTableQueryInfoList);
taosMemoryFreeClear(posList);
taosMemoryFreeClear(pTree);
return code;
}
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, STaskRuntimeEnv* pRuntimeEnv, int32_t* offset) {
int64_t st = taosGetTimestampUs();
while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) {
mergeIntoGroupResultImplRv(pRuntimeEnv, pGroupResInfo, pGroupResInfo->currentGroup, offset);
// this group generates at least one result, return results
if (taosArrayGetSize(pGroupResInfo->pRows) > 0) {
break;
}
// qDebug("QInfo:%"PRIu64" no result in group %d, continue", GET_TASKID(pRuntimeEnv), pGroupResInfo->currentGroup);
cleanupGroupResInfo(pGroupResInfo);
incNextGroup(pGroupResInfo);
}
// int64_t elapsedTime = taosGetTimestampUs() - st;
// qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_TASKID(pRuntimeEnv),
// pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime);
#endif
return TSDB_CODE_SUCCESS;
}
//void blockDistInfoToBinary(STableBlockDist* pDist, struct SBufferWriter* bw) {
// tbufWriteUint32(bw, pDist->numOfTables);
// tbufWriteUint16(bw, pDist->numOfFiles);
// tbufWriteUint64(bw, pDist->totalSize);
// tbufWriteUint64(bw, pDist->totalRows);
// tbufWriteInt32(bw, pDist->maxRows);
// tbufWriteInt32(bw, pDist->minRows);
// tbufWriteUint32(bw, pDist->numOfInmemRows);
// tbufWriteUint32(bw, pDist->numOfSmallBlocks);
// tbufWriteUint64(bw, taosArrayGetSize(pDist->dataBlockInfos));
//
// // compress the binary string
// char* p = TARRAY_GET_START(pDist->dataBlockInfos);
//
// // compress extra bytes
// size_t x = taosArrayGetSize(pDist->dataBlockInfos) * pDist->dataBlockInfos->elemSize;
// char* tmp = taosMemoryMalloc(x + 2);
//
// bool comp = false;
// int32_t len = tsCompressString(p, (int32_t)x, 1, tmp, (int32_t)x, ONE_STAGE_COMP, NULL, 0);
// if (len == -1 || len >= x) { // compress failed, do not compress this binary data
// comp = false;
// len = (int32_t)x;
// } else {
// comp = true;
// }
//
// tbufWriteUint8(bw, comp);
// tbufWriteUint32(bw, len);
// if (comp) {
// tbufWriteBinary(bw, tmp, len);
// } else {
// tbufWriteBinary(bw, p, len);
// }
// taosMemoryFreeClear(tmp);
//}
//void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist) {
// SBufferReader br = tbufInitReader(data, len, false);
//
// pDist->numOfTables = tbufReadUint32(&br);
// pDist->numOfFiles = tbufReadUint16(&br);
// pDist->totalSize = tbufReadUint64(&br);
// pDist->totalRows = tbufReadUint64(&br);
// pDist->maxRows = tbufReadInt32(&br);
// pDist->minRows = tbufReadInt32(&br);
// pDist->numOfInmemRows = tbufReadUint32(&br);
// pDist->numOfSmallBlocks = tbufReadUint32(&br);
// int64_t numSteps = tbufReadUint64(&br);
//
// bool comp = tbufReadUint8(&br);
// uint32_t compLen = tbufReadUint32(&br);
//
// size_t originalLen = (size_t) (numSteps *sizeof(SFileBlockInfo));
//
// char* outputBuf = NULL;
// if (comp) {
// outputBuf = taosMemoryMalloc(originalLen);
//
// size_t actualLen = compLen;
// const char* compStr = tbufReadBinary(&br, &actualLen);
//
// int32_t orignalLen = tsDecompressString(compStr, compLen, 1, outputBuf,
// (int32_t)originalLen , ONE_STAGE_COMP, NULL, 0);
// assert(orignalLen == numSteps *sizeof(SFileBlockInfo));
// } else {
// outputBuf = (char*) tbufReadBinary(&br, &originalLen);
// }
//
// pDist->dataBlockInfos = taosArrayFromList(outputBuf, (uint32_t)numSteps, sizeof(SFileBlockInfo));
// if (comp) {
// taosMemoryFreeClear(outputBuf);
// }
//}

View File

@ -118,7 +118,6 @@ static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput);
static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOperatorInfo(SOperatorInfo* pOperator);
static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput);
void doSetOperatorCompleted(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE;
@ -562,10 +561,6 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
pCtx[k].input.startRowIndex = offset;
pCtx[k].input.numOfRows = forwardStep;
if (tsCol != NULL) {
pCtx[k].ptsList = tsCol;
}
// not a whole block involved in query processing, statistics data can not be used
// NOTE: the original value of isSet have been changed here
if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) {

View File

@ -378,8 +378,9 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
return (rows == 0)? NULL:pRes;
}
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
@ -659,8 +660,8 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFree(pInfo->columnOffset);
}
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo) {
SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {

View File

@ -30,18 +30,18 @@ static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult);
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
/*
* There are two cases to handle:
*
* 1. Query range is not set yet (queryRangeSet = 0). we need to set the query range info, including
* pQueryAttr->lastKey, pQueryAttr->window.skey, and pQueryAttr->eKey.
* 2. Query range is set and query is in progress. There may be another result with the same query ranges to be
* merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there
* is a previous result generated or not.
*/
static void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWindow* pQRange) {
// do nothing
}
///*
// * There are two cases to handle:
// *
// * 1. Query range is not set yet (queryRangeSet = 0). we need to set the query range info, including
// * pQueryAttr->lastKey, pQueryAttr->window.skey, and pQueryAttr->eKey.
// * 2. Query range is set and query is in progress. There may be another result with the same query ranges to be
// * merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there
// * is a previous result generated or not.
// */
//static void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWindow* pQRange) {
// // do nothing
//}
static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
@ -327,8 +327,7 @@ void doTimeWindowInterpolation(SIntervalAggOperatorInfo* pInfo, int32_t numOfExp
int32_t index = 1;
for (int32_t k = 0; k < numOfExprs; ++k) {
// todo use flag instead of function name
if (strcmp(pCtx[k].pExpr->pExpr->_function.functionName, "twa") != 0) {
if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
pCtx[k].start.key = INT64_MIN;
continue;
}
@ -958,9 +957,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, scanFlag, true);
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL);
#if 0 // test for encode/decode result info
@ -1415,7 +1411,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
for (int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = pCtx[i].pExpr;
if (strcmp(pExpr->pExpr->_function.functionName, "twa") == 0) {
if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
SFunctParam* pParam = &pExpr->base.pParam[0];
SColumn c = *pParam->pCol;
@ -1476,12 +1472,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->timeWindowInterpo = timeWindowinterpNeeded(pInfo->binfo.pCtx, numOfCols, pInfo);
if (pInfo->timeWindowInterpo) {
pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
}
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) {
if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
goto _error;
}
}
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
@ -1695,22 +1689,25 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
return (rows == 0) ? NULL : pBInfo->pRes;
}
static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
SSDataBlock* pResBlock = pSliceInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) {
// doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pSliceInfo->binfo.pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) {
if (pResBlock->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
return pSliceInfo->binfo.pRes;
return pResBlock;
}
int32_t order = TSDB_ORDER_ASC;
SInterval* pInterval = &pSliceInfo->interval;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
@ -1721,7 +1718,38 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
// hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0);
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, 0);
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t ts = *(int64_t*) colDataGetData(pTsCol, i);
if (ts == pSliceInfo->current) {
// output the result
pSliceInfo->current += taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
} else if (ts < pSliceInfo->current) {
if (i != pBlock->info.window.ekey) {
int64_t nextTs = *(int64_t*) colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) {
// output the result
pSliceInfo->current += taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
} else {
// keep current row
}
} else { // it is the last row of current block
// keep current row
}
}
}
}
// restore the value
@ -1733,11 +1761,38 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
// initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo);
// doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes);
if (pSliceInfo->binfo.pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) {
if (pResBlock->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pSliceInfo->binfo.pRes->info.rows == 0 ? NULL : pSliceInfo->binfo.pRes;
return pResBlock->info.rows == 0 ? NULL : pResBlock;
}
static int32_t initTimesliceInfo(STimeSliceOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfCols) {
pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys));
pInfo->pCols = taosArrayInit(4, sizeof(SColumn));
if (pInfo->pPrevRow == NULL || pInfo->pCols == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = pCtx[i].pExpr;
SFunctParam* pParam = &pExpr->base.pParam[0];
SColumn c = *pParam->pCol;
taosArrayPush(pInfo->pCols, &c);
SGroupKeys key = {0};
key.bytes = c.bytes;
key.type = c.type;
key.isNull = false;
key.pData = taosMemoryCalloc(1, c.bytes);
taosArrayPush(pInfo->pPrevRow, &key);
}
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
@ -1748,8 +1803,15 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error;
}
int32_t code = initTimesliceInfo(pInfo, pInfo->binfo.pCtx, numOfCols);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
pInfo->binfo.pRes = pResultBlock;
pOperator->name = "TimeSliceOperator";
// pOperator->operatorType = OP_AllTimeWindow;
pOperator->blocking = true;
@ -1759,10 +1821,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doAllIntervalAgg, NULL, NULL, destroyBasicOperatorInfo,
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyBasicOperatorInfo,
NULL, NULL, NULL);
int32_t code = appendDownstream(pOperator, &downstream, 1);
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
@ -3326,9 +3388,6 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
getTableScanInfo(pOperator, &iaInfo->order, &scanFlag);
setInputDataBlock(pOperator, iaInfo->binfo.pCtx, pBlock, iaInfo->order, scanFlag, true);
STableQueryInfo* pTableQueryInfo = iaInfo->pCurrent;
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);
if (pRes->info.rows >= pOperator->resultInfo.threshold) {

View File

@ -42,6 +42,7 @@ extern "C" {
#define FUNC_MGT_SELECT_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(13)
#define FUNC_MGT_REPEAT_SCAN_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(14)
#define FUNC_MGT_FORBID_FILL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(15)
#define FUNC_MGT_INTERVAL_INTERPO_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(16)
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)

View File

@ -1508,6 +1508,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.type = FUNCTION_TYPE_AVG,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateInNumOutDou,
.dataRequiredFunc = statisDataRequired,
.getEnvFunc = getAvgFuncEnv,
.initFunc = avgFunctionSetup,
.processFunc = avgFunction,
@ -1679,7 +1680,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "elapsed",
.type = FUNCTION_TYPE_ELAPSED,
.classification = FUNC_MGT_AGG_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC,
.dataRequiredFunc = statisDataRequired,
.translateFunc = translateElapsed,
.getEnvFunc = getElapsedFuncEnv,
@ -1717,6 +1718,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.invertFunc = NULL,
.combineFunc = elapsedCombine,
},
// {
// .name = "interp",
// .type = FUNCTION_TYPE_INTERP,
// .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC,
// .translateFunc = translateFirstLast,
// .getEnvFunc = getSelectivityFuncEnv,
// .initFunc = functionSetup,
// .processFunc = interpFunction,
// .finalizeFunc = NULL
// },
{
.name = "derivative",
.type = FUNCTION_TYPE_DERIVATIVE,
@ -1762,8 +1773,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "twa",
.type = FUNCTION_TYPE_TWA,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC,
.translateFunc = translateInNumOutDou,
.dataRequiredFunc = statisDataRequired,
.getEnvFunc = getTwaFuncEnv,
.initFunc = twaFunctionSetup,
.processFunc = twaFunction,

View File

@ -933,9 +933,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
index = pInput->pColumnDataAgg[0]->maxIndex;
}
// the index is the original position, not the relative position
TSKEY key = (pCtx->ptsList != NULL) ? pCtx->ptsList[index] : TSKEY_INITIAL_VAL;
if (!pBuf->assign) {
pBuf->v = *(int64_t*)tval;
if (pCtx->subsidiaries.num > 0) {
@ -3284,7 +3281,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) {
SColumnInfoData* pCol = pInput->pData[0];
int32_t start = pInput->startRowIndex;
TSKEY* ptsList = (int64_t*)colDataGetData(pCol, start);
TSKEY* ptsList = (int64_t*)colDataGetData(pCol, 0);
if (pCtx->order == TSDB_ORDER_DESC) {
if (pCtx->start.key == INT64_MIN) {
pInfo->max = (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->max;
@ -5086,3 +5083,96 @@ int32_t derivativeFunction(SqlFunctionCtx *pCtx) {
return numOfElems;
}
int32_t interpFunction(SqlFunctionCtx *pCtx) {
#if 0
int32_t fillType = (int32_t) pCtx->param[2].i64;
//bool ascQuery = (pCtx->order == TSDB_ORDER_ASC);
if (pCtx->start.key == pCtx->startTs) {
assert(pCtx->start.key != INT64_MIN);
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->start.val);
goto interp_success_exit;
} else if (pCtx->end.key == pCtx->startTs && pCtx->end.key != INT64_MIN && fillType == TSDB_FILL_NEXT) {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->end.val);
goto interp_success_exit;
}
switch (fillType) {
case TSDB_FILL_NULL:
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
break;
case TSDB_FILL_SET_VALUE:
tVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true);
break;
case TSDB_FILL_LINEAR:
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
|| pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
goto interp_exit;
}
double v1 = -1, v2 = -1;
GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val);
GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val);
SPoint point1 = {.key = pCtx->start.key, .val = &v1};
SPoint point2 = {.key = pCtx->end.key, .val = &v2};
SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput};
int32_t srcType = pCtx->inputType;
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
} else {
bool exceedMax = false, exceedMin = false;
taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin);
if (exceedMax || exceedMin) {
__compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0);
if (func(&pCtx->start.val, &pCtx->end.val) <= 0) {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val);
} else {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val);
}
}
}
break;
case TSDB_FILL_PREV:
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs) {
goto interp_exit;
}
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->start.val);
break;
case TSDB_FILL_NEXT:
if (pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
goto interp_exit;
}
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->end.val);
break;
case TSDB_FILL_NONE:
// do nothing
default:
goto interp_exit;
}
interp_success_exit:
*(TSKEY*)pCtx->ptsOutputBuf = pCtx->startTs;
INC_INIT_VAL(pCtx, 1);
interp_exit:
pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN;
pCtx->endTs = pCtx->startTs;
#endif
return TSDB_CODE_SUCCESS;
}

View File

@ -161,6 +161,8 @@ bool fmIsUserDefinedFunc(int32_t funcId) { return funcId > FUNC_UDF_ID_START; }
bool fmIsForbidFillFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_FILL_FUNC); }
bool fmIsIntervalInterpoFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_INTERVAL_INTERPO_FUNC); }
void fmFuncMgtDestroy() {
void* m = gFunMgtService.pFuncNameHashTable;
if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {