refactor(query): do some internal refactor.
This commit is contained in:
parent
978b369e8b
commit
4b36763429
|
@ -37,7 +37,7 @@ typedef struct SFuncExecEnv {
|
||||||
typedef bool (*FExecGetEnv)(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
typedef bool (*FExecGetEnv)(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
|
typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
|
||||||
typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
|
typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
|
||||||
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock, int32_t slotId);
|
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
|
||||||
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
|
||||||
typedef struct SScalarFuncExecFuncs {
|
typedef struct SScalarFuncExecFuncs {
|
||||||
|
@ -141,8 +141,7 @@ struct SResultRowEntryInfo;
|
||||||
|
|
||||||
//for selectivity query, the corresponding tag value is assigned if the data is qualified
|
//for selectivity query, the corresponding tag value is assigned if the data is qualified
|
||||||
typedef struct SSubsidiaryResInfo {
|
typedef struct SSubsidiaryResInfo {
|
||||||
int16_t bufLen; // keep the tags data for top/bottom query result
|
int16_t num;
|
||||||
int16_t numOfCols;
|
|
||||||
struct SqlFunctionCtx **pCtx;
|
struct SqlFunctionCtx **pCtx;
|
||||||
} SSubsidiaryResInfo;
|
} SSubsidiaryResInfo;
|
||||||
|
|
||||||
|
@ -187,8 +186,8 @@ typedef struct SqlFunctionCtx {
|
||||||
uint8_t currentStage; // record current running step, default: 0
|
uint8_t currentStage; // record current running step, default: 0
|
||||||
bool isAggSet;
|
bool isAggSet;
|
||||||
int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it
|
int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it
|
||||||
/////////////////////////////////////////////////////////////////
|
|
||||||
bool stableQuery;
|
bool stableQuery;
|
||||||
|
/////////////////////////////////////////////////////////////////
|
||||||
int16_t functionId; // function id
|
int16_t functionId; // function id
|
||||||
char * pOutput; // final result output buffer, point to sdata->data
|
char * pOutput; // final result output buffer, point to sdata->data
|
||||||
int32_t numOfParams;
|
int32_t numOfParams;
|
||||||
|
@ -198,11 +197,15 @@ typedef struct SqlFunctionCtx {
|
||||||
int32_t offset;
|
int32_t offset;
|
||||||
SVariant tag;
|
SVariant tag;
|
||||||
struct SResultRowEntryInfo *resultInfo;
|
struct SResultRowEntryInfo *resultInfo;
|
||||||
SSubsidiaryResInfo subsidiaryRes;
|
SSubsidiaryResInfo subsidiaries;
|
||||||
SPoint1 start;
|
SPoint1 start;
|
||||||
SPoint1 end;
|
SPoint1 end;
|
||||||
SFuncExecFuncs fpSet;
|
SFuncExecFuncs fpSet;
|
||||||
SScalarFuncExecFuncs sfp;
|
SScalarFuncExecFuncs sfp;
|
||||||
|
SExprInfo *pExpr;
|
||||||
|
struct SDiskbasedBuf *pBuf;
|
||||||
|
struct SSDataBlock *pSrcBlock;
|
||||||
|
int32_t curBufPage;
|
||||||
} SqlFunctionCtx;
|
} SqlFunctionCtx;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
|
|
@ -106,8 +106,8 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
|
||||||
}
|
}
|
||||||
|
|
||||||
char* key = getClusterKey(user, secretEncrypt, ip, port);
|
char* key = getClusterKey(user, secretEncrypt, ip, port);
|
||||||
SAppInstInfo** pInst = NULL;
|
|
||||||
|
|
||||||
|
SAppInstInfo** pInst = NULL;
|
||||||
taosThreadMutexLock(&appInfo.mutex);
|
taosThreadMutexLock(&appInfo.mutex);
|
||||||
|
|
||||||
pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
|
pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
|
||||||
|
|
|
@ -1084,6 +1084,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
|
||||||
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
||||||
pCtx[i].order = order;
|
pCtx[i].order = order;
|
||||||
pCtx[i].size = pBlock->info.rows;
|
pCtx[i].size = pBlock->info.rows;
|
||||||
|
pCtx[i].pSrcBlock = pBlock;
|
||||||
pCtx[i].currentStage = MAIN_SCAN;
|
pCtx[i].currentStage = MAIN_SCAN;
|
||||||
|
|
||||||
SInputColumnInfoData* pInput = &pCtx[i].input;
|
SInputColumnInfoData* pInput = &pCtx[i].input;
|
||||||
|
@ -1836,9 +1837,8 @@ static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
p->subsidiaryRes.pCtx = pTagCtx;
|
p->subsidiaries.pCtx = pTagCtx;
|
||||||
p->subsidiaryRes.numOfCols = num;
|
p->subsidiaries.num = num;
|
||||||
p->subsidiaryRes.bufLen = tagLen;
|
|
||||||
} else {
|
} else {
|
||||||
taosMemoryFreeClear(pTagCtx);
|
taosMemoryFreeClear(pTagCtx);
|
||||||
}
|
}
|
||||||
|
@ -1865,6 +1865,9 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
||||||
SqlFunctionCtx* pCtx = &pFuncCtx[i];
|
SqlFunctionCtx* pCtx = &pFuncCtx[i];
|
||||||
|
|
||||||
pCtx->functionId = -1;
|
pCtx->functionId = -1;
|
||||||
|
pCtx->curBufPage = -1;
|
||||||
|
pCtx->pExpr = pExpr;
|
||||||
|
|
||||||
if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) {
|
if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) {
|
||||||
SFuncExecEnv env = {0};
|
SFuncExecEnv env = {0};
|
||||||
pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
|
pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
|
||||||
|
@ -1953,7 +1956,7 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosVariantDestroy(&pCtx[i].tag);
|
taosVariantDestroy(&pCtx[i].tag);
|
||||||
taosMemoryFreeClear(pCtx[i].subsidiaryRes.pCtx);
|
taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFreeClear(pCtx);
|
taosMemoryFreeClear(pCtx);
|
||||||
|
@ -3139,7 +3142,7 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
|
||||||
|
|
||||||
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset);
|
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset);
|
||||||
if (pCtx[j].fpSet.process) {
|
if (pCtx[j].fpSet.process) {
|
||||||
pCtx[j].fpSet.finalize(&pCtx[j], pBlock, slotId);
|
pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
||||||
} else {
|
} else {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
@ -5633,6 +5636,11 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf
|
||||||
pBasicInfo->pRes = pResultBlock;
|
pBasicInfo->pRes = pResultBlock;
|
||||||
|
|
||||||
doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey);
|
doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey);
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
pBasicInfo->pCtx[i].pBuf = pAggSup->pResultBuf;
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@ extern "C" {
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
|
|
||||||
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
|
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
|
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
|
||||||
bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
@ -43,12 +43,12 @@ int32_t maxFunction(SqlFunctionCtx *pCtx);
|
||||||
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t stddevFunction(SqlFunctionCtx* pCtx);
|
int32_t stddevFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
|
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t percentileFunction(SqlFunctionCtx *pCtx);
|
int32_t percentileFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
|
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
|
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
|
||||||
|
@ -60,7 +60,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx);
|
||||||
|
|
||||||
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
|
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
|
||||||
int32_t topFunction(SqlFunctionCtx *pCtx);
|
int32_t topFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
|
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "builtinsimpl.h"
|
#include "builtinsimpl.h"
|
||||||
#include <libs/nodes/querynodes.h>
|
#include "function.h"
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
#include "taggfunction.h"
|
#include "taggfunction.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
@ -49,7 +49,8 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
|
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
@ -305,8 +306,8 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
|
|
||||||
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
|
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
|
||||||
do { \
|
do { \
|
||||||
for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \
|
for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \
|
||||||
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
|
SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i]; \
|
||||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
|
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
|
||||||
__ctx->tag.i = (ts); \
|
__ctx->tag.i = (ts); \
|
||||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
|
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
|
||||||
|
@ -378,8 +379,8 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
||||||
int64_t val = GET_INT64_VAL(tval);
|
int64_t val = GET_INT64_VAL(tval);
|
||||||
if ((prev < val) ^ isMinFunc) {
|
if ((prev < val) ^ isMinFunc) {
|
||||||
*(int64_t*) buf = val;
|
*(int64_t*) buf = val;
|
||||||
for (int32_t i = 0; i < (pCtx)->subsidiaryRes.numOfCols; ++i) {
|
for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
|
||||||
SqlFunctionCtx* __ctx = pCtx->subsidiaryRes.pCtx[i];
|
SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
|
||||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
|
if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
|
||||||
__ctx->tag.i = key;
|
__ctx->tag.i = key;
|
||||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
|
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
@ -395,8 +396,8 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
||||||
uint64_t val = GET_UINT64_VAL(tval);
|
uint64_t val = GET_UINT64_VAL(tval);
|
||||||
if ((prev < val) ^ isMinFunc) {
|
if ((prev < val) ^ isMinFunc) {
|
||||||
*(uint64_t*) buf = val;
|
*(uint64_t*) buf = val;
|
||||||
for (int32_t i = 0; i < (pCtx)->subsidiaryRes.numOfCols; ++i) {
|
for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
|
||||||
SqlFunctionCtx* __ctx = pCtx->subsidiaryRes.pCtx[i];
|
SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
|
||||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
|
if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
|
||||||
__ctx->tag.i = key;
|
__ctx->tag.i = key;
|
||||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
|
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
@ -618,11 +619,11 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
|
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
double avg = pStddevRes->isum / ((double) pStddevRes->count);
|
double avg = pStddevRes->isum / ((double) pStddevRes->count);
|
||||||
pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg);
|
pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg);
|
||||||
return functionFinalize(pCtx, pBlock, slotId);
|
return functionFinalize(pCtx, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SPercentileInfo {
|
typedef struct SPercentileInfo {
|
||||||
|
@ -744,7 +745,7 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
|
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SVariant* pVal = &pCtx->param[1].param;
|
SVariant* pVal = &pCtx->param[1].param;
|
||||||
double v = pVal->nType == TSDB_DATA_TYPE_INT ? pVal->i : pVal->d;
|
double v = pVal->nType == TSDB_DATA_TYPE_INT ? pVal->i : pVal->d;
|
||||||
|
|
||||||
|
@ -757,7 +758,7 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t sl
|
||||||
}
|
}
|
||||||
|
|
||||||
tMemBucketDestroy(pMemBucket);
|
tMemBucketDestroy(pMemBucket);
|
||||||
return functionFinalize(pCtx, pBlock, slotId);
|
return functionFinalize(pCtx, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
|
@ -1178,7 +1179,7 @@ typedef struct STopBotResItem {
|
||||||
} STopBotResItem;
|
} STopBotResItem;
|
||||||
|
|
||||||
typedef struct STopBotRes {
|
typedef struct STopBotRes {
|
||||||
int32_t pageId;
|
// int32_t pageId;
|
||||||
// int32_t num;
|
// int32_t num;
|
||||||
STopBotResItem *pItems;
|
STopBotResItem *pItems;
|
||||||
} STopBotRes;
|
} STopBotRes;
|
||||||
|
@ -1197,15 +1198,16 @@ static STopBotRes *getTopBotOutputInfo(SqlFunctionCtx *pCtx) {
|
||||||
return pRes;
|
return pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doAddIntoResult(STopBotRes* pRes, int32_t maxSize, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock,
|
static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock,
|
||||||
uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo);
|
uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo);
|
||||||
|
|
||||||
|
static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem);
|
||||||
|
static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem);
|
||||||
|
|
||||||
int32_t topFunction(SqlFunctionCtx *pCtx) {
|
int32_t topFunction(SqlFunctionCtx *pCtx) {
|
||||||
int32_t numOfElems = 0;
|
int32_t numOfElems = 0;
|
||||||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
STopBotRes *pRes = getTopBotOutputInfo(pCtx);
|
|
||||||
|
|
||||||
// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
|
// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
|
||||||
// buildTopBotStruct(pRes, pCtx);
|
// buildTopBotStruct(pRes, pCtx);
|
||||||
// }
|
// }
|
||||||
|
@ -1225,7 +1227,7 @@ int32_t topFunction(SqlFunctionCtx *pCtx) {
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
|
|
||||||
char* data = colDataGetData(pCol, i);
|
char* data = colDataGetData(pCol, i);
|
||||||
doAddIntoResult(pRes, pCtx->param[1].param.i, data, i, NULL, type, pInput->uid, pResInfo);
|
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1258,9 +1260,11 @@ static int32_t topBotResComparFn(const void *p1, const void *p2, const void *par
|
||||||
return (val1->v.d > val2->v.d) ? 1 : -1;
|
return (val1->v.d > val2->v.d) ? 1 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
||||||
void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
|
||||||
uint64_t uid, SResultRowEntryInfo* pEntryInfo) {
|
uint64_t uid, SResultRowEntryInfo* pEntryInfo) {
|
||||||
|
STopBotRes *pRes = getTopBotOutputInfo(pCtx);
|
||||||
|
int32_t maxSize = pCtx->param[1].param.i;
|
||||||
|
|
||||||
SVariant val = {0};
|
SVariant val = {0};
|
||||||
taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type);
|
taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type);
|
||||||
|
|
||||||
|
@ -1272,22 +1276,9 @@ void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, int32_t row
|
||||||
STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
|
STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
|
||||||
pItem->v = val;
|
pItem->v = val;
|
||||||
pItem->uid = uid;
|
pItem->uid = uid;
|
||||||
pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer
|
|
||||||
|
|
||||||
if (pRes->pageId == -1) {
|
// save the data of this tuple
|
||||||
SFilePage* pPage = getNewBufPage(NULL, 0, &pRes->pageId);
|
saveTupleData(pCtx, rowIndex, pSrcBlock, pItem);
|
||||||
pPage->num = sizeof(SFilePage);
|
|
||||||
|
|
||||||
// keep the current row data
|
|
||||||
for(int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
|
|
||||||
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
|
|
||||||
bool isNull = colDataIsNull_s(pCol, rowIndex);
|
|
||||||
|
|
||||||
|
|
||||||
colDataGetData(pCol, rowIndex);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// allocate the buffer and keep the data of this row into the new allocated buffer
|
// allocate the buffer and keep the data of this row into the new allocated buffer
|
||||||
pEntryInfo->numOfRes++;
|
pEntryInfo->numOfRes++;
|
||||||
|
@ -1296,22 +1287,100 @@ void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, int32_t row
|
||||||
if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) ||
|
if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) ||
|
||||||
(IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) ||
|
(IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) ||
|
||||||
(IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) {
|
(IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) {
|
||||||
|
// replace the old data and the coresponding tuple data
|
||||||
STopBotResItem* pItem = &pItems[0];
|
STopBotResItem* pItem = &pItems[0];
|
||||||
pItem->v = val;
|
pItem->v = val;
|
||||||
pItem->uid = uid;
|
pItem->uid = uid;
|
||||||
pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer
|
|
||||||
|
// save the data of this tuple by over writing the old data
|
||||||
|
copyTupleData(pCtx, rowIndex, pSrcBlock, pItem);
|
||||||
|
|
||||||
taosheapadjust((void *) pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void *) &type, topBotResComparFn, NULL, false);
|
taosheapadjust((void *) pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void *) &type, topBotResComparFn, NULL, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
|
void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) {
|
||||||
|
SFilePage* pPage = NULL;
|
||||||
|
|
||||||
|
int32_t completeRowSize = pSrcBlock->info.rowSize + pSrcBlock->info.numOfCols * sizeof(bool);
|
||||||
|
|
||||||
|
if (pCtx->curBufPage == -1) {
|
||||||
|
pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
|
||||||
|
pPage->num = sizeof(SFilePage);
|
||||||
|
} else {
|
||||||
|
pPage = getBufPage(pCtx->pBuf, pCtx->curBufPage);
|
||||||
|
if (pPage->num + completeRowSize > getBufPageSize(pCtx->pBuf)) {
|
||||||
|
pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
|
||||||
|
pPage->num = sizeof(SFilePage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pItem->tuplePos.pageId = pCtx->curBufPage;
|
||||||
|
|
||||||
|
// keep the current row data, extract method
|
||||||
|
int32_t offset = 0;
|
||||||
|
bool* nullList = (bool*)((char*)pPage + pPage->num);
|
||||||
|
char* pStart = (char*)(nullList + sizeof(bool) * pSrcBlock->info.numOfCols);
|
||||||
|
for (int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
|
||||||
|
bool isNull = colDataIsNull_s(pCol, rowIndex);
|
||||||
|
if (isNull) {
|
||||||
|
nullList[i] = true;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* p = colDataGetData(pCol, rowIndex);
|
||||||
|
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||||
|
memcpy(pStart + offset, p, varDataTLen(p));
|
||||||
|
} else {
|
||||||
|
memcpy(pStart + offset, p, pCol->info.bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
offset += pCol->info.bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
pItem->tuplePos.offset = pPage->num;
|
||||||
|
pPage->num += completeRowSize;
|
||||||
|
|
||||||
|
setBufPageDirty(pPage, true);
|
||||||
|
releaseBufPage(pCtx->pBuf, pPage);
|
||||||
|
}
|
||||||
|
|
||||||
|
void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) {
|
||||||
|
SFilePage* pPage = getBufPage(pCtx->pBuf, pItem->tuplePos.pageId);
|
||||||
|
|
||||||
|
bool* nullList = (bool*)((char*)pPage + pItem->tuplePos.offset);
|
||||||
|
char* pStart = (char*)(nullList + pSrcBlock->info.numOfCols * sizeof(bool));
|
||||||
|
|
||||||
|
int32_t offset = 0;
|
||||||
|
for(int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
|
||||||
|
if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* p = colDataGetData(pCol, rowIndex);
|
||||||
|
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||||
|
memcpy(pStart + offset, p, varDataTLen(p));
|
||||||
|
} else {
|
||||||
|
memcpy(pStart + offset, p, pCol->info.bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
offset += pCol->info.bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
setBufPageDirty(pPage, true);
|
||||||
|
releaseBufPage(pCtx->pBuf, pPage);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SResultRowEntryInfo *pEntryInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo *pEntryInfo = GET_RES_INFO(pCtx);
|
||||||
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||||
pEntryInfo->complete = true;
|
pEntryInfo->complete = true;
|
||||||
|
|
||||||
int32_t type = pCtx->input.pData[0]->info.type;
|
int32_t type = pCtx->input.pData[0]->info.type;
|
||||||
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
// todo assign the tag value and the corresponding row data
|
// todo assign the tag value and the corresponding row data
|
||||||
|
@ -1320,19 +1389,45 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId
|
||||||
case TSDB_DATA_TYPE_INT: {
|
case TSDB_DATA_TYPE_INT: {
|
||||||
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
|
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
|
||||||
STopBotResItem* pItem = &pRes->pItems[i];
|
STopBotResItem* pItem = &pRes->pItems[i];
|
||||||
colDataAppendInt32(pCol, currentRow++, (int32_t*)&pItem->v.i);
|
colDataAppendInt32(pCol, currentRow, (int32_t*)&pItem->v.i);
|
||||||
|
|
||||||
int32_t pageId = pItem->tuplePos.pageId;
|
int32_t pageId = pItem->tuplePos.pageId;
|
||||||
int32_t offset = pItem->tuplePos.offset;
|
int32_t offset = pItem->tuplePos.offset;
|
||||||
if (pageId != -1) {
|
if (pItem->tuplePos.pageId != -1) {
|
||||||
// todo
|
SFilePage* pPage = getBufPage(pCtx->pBuf, pageId);
|
||||||
|
|
||||||
|
bool* nullList = (bool*)((char*)pPage + offset);
|
||||||
|
char* pStart = (char*)(nullList + pCtx->pSrcBlock->info.numOfCols * sizeof(bool));
|
||||||
|
|
||||||
|
// todo set the offset value to optimize the performance.
|
||||||
|
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||||
|
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||||
|
|
||||||
|
SFunctParam *pFuncParam = &pc->pExpr->base.pParam[0];
|
||||||
|
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
||||||
|
int32_t dstSlotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
|
||||||
|
int32_t ps = 0;
|
||||||
|
for(int32_t k = 0; k < srcSlotId; ++k) {
|
||||||
|
SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k);
|
||||||
|
ps += pSrcCol->info.bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||||
|
if (nullList[srcSlotId]) {
|
||||||
|
colDataAppendNULL(pDstCol, currentRow);
|
||||||
|
} else {
|
||||||
|
colDataAppend(pDstCol, currentRow, (pStart + ps), false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
currentRow += 1;
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return pEntryInfo->numOfRes;
|
return pEntryInfo->numOfRes;
|
||||||
|
|
||||||
// return functionFinalize(pCtx, pBlock, slotId);
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue