refactor(query): do some internal refactor.
This commit is contained in:
parent
46a3a58fac
commit
e14142d114
|
@ -166,6 +166,7 @@ typedef struct SInputColumnInfoData {
|
|||
SColumnInfoData *pPTS; // primary timestamp column
|
||||
SColumnInfoData **pData;
|
||||
SColumnDataAgg **pColumnDataAgg;
|
||||
uint64_t uid; // table uid
|
||||
} SInputColumnInfoData;
|
||||
|
||||
// sql function runtime context
|
||||
|
|
|
@ -77,7 +77,7 @@ void *taosbsearch(const void *key, const void *base, int64_t nmemb, int64_t size
|
|||
* @return
|
||||
*/
|
||||
void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const void *parcompar,
|
||||
__ext_compar_fn_t compar, const void *parswap, __ext_swap_fn_t swap, bool maxroot);
|
||||
__ext_compar_fn_t compar, char* buf, bool maxroot);
|
||||
|
||||
/**
|
||||
* sort heap to make sure it is a max/min root heap
|
||||
|
@ -93,7 +93,7 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const
|
|||
* @return
|
||||
*/
|
||||
void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar, __ext_compar_fn_t compar,
|
||||
const void *parswap, __ext_swap_fn_t swap, bool maxroot);
|
||||
bool maxroot);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -479,7 +479,7 @@ typedef struct {
|
|||
|
||||
typedef struct SGroupbyOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
SArray* pGroupCols;
|
||||
SArray* pGroupCols; // group by columns, SArray<SColumn>
|
||||
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
|
||||
SNode* pCondition;
|
||||
bool isInit; // denote if current val is initialized or not
|
||||
|
|
|
@ -193,6 +193,12 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
// pseudo column do not need to check parameters
|
||||
pFunc->node.resType = (SDataType){.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR};
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
// todo
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -872,7 +878,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.name = "tbname",
|
||||
.type = FUNCTION_TYPE_TBNAME,
|
||||
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC,
|
||||
.translateFunc = NULL,
|
||||
.translateFunc = translateTbnameColumn,
|
||||
.getEnvFunc = NULL,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = NULL,
|
||||
|
|
|
@ -472,17 +472,6 @@ int32_t maxFunction(SqlFunctionCtx *pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
typedef struct STopBotRes {
|
||||
int32_t num;
|
||||
} STopBotRes;
|
||||
|
||||
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
SColumnNode* pColNode = (SColumnNode*) nodesListGetNode(pFunc->pParameterList, 0);
|
||||
int32_t bytes = pColNode->node.resType.bytes;
|
||||
SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1);
|
||||
return true;
|
||||
}
|
||||
|
||||
typedef struct SStddevRes {
|
||||
double result;
|
||||
int64_t count;
|
||||
|
@ -523,7 +512,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
|
|||
switch (type) {
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
int8_t* plist = (int8_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
for (int32_t i = start; i < numOfRows + start; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
@ -1173,3 +1162,120 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
|
|||
}
|
||||
}
|
||||
|
||||
typedef struct STopBotResItem {
|
||||
SVariant v;
|
||||
uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data
|
||||
struct {
|
||||
int32_t pageId;
|
||||
int32_t offset;
|
||||
} tuplePos; // tuple data of this chosen row
|
||||
} STopBotResItem;
|
||||
|
||||
typedef struct STopBotRes {
|
||||
int32_t num;
|
||||
STopBotResItem *pItems;
|
||||
} STopBotRes;
|
||||
|
||||
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
SColumnNode* pColNode = (SColumnNode*) nodesListGetNode(pFunc->pParameterList, 0);
|
||||
int32_t bytes = pColNode->node.resType.bytes;
|
||||
SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1);
|
||||
return true;
|
||||
}
|
||||
|
||||
static STopBotRes *getTopBotOutputInfo(SqlFunctionCtx *pCtx) {
|
||||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
return GET_ROWCELL_INTERBUF(pResInfo);
|
||||
}
|
||||
|
||||
static void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, uint16_t type, uint64_t uid);
|
||||
|
||||
static void topFunction(SqlFunctionCtx *pCtx) {
|
||||
int32_t numOfElems = 0;
|
||||
|
||||
STopBotRes *pRes = getTopBotOutputInfo(pCtx);
|
||||
assert(pRes->num >= 0);
|
||||
|
||||
// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
|
||||
// buildTopBotStruct(pRes, pCtx);
|
||||
// }
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
|
||||
int32_t type = pInput->pData[0]->info.type;
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
int32_t numOfRows = pInput->numOfRows;
|
||||
|
||||
for (int32_t i = start; i < numOfRows + start; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
numOfElems++;
|
||||
|
||||
char* data = colDataGetData(pCol, i);
|
||||
doAddIntoResult(pRes, pCtx->param[0].i, data, type, pInput->uid);
|
||||
}
|
||||
|
||||
// treat the result as only one result
|
||||
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
||||
}
|
||||
|
||||
static int32_t topBotResComparFn(const void *p1, const void *p2, const void *param) {
|
||||
uint16_t type = *(uint16_t *) param;
|
||||
|
||||
STopBotResItem *val1 = (STopBotResItem *) p1;
|
||||
STopBotResItem *val2 = (STopBotResItem *) p2;
|
||||
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||
if (val1->v.i == val2->v.i) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return (val1->v.i > val2->v.i) ? 1 : -1;
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||
if (val1->v.u == val2->v.u) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return (val1->v.u > val2->v.u) ? 1 : -1;
|
||||
}
|
||||
|
||||
if (val1->v.d == val2->v.d) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return (val1->v.d > val2->v.d) ? 1 : -1;
|
||||
}
|
||||
|
||||
void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, uint16_t type, uint64_t uid) {
|
||||
SVariant val = {0};
|
||||
taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type);
|
||||
|
||||
STopBotResItem *pItems = pRes->pItems;
|
||||
assert(pItems != NULL);
|
||||
|
||||
// not full yet
|
||||
if (pRes->num < maxSize) {
|
||||
STopBotResItem* pItem = &pItems[pRes->num];
|
||||
pItem->v = val;
|
||||
pItem->uid = uid;
|
||||
pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer
|
||||
|
||||
pRes->num++;
|
||||
taosheapsort((void *) pItem, sizeof(STopBotResItem), pRes->num, (const void *) &type, topBotResComparFn, false);
|
||||
} else { // replace the minimum value in the result
|
||||
// if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pList[0]->v.i) ||
|
||||
// (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pList[0]->v.u) ||
|
||||
// (IS_FLOAT_TYPE(type) && val.d > pList[0]->v.d)) {
|
||||
//
|
||||
// STopBotResItem* pItem = &pItems[0];
|
||||
// pItem->v = val;
|
||||
// pItem->uid = uid;
|
||||
// pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer
|
||||
//
|
||||
// taosheapadjust((void *) pItem, sizeof(STopBotResItem), 0, pRes->num - 1, (const void *) &type, topBotResComparFn, NULL, false);
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
|
|
@ -229,22 +229,21 @@ void *taosbsearch(const void *key, const void *base, int64_t nmemb, int64_t size
|
|||
}
|
||||
|
||||
void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const void *parcompar,
|
||||
__ext_compar_fn_t compar, const void *parswap, __ext_swap_fn_t swap, bool maxroot) {
|
||||
__ext_compar_fn_t compar, char* buf, bool maxroot) {
|
||||
int32_t parent;
|
||||
int32_t child;
|
||||
char *buf;
|
||||
|
||||
char* tmp = NULL;
|
||||
if (buf == NULL) {
|
||||
tmp = taosMemoryMalloc(size);
|
||||
} else {
|
||||
tmp = buf;
|
||||
}
|
||||
|
||||
if (base && size > 0 && compar) {
|
||||
parent = start;
|
||||
child = 2 * parent + 1;
|
||||
|
||||
if (swap == NULL) {
|
||||
buf = taosMemoryCalloc(1, size);
|
||||
if (buf == NULL) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (maxroot) {
|
||||
while (child <= end) {
|
||||
if (child + 1 <= end &&
|
||||
|
@ -256,11 +255,7 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const
|
|||
break;
|
||||
}
|
||||
|
||||
if (swap == NULL) {
|
||||
doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, buf);
|
||||
} else {
|
||||
(*swap)(elePtrAt(base, size, parent), elePtrAt(base, size, child), parswap);
|
||||
}
|
||||
doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, tmp);
|
||||
|
||||
parent = child;
|
||||
child = 2 * parent + 1;
|
||||
|
@ -276,33 +271,35 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const
|
|||
break;
|
||||
}
|
||||
|
||||
if (swap == NULL) {
|
||||
doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, buf);
|
||||
} else {
|
||||
(*swap)(elePtrAt(base, size, parent), elePtrAt(base, size, child), parswap);
|
||||
}
|
||||
doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, tmp);
|
||||
|
||||
parent = child;
|
||||
child = 2 * parent + 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (swap == NULL) {
|
||||
taosMemoryFreeClear(buf);
|
||||
}
|
||||
if (buf == NULL) {
|
||||
taosMemoryFree(tmp);
|
||||
}
|
||||
}
|
||||
|
||||
void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar, __ext_compar_fn_t compar,
|
||||
const void *parswap, __ext_swap_fn_t swap, bool maxroot) {
|
||||
bool maxroot) {
|
||||
int32_t i;
|
||||
|
||||
char* buf = taosMemoryCalloc(1, size);
|
||||
if (buf == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (base && size > 0) {
|
||||
for (i = len / 2 - 1; i >= 0; i--) {
|
||||
taosheapadjust(base, size, i, len - 1, parcompar, compar, parswap, swap, maxroot);
|
||||
taosheapadjust(base, size, i, len - 1, parcompar, compar, buf, maxroot);
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(buf);
|
||||
/*
|
||||
char *buf = taosMemoryCalloc(1, size);
|
||||
|
||||
|
|
Loading…
Reference in New Issue