Merge pull request #12026 from taosdata/feature/tq
fix(stream): interval status do not reset
This commit is contained in:
commit
a7d7f180be
|
@ -25,7 +25,7 @@ int32_t init_env() {
|
|||
return -1;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
|
|
@ -44,19 +44,25 @@ typedef struct STopBotResItem {
|
|||
} STopBotResItem;
|
||||
|
||||
typedef struct STopBotRes {
|
||||
STopBotResItem *pItems;
|
||||
STopBotResItem* pItems;
|
||||
} STopBotRes;
|
||||
|
||||
typedef struct SStddevRes {
|
||||
double result;
|
||||
int64_t count;
|
||||
union {double quadraticDSum; int64_t quadraticISum;};
|
||||
union {double dsum; int64_t isum;};
|
||||
union {
|
||||
double quadraticDSum;
|
||||
int64_t quadraticISum;
|
||||
};
|
||||
union {
|
||||
double dsum;
|
||||
int64_t isum;
|
||||
};
|
||||
} SStddevRes;
|
||||
|
||||
typedef struct SPercentileInfo {
|
||||
double result;
|
||||
tMemBucket *pMemBucket;
|
||||
tMemBucket* pMemBucket;
|
||||
int32_t stage;
|
||||
double minval;
|
||||
double maxval;
|
||||
|
@ -68,7 +74,10 @@ typedef struct SDiffInfo {
|
|||
bool includeNull;
|
||||
bool ignoreNegative;
|
||||
bool firstOutput;
|
||||
union { int64_t i64; double d64;} prev;
|
||||
union {
|
||||
int64_t i64;
|
||||
double d64;
|
||||
} prev;
|
||||
} SDiffInfo;
|
||||
|
||||
#define SET_VAL(_info, numOfElem, res) \
|
||||
|
@ -85,7 +94,7 @@ typedef struct SDiffInfo {
|
|||
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
|
||||
do { \
|
||||
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
|
||||
SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
|
||||
SqlFunctionCtx* __ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
} \
|
||||
} while (0);
|
||||
|
@ -101,7 +110,7 @@ typedef struct SDiffInfo {
|
|||
|
||||
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
|
||||
do { \
|
||||
_t *d = (_t *)((_col)->pData); \
|
||||
_t* d = (_t*)((_col)->pData); \
|
||||
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
|
||||
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
|
||||
continue; \
|
||||
|
@ -111,8 +120,7 @@ typedef struct SDiffInfo {
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
|
||||
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
bool functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
if (pResultInfo->initialized) {
|
||||
return false;
|
||||
}
|
||||
|
@ -130,8 +138,8 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
pResInfo->isNullRes = (pResInfo->numOfRes == 0)? 1:0;
|
||||
cleanupResultRowEntry(pResInfo);
|
||||
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
|
||||
/*cleanupResultRowEntry(pResInfo);*/
|
||||
|
||||
char* in = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);
|
||||
|
@ -144,7 +152,7 @@ int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
|
|||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
pResInfo->isNullRes = (pResInfo->numOfRes == 0)? 1:0;
|
||||
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
|
||||
cleanupResultRowEntry(pResInfo);
|
||||
|
||||
char* in = finalResult;
|
||||
|
@ -170,7 +178,7 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
|||
* count function does need the finalize, if data is missing, the default value, which is 0, is used
|
||||
* count function does not use the pCtx->interResBuf to keep the intermediate buffer
|
||||
*/
|
||||
int32_t countFunction(SqlFunctionCtx *pCtx) {
|
||||
int32_t countFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t numOfElem = 0;
|
||||
|
||||
/*
|
||||
|
@ -192,14 +200,15 @@ int32_t countFunction(SqlFunctionCtx *pCtx) {
|
|||
numOfElem += 1;
|
||||
}
|
||||
} else {
|
||||
//when counting on the primary time stamp column and no statistics data is presented, use the size value directly.
|
||||
// when counting on the primary time stamp column and no statistics data is presented, use the size value
|
||||
// directly.
|
||||
numOfElem = pInput->numOfRows;
|
||||
}
|
||||
}
|
||||
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
*((int64_t *)buf) += numOfElem;
|
||||
*((int64_t*)buf) += numOfElem;
|
||||
|
||||
SET_VAL(pResInfo, numOfElem, 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -207,7 +216,7 @@ int32_t countFunction(SqlFunctionCtx *pCtx) {
|
|||
|
||||
#define LIST_ADD_N(_res, _col, _start, _rows, _t, numOfElem) \
|
||||
do { \
|
||||
_t *d = (_t *)(_col->pData); \
|
||||
_t* d = (_t*)(_col->pData); \
|
||||
for (int32_t i = (_start); i < (_rows) + (_start); ++i) { \
|
||||
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
|
||||
continue; \
|
||||
|
@ -217,12 +226,12 @@ int32_t countFunction(SqlFunctionCtx *pCtx) {
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
int32_t sumFunction(SqlFunctionCtx *pCtx) {
|
||||
int32_t sumFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t numOfElem = 0;
|
||||
|
||||
// Only the pre-computing information loaded and actual data does not loaded
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];
|
||||
SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
|
||||
int32_t type = pInput->pData[0]->info.type;
|
||||
|
||||
SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
|
@ -286,7 +295,7 @@ bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
bool avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
if (!functionSetup(pCtx, pResultInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -412,19 +421,19 @@ int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
int32_t type = pInput->pData[0]->info.type;
|
||||
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
if (IS_INTEGER_TYPE(type)) {
|
||||
pAvgRes->result = pAvgRes->sum.isum / ((double) pAvgRes->count);
|
||||
pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count);
|
||||
} else {
|
||||
pAvgRes->result = pAvgRes->sum.dsum / ((double) pAvgRes->count);
|
||||
pAvgRes->result = pAvgRes->sum.dsum / ((double)pAvgRes->count);
|
||||
}
|
||||
|
||||
return functionFinalize(pCtx, pBlock);
|
||||
}
|
||||
|
||||
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow){
|
||||
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
||||
return FUNC_DATA_REQUIRED_STATIS_LOAD;
|
||||
}
|
||||
|
||||
bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
bool maxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
if (!functionSetup(pCtx, pResultInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -432,34 +441,34 @@ bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
|||
char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||
switch (pCtx->resDataInfo.type) {
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
*((int32_t *)buf) = INT32_MIN;
|
||||
*((int32_t*)buf) = INT32_MIN;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
*((uint32_t *)buf) = 0;
|
||||
*((uint32_t*)buf) = 0;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
*((float *)buf) = -FLT_MAX;
|
||||
*((float*)buf) = -FLT_MAX;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
SET_DOUBLE_VAL(((double *)buf), -DBL_MAX);
|
||||
SET_DOUBLE_VAL(((double*)buf), -DBL_MAX);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
*((int64_t *)buf) = INT64_MIN;
|
||||
*((int64_t*)buf) = INT64_MIN;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
*((uint64_t *)buf) = 0;
|
||||
*((uint64_t*)buf) = 0;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
*((int16_t *)buf) = INT16_MIN;
|
||||
*((int16_t*)buf) = INT16_MIN;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
*((uint16_t *)buf) = 0;
|
||||
*((uint16_t*)buf) = 0;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
*((int8_t *)buf) = INT8_MIN;
|
||||
*((int8_t*)buf) = INT8_MIN;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
*((uint8_t *)buf) = 0;
|
||||
*((uint8_t*)buf) = 0;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
*((int8_t*)buf) = 0;
|
||||
|
@ -470,7 +479,7 @@ bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
bool minFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
if (!functionSetup(pCtx, pResultInfo)) {
|
||||
return false; // not initialized since it has been initialized
|
||||
}
|
||||
|
@ -478,34 +487,34 @@ bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
|||
char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||
switch (pCtx->resDataInfo.type) {
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
*((int8_t *)buf) = INT8_MAX;
|
||||
*((int8_t*)buf) = INT8_MAX;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
*(uint8_t *) buf = UINT8_MAX;
|
||||
*(uint8_t*)buf = UINT8_MAX;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
*((int16_t *)buf) = INT16_MAX;
|
||||
*((int16_t*)buf) = INT16_MAX;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
*((uint16_t *)buf) = UINT16_MAX;
|
||||
*((uint16_t*)buf) = UINT16_MAX;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
*((int32_t *)buf) = INT32_MAX;
|
||||
*((int32_t*)buf) = INT32_MAX;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
*((uint32_t *)buf) = UINT32_MAX;
|
||||
*((uint32_t*)buf) = UINT32_MAX;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
*((int64_t *)buf) = INT64_MAX;
|
||||
*((int64_t*)buf) = INT64_MAX;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
*((uint64_t *)buf) = UINT64_MAX;
|
||||
*((uint64_t*)buf) = UINT64_MAX;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
*((float *)buf) = FLT_MAX;
|
||||
*((float*)buf) = FLT_MAX;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
SET_DOUBLE_VAL(((double *)buf), DBL_MAX);
|
||||
SET_DOUBLE_VAL(((double*)buf), DBL_MAX);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
*((int8_t*)buf) = 1;
|
||||
|
@ -528,7 +537,7 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
|||
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
|
||||
do { \
|
||||
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
|
||||
SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
|
||||
SqlFunctionCtx* __ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
} \
|
||||
} while (0);
|
||||
|
@ -556,7 +565,7 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
|||
|
||||
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
|
||||
do { \
|
||||
_t *d = (_t *)((_col)->pData); \
|
||||
_t* d = (_t*)((_col)->pData); \
|
||||
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
|
||||
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
|
||||
continue; \
|
||||
|
@ -566,11 +575,11 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
||||
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||
int32_t numOfElems = 0;
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];
|
||||
SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
|
||||
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
int32_t type = pCol->info.type;
|
||||
|
@ -599,7 +608,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
|||
}
|
||||
|
||||
// the index is the original position, not the relative position
|
||||
TSKEY key = (pCtx->ptsList != NULL)? pCtx->ptsList[index]:TSKEY_INITIAL_VAL;
|
||||
TSKEY key = (pCtx->ptsList != NULL) ? pCtx->ptsList[index] : TSKEY_INITIAL_VAL;
|
||||
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||
int64_t prev = 0;
|
||||
|
@ -607,7 +616,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
|||
|
||||
int64_t val = GET_INT64_VAL(tval);
|
||||
if ((prev < val) ^ isMinFunc) {
|
||||
*(int64_t*) buf = val;
|
||||
*(int64_t*)buf = val;
|
||||
for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
|
||||
SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
|
||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
|
||||
|
@ -624,7 +633,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
|||
|
||||
uint64_t val = GET_UINT64_VAL(tval);
|
||||
if ((prev < val) ^ isMinFunc) {
|
||||
*(uint64_t*) buf = val;
|
||||
*(uint64_t*)buf = val;
|
||||
for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
|
||||
SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
|
||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
|
||||
|
@ -637,10 +646,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
|||
}
|
||||
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
|
||||
double val = GET_DOUBLE_VAL(tval);
|
||||
UPDATE_DATA(pCtx, *(double*) buf, val, numOfElems, isMinFunc, key);
|
||||
UPDATE_DATA(pCtx, *(double*)buf, val, numOfElems, isMinFunc, key);
|
||||
} else if (type == TSDB_DATA_TYPE_FLOAT) {
|
||||
double val = GET_DOUBLE_VAL(tval);
|
||||
UPDATE_DATA(pCtx, *(float*) buf, val, numOfElems, isMinFunc, key);
|
||||
UPDATE_DATA(pCtx, *(float*)buf, val, numOfElems, isMinFunc, key);
|
||||
}
|
||||
|
||||
return numOfElems;
|
||||
|
@ -653,10 +662,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
|||
if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) {
|
||||
LOOPCHECK_N(*(int8_t*)buf, pCol, pCtx, int8_t, numOfRows, start, isMinFunc, numOfElems);
|
||||
} else if (type == TSDB_DATA_TYPE_SMALLINT) {
|
||||
LOOPCHECK_N(*(int16_t*) buf, pCol, pCtx, int16_t, numOfRows, start, isMinFunc, numOfElems);
|
||||
LOOPCHECK_N(*(int16_t*)buf, pCol, pCtx, int16_t, numOfRows, start, isMinFunc, numOfElems);
|
||||
} else if (type == TSDB_DATA_TYPE_INT) {
|
||||
int32_t *pData = (int32_t*)pCol->pData;
|
||||
int32_t *val = (int32_t*) buf;
|
||||
int32_t* pData = (int32_t*)pCol->pData;
|
||||
int32_t* val = (int32_t*)buf;
|
||||
|
||||
for (int32_t i = start; i < start + numOfRows; ++i) {
|
||||
if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
|
@ -665,7 +674,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
|||
|
||||
if ((*val < pData[i]) ^ isMinFunc) {
|
||||
*val = pData[i];
|
||||
TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i) : 0;
|
||||
TSKEY ts = (pCtx->ptsList != NULL) ? GET_TS_DATA(pCtx, i) : 0;
|
||||
DO_UPDATE_SUBSID_RES(pCtx, ts);
|
||||
}
|
||||
|
||||
|
@ -676,34 +685,34 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
|||
qDebug("max value updated:%d", *retVal);
|
||||
#endif
|
||||
} else if (type == TSDB_DATA_TYPE_BIGINT) {
|
||||
LOOPCHECK_N(*(int64_t*) buf, pCol, pCtx, int64_t, numOfRows, start, isMinFunc, numOfElems);
|
||||
LOOPCHECK_N(*(int64_t*)buf, pCol, pCtx, int64_t, numOfRows, start, isMinFunc, numOfElems);
|
||||
}
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||
if (type == TSDB_DATA_TYPE_UTINYINT) {
|
||||
LOOPCHECK_N(*(uint8_t*) buf, pCol, pCtx, uint8_t, numOfRows, start, isMinFunc, numOfElems);
|
||||
LOOPCHECK_N(*(uint8_t*)buf, pCol, pCtx, uint8_t, numOfRows, start, isMinFunc, numOfElems);
|
||||
} else if (type == TSDB_DATA_TYPE_USMALLINT) {
|
||||
LOOPCHECK_N(*(uint16_t*) buf, pCol, pCtx, uint16_t, numOfRows, start, isMinFunc, numOfElems);
|
||||
LOOPCHECK_N(*(uint16_t*)buf, pCol, pCtx, uint16_t, numOfRows, start, isMinFunc, numOfElems);
|
||||
} else if (type == TSDB_DATA_TYPE_UINT) {
|
||||
LOOPCHECK_N(*(uint32_t*) buf, pCol, pCtx, uint32_t, numOfRows, start, isMinFunc, numOfElems);
|
||||
LOOPCHECK_N(*(uint32_t*)buf, pCol, pCtx, uint32_t, numOfRows, start, isMinFunc, numOfElems);
|
||||
} else if (type == TSDB_DATA_TYPE_UBIGINT) {
|
||||
LOOPCHECK_N(*(uint64_t*) buf, pCol, pCtx, uint64_t, numOfRows, start, isMinFunc, numOfElems);
|
||||
LOOPCHECK_N(*(uint64_t*)buf, pCol, pCtx, uint64_t, numOfRows, start, isMinFunc, numOfElems);
|
||||
}
|
||||
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
|
||||
LOOPCHECK_N(*(double*) buf, pCol, pCtx, double, numOfRows, start, isMinFunc, numOfElems);
|
||||
LOOPCHECK_N(*(double*)buf, pCol, pCtx, double, numOfRows, start, isMinFunc, numOfElems);
|
||||
} else if (type == TSDB_DATA_TYPE_FLOAT) {
|
||||
LOOPCHECK_N(*(float*) buf, pCol, pCtx, float, numOfRows, start, isMinFunc, numOfElems);
|
||||
LOOPCHECK_N(*(float*)buf, pCol, pCtx, float, numOfRows, start, isMinFunc, numOfElems);
|
||||
}
|
||||
|
||||
return numOfElems;
|
||||
}
|
||||
|
||||
int32_t minFunction(SqlFunctionCtx *pCtx) {
|
||||
int32_t minFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t numOfElems = doMinMaxHelper(pCtx, 1);
|
||||
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t maxFunction(SqlFunctionCtx *pCtx) {
|
||||
int32_t maxFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t numOfElems = doMinMaxHelper(pCtx, 0);
|
||||
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -714,7 +723,7 @@ bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
bool stddevFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
if (!functionSetup(pCtx, pResultInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -847,11 +856,11 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
double avg;
|
||||
if (IS_INTEGER_TYPE(type)) {
|
||||
avg = pStddevRes->isum / ((double) pStddevRes->count);
|
||||
pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg);
|
||||
avg = pStddevRes->isum / ((double)pStddevRes->count);
|
||||
pStddevRes->result = sqrt(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg);
|
||||
} else {
|
||||
avg = pStddevRes->dsum / ((double) pStddevRes->count);
|
||||
pStddevRes->result = sqrt(pStddevRes->quadraticDSum/((double)pStddevRes->count) - avg*avg);
|
||||
avg = pStddevRes->dsum / ((double)pStddevRes->count);
|
||||
pStddevRes->result = sqrt(pStddevRes->quadraticDSum / ((double)pStddevRes->count) - avg * avg);
|
||||
}
|
||||
|
||||
return functionFinalize(pCtx, pBlock);
|
||||
|
@ -862,13 +871,13 @@ bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
bool percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
if (!functionSetup(pCtx, pResultInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// in the first round, get the min-max value of all involved data
|
||||
SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||
SPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||
SET_DOUBLE_VAL(&pInfo->minval, DBL_MAX);
|
||||
SET_DOUBLE_VAL(&pInfo->maxval, -DBL_MAX);
|
||||
pInfo->numOfElems = 0;
|
||||
|
@ -876,17 +885,17 @@ bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultI
|
|||
return true;
|
||||
}
|
||||
|
||||
int32_t percentileFunction(SqlFunctionCtx *pCtx) {
|
||||
int32_t percentileFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t notNullElems = 0;
|
||||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];
|
||||
SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
|
||||
|
||||
SColumnInfoData *pCol = pInput->pData[0];
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
int32_t type = pCol->info.type;
|
||||
|
||||
SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
SPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
|
||||
pInfo->stage += 1;
|
||||
|
||||
|
@ -931,7 +940,7 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) {
|
|||
continue;
|
||||
}
|
||||
|
||||
char *data = colDataGetData(pCol, i);
|
||||
char* data = colDataGetData(pCol, i);
|
||||
|
||||
double v = 0;
|
||||
GET_TYPED_DATA(v, double, pCtx->inputType, data);
|
||||
|
@ -957,7 +966,7 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) {
|
|||
continue;
|
||||
}
|
||||
|
||||
char *data = colDataGetData(pCol, i);
|
||||
char* data = colDataGetData(pCol, i);
|
||||
|
||||
notNullElems += 1;
|
||||
tMemBucketPut(pInfo->pMemBucket, data, 1);
|
||||
|
@ -971,10 +980,10 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
SVariant* pVal = &pCtx->param[1].param;
|
||||
double v = pVal->nType == TSDB_DATA_TYPE_INT ? pVal->i : pVal->d;
|
||||
|
||||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
SPercentileInfo* ppInfo = (SPercentileInfo *) GET_ROWCELL_INTERBUF(pResInfo);
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SPercentileInfo* ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
tMemBucket * pMemBucket = ppInfo->pMemBucket;
|
||||
tMemBucket* pMemBucket = ppInfo->pMemBucket;
|
||||
if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null
|
||||
SET_DOUBLE_VAL(&ppInfo->result, getPercentile(pMemBucket, v));
|
||||
}
|
||||
|
@ -994,15 +1003,15 @@ static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowInde
|
|||
return 0;
|
||||
}
|
||||
|
||||
return *(TSKEY*) colDataGetData(pTsColInfo, rowIndex);
|
||||
return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex);
|
||||
}
|
||||
|
||||
// This ordinary first function does not care if current scan is ascending order or descending order scan
|
||||
// the OPTIMIZED version of first function will only handle the ascending order scan
|
||||
int32_t firstFunction(SqlFunctionCtx *pCtx) {
|
||||
int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t numOfElems = 0;
|
||||
|
||||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
|
@ -1016,12 +1025,12 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet)? pInput->pColumnDataAgg[0]:NULL;
|
||||
SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet) ? pInput->pColumnDataAgg[0] : NULL;
|
||||
|
||||
TSKEY startKey = getRowPTs(pInput->pPTS, 0);
|
||||
TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
|
||||
|
||||
int32_t blockDataOrder = (startKey <= endKey)? TSDB_ORDER_ASC:TSDB_ORDER_DESC;
|
||||
int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||
|
||||
if (blockDataOrder == TSDB_ORDER_ASC) {
|
||||
// filter according to current result firstly
|
||||
|
@ -1045,7 +1054,7 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
|
|||
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
|
||||
memcpy(buf, data, bytes);
|
||||
*(TSKEY*)(buf + bytes) = cts;
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
|
||||
pResInfo->numOfRes = 1;
|
||||
break;
|
||||
|
@ -1074,7 +1083,7 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
|
|||
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
|
||||
memcpy(buf, data, bytes);
|
||||
*(TSKEY*)(buf + bytes) = cts;
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
pResInfo->numOfRes = 1;
|
||||
break;
|
||||
}
|
||||
|
@ -1085,10 +1094,10 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t lastFunction(SqlFunctionCtx *pCtx) {
|
||||
int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t numOfElems = 0;
|
||||
|
||||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
|
@ -1102,12 +1111,12 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet)? pInput->pColumnDataAgg[0]:NULL;
|
||||
SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet) ? pInput->pColumnDataAgg[0] : NULL;
|
||||
|
||||
TSKEY startKey = getRowPTs(pInput->pPTS, 0);
|
||||
TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
|
||||
|
||||
int32_t blockDataOrder = (startKey <= endKey)? TSDB_ORDER_ASC:TSDB_ORDER_DESC;
|
||||
int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||
|
||||
if (blockDataOrder == TSDB_ORDER_ASC) {
|
||||
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
|
||||
|
@ -1141,7 +1150,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
|
|||
memcpy(buf, data, bytes);
|
||||
*(TSKEY*)(buf + bytes) = cts;
|
||||
pResInfo->numOfRes = 1;
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -1156,7 +1165,7 @@ bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo) {
|
||||
bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
||||
if (!functionSetup(pCtx, pResInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -1170,9 +1179,9 @@ bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo) {
|
|||
return true;
|
||||
}
|
||||
|
||||
int32_t diffFunction(SqlFunctionCtx *pCtx) {
|
||||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
SDiffInfo *pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
|
@ -1181,7 +1190,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
|
|||
int32_t numOfElems = 0;
|
||||
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
|
||||
// int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
|
||||
// int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
|
||||
|
||||
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||
|
@ -1189,10 +1198,9 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
|
|||
int32_t startOffset = pCtx->offset;
|
||||
switch (pInputCol->info.type) {
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
SColumnInfoData *pOutput = (SColumnInfoData *)pCtx->pOutput;
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
|
||||
|
||||
int32_t pos = startOffset + (isFirstBlock? (numOfElems-1):numOfElems);
|
||||
int32_t pos = startOffset + (isFirstBlock ? (numOfElems - 1) : numOfElems);
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
if (pDiffInfo->includeNull) {
|
||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
||||
|
@ -1205,7 +1213,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
|
|||
continue;
|
||||
}
|
||||
|
||||
int32_t v = *(int32_t*) colDataGetData(pInputCol, i);
|
||||
int32_t v = *(int32_t*)colDataGetData(pInputCol, i);
|
||||
if (pDiffInfo->hasPrev) {
|
||||
int32_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null
|
||||
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
||||
|
@ -1227,7 +1235,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
|
|||
}
|
||||
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
SColumnInfoData *pOutput = (SColumnInfoData *)pCtx->pOutput;
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
continue;
|
||||
|
@ -1235,17 +1243,17 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
|
|||
|
||||
int32_t v = 0;
|
||||
if (pDiffInfo->hasPrev) {
|
||||
v = *(int64_t*) colDataGetData(pInputCol, i);
|
||||
v = *(int64_t*)colDataGetData(pInputCol, i);
|
||||
int64_t delta = (int64_t)(v - pDiffInfo->prev.i64); // direct previous may be null
|
||||
if (pDiffInfo->ignoreNegative) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// *(pOutput++) = delta;
|
||||
// *pTimestamp = (tsList != NULL)? tsList[i]:0;
|
||||
//
|
||||
// pOutput += 1;
|
||||
// pTimestamp += 1;
|
||||
// *(pOutput++) = delta;
|
||||
// *pTimestamp = (tsList != NULL)? tsList[i]:0;
|
||||
//
|
||||
// pOutput += 1;
|
||||
// pTimestamp += 1;
|
||||
}
|
||||
|
||||
pDiffInfo->prev.i64 = v;
|
||||
|
@ -1359,7 +1367,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
|
|||
#endif
|
||||
default:
|
||||
break;
|
||||
// qError("error input type");
|
||||
// qError("error input type");
|
||||
}
|
||||
|
||||
// initial value is not set yet
|
||||
|
@ -1371,12 +1379,12 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
|
|||
assert(pCtx->hasNull);
|
||||
return 0;
|
||||
} else {
|
||||
// for (int t = 0; t < pCtx->tagInfo.numOfTagCols; ++t) {
|
||||
// SqlFunctionCtx* tagCtx = pCtx->tagInfo.pTagCtxList[t];
|
||||
// if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) {
|
||||
// aAggs[TSDB_FUNC_TAGPRJ].xFunction(tagCtx);
|
||||
// }
|
||||
// }
|
||||
// for (int t = 0; t < pCtx->tagInfo.numOfTagCols; ++t) {
|
||||
// SqlFunctionCtx* tagCtx = pCtx->tagInfo.pTagCtxList[t];
|
||||
// if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) {
|
||||
// aAggs[TSDB_FUNC_TAGPRJ].xFunction(tagCtx);
|
||||
// }
|
||||
// }
|
||||
|
||||
int32_t forwardStep = (isFirstBlock) ? numOfElems - 1 : numOfElems;
|
||||
return forwardStep;
|
||||
|
@ -1384,32 +1392,32 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
|
|||
}
|
||||
|
||||
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1);
|
||||
SValueNode* pkNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
|
||||
pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem);
|
||||
return true;
|
||||
}
|
||||
|
||||
static STopBotRes *getTopBotOutputInfo(SqlFunctionCtx *pCtx) {
|
||||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
pRes->pItems = (STopBotResItem*)((char*) pRes + sizeof(STopBotRes));
|
||||
pRes->pItems = (STopBotResItem*)((char*)pRes + sizeof(STopBotRes));
|
||||
|
||||
return pRes;
|
||||
}
|
||||
|
||||
static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock,
|
||||
uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo);
|
||||
static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
||||
uint64_t uid, SResultRowEntryInfo* pEntryInfo);
|
||||
|
||||
static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem);
|
||||
static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem);
|
||||
|
||||
int32_t topFunction(SqlFunctionCtx *pCtx) {
|
||||
int32_t topFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t numOfElems = 0;
|
||||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
|
||||
// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
|
||||
// buildTopBotStruct(pRes, pCtx);
|
||||
// }
|
||||
// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
|
||||
// buildTopBotStruct(pRes, pCtx);
|
||||
// }
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
|
@ -1432,11 +1440,11 @@ int32_t topFunction(SqlFunctionCtx *pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t topBotResComparFn(const void *p1, const void *p2, const void *param) {
|
||||
uint16_t type = *(uint16_t *) param;
|
||||
static int32_t topBotResComparFn(const void* p1, const void* p2, const void* param) {
|
||||
uint16_t type = *(uint16_t*)param;
|
||||
|
||||
STopBotResItem *val1 = (STopBotResItem *) p1;
|
||||
STopBotResItem *val2 = (STopBotResItem *) p2;
|
||||
STopBotResItem* val1 = (STopBotResItem*)p1;
|
||||
STopBotResItem* val2 = (STopBotResItem*)p2;
|
||||
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||
if (val1->v.i == val2->v.i) {
|
||||
|
@ -1461,13 +1469,13 @@ static int32_t topBotResComparFn(const void *p1, const void *p2, const void *par
|
|||
|
||||
void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
||||
uint64_t uid, SResultRowEntryInfo* pEntryInfo) {
|
||||
STopBotRes *pRes = getTopBotOutputInfo(pCtx);
|
||||
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
|
||||
int32_t maxSize = pCtx->param[1].param.i;
|
||||
|
||||
SVariant val = {0};
|
||||
taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type);
|
||||
|
||||
STopBotResItem *pItems = pRes->pItems;
|
||||
STopBotResItem* pItems = pRes->pItems;
|
||||
assert(pItems != NULL);
|
||||
|
||||
// not full yet
|
||||
|
@ -1481,11 +1489,11 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
|||
|
||||
// allocate the buffer and keep the data of this row into the new allocated buffer
|
||||
pEntryInfo->numOfRes++;
|
||||
taosheapsort((void *) pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void *) &type, topBotResComparFn, false);
|
||||
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
|
||||
false);
|
||||
} else { // replace the minimum value in the result
|
||||
if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) ||
|
||||
(IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) ||
|
||||
(IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) {
|
||||
(IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) || (IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) {
|
||||
// replace the old data and the coresponding tuple data
|
||||
STopBotResItem* pItem = &pItems[0];
|
||||
pItem->v = val;
|
||||
|
@ -1494,7 +1502,8 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
|||
// save the data of this tuple by over writing the old data
|
||||
copyTupleData(pCtx, rowIndex, pSrcBlock, pItem);
|
||||
|
||||
taosheapadjust((void *) pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void *) &type, topBotResComparFn, NULL, false);
|
||||
taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type,
|
||||
topBotResComparFn, NULL, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1553,7 +1562,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
|||
char* pStart = (char*)(nullList + pSrcBlock->info.numOfCols * sizeof(bool));
|
||||
|
||||
int32_t offset = 0;
|
||||
for(int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
|
||||
for (int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
|
||||
if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) {
|
||||
continue;
|
||||
|
@ -1574,7 +1583,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
|||
}
|
||||
|
||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo *pEntryInfo = GET_RES_INFO(pCtx);
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||
pEntryInfo->complete = true;
|
||||
|
||||
|
@ -1584,7 +1593,7 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
|
||||
// todo assign the tag value and the corresponding row data
|
||||
int32_t currentRow = pBlock->info.rows;
|
||||
switch(type) {
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
|
||||
STopBotResItem* pItem = &pRes->pItems[i];
|
||||
|
@ -1602,12 +1611,12 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||
|
||||
SFunctParam *pFuncParam = &pc->pExpr->base.pParam[0];
|
||||
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
||||
int32_t dstSlotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
|
||||
int32_t ps = 0;
|
||||
for(int32_t k = 0; k < srcSlotId; ++k) {
|
||||
for (int32_t k = 0; k < srcSlotId; ++k) {
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k);
|
||||
ps += pSrcCol->info.bytes;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue