Merge pull request #11247 from taosdata/feature/3.0_liaohj
[td-14393] support percentile.
This commit is contained in:
commit
9317b5f299
|
@ -36,7 +36,7 @@ typedef struct SFuncExecEnv {
|
||||||
|
|
||||||
typedef bool (*FExecGetEnv)(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
typedef bool (*FExecGetEnv)(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
|
typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
|
||||||
typedef void (*FExecProcess)(struct SqlFunctionCtx *pCtx);
|
typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
|
||||||
typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx);
|
typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx);
|
||||||
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
|
||||||
|
@ -154,7 +154,9 @@ typedef struct SResultDataInfo {
|
||||||
int32_t interBufSize;
|
int32_t interBufSize;
|
||||||
} SResultDataInfo;
|
} SResultDataInfo;
|
||||||
|
|
||||||
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
|
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
|
||||||
|
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo)))
|
||||||
|
#define DATA_SET_FLAG ',' // to denote the output area has data, not null value
|
||||||
|
|
||||||
typedef struct SInputColumnInfoData {
|
typedef struct SInputColumnInfoData {
|
||||||
int32_t totalRows; // total rows in current columnar data
|
int32_t totalRows; // total rows in current columnar data
|
||||||
|
@ -192,7 +194,8 @@ typedef struct SqlFunctionCtx {
|
||||||
int32_t numOfParams;
|
int32_t numOfParams;
|
||||||
SVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
|
SVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
|
||||||
int64_t *ptsList; // corresponding timestamp array list
|
int64_t *ptsList; // corresponding timestamp array list
|
||||||
void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
|
SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
|
||||||
|
int32_t offset;
|
||||||
SVariant tag;
|
SVariant tag;
|
||||||
struct SResultRowEntryInfo *resultInfo;
|
struct SResultRowEntryInfo *resultInfo;
|
||||||
SSubsidiaryResInfo subsidiaryRes;
|
SSubsidiaryResInfo subsidiaryRes;
|
||||||
|
|
|
@ -1,17 +1,12 @@
|
||||||
aux_source_directory(src EXECUTOR_SRC)
|
aux_source_directory(src EXECUTOR_SRC)
|
||||||
#add_library(executor ${EXECUTOR_SRC})
|
#add_library(executor ${EXECUTOR_SRC})
|
||||||
|
|
||||||
|
|
||||||
#target_link_libraries(
|
|
||||||
# executor
|
|
||||||
# PRIVATE os util common function parser planner qcom tsdb
|
|
||||||
#)
|
|
||||||
|
|
||||||
add_library(executor STATIC ${EXECUTOR_SRC})
|
add_library(executor STATIC ${EXECUTOR_SRC})
|
||||||
#set_target_properties(executor PROPERTIES
|
#set_target_properties(executor PROPERTIES
|
||||||
# IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libexecutor.a"
|
# IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libexecutor.a"
|
||||||
# INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/executor"
|
# INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/executor"
|
||||||
# )
|
# )
|
||||||
|
|
||||||
target_link_libraries(executor
|
target_link_libraries(executor
|
||||||
PRIVATE os util common function parser planner qcom vnode scalar nodes
|
PRIVATE os util common function parser planner qcom vnode scalar nodes
|
||||||
)
|
)
|
||||||
|
|
|
@ -385,6 +385,12 @@ typedef struct SExchangeInfo {
|
||||||
SLoadRemoteDataInfo loadInfo;
|
SLoadRemoteDataInfo loadInfo;
|
||||||
} SExchangeInfo;
|
} SExchangeInfo;
|
||||||
|
|
||||||
|
typedef struct SColMatchInfo {
|
||||||
|
int32_t colId;
|
||||||
|
int32_t targetSlotId;
|
||||||
|
bool output;
|
||||||
|
} SColMatchInfo;
|
||||||
|
|
||||||
typedef struct STableScanInfo {
|
typedef struct STableScanInfo {
|
||||||
void* dataReader;
|
void* dataReader;
|
||||||
int32_t numOfBlocks; // extract basic running information.
|
int32_t numOfBlocks; // extract basic running information.
|
||||||
|
@ -497,8 +503,9 @@ typedef struct SAggOperatorInfo {
|
||||||
|
|
||||||
typedef struct SProjectOperatorInfo {
|
typedef struct SProjectOperatorInfo {
|
||||||
SOptrBasicInfo binfo;
|
SOptrBasicInfo binfo;
|
||||||
|
SAggSupporter aggSup;
|
||||||
SSDataBlock *existDataBlock;
|
SSDataBlock *existDataBlock;
|
||||||
int32_t threshold;
|
SArray *pPseudoColInfo;
|
||||||
SLimit limit;
|
SLimit limit;
|
||||||
int64_t curOffset;
|
int64_t curOffset;
|
||||||
int64_t curOutput;
|
int64_t curOutput;
|
||||||
|
@ -623,13 +630,28 @@ typedef struct SDistinctOperatorInfo {
|
||||||
SHashObj* pSet;
|
SHashObj* pSet;
|
||||||
SSDataBlock* pRes;
|
SSDataBlock* pRes;
|
||||||
bool recordNullVal; // has already record the null value, no need to try again
|
bool recordNullVal; // has already record the null value, no need to try again
|
||||||
int64_t threshold;
|
int64_t threshold; // todo remove it
|
||||||
int64_t outputCapacity;
|
int64_t outputCapacity;// todo remove it
|
||||||
int32_t totalBytes;
|
int32_t totalBytes; // todo remove it
|
||||||
char* buf;
|
char* buf;
|
||||||
SArray* pDistinctDataInfo;
|
SArray* pDistinctDataInfo;
|
||||||
} SDistinctOperatorInfo;
|
} SDistinctOperatorInfo;
|
||||||
|
|
||||||
|
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
|
||||||
|
void operatorDummyCloseFn(void* param, int32_t numOfCols);
|
||||||
|
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
|
||||||
|
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
|
int32_t numOfRows, SSDataBlock* pResultBlock, const char* pkey);
|
||||||
|
void toSDatablock(SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset);
|
||||||
|
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
|
||||||
|
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
|
||||||
|
int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type,
|
||||||
|
int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);
|
||||||
|
void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput);
|
||||||
|
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows,
|
||||||
|
char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs,
|
||||||
|
uint64_t* total, SArray* pColList);
|
||||||
|
|
||||||
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime,
|
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime,
|
||||||
int32_t reverseTime, SArray* pColMatchInfo, SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
int32_t reverseTime, SArray* pColMatchInfo, SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
||||||
|
@ -647,12 +669,12 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
||||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
|
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
|
||||||
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
|
||||||
|
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
|
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
|
||||||
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo);
|
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
|
SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
|
||||||
int32_t numOfOutput);
|
|
||||||
|
|
||||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
||||||
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
|
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,332 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
#include "function.h"
|
||||||
|
#include "tname.h"
|
||||||
|
|
||||||
|
#include "tdatablock.h"
|
||||||
|
#include "tmsg.h"
|
||||||
|
|
||||||
|
#include "executorimpl.h"
|
||||||
|
#include "tcompare.h"
|
||||||
|
#include "thash.h"
|
||||||
|
#include "ttypes.h"
|
||||||
|
|
||||||
|
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
|
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
|
||||||
|
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
||||||
|
taosMemoryFreeClear(pInfo->keyBuf);
|
||||||
|
taosArrayDestroy(pInfo->pGroupCols);
|
||||||
|
taosArrayDestroy(pInfo->pGroupColVals);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t initGroupOptrInfo(SGroupbyOperatorInfo* pInfo, SArray* pGroupColList) {
|
||||||
|
pInfo->pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
|
||||||
|
if (pInfo->pGroupColVals == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
|
||||||
|
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
||||||
|
SColumn* pCol = taosArrayGet(pGroupColList, i);
|
||||||
|
pInfo->groupKeyLen += pCol->bytes;
|
||||||
|
|
||||||
|
struct SGroupKeys key = {0};
|
||||||
|
key.bytes = pCol->bytes;
|
||||||
|
key.type = pCol->type;
|
||||||
|
key.isNull = false;
|
||||||
|
key.pData = taosMemoryCalloc(1, pCol->bytes);
|
||||||
|
if (key.pData == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pInfo->pGroupColVals, &key);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
|
||||||
|
pInfo->keyBuf = taosMemoryCalloc(1, pInfo->groupKeyLen + nullFlagSize);
|
||||||
|
|
||||||
|
if (pInfo->keyBuf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex,
|
||||||
|
int32_t numOfGroupCols) {
|
||||||
|
SColumnDataAgg* pColAgg = NULL;
|
||||||
|
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
||||||
|
SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i);
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
|
||||||
|
if (pBlock->pBlockAgg != NULL) {
|
||||||
|
pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
|
||||||
|
}
|
||||||
|
|
||||||
|
bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);
|
||||||
|
|
||||||
|
SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i);
|
||||||
|
if (pkey->isNull && isNull) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isNull || pkey->isNull) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* val = colDataGetData(pColInfoData, rowIndex);
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(pkey->type)) {
|
||||||
|
int32_t len = varDataLen(val);
|
||||||
|
if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) {
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (memcmp(pkey->pData, val, pkey->bytes) != 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void keepGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
|
||||||
|
SColumnDataAgg* pColAgg = NULL;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
||||||
|
SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i);
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
|
||||||
|
|
||||||
|
if (pBlock->pBlockAgg != NULL) {
|
||||||
|
pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
|
||||||
|
}
|
||||||
|
|
||||||
|
SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i);
|
||||||
|
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
|
||||||
|
pkey->isNull = true;
|
||||||
|
} else {
|
||||||
|
char* val = colDataGetData(pColInfoData, rowIndex);
|
||||||
|
if (IS_VAR_DATA_TYPE(pkey->type)) {
|
||||||
|
memcpy(pkey->pData, val, varDataTLen(val));
|
||||||
|
} else {
|
||||||
|
memcpy(pkey->pData, val, pkey->bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t generatedHashKey(void* pKey, int32_t* length, SArray* pGroupColVals) {
|
||||||
|
ASSERT(pKey != NULL);
|
||||||
|
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
|
||||||
|
|
||||||
|
char* isNull = (char*)pKey;
|
||||||
|
char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
|
||||||
|
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
||||||
|
SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
|
||||||
|
if (pkey->isNull) {
|
||||||
|
isNull[i] = 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
isNull[i] = 0;
|
||||||
|
if (IS_VAR_DATA_TYPE(pkey->type)) {
|
||||||
|
varDataCopy(pStart, pkey->pData);
|
||||||
|
pStart += varDataTLen(pkey->pData);
|
||||||
|
ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
|
||||||
|
} else {
|
||||||
|
memcpy(pStart, pkey->pData, pkey->bytes);
|
||||||
|
pStart += pkey->bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*length = (pStart - (char*)pKey);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// assign the group keys or user input constant values if required
|
||||||
|
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
|
||||||
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
|
if (pCtx[i].functionId == -1) {
|
||||||
|
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);
|
||||||
|
|
||||||
|
SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
|
||||||
|
if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) {
|
||||||
|
char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||||
|
char* data = colDataGetData(pColInfoData, rowIndex);
|
||||||
|
|
||||||
|
// set result exists, todo refactor
|
||||||
|
memcpy(dest, data, pColInfoData->info.bytes);
|
||||||
|
pEntryInfo->hasResult = DATA_SET_FLAG;
|
||||||
|
pEntryInfo->numOfRes = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
|
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
|
||||||
|
int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
|
||||||
|
// if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
|
// qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
|
||||||
|
int32_t len = 0;
|
||||||
|
STimeWindow w = TSWINDOW_INITIALIZER;
|
||||||
|
|
||||||
|
int32_t num = 0;
|
||||||
|
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
||||||
|
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
|
||||||
|
if (!pInfo->isInit) {
|
||||||
|
keepGroupKeys(pInfo, pBlock, j, numOfGroupCols);
|
||||||
|
pInfo->isInit = true;
|
||||||
|
num++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool equal = groupKeyCompare(pInfo, pBlock, j, numOfGroupCols);
|
||||||
|
if (equal) {
|
||||||
|
num++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
|
||||||
|
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||||
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t rowIndex = j - num;
|
||||||
|
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
|
||||||
|
|
||||||
|
// assign the group keys or user input constant values if required
|
||||||
|
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
|
||||||
|
keepGroupKeys(pInfo, pBlock, j, numOfGroupCols);
|
||||||
|
num = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (num > 0) {
|
||||||
|
/*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
|
||||||
|
int32_t ret =
|
||||||
|
setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
|
||||||
|
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t rowIndex = pBlock->info.rows - num;
|
||||||
|
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
|
||||||
|
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||||
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
|
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity,
|
||||||
|
pInfo->binfo.rowCellInfoOffset);
|
||||||
|
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
}
|
||||||
|
return pInfo->binfo.pRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t order = TSDB_ORDER_ASC;
|
||||||
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||||
|
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
|
||||||
|
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||||
|
if (pBlock == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
|
||||||
|
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||||
|
doHashGroupbyAgg(pOperator, pBlock);
|
||||||
|
}
|
||||||
|
|
||||||
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
|
closeAllResultRows(&pInfo->binfo.resultRowInfo);
|
||||||
|
|
||||||
|
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf,
|
||||||
|
&pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
|
||||||
|
// if (!stableQuery) { // finalize include the update of result rows
|
||||||
|
// finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||||
|
// } else {
|
||||||
|
// updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo,
|
||||||
|
// pInfo->binfo.rowCellInfoOffset);
|
||||||
|
// }
|
||||||
|
|
||||||
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
|
||||||
|
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
|
||||||
|
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity,
|
||||||
|
pInfo->binfo.rowCellInfoOffset);
|
||||||
|
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pInfo->binfo.pRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo,
|
||||||
|
const STableGroupInfo* pTableGroupInfo) {
|
||||||
|
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
|
||||||
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->pGroupCols = pGroupColList;
|
||||||
|
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str);
|
||||||
|
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
||||||
|
|
||||||
|
int32_t code = initGroupOptrInfo(pInfo, pGroupColList);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
pOperator->name = "GroupbyAggOperator";
|
||||||
|
pOperator->blockingOptr = true;
|
||||||
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
// pOperator->operatorType = OP_Groupby;
|
||||||
|
pOperator->pExpr = pExprInfo;
|
||||||
|
pOperator->numOfOutput = numOfCols;
|
||||||
|
pOperator->info = pInfo;
|
||||||
|
pOperator->_openFn = operatorDummyOpenFn;
|
||||||
|
pOperator->getNextFn = hashGroupbyAggregate;
|
||||||
|
pOperator->closeFn = destroyGroupbyOperatorInfo;
|
||||||
|
|
||||||
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
return pOperator;
|
||||||
|
|
||||||
|
_error:
|
||||||
|
taosMemoryFreeClear(pInfo);
|
||||||
|
taosMemoryFreeClear(pOperator);
|
||||||
|
return NULL;
|
||||||
|
}
|
|
@ -0,0 +1,865 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "filter.h"
|
||||||
|
#include "function.h"
|
||||||
|
#include "os.h"
|
||||||
|
#include "querynodes.h"
|
||||||
|
#include "tname.h"
|
||||||
|
#include "vnode.h"
|
||||||
|
|
||||||
|
#include "tdatablock.h"
|
||||||
|
#include "tmsg.h"
|
||||||
|
|
||||||
|
#include "executorimpl.h"
|
||||||
|
#include "query.h"
|
||||||
|
#include "tcompare.h"
|
||||||
|
#include "thash.h"
|
||||||
|
#include "tsdb.h"
|
||||||
|
#include "ttypes.h"
|
||||||
|
|
||||||
|
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
|
||||||
|
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
||||||
|
|
||||||
|
void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
|
SWITCH_ORDER(pCtx[i].order);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) {
|
||||||
|
#if 0
|
||||||
|
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv));
|
||||||
|
for(int32_t i = 0; i < numOfGroups; ++i) {
|
||||||
|
SArray *group = GET_TABLEGROUP(pRuntimeEnv, i);
|
||||||
|
SArray *tableKeyGroup = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i);
|
||||||
|
|
||||||
|
size_t t = taosArrayGetSize(group);
|
||||||
|
for (int32_t j = 0; j < t; ++j) {
|
||||||
|
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
|
||||||
|
updateTableQueryInfoForReverseScan(pCheckInfo);
|
||||||
|
|
||||||
|
// update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
|
||||||
|
// the start check timestamp of tsdbQueryHandle
|
||||||
|
// STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j);
|
||||||
|
// pTableKeyInfo->lastKey = pCheckInfo->lastKey;
|
||||||
|
//
|
||||||
|
// assert(pCheckInfo->pTable == pTableKeyInfo->pTable);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
|
||||||
|
STaskCostInfo* pCost = &pTaskInfo->cost;
|
||||||
|
|
||||||
|
pCost->totalBlocks += 1;
|
||||||
|
pCost->totalRows += pBlock->info.rows;
|
||||||
|
|
||||||
|
pCost->totalCheckedRows += pBlock->info.rows;
|
||||||
|
pCost->loadBlocks += 1;
|
||||||
|
|
||||||
|
*status = BLK_DATA_ALL_NEEDED;
|
||||||
|
|
||||||
|
SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
|
||||||
|
if (pCols == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfCols = pBlock->info.numOfCols;
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColumnInfoData* p = taosArrayGet(pCols, i);
|
||||||
|
SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
|
||||||
|
if (!pColMatchInfo->output) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pColMatchInfo->colId == p->info.colId);
|
||||||
|
taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTableScanInfo->pFilterNode != NULL) {
|
||||||
|
SFilterInfo* filter = NULL;
|
||||||
|
int32_t code = filterInitFromNode((SNode*)pTableScanInfo->pFilterNode, &filter, 0);
|
||||||
|
|
||||||
|
SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock};
|
||||||
|
code = filterSetDataFromSlotId(filter, ¶m1);
|
||||||
|
|
||||||
|
int8_t* rowRes = NULL;
|
||||||
|
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
|
||||||
|
|
||||||
|
SSDataBlock* px = createOneDataBlock(pBlock);
|
||||||
|
blockDataEnsureCapacity(px, pBlock->info.rows);
|
||||||
|
|
||||||
|
int32_t numOfRow = 0;
|
||||||
|
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||||
|
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
|
||||||
|
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
|
||||||
|
numOfRow = 0;
|
||||||
|
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
||||||
|
if (rowRes[j] == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false);
|
||||||
|
numOfRow += 1;
|
||||||
|
}
|
||||||
|
*pSrc = *pDst;
|
||||||
|
}
|
||||||
|
|
||||||
|
pBlock->info.rows = numOfRow;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void setupEnvForReverseScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
|
// reverse order time range
|
||||||
|
SET_REVERSE_SCAN_FLAG(pTableScanInfo);
|
||||||
|
|
||||||
|
switchCtxOrder(pCtx, numOfOutput);
|
||||||
|
SWITCH_ORDER(pTableScanInfo->order);
|
||||||
|
setupQueryRangeForReverseScan(pTableScanInfo);
|
||||||
|
|
||||||
|
pTableScanInfo->times = 1;
|
||||||
|
pTableScanInfo->current = 0;
|
||||||
|
pTableScanInfo->reverseTimes = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
SSDataBlock* pBlock = &pTableScanInfo->block;
|
||||||
|
STableGroupInfo* pTableGroupInfo = &pOperator->pTaskInfo->tableqinfoGroupInfo;
|
||||||
|
|
||||||
|
*newgroup = false;
|
||||||
|
|
||||||
|
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
|
||||||
|
if (isTaskKilled(pOperator->pTaskInfo)) {
|
||||||
|
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
|
}
|
||||||
|
|
||||||
|
pTableScanInfo->numOfBlocks += 1;
|
||||||
|
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info);
|
||||||
|
|
||||||
|
// todo opt
|
||||||
|
// if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) {
|
||||||
|
// STableQueryInfo** pTableQueryInfo =
|
||||||
|
// (STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.uid, sizeof(pBlock->info.uid));
|
||||||
|
// if (pTableQueryInfo == NULL) {
|
||||||
|
// break;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// doTableQueryInfoTimeWindowCheck(pTaskInfo, *pTableQueryInfo, pTableScanInfo->order);
|
||||||
|
// }
|
||||||
|
|
||||||
|
// this function never returns error?
|
||||||
|
uint32_t status = BLK_DATA_ALL_NEEDED;
|
||||||
|
int32_t code = loadDataBlock(pTaskInfo, pTableScanInfo, pBlock, &status);
|
||||||
|
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pOperator->pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
// current block is ignored according to filter result by block statistics data, continue load the next block
|
||||||
|
if (status == BLK_DATA_DISCARD || pBlock->info.rows == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pBlock;
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
// The read handle is not initialized yet, since no qualified tables exists
|
||||||
|
if (pTableScanInfo->dataReader == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo;
|
||||||
|
*newgroup = false;
|
||||||
|
|
||||||
|
while (pTableScanInfo->current < pTableScanInfo->times) {
|
||||||
|
SSDataBlock* p = doTableScanImpl(pOperator, newgroup);
|
||||||
|
if (p != NULL) {
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (++pTableScanInfo->current >= pTableScanInfo->times) {
|
||||||
|
if (pTableScanInfo->reverseTimes <= 0 /* || isTsdbCacheLastRow(pTableScanInfo->pTsdbReadHandle)*/) {
|
||||||
|
return NULL;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// do prepare for the next round table scan operation
|
||||||
|
// STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
|
||||||
|
// tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond);
|
||||||
|
|
||||||
|
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||||
|
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
||||||
|
|
||||||
|
if (pResultRowInfo->size > 0) {
|
||||||
|
pResultRowInfo->curPos = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("%s start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
||||||
|
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* p = NULL;
|
||||||
|
// todo refactor
|
||||||
|
if (pTableScanInfo->reverseTimes > 0) {
|
||||||
|
setupEnvForReverseScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput);
|
||||||
|
// STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
|
||||||
|
// tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond);
|
||||||
|
|
||||||
|
qDebug("%s start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
||||||
|
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
|
||||||
|
|
||||||
|
if (pResultRowInfo->size > 0) {
|
||||||
|
pResultRowInfo->curPos = pResultRowInfo->size - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
p = doTableScanImpl(pOperator, newgroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
|
||||||
|
int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo,
|
||||||
|
SNode* pCondition, SExecTaskInfo* pTaskInfo) {
|
||||||
|
assert(repeatTime > 0);
|
||||||
|
|
||||||
|
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
||||||
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
taosMemoryFreeClear(pInfo);
|
||||||
|
taosMemoryFreeClear(pOperator);
|
||||||
|
|
||||||
|
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->block.pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
|
||||||
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
|
SColumnInfoData idata = {0};
|
||||||
|
taosArrayPush(pInfo->block.pDataBlock, &idata);
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->pFilterNode = pCondition;
|
||||||
|
pInfo->dataReader = pTsdbReadHandle;
|
||||||
|
pInfo->times = repeatTime;
|
||||||
|
pInfo->reverseTimes = reverseTime;
|
||||||
|
pInfo->order = order;
|
||||||
|
pInfo->current = 0;
|
||||||
|
pInfo->scanFlag = MAIN_SCAN;
|
||||||
|
pInfo->pColMatchInfo = pColMatchInfo;
|
||||||
|
pOperator->name = "TableScanOperator";
|
||||||
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||||
|
pOperator->blockingOptr = false;
|
||||||
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
pOperator->info = pInfo;
|
||||||
|
pOperator->numOfOutput = numOfOutput;
|
||||||
|
pOperator->getNextFn = doTableScan;
|
||||||
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
return pOperator;
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv) {
|
||||||
|
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
||||||
|
|
||||||
|
pInfo->dataReader = pTsdbReadHandle;
|
||||||
|
pInfo->times = 1;
|
||||||
|
pInfo->reverseTimes = 0;
|
||||||
|
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
|
||||||
|
pInfo->current = 0;
|
||||||
|
pInfo->prevGroupId = -1;
|
||||||
|
pRuntimeEnv->enableGroupData = true;
|
||||||
|
|
||||||
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
pOperator->name = "TableSeqScanOperator";
|
||||||
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
|
||||||
|
pOperator->blockingOptr = false;
|
||||||
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
pOperator->info = pInfo;
|
||||||
|
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
|
||||||
|
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||||
|
pOperator->getNextFn = doTableScanImpl;
|
||||||
|
|
||||||
|
return pOperator;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
|
*newgroup = false;
|
||||||
|
|
||||||
|
STableBlockDistInfo tableBlockDist = {0};
|
||||||
|
tableBlockDist.numOfTables = 1; // TODO set the correct number of tables
|
||||||
|
|
||||||
|
int32_t numRowSteps = TSDB_DEFAULT_MAX_ROW_FBLOCK / TSDB_BLOCK_DIST_STEP_ROWS;
|
||||||
|
if (TSDB_DEFAULT_MAX_ROW_FBLOCK % TSDB_BLOCK_DIST_STEP_ROWS != 0) {
|
||||||
|
++numRowSteps;
|
||||||
|
}
|
||||||
|
|
||||||
|
tableBlockDist.dataBlockInfos = taosArrayInit(numRowSteps, sizeof(SFileBlockInfo));
|
||||||
|
taosArraySetSize(tableBlockDist.dataBlockInfos, numRowSteps);
|
||||||
|
|
||||||
|
tableBlockDist.maxRows = INT_MIN;
|
||||||
|
tableBlockDist.minRows = INT_MAX;
|
||||||
|
|
||||||
|
tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &tableBlockDist);
|
||||||
|
tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader);
|
||||||
|
|
||||||
|
SSDataBlock* pBlock = &pTableScanInfo->block;
|
||||||
|
pBlock->info.rows = 1;
|
||||||
|
pBlock->info.numOfCols = 1;
|
||||||
|
|
||||||
|
// SBufferWriter bw = tbufInitWriter(NULL, false);
|
||||||
|
// blockDistInfoToBinary(&tableBlockDist, &bw);
|
||||||
|
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
|
||||||
|
|
||||||
|
// int32_t len = (int32_t) tbufTell(&bw);
|
||||||
|
// pColInfo->pData = taosMemoryMalloc(len + sizeof(int32_t));
|
||||||
|
// *(int32_t*) pColInfo->pData = len;
|
||||||
|
// memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len);
|
||||||
|
//
|
||||||
|
// tbufCloseWriter(&bw);
|
||||||
|
|
||||||
|
// SArray* g = GET_TABLEGROUP(pOperator->, 0);
|
||||||
|
// pOperator->pRuntimeEnv->current = taosArrayGetP(g, 0);
|
||||||
|
|
||||||
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
return pBlock;
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) {
|
||||||
|
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
||||||
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->dataReader = dataReader;
|
||||||
|
pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
|
||||||
|
|
||||||
|
SColumnInfoData infoData = {0};
|
||||||
|
infoData.info.type = TSDB_DATA_TYPE_BINARY;
|
||||||
|
infoData.info.bytes = 1024;
|
||||||
|
infoData.info.colId = 0;
|
||||||
|
taosArrayPush(pInfo->block.pDataBlock, &infoData);
|
||||||
|
|
||||||
|
pOperator->name = "DataBlockInfoScanOperator";
|
||||||
|
// pOperator->operatorType = OP_TableBlockInfoScan;
|
||||||
|
pOperator->blockingOptr = false;
|
||||||
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
pOperator->_openFn = operatorDummyOpenFn;
|
||||||
|
pOperator->getNextFn = doBlockInfoScan;
|
||||||
|
|
||||||
|
pOperator->info = pInfo;
|
||||||
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
return pOperator;
|
||||||
|
|
||||||
|
_error:
|
||||||
|
taosMemoryFreeClear(pInfo);
|
||||||
|
taosMemoryFreeClear(pOperator);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
|
||||||
|
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
||||||
|
|
||||||
|
pInfo->validBlockIndex = 0;
|
||||||
|
for (int32_t i = 0; i < total; ++i) {
|
||||||
|
SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i);
|
||||||
|
blockDataDestroy(p);
|
||||||
|
}
|
||||||
|
taosArrayClear(pInfo->pBlockLists);
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
|
// NOTE: this operator does never check if current status is done or not
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SStreamBlockScanInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
|
pTaskInfo->code = pOperator->_openFn(pOperator);
|
||||||
|
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
|
||||||
|
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
||||||
|
if (pInfo->validBlockIndex >= total) {
|
||||||
|
doClearBufferedBlocks(pInfo);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t current = pInfo->validBlockIndex++;
|
||||||
|
return taosArrayGetP(pInfo->pBlockLists, current);
|
||||||
|
} else {
|
||||||
|
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||||
|
blockDataCleanup(pInfo->pRes);
|
||||||
|
|
||||||
|
while (tqNextDataBlock(pInfo->readerHandle)) {
|
||||||
|
pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo);
|
||||||
|
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = pTaskInfo->code;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pBlockInfo->rows == 0) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray* pCols = tqRetrieveDataBlock(pInfo->readerHandle);
|
||||||
|
|
||||||
|
int32_t numOfCols = pInfo->pRes->info.numOfCols;
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColumnInfoData* p = taosArrayGet(pCols, i);
|
||||||
|
SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
|
||||||
|
if (!pColMatchInfo->output) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pColMatchInfo->colId == p->info.colId);
|
||||||
|
taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, p);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInfo->pRes->pDataBlock == NULL) {
|
||||||
|
// TODO add log
|
||||||
|
pTaskInfo->code = terrno;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// record the scan action.
|
||||||
|
pInfo->numOfExec++;
|
||||||
|
pInfo->numOfRows += pBlockInfo->rows;
|
||||||
|
|
||||||
|
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo) {
|
||||||
|
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
|
||||||
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
taosMemoryFreeClear(pInfo);
|
||||||
|
taosMemoryFreeClear(pOperator);
|
||||||
|
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfOutput = taosArrayGetSize(pColList);
|
||||||
|
|
||||||
|
SArray* pColIds = taosArrayInit(4, sizeof(int16_t));
|
||||||
|
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
|
int16_t* id = taosArrayGet(pColList, i);
|
||||||
|
taosArrayPush(pColIds, id);
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->pColMatchInfo = pColList;
|
||||||
|
|
||||||
|
// set the extract column id to streamHandle
|
||||||
|
tqReadHandleSetColIdList((STqReadHandle*)streamReadHandle, pColIds);
|
||||||
|
int32_t code = tqReadHandleSetTbUidList(streamReadHandle, pTableIdList);
|
||||||
|
if (code != 0) {
|
||||||
|
taosMemoryFreeClear(pInfo);
|
||||||
|
taosMemoryFreeClear(pOperator);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
if (pInfo->pBlockLists == NULL) {
|
||||||
|
taosMemoryFreeClear(pInfo);
|
||||||
|
taosMemoryFreeClear(pOperator);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->readerHandle = streamReadHandle;
|
||||||
|
pInfo->pRes = pResBlock;
|
||||||
|
|
||||||
|
pOperator->name = "StreamBlockScanOperator";
|
||||||
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
||||||
|
pOperator->blockingOptr = false;
|
||||||
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
pOperator->info = pInfo;
|
||||||
|
pOperator->numOfOutput = pResBlock->info.numOfCols;
|
||||||
|
pOperator->_openFn = operatorDummyOpenFn;
|
||||||
|
pOperator->getNextFn = doStreamBlockScan;
|
||||||
|
pOperator->closeFn = operatorDummyCloseFn;
|
||||||
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
return pOperator;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void destroySysScanOperator(void* param, int32_t numOfOutput) {
|
||||||
|
SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param;
|
||||||
|
tsem_destroy(&pInfo->ready);
|
||||||
|
blockDataDestroy(pInfo->pRes);
|
||||||
|
|
||||||
|
if (pInfo->type == TSDB_MGMT_TABLE_TABLE) {
|
||||||
|
metaCloseTbCursor(pInfo->pCur);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
ENodeType nType = nodeType(pNode);
|
||||||
|
|
||||||
|
switch (nType) {
|
||||||
|
case QUERY_NODE_OPERATOR: {
|
||||||
|
SOperatorNode* node = (SOperatorNode*)pNode;
|
||||||
|
|
||||||
|
if (OP_TYPE_EQUAL == node->opType) {
|
||||||
|
*(int32_t*)pContext = 1;
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
*(int32_t*)pContext = 0;
|
||||||
|
|
||||||
|
return DEAL_RES_IGNORE_CHILD;
|
||||||
|
}
|
||||||
|
case QUERY_NODE_COLUMN: {
|
||||||
|
if (1 != *(int32_t*)pContext) {
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnNode* node = (SColumnNode*)pNode;
|
||||||
|
if (TSDB_INS_USER_STABLES_DBNAME_COLID == node->colId) {
|
||||||
|
*(int32_t*)pContext = 2;
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
*(int32_t*)pContext = 0;
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
case QUERY_NODE_VALUE: {
|
||||||
|
if (2 != *(int32_t*)pContext) {
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
SValueNode* node = (SValueNode*)pNode;
|
||||||
|
char* dbName = nodesGetValueFromNode(node);
|
||||||
|
strncpy(pContext, varDataVal(dbName), varDataLen(dbName));
|
||||||
|
*((char*)pContext + varDataLen(dbName)) = 0;
|
||||||
|
return DEAL_RES_ERROR; // stop walk
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
void getDBNameFromCondition(SNode* pCondition, char* dbName) {
|
||||||
|
if (NULL == pCondition) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
nodesWalkExpr(pCondition, getDBNameFromConditionWalker, dbName);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
|
SOperatorInfo* operator=(SOperatorInfo*) param;
|
||||||
|
SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*)operator->info;
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
pScanResInfo->pRsp = pMsg->pData;
|
||||||
|
|
||||||
|
SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp;
|
||||||
|
pRsp->numOfRows = htonl(pRsp->numOfRows);
|
||||||
|
pRsp->useconds = htobe64(pRsp->useconds);
|
||||||
|
pRsp->handle = htobe64(pRsp->handle);
|
||||||
|
pRsp->compLen = htonl(pRsp->compLen);
|
||||||
|
} else {
|
||||||
|
operator->pTaskInfo->code = code;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsem_post(&pScanResInfo->ready);
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
|
||||||
|
if (pInfo->pCondition == NULL) {
|
||||||
|
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
SFilterInfo* filter = NULL;
|
||||||
|
int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0);
|
||||||
|
|
||||||
|
SFilterColumnParam param1 = {.numOfCols = pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock};
|
||||||
|
code = filterSetDataFromSlotId(filter, ¶m1);
|
||||||
|
|
||||||
|
int8_t* rowRes = NULL;
|
||||||
|
bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
|
||||||
|
|
||||||
|
SSDataBlock* px = createOneDataBlock(pInfo->pRes);
|
||||||
|
blockDataEnsureCapacity(px, pInfo->pRes->info.rows);
|
||||||
|
|
||||||
|
// TODO refactor
|
||||||
|
int32_t numOfRow = 0;
|
||||||
|
for (int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) {
|
||||||
|
SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i);
|
||||||
|
SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i);
|
||||||
|
|
||||||
|
numOfRow = 0;
|
||||||
|
for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) {
|
||||||
|
if (rowRes[j] == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false);
|
||||||
|
numOfRow += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
px->info.rows = numOfRow;
|
||||||
|
pInfo->pRes = px;
|
||||||
|
|
||||||
|
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
|
// build message and send to mnode to fetch the content of system tables.
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SSysTableScanInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
|
// retrieve local table list info from vnode
|
||||||
|
if (pInfo->type == TSDB_MGMT_TABLE_TABLE) {
|
||||||
|
if (pInfo->pCur == NULL) {
|
||||||
|
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle);
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataCleanup(pInfo->pRes);
|
||||||
|
|
||||||
|
int32_t tableNameSlotId = 1;
|
||||||
|
SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, tableNameSlotId);
|
||||||
|
|
||||||
|
char* name = NULL;
|
||||||
|
int32_t numOfRows = 0;
|
||||||
|
|
||||||
|
char n[TSDB_TABLE_NAME_LEN] = {0};
|
||||||
|
while ((name = metaTbCursorNext(pInfo->pCur)) != NULL) {
|
||||||
|
STR_TO_VARSTR(n, name);
|
||||||
|
colDataAppend(pTableNameCol, numOfRows, n, false);
|
||||||
|
numOfRows += 1;
|
||||||
|
if (numOfRows >= pInfo->capacity) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) {
|
||||||
|
if (i == tableNameSlotId) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pInfo->pRes->pDataBlock, i);
|
||||||
|
int64_t tmp = 0;
|
||||||
|
char t[10] = {0};
|
||||||
|
STR_TO_VARSTR(t, "_"); //TODO
|
||||||
|
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||||
|
colDataAppend(pColInfoData, numOfRows, t, false);
|
||||||
|
} else {
|
||||||
|
colDataAppend(pColInfoData, numOfRows, (char*)&tmp, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->loadInfo.totalRows += numOfRows;
|
||||||
|
pInfo->pRes->info.rows = numOfRows;
|
||||||
|
|
||||||
|
// pInfo->elapsedTime;
|
||||||
|
// pInfo->totalBytes;
|
||||||
|
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||||
|
} else { // load the meta from mnode of the given epset
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t startTs = taosGetTimestampUs();
|
||||||
|
|
||||||
|
pInfo->req.type = pInfo->type;
|
||||||
|
strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
|
||||||
|
if (pInfo->showRewrite) {
|
||||||
|
char dbName[TSDB_DB_NAME_LEN] = {0};
|
||||||
|
getDBNameFromCondition(pInfo->pCondition, dbName);
|
||||||
|
sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
|
||||||
|
char* buf1 = taosMemoryCalloc(1, contLen);
|
||||||
|
tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);
|
||||||
|
|
||||||
|
// send the fetch remote task result reques
|
||||||
|
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
|
if (NULL == pMsgSendInfo) {
|
||||||
|
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
|
||||||
|
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pMsgSendInfo->param = pOperator;
|
||||||
|
pMsgSendInfo->msgInfo.pData = buf1;
|
||||||
|
pMsgSendInfo->msgInfo.len = contLen;
|
||||||
|
pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE;
|
||||||
|
pMsgSendInfo->fp = loadSysTableContentCb;
|
||||||
|
|
||||||
|
int64_t transporterId = 0;
|
||||||
|
int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo);
|
||||||
|
tsem_wait(&pInfo->ready);
|
||||||
|
|
||||||
|
if (pTaskInfo->code) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
|
||||||
|
pInfo->req.showId = pRsp->handle;
|
||||||
|
|
||||||
|
if (pRsp->numOfRows == 0 || pRsp->completed) {
|
||||||
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pRsp->numOfRows == 0) {
|
||||||
|
// qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64"
|
||||||
|
// try next",
|
||||||
|
// GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
|
||||||
|
// pDataInfo->totalRows, pExchangeInfo->totalRows);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp;
|
||||||
|
setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen,
|
||||||
|
pOperator->numOfOutput, startTs, NULL, pInfo->scanCols);
|
||||||
|
|
||||||
|
return doFilterResult(pInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
|
||||||
|
SNode* pCondition, SEpSet epset, SArray* colList,
|
||||||
|
SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId) {
|
||||||
|
SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
|
||||||
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
taosMemoryFreeClear(pInfo);
|
||||||
|
taosMemoryFreeClear(pOperator);
|
||||||
|
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->accountId = accountId;
|
||||||
|
pInfo->showRewrite = showRewrite;
|
||||||
|
pInfo->pRes = pResBlock;
|
||||||
|
pInfo->capacity = 4096;
|
||||||
|
pInfo->pCondition = pCondition;
|
||||||
|
pInfo->scanCols = colList;
|
||||||
|
|
||||||
|
// TODO remove it
|
||||||
|
int32_t tableType = 0;
|
||||||
|
const char* name = tNameGetTableName(pName);
|
||||||
|
if (strncasecmp(name, TSDB_INS_TABLE_USER_DATABASES, tListLen(pName->tname)) == 0) {
|
||||||
|
tableType = TSDB_MGMT_TABLE_DB;
|
||||||
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_USERS, tListLen(pName->tname)) == 0) {
|
||||||
|
tableType = TSDB_MGMT_TABLE_USER;
|
||||||
|
} else if (strncasecmp(name, TSDB_INS_TABLE_DNODES, tListLen(pName->tname)) == 0) {
|
||||||
|
tableType = TSDB_MGMT_TABLE_DNODE;
|
||||||
|
} else if (strncasecmp(name, TSDB_INS_TABLE_MNODES, tListLen(pName->tname)) == 0) {
|
||||||
|
tableType = TSDB_MGMT_TABLE_MNODE;
|
||||||
|
} else if (strncasecmp(name, TSDB_INS_TABLE_MODULES, tListLen(pName->tname)) == 0) {
|
||||||
|
tableType = TSDB_MGMT_TABLE_MODULE;
|
||||||
|
} else if (strncasecmp(name, TSDB_INS_TABLE_QNODES, tListLen(pName->tname)) == 0) {
|
||||||
|
tableType = TSDB_MGMT_TABLE_QNODE;
|
||||||
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_FUNCTIONS, tListLen(pName->tname)) == 0) {
|
||||||
|
tableType = TSDB_MGMT_TABLE_FUNC;
|
||||||
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_INDEXES, tListLen(pName->tname)) == 0) {
|
||||||
|
// tableType = TSDB_MGMT_TABLE_INDEX;
|
||||||
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, tListLen(pName->tname)) == 0) {
|
||||||
|
tableType = TSDB_MGMT_TABLE_STB;
|
||||||
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STREAMS, tListLen(pName->tname)) == 0) {
|
||||||
|
tableType = TSDB_MGMT_TABLE_STREAMS;
|
||||||
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, tListLen(pName->tname)) == 0) {
|
||||||
|
tableType = TSDB_MGMT_TABLE_TABLE;
|
||||||
|
} else if (strncasecmp(name, TSDB_INS_TABLE_VGROUPS, tListLen(pName->tname)) == 0) {
|
||||||
|
tableType = TSDB_MGMT_TABLE_VGROUP;
|
||||||
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, tListLen(pName->tname)) == 0) {
|
||||||
|
// tableType = TSDB_MGMT_TABLE_DIST;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
tNameAssign(&pInfo->name, pName);
|
||||||
|
pInfo->type = tableType;
|
||||||
|
if (pInfo->type == TSDB_MGMT_TABLE_TABLE) {
|
||||||
|
pInfo->readHandle = pSysTableReadHandle;
|
||||||
|
blockDataEnsureCapacity(pInfo->pRes, pInfo->capacity);
|
||||||
|
} else {
|
||||||
|
tsem_init(&pInfo->ready, 0, 0);
|
||||||
|
pInfo->epSet = epset;
|
||||||
|
|
||||||
|
#if 1
|
||||||
|
{ // todo refactor
|
||||||
|
SRpcInit rpcInit;
|
||||||
|
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||||
|
rpcInit.localPort = 0;
|
||||||
|
rpcInit.label = "DB-META";
|
||||||
|
rpcInit.numOfThreads = 1;
|
||||||
|
rpcInit.cfp = qProcessFetchRsp;
|
||||||
|
rpcInit.sessions = tsMaxConnections;
|
||||||
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
|
rpcInit.user = (char*)"root";
|
||||||
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||||
|
rpcInit.ckey = "key";
|
||||||
|
rpcInit.spi = 1;
|
||||||
|
rpcInit.secret = (char*)"dcc5bed04851fec854c035b2e40263b6";
|
||||||
|
|
||||||
|
pInfo->pTransporter = rpcOpen(&rpcInit);
|
||||||
|
if (pInfo->pTransporter == NULL) {
|
||||||
|
return NULL; // todo
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
pOperator->name = "SysTableScanOperator";
|
||||||
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
|
||||||
|
pOperator->blockingOptr = false;
|
||||||
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
pOperator->info = pInfo;
|
||||||
|
pOperator->numOfOutput = pResBlock->info.numOfCols;
|
||||||
|
pOperator->getNextFn = doSysTableScan;
|
||||||
|
pOperator->closeFn = destroySysScanOperator;
|
||||||
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
return pOperator;
|
||||||
|
}
|
|
@ -26,31 +26,34 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
void functionFinalize(SqlFunctionCtx *pCtx);
|
void functionFinalize(SqlFunctionCtx *pCtx);
|
||||||
|
|
||||||
bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
void countFunction(SqlFunctionCtx *pCtx);
|
int32_t countFunction(SqlFunctionCtx *pCtx);
|
||||||
|
|
||||||
bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
void sumFunction(SqlFunctionCtx *pCtx);
|
int32_t sumFunction(SqlFunctionCtx *pCtx);
|
||||||
|
|
||||||
bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
void minFunction(SqlFunctionCtx* pCtx);
|
int32_t minFunction(SqlFunctionCtx* pCtx);
|
||||||
void maxFunction(SqlFunctionCtx *pCtx);
|
int32_t maxFunction(SqlFunctionCtx *pCtx);
|
||||||
|
|
||||||
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
void stddevFunction(SqlFunctionCtx* pCtx);
|
int32_t stddevFunction(SqlFunctionCtx* pCtx);
|
||||||
void stddevFinalize(SqlFunctionCtx* pCtx);
|
void stddevFinalize(SqlFunctionCtx* pCtx);
|
||||||
|
|
||||||
bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
void percentileFunction(SqlFunctionCtx *pCtx);
|
int32_t percentileFunction(SqlFunctionCtx *pCtx);
|
||||||
|
void percentileFinalize(SqlFunctionCtx* pCtx);
|
||||||
|
|
||||||
|
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
|
||||||
|
int32_t diffFunction(SqlFunctionCtx *pCtx);
|
||||||
|
|
||||||
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
void firstFunction(SqlFunctionCtx *pCtx);
|
int32_t firstFunction(SqlFunctionCtx *pCtx);
|
||||||
void lastFunction(SqlFunctionCtx *pCtx);
|
int32_t lastFunction(SqlFunctionCtx *pCtx);
|
||||||
|
|
||||||
void valFunction(SqlFunctionCtx *pCtx);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,8 +52,6 @@ typedef struct SInterpInfoDetail {
|
||||||
int8_t primaryCol;
|
int8_t primaryCol;
|
||||||
} SInterpInfoDetail;
|
} SInterpInfoDetail;
|
||||||
|
|
||||||
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo)))
|
|
||||||
|
|
||||||
typedef struct STwaInfo {
|
typedef struct STwaInfo {
|
||||||
int8_t hasResult; // flag to denote has value
|
int8_t hasResult; // flag to denote has value
|
||||||
double dOutput;
|
double dOutput;
|
||||||
|
|
|
@ -63,74 +63,74 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.finalizeFunc = functionFinalize
|
.finalizeFunc = functionFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "stddev",
|
.name = "stddev",
|
||||||
.type = FUNCTION_TYPE_STDDEV,
|
.type = FUNCTION_TYPE_STDDEV,
|
||||||
.classification = FUNC_MGT_AGG_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
.checkFunc = stubCheckAndGetResultType,
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
.getEnvFunc = getStddevFuncEnv,
|
.getEnvFunc = getStddevFuncEnv,
|
||||||
.initFunc = stddevFunctionSetup,
|
.initFunc = stddevFunctionSetup,
|
||||||
.processFunc = stddevFunction,
|
.processFunc = stddevFunction,
|
||||||
.finalizeFunc = stddevFinalize
|
.finalizeFunc = stddevFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "percentile",
|
.name = "percentile",
|
||||||
.type = FUNCTION_TYPE_PERCENTILE,
|
.type = FUNCTION_TYPE_PERCENTILE,
|
||||||
.classification = FUNC_MGT_AGG_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
.checkFunc = stubCheckAndGetResultType,
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
.getEnvFunc = getMinmaxFuncEnv,
|
.getEnvFunc = getPercentileFuncEnv,
|
||||||
.initFunc = maxFunctionSetup,
|
.initFunc = percentileFunctionSetup,
|
||||||
.processFunc = maxFunction,
|
.processFunc = percentileFunction,
|
||||||
.finalizeFunc = functionFinalize
|
.finalizeFunc = percentileFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "apercentile",
|
.name = "apercentile",
|
||||||
.type = FUNCTION_TYPE_APERCENTILE,
|
.type = FUNCTION_TYPE_APERCENTILE,
|
||||||
.classification = FUNC_MGT_AGG_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
.checkFunc = stubCheckAndGetResultType,
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
.getEnvFunc = getMinmaxFuncEnv,
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
.initFunc = maxFunctionSetup,
|
.initFunc = maxFunctionSetup,
|
||||||
.processFunc = maxFunction,
|
.processFunc = maxFunction,
|
||||||
.finalizeFunc = functionFinalize
|
.finalizeFunc = functionFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "top",
|
.name = "top",
|
||||||
.type = FUNCTION_TYPE_TOP,
|
.type = FUNCTION_TYPE_TOP,
|
||||||
.classification = FUNC_MGT_AGG_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
.checkFunc = stubCheckAndGetResultType,
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
.getEnvFunc = getMinmaxFuncEnv,
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
.initFunc = maxFunctionSetup,
|
.initFunc = maxFunctionSetup,
|
||||||
.processFunc = maxFunction,
|
.processFunc = maxFunction,
|
||||||
.finalizeFunc = functionFinalize
|
.finalizeFunc = functionFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "bottom",
|
.name = "bottom",
|
||||||
.type = FUNCTION_TYPE_BOTTOM,
|
.type = FUNCTION_TYPE_BOTTOM,
|
||||||
.classification = FUNC_MGT_AGG_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
.checkFunc = stubCheckAndGetResultType,
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
.getEnvFunc = getMinmaxFuncEnv,
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
.initFunc = maxFunctionSetup,
|
.initFunc = maxFunctionSetup,
|
||||||
.processFunc = maxFunction,
|
.processFunc = maxFunction,
|
||||||
.finalizeFunc = functionFinalize
|
.finalizeFunc = functionFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "spread",
|
.name = "spread",
|
||||||
.type = FUNCTION_TYPE_SPREAD,
|
.type = FUNCTION_TYPE_SPREAD,
|
||||||
.classification = FUNC_MGT_AGG_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
.checkFunc = stubCheckAndGetResultType,
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
.getEnvFunc = getMinmaxFuncEnv,
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
.initFunc = maxFunctionSetup,
|
.initFunc = maxFunctionSetup,
|
||||||
.processFunc = maxFunction,
|
.processFunc = maxFunction,
|
||||||
.finalizeFunc = functionFinalize
|
.finalizeFunc = functionFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "last_row",
|
.name = "last_row",
|
||||||
.type = FUNCTION_TYPE_LAST_ROW,
|
.type = FUNCTION_TYPE_LAST_ROW,
|
||||||
.classification = FUNC_MGT_AGG_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
.checkFunc = stubCheckAndGetResultType,
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
.getEnvFunc = getMinmaxFuncEnv,
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
.initFunc = maxFunctionSetup,
|
.initFunc = maxFunctionSetup,
|
||||||
.processFunc = maxFunction,
|
.processFunc = maxFunction,
|
||||||
.finalizeFunc = functionFinalize
|
.finalizeFunc = functionFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "first",
|
.name = "first",
|
||||||
|
@ -152,6 +152,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.processFunc = lastFunction,
|
.processFunc = lastFunction,
|
||||||
.finalizeFunc = functionFinalize
|
.finalizeFunc = functionFinalize
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
.name = "diff",
|
||||||
|
.type = FUNCTION_TYPE_DIFF,
|
||||||
|
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC,
|
||||||
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
|
.getEnvFunc = getDiffFuncEnv,
|
||||||
|
.initFunc = diffFunctionSetup,
|
||||||
|
.processFunc = diffFunction,
|
||||||
|
.finalizeFunc = functionFinalize
|
||||||
|
},
|
||||||
{
|
{
|
||||||
.name = "abs",
|
.name = "abs",
|
||||||
.type = FUNCTION_TYPE_ABS,
|
.type = FUNCTION_TYPE_ABS,
|
||||||
|
@ -377,7 +387,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.type = FUNCTION_TYPE_ROWTS,
|
.type = FUNCTION_TYPE_ROWTS,
|
||||||
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC,
|
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC,
|
||||||
.checkFunc = stubCheckAndGetResultType,
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
.getEnvFunc = NULL,
|
.getEnvFunc = getTimePseudoFuncEnv,
|
||||||
.initFunc = NULL,
|
.initFunc = NULL,
|
||||||
.sprocessFunc = NULL,
|
.sprocessFunc = NULL,
|
||||||
.finalizeFunc = NULL
|
.finalizeFunc = NULL
|
||||||
|
@ -459,9 +469,11 @@ const int32_t funcMgtBuiltinsNum = (sizeof(funcMgtBuiltins) / sizeof(SBuiltinFun
|
||||||
int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
||||||
switch(pFunc->funcType) {
|
switch(pFunc->funcType) {
|
||||||
case FUNCTION_TYPE_WDURATION:
|
case FUNCTION_TYPE_WDURATION:
|
||||||
case FUNCTION_TYPE_COUNT:
|
case FUNCTION_TYPE_COUNT: {
|
||||||
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
|
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
case FUNCTION_TYPE_SUM: {
|
case FUNCTION_TYPE_SUM: {
|
||||||
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
|
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
int32_t paraType = pParam->node.resType.type;
|
int32_t paraType = pParam->node.resType.type;
|
||||||
|
@ -480,6 +492,8 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
||||||
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType };
|
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType };
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case FUNCTION_TYPE_DIFF:
|
||||||
case FUNCTION_TYPE_FIRST:
|
case FUNCTION_TYPE_FIRST:
|
||||||
case FUNCTION_TYPE_LAST:
|
case FUNCTION_TYPE_LAST:
|
||||||
case FUNCTION_TYPE_MIN:
|
case FUNCTION_TYPE_MIN:
|
||||||
|
@ -490,10 +504,11 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case FUNCTION_TYPE_QENDTS:
|
case FUNCTION_TYPE_ROWTS:
|
||||||
case FUNCTION_TYPE_QSTARTTS:
|
case FUNCTION_TYPE_QSTARTTS:
|
||||||
case FUNCTION_TYPE_WENDTS:
|
case FUNCTION_TYPE_QENDTS:
|
||||||
case FUNCTION_TYPE_WSTARTTS: {
|
case FUNCTION_TYPE_WSTARTTS:
|
||||||
|
case FUNCTION_TYPE_WENDTS:{
|
||||||
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP};
|
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP};
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -508,6 +523,7 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case FUNCTION_TYPE_PERCENTILE:
|
||||||
case FUNCTION_TYPE_STDDEV:
|
case FUNCTION_TYPE_STDDEV:
|
||||||
case FUNCTION_TYPE_SIN:
|
case FUNCTION_TYPE_SIN:
|
||||||
case FUNCTION_TYPE_COS:
|
case FUNCTION_TYPE_COS:
|
||||||
|
@ -574,7 +590,6 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case FUNCTION_TYPE_ROWTS:
|
|
||||||
case FUNCTION_TYPE_TBNAME: {
|
case FUNCTION_TYPE_TBNAME: {
|
||||||
// todo
|
// todo
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -65,7 +65,7 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
* count function does need the finalize, if data is missing, the default value, which is 0, is used
|
* count function does need the finalize, if data is missing, the default value, which is 0, is used
|
||||||
* count function does not use the pCtx->interResBuf to keep the intermediate buffer
|
* count function does not use the pCtx->interResBuf to keep the intermediate buffer
|
||||||
*/
|
*/
|
||||||
void countFunction(SqlFunctionCtx *pCtx) {
|
int32_t countFunction(SqlFunctionCtx *pCtx) {
|
||||||
int32_t numOfElem = 0;
|
int32_t numOfElem = 0;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -111,7 +111,7 @@ void countFunction(SqlFunctionCtx *pCtx) {
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
void sumFunction(SqlFunctionCtx *pCtx) {
|
int32_t sumFunction(SqlFunctionCtx *pCtx) {
|
||||||
int32_t numOfElem = 0;
|
int32_t numOfElem = 0;
|
||||||
|
|
||||||
// Only the pre-computing information loaded and actual data does not loaded
|
// Only the pre-computing information loaded and actual data does not loaded
|
||||||
|
@ -432,12 +432,12 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
||||||
return numOfElems;
|
return numOfElems;
|
||||||
}
|
}
|
||||||
|
|
||||||
void minFunction(SqlFunctionCtx *pCtx) {
|
int32_t minFunction(SqlFunctionCtx *pCtx) {
|
||||||
int32_t numOfElems = doMinMaxHelper(pCtx, 1);
|
int32_t numOfElems = doMinMaxHelper(pCtx, 1);
|
||||||
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
void maxFunction(SqlFunctionCtx *pCtx) {
|
int32_t maxFunction(SqlFunctionCtx *pCtx) {
|
||||||
int32_t numOfElems = doMinMaxHelper(pCtx, 0);
|
int32_t numOfElems = doMinMaxHelper(pCtx, 0);
|
||||||
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
||||||
}
|
}
|
||||||
|
@ -475,12 +475,11 @@ bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void stddevFunction(SqlFunctionCtx* pCtx) {
|
int32_t stddevFunction(SqlFunctionCtx* pCtx) {
|
||||||
int32_t numOfElem = 0;
|
int32_t numOfElem = 0;
|
||||||
|
|
||||||
// Only the pre-computing information loaded and actual data does not loaded
|
// Only the pre-computing information loaded and actual data does not loaded
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
|
|
||||||
int32_t type = pInput->pData[0]->info.type;
|
int32_t type = pInput->pData[0]->info.type;
|
||||||
|
|
||||||
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
@ -601,6 +600,7 @@ void stddevFinalize(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SPercentileInfo {
|
typedef struct SPercentileInfo {
|
||||||
|
double result;
|
||||||
tMemBucket *pMemBucket;
|
tMemBucket *pMemBucket;
|
||||||
int32_t stage;
|
int32_t stage;
|
||||||
double minval;
|
double minval;
|
||||||
|
@ -627,19 +627,24 @@ bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultI
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void percentileFunction(SqlFunctionCtx *pCtx) {
|
int32_t percentileFunction(SqlFunctionCtx *pCtx) {
|
||||||
int32_t notNullElems = 0;
|
int32_t notNullElems = 0;
|
||||||
#if 0
|
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
|
||||||
SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];
|
||||||
|
|
||||||
|
SColumnInfoData *pCol = pInput->pData[0];
|
||||||
|
int32_t type = pCol->info.type;
|
||||||
|
|
||||||
|
SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
|
if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
|
||||||
pInfo->stage += 1;
|
pInfo->stage += 1;
|
||||||
|
|
||||||
// all data are null, set it completed
|
// all data are null, set it completed
|
||||||
if (pInfo->numOfElems == 0) {
|
if (pInfo->numOfElems == 0) {
|
||||||
pResInfo->complete = true;
|
pResInfo->complete = true;
|
||||||
return;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval);
|
pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval);
|
||||||
}
|
}
|
||||||
|
@ -647,19 +652,17 @@ void percentileFunction(SqlFunctionCtx *pCtx) {
|
||||||
|
|
||||||
// the first stage, only acquire the min/max value
|
// the first stage, only acquire the min/max value
|
||||||
if (pInfo->stage == 0) {
|
if (pInfo->stage == 0) {
|
||||||
if (pCtx->preAggVals.isSet) {
|
if (pCtx->input.colDataAggIsSet) {
|
||||||
double tmin = 0.0, tmax = 0.0;
|
double tmin = 0.0, tmax = 0.0;
|
||||||
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) {
|
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||||
tmin = (double)GET_INT64_VAL(&pCtx->preAggVals.statis.min);
|
tmin = (double)GET_INT64_VAL(&pAgg->min);
|
||||||
tmax = (double)GET_INT64_VAL(&pCtx->preAggVals.statis.max);
|
tmax = (double)GET_INT64_VAL(&pAgg->max);
|
||||||
} else if (IS_FLOAT_TYPE(pCtx->inputType)) {
|
} else if (IS_FLOAT_TYPE(type)) {
|
||||||
tmin = GET_DOUBLE_VAL(&pCtx->preAggVals.statis.min);
|
tmin = GET_DOUBLE_VAL(&pAgg->min);
|
||||||
tmax = GET_DOUBLE_VAL(&pCtx->preAggVals.statis.max);
|
tmax = GET_DOUBLE_VAL(&pAgg->max);
|
||||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) {
|
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||||
tmin = (double)GET_UINT64_VAL(&pCtx->preAggVals.statis.min);
|
tmin = (double)GET_UINT64_VAL(&pAgg->min);
|
||||||
tmax = (double)GET_UINT64_VAL(&pCtx->preAggVals.statis.max);
|
tmax = (double)GET_UINT64_VAL(&pAgg->max);
|
||||||
} else {
|
|
||||||
assert(true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (GET_DOUBLE_VAL(&pInfo->minval) > tmin) {
|
if (GET_DOUBLE_VAL(&pInfo->minval) > tmin) {
|
||||||
|
@ -670,17 +673,19 @@ void percentileFunction(SqlFunctionCtx *pCtx) {
|
||||||
SET_DOUBLE_VAL(&pInfo->maxval, tmax);
|
SET_DOUBLE_VAL(&pInfo->maxval, tmax);
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->numOfElems += (pCtx->size - pCtx->preAggVals.statis.numOfNull);
|
pInfo->numOfElems += (pInput->numOfRows - pAgg->numOfNull);
|
||||||
} else {
|
} else {
|
||||||
for (int32_t i = 0; i < pCtx->size; ++i) {
|
// check the valid data one by one
|
||||||
char *data = GET_INPUT_DATA(pCtx, i);
|
int32_t start = pInput->startRowIndex;
|
||||||
if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
|
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
||||||
|
if (colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char *data = colDataGetData(pCol, i);
|
||||||
|
|
||||||
double v = 0;
|
double v = 0;
|
||||||
GET_TYPED_DATA(v, double, pCtx->inputType, data);
|
GET_TYPED_DATA(v, double, pCtx->inputType, data);
|
||||||
|
|
||||||
if (v < GET_DOUBLE_VAL(&pInfo->minval)) {
|
if (v < GET_DOUBLE_VAL(&pInfo->minval)) {
|
||||||
SET_DOUBLE_VAL(&pInfo->minval, v);
|
SET_DOUBLE_VAL(&pInfo->minval, v);
|
||||||
}
|
}
|
||||||
|
@ -693,24 +698,40 @@ void percentileFunction(SqlFunctionCtx *pCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// the second stage, calculate the true percentile value
|
// the second stage, calculate the true percentile value
|
||||||
for (int32_t i = 0; i < pCtx->size; ++i) {
|
int32_t start = pInput->startRowIndex;
|
||||||
char *data = GET_INPUT_DATA(pCtx, i);
|
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
||||||
if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
|
if (colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char *data = colDataGetData(pCol, i);
|
||||||
|
|
||||||
notNullElems += 1;
|
notNullElems += 1;
|
||||||
tMemBucketPut(pInfo->pMemBucket, data, 1);
|
tMemBucketPut(pInfo->pMemBucket, data, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
SET_VAL(pCtx, notNullElems, 1);
|
SET_VAL(pResInfo, notNullElems, 1);
|
||||||
pResInfo->hasResult = DATA_SET_FLAG;
|
pResInfo->hasResult = DATA_SET_FLAG;
|
||||||
#endif
|
}
|
||||||
|
|
||||||
|
// TODO set the correct parameter.
|
||||||
|
void percentileFinalize(SqlFunctionCtx* pCtx) {
|
||||||
|
double v = 50;//pCtx->param[0].nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].i64 : pCtx->param[0].dKey;
|
||||||
|
|
||||||
|
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SPercentileInfo* ppInfo = (SPercentileInfo *) GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
tMemBucket * pMemBucket = ppInfo->pMemBucket;
|
||||||
|
if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null
|
||||||
|
SET_DOUBLE_VAL(&ppInfo->result, getPercentile(pMemBucket, v));
|
||||||
|
}
|
||||||
|
|
||||||
|
tMemBucketDestroy(pMemBucket);
|
||||||
|
functionFinalize(pCtx);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
|
@ -721,9 +742,9 @@ bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
|
|
||||||
// TODO fix this
|
// TODO fix this
|
||||||
// This ordinary first function only handle the data block in ascending order
|
// This ordinary first function only handle the data block in ascending order
|
||||||
void firstFunction(SqlFunctionCtx *pCtx) {
|
int32_t firstFunction(SqlFunctionCtx *pCtx) {
|
||||||
if (pCtx->order == TSDB_ORDER_DESC) {
|
if (pCtx->order == TSDB_ORDER_DESC) {
|
||||||
return;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfElems = 0;
|
int32_t numOfElems = 0;
|
||||||
|
@ -737,7 +758,7 @@ void firstFunction(SqlFunctionCtx *pCtx) {
|
||||||
// All null data column, return directly.
|
// All null data column, return directly.
|
||||||
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
|
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
|
||||||
ASSERT(pInputCol->hasNull == true);
|
ASSERT(pInputCol->hasNull == true);
|
||||||
return;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for the first not null data
|
// Check for the first not null data
|
||||||
|
@ -764,9 +785,9 @@ void firstFunction(SqlFunctionCtx *pCtx) {
|
||||||
SET_VAL(pResInfo, numOfElems, 1);
|
SET_VAL(pResInfo, numOfElems, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
void lastFunction(SqlFunctionCtx *pCtx) {
|
int32_t lastFunction(SqlFunctionCtx *pCtx) {
|
||||||
if (pCtx->order != TSDB_ORDER_DESC) {
|
if (pCtx->order != TSDB_ORDER_DESC) {
|
||||||
return;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfElems = 0;
|
int32_t numOfElems = 0;
|
||||||
|
@ -775,13 +796,12 @@ void lastFunction(SqlFunctionCtx *pCtx) {
|
||||||
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
|
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
|
||||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
|
||||||
// All null data column, return directly.
|
// All null data column, return directly.
|
||||||
if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) {
|
if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) {
|
||||||
ASSERT(pInputCol->hasNull == true);
|
ASSERT(pInputCol->hasNull == true);
|
||||||
return;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCtx->order == TSDB_ORDER_DESC) {
|
if (pCtx->order == TSDB_ORDER_DESC) {
|
||||||
|
@ -826,10 +846,242 @@ void lastFunction(SqlFunctionCtx *pCtx) {
|
||||||
SET_VAL(pResInfo, numOfElems, 1);
|
SET_VAL(pResInfo, numOfElems, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
void valFunction(SqlFunctionCtx *pCtx) {
|
typedef struct SDiffInfo {
|
||||||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
bool hasPrev;
|
||||||
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
|
bool includeNull;
|
||||||
|
bool ignoreNegative;
|
||||||
|
bool firstOutput;
|
||||||
|
union { int64_t i64; double d64;} prev;
|
||||||
|
} SDiffInfo;
|
||||||
|
|
||||||
SColumnInfoData* pInputCol = pCtx->input.pData[0];
|
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
memcpy(buf, pInputCol->pData, pInputCol->info.bytes);
|
pEnv->calcMemSize = sizeof(SDiffInfo);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo) {
|
||||||
|
if (!functionSetup(pCtx, pResInfo)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
pDiffInfo->hasPrev = false;
|
||||||
|
pDiffInfo->prev.i64 = 0;
|
||||||
|
pDiffInfo->ignoreNegative = false; // TODO set correct param
|
||||||
|
pDiffInfo->includeNull = false;
|
||||||
|
pDiffInfo->firstOutput = false;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t diffFunction(SqlFunctionCtx *pCtx) {
|
||||||
|
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SDiffInfo *pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
|
||||||
|
bool isFirstBlock = (pDiffInfo->hasPrev == false);
|
||||||
|
int32_t numOfElems = 0;
|
||||||
|
|
||||||
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
|
||||||
|
// int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
|
||||||
|
|
||||||
|
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||||
|
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||||
|
|
||||||
|
int32_t startOffset = pCtx->offset;
|
||||||
|
switch (pInputCol->info.type) {
|
||||||
|
case TSDB_DATA_TYPE_INT: {
|
||||||
|
SColumnInfoData *pOutput = (SColumnInfoData *)pCtx->pOutput;
|
||||||
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
|
||||||
|
|
||||||
|
int32_t pos = startOffset + (isFirstBlock? (numOfElems-1):numOfElems);
|
||||||
|
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||||
|
if (pDiffInfo->includeNull) {
|
||||||
|
colDataSetNull_f(pOutput->nullbitmap, pos);
|
||||||
|
if (tsList != NULL) {
|
||||||
|
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElems += 1;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t v = *(int32_t*) colDataGetData(pInputCol, i);
|
||||||
|
if (pDiffInfo->hasPrev) {
|
||||||
|
int32_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null
|
||||||
|
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
||||||
|
colDataSetNull_f(pOutput->nullbitmap, pos);
|
||||||
|
} else {
|
||||||
|
colDataAppendInt32(pOutput, pos, &delta);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsList != NULL) {
|
||||||
|
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pDiffInfo->prev.i64 = v;
|
||||||
|
pDiffInfo->hasPrev = true;
|
||||||
|
numOfElems++;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_BIGINT: {
|
||||||
|
SColumnInfoData *pOutput = (SColumnInfoData *)pCtx->pOutput;
|
||||||
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
|
||||||
|
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t v = 0;
|
||||||
|
if (pDiffInfo->hasPrev) {
|
||||||
|
v = *(int64_t*) colDataGetData(pInputCol, i);
|
||||||
|
int64_t delta = (int64_t)(v - pDiffInfo->prev.i64); // direct previous may be null
|
||||||
|
if (pDiffInfo->ignoreNegative) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// *(pOutput++) = delta;
|
||||||
|
// *pTimestamp = (tsList != NULL)? tsList[i]:0;
|
||||||
|
//
|
||||||
|
// pOutput += 1;
|
||||||
|
// pTimestamp += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDiffInfo->prev.i64 = v;
|
||||||
|
pDiffInfo->hasPrev = true;
|
||||||
|
numOfElems++;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
#if 0
|
||||||
|
case TSDB_DATA_TYPE_DOUBLE: {
|
||||||
|
double *pData = (double *)data;
|
||||||
|
double *pOutput = (double *)pCtx->pOutput;
|
||||||
|
|
||||||
|
for (; i < pCtx->size && i >= 0; i += step) {
|
||||||
|
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pDiffInfo->hasPrev) { // initial value is not set yet
|
||||||
|
SET_DOUBLE_VAL(pOutput, pData[i] - pDiffInfo->d64Prev); // direct previous may be null
|
||||||
|
*pTimestamp = (tsList != NULL)? tsList[i]:0;
|
||||||
|
pOutput += 1;
|
||||||
|
pTimestamp += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDiffInfo->d64Prev = pData[i];
|
||||||
|
pDiffInfo->hasPrev = true;
|
||||||
|
numOfElems++;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_FLOAT: {
|
||||||
|
float *pData = (float *)data;
|
||||||
|
float *pOutput = (float *)pCtx->pOutput;
|
||||||
|
|
||||||
|
for (; i < pCtx->size && i >= 0; i += step) {
|
||||||
|
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pDiffInfo->hasPrev) { // initial value is not set yet
|
||||||
|
*pOutput = (float)(pData[i] - pDiffInfo->d64Prev); // direct previous may be null
|
||||||
|
*pTimestamp = (tsList != NULL)? tsList[i]:0;
|
||||||
|
pOutput += 1;
|
||||||
|
pTimestamp += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDiffInfo->d64Prev = pData[i];
|
||||||
|
pDiffInfo->hasPrev = true;
|
||||||
|
numOfElems++;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT: {
|
||||||
|
int16_t *pData = (int16_t *)data;
|
||||||
|
int16_t *pOutput = (int16_t *)pCtx->pOutput;
|
||||||
|
|
||||||
|
for (; i < pCtx->size && i >= 0; i += step) {
|
||||||
|
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pDiffInfo->hasPrev) { // initial value is not set yet
|
||||||
|
*pOutput = (int16_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null
|
||||||
|
*pTimestamp = (tsList != NULL)? tsList[i]:0;
|
||||||
|
pOutput += 1;
|
||||||
|
pTimestamp += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDiffInfo->i64Prev = pData[i];
|
||||||
|
pDiffInfo->hasPrev = true;
|
||||||
|
numOfElems++;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_TINYINT: {
|
||||||
|
int8_t *pData = (int8_t *)data;
|
||||||
|
int8_t *pOutput = (int8_t *)pCtx->pOutput;
|
||||||
|
|
||||||
|
for (; i < pCtx->size && i >= 0; i += step) {
|
||||||
|
if (pCtx->hasNull && isNull((char *)&pData[i], pCtx->inputType)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pDiffInfo->hasPrev) { // initial value is not set yet
|
||||||
|
*pOutput = (int8_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null
|
||||||
|
*pTimestamp = (tsList != NULL)? tsList[i]:0;
|
||||||
|
pOutput += 1;
|
||||||
|
pTimestamp += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDiffInfo->i64Prev = pData[i];
|
||||||
|
pDiffInfo->hasPrev = true;
|
||||||
|
numOfElems++;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
// qError("error input type");
|
||||||
|
}
|
||||||
|
|
||||||
|
// initial value is not set yet
|
||||||
|
if (!pDiffInfo->hasPrev || numOfElems <= 0) {
|
||||||
|
/*
|
||||||
|
* 1. current block and blocks before are full of null
|
||||||
|
* 2. current block may be null value
|
||||||
|
*/
|
||||||
|
assert(pCtx->hasNull);
|
||||||
|
} else {
|
||||||
|
// for (int t = 0; t < pCtx->tagInfo.numOfTagCols; ++t) {
|
||||||
|
// SqlFunctionCtx* tagCtx = pCtx->tagInfo.pTagCtxList[t];
|
||||||
|
// if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) {
|
||||||
|
// aAggs[TSDB_FUNC_TAGPRJ].xFunction(tagCtx);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
int32_t forwardStep = (isFirstBlock) ? numOfElems - 1 : numOfElems;
|
||||||
|
return forwardStep;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -116,6 +116,11 @@ bool fmIsWindowClauseFunc(int32_t funcId) {
|
||||||
return fmIsAggFunc(funcId) || fmIsWindowPseudoColumnFunc(funcId);
|
return fmIsAggFunc(funcId) || fmIsWindowPseudoColumnFunc(funcId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool fmIsNonstandardSQLFunc(int32_t funcId) {
|
||||||
|
return isSpecificClassifyFunc(funcId, FUNC_MGT_NONSTANDARD_SQL_FUNC);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void fmFuncMgtDestroy() {
|
void fmFuncMgtDestroy() {
|
||||||
void* m = gFunMgtService.pFuncNameHashTable;
|
void* m = gFunMgtService.pFuncNameHashTable;
|
||||||
if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {
|
if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {
|
||||||
|
|
|
@ -1902,10 +1902,10 @@ static void copyTopBotRes(SqlFunctionCtx *pCtx, int32_t type) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the output timestamp of each record.
|
// set the output timestamp of each record.
|
||||||
TSKEY *output = pCtx->ptsOutputBuf;
|
// TSKEY *output = pCtx->pTsOutput;
|
||||||
for (int32_t i = 0; i < len; ++i, output += step) {
|
// for (int32_t i = 0; i < len; ++i, output += step) {
|
||||||
*output = tvp[i]->timestamp;
|
// *output = tvp[i]->timestamp;
|
||||||
}
|
// }
|
||||||
|
|
||||||
// set the corresponding tag data for each record
|
// set the corresponding tag data for each record
|
||||||
// todo check malloc failure
|
// todo check malloc failure
|
||||||
|
@ -2687,7 +2687,7 @@ static void deriv_function(SqlFunctionCtx *pCtx) {
|
||||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
|
||||||
int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
|
int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
|
||||||
|
|
||||||
TSKEY *pTimestamp = pCtx->ptsOutputBuf;
|
TSKEY *pTimestamp = NULL;//pCtx->pTsOutput;
|
||||||
TSKEY *tsList = GET_TS_LIST(pCtx);
|
TSKEY *tsList = GET_TS_LIST(pCtx);
|
||||||
|
|
||||||
double *pOutput = (double *)pCtx->pOutput;
|
double *pOutput = (double *)pCtx->pOutput;
|
||||||
|
@ -2867,7 +2867,7 @@ static void deriv_function(SqlFunctionCtx *pCtx) {
|
||||||
} else { \
|
} else { \
|
||||||
*(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i)); \
|
*(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i)); \
|
||||||
*(type *)(&(ctx)->param[1].i) = *(type *)(d); \
|
*(type *)(&(ctx)->param[1].i) = *(type *)(d); \
|
||||||
*(int64_t *)(ctx)->ptsOutputBuf = GET_TS_DATA(ctx, index); \
|
*(int64_t *)(ctx)->pTsOutput = GET_TS_DATA(ctx, index); \
|
||||||
} \
|
} \
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
@ -2881,7 +2881,7 @@ static void diff_function(SqlFunctionCtx *pCtx) {
|
||||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
|
||||||
int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
|
int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
|
||||||
|
|
||||||
TSKEY* pTimestamp = pCtx->ptsOutputBuf;
|
TSKEY* pTimestamp = NULL;//pCtx->pTsOutput;
|
||||||
TSKEY* tsList = GET_TS_LIST(pCtx);
|
TSKEY* tsList = GET_TS_LIST(pCtx);
|
||||||
|
|
||||||
switch (pCtx->inputType) {
|
switch (pCtx->inputType) {
|
||||||
|
|
|
@ -687,6 +687,7 @@ int32_t charLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
|
||||||
return doLengthFunction(pInput, inputNum, pOutput, tcharlength);
|
return doLengthFunction(pInput, inputNum, pOutput, tcharlength);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) {
|
static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) {
|
||||||
switch(type) {
|
switch(type) {
|
||||||
case TSDB_DATA_TYPE_TINYINT:
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
|
@ -751,6 +752,7 @@ static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOf
|
||||||
default: assert(0);
|
default: assert(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
bool getTimePseudoFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
bool getTimePseudoFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(int64_t);
|
pEnv->calcMemSize = sizeof(int64_t);
|
||||||
|
|
|
@ -284,14 +284,15 @@ print ====> select count(*),first(tagtype),last(tagtype),avg(tagtype),sum(tagtyp
|
||||||
# return -1
|
# return -1
|
||||||
#endi
|
#endi
|
||||||
|
|
||||||
sql_error select * from dev_001 session(ts,1w)
|
print ================> syntax error check not active ================> reactive
|
||||||
sql_error select count(*) from st session(ts,1w)
|
#sql_error select * from dev_001 session(ts,1w)
|
||||||
sql_error select count(*) from dev_001 group by tagtype session(ts,1w)
|
#sql_error select count(*) from st session(ts,1w)
|
||||||
sql_error select count(*) from dev_001 session(ts,1n)
|
#sql_error select count(*) from dev_001 group by tagtype session(ts,1w)
|
||||||
sql_error select count(*) from dev_001 session(ts,1y)
|
#sql_error select count(*) from dev_001 session(ts,1n)
|
||||||
sql_error select count(*) from dev_001 session(ts,0s)
|
#sql_error select count(*) from dev_001 session(ts,1y)
|
||||||
sql_error select count(*) from dev_001 session(i,1y)
|
#sql_error select count(*) from dev_001 session(ts,0s)
|
||||||
sql_error select count(*) from dev_001 session(ts,1d) where ts <'2020-05-20 0:0:0'
|
#sql_error select count(*) from dev_001 session(i,1y)
|
||||||
|
#sql_error select count(*) from dev_001 session(ts,1d) where ts <'2020-05-20 0:0:0'
|
||||||
|
|
||||||
print ====> create database d1 precision 'us'
|
print ====> create database d1 precision 'us'
|
||||||
sql create database d1 precision 'us'
|
sql create database d1 precision 'us'
|
||||||
|
@ -299,17 +300,19 @@ sql use d1
|
||||||
sql create table dev_001 (ts timestamp ,i timestamp ,j int)
|
sql create table dev_001 (ts timestamp ,i timestamp ,j int)
|
||||||
sql insert into dev_001 values(1623046993681000,now,1)(1623046993681001,now+1s,2)(1623046993681002,now+2s,3)(1623046993681004,now+5s,4)
|
sql insert into dev_001 values(1623046993681000,now,1)(1623046993681001,now+1s,2)(1623046993681002,now+2s,3)(1623046993681004,now+5s,4)
|
||||||
print ====> select count(*) from dev_001 session(ts,1u)
|
print ====> select count(*) from dev_001 session(ts,1u)
|
||||||
sql select count(*) from dev_001 session(ts,1u)
|
sql select _wstartts, count(*) from dev_001 session(ts,1u)
|
||||||
if $rows != 2 then
|
if $rows != 2 then
|
||||||
|
print expect 2, actual: $rows
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data01 != 3 then
|
if $data01 != 3 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
sql_error select count(*) from dev_001 session(i,1s)
|
|
||||||
sql create table secondts(ts timestamp,t2 timestamp,i int)
|
|
||||||
sql_error select count(*) from secondts session(t2,2s)
|
|
||||||
|
|
||||||
|
#sql_error select count(*) from dev_001 session(i,1s)
|
||||||
|
#sql create table secondts(ts timestamp,t2 timestamp,i int)
|
||||||
|
#sql_error select count(*) from secondts session(t2,2s)
|
||||||
|
|
||||||
if $loop_test == 0 then
|
if $loop_test == 0 then
|
||||||
print =============== stop and restart taosd
|
print =============== stop and restart taosd
|
||||||
|
|
Loading…
Reference in New Issue