refactor some codes. fix bugs in selectivity+tags/ts query.

This commit is contained in:
hjxilinx 2019-11-12 17:21:36 +08:00
parent b12e12f840
commit fa0f056f19
5 changed files with 587 additions and 276 deletions

View File

@ -63,6 +63,14 @@
} \
} while (0);
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
do {\
for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \
SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[i]; \
aAggs[TSDB_FUNC_TAG].xFunction(__ctx); \
} \
} while(0);
void noop(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {}
typedef struct tValuePair {
@ -104,7 +112,9 @@ typedef struct SFirstLastInfo {
} SFirstLastInfo;
typedef struct SFirstLastInfo SLastrowInfo;
typedef struct SPercentileInfo { tMemBucket *pMemBucket; } SPercentileInfo;
typedef struct SPercentileInfo {
tMemBucket *pMemBucket;
} SPercentileInfo;
typedef struct STopBotInfo {
int32_t num;
@ -118,9 +128,13 @@ typedef struct SLeastsquareInfo {
int64_t num;
} SLeastsquareInfo;
typedef struct SAPercentileInfo { SHistogramInfo *pHisto; } SAPercentileInfo;
typedef struct SAPercentileInfo {
SHistogramInfo *pHisto;
} SAPercentileInfo;
typedef struct STSCompInfo { STSBuf *pTSBuf; } STSCompInfo;
typedef struct STSCompInfo {
STSBuf *pTSBuf;
} STSCompInfo;
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int16_t *bytes, int16_t *intermediateResBytes, int16_t extLength, bool isSuperTable) {
@ -451,21 +465,32 @@ int32_t no_data_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId
} \
};
#define UPDATE_DATA(ctx, left, right, num, sign) \
do { \
if (((left) < (right)) ^ (sign)) { \
(left) = right; \
DO_UPDATE_TAG_COLUMNS(ctx, 0); \
(num) += 1; \
} \
} while (0)
#define UPDATE_DATA(ctx, left, right, num, sign, k) \
do { \
if (((left) < (right)) ^ (sign)) { \
(left) = (right); \
DO_UPDATE_TAG_COLUMNS(ctx, k); \
(num) += 1; \
} \
} while (0);
#define DUPATE_DATA_WITHOUT_TS(ctx, left, right, num, sign) \
do { \
if (((left) < (right)) ^ (sign)) { \
(left) = (right); \
DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx); \
(num) += 1; \
} \
} while (0);
#define LOOPCHECK_N(val, list, ctx, tsdbType, sign, num) \
for (int32_t i = 0; i < ((ctx)->size); ++i) { \
if ((ctx)->hasNull && isNull((char *)&(list)[i], tsdbType)) { \
continue; \
} \
UPDATE_DATA(ctx, val, (list)[i], num, sign); \
TSKEY key = (ctx)->ptsList[i]; \
UPDATE_DATA(ctx, val, (list)[i], num, sign, key); \
}
#define TYPED_LOOPCHECK_N(type, data, list, ctx, tsdbType, sign, notNullElems) \
@ -886,16 +911,18 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
index = pCtx->preAggVals.maxIndex;
}
TSKEY key = pCtx->ptsList[index];
if (pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) {
int64_t val = GET_INT64_VAL(tval);
if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) {
int8_t *data = (int8_t *)pOutput;
UPDATE_DATA(pCtx, *data, val, notNullElems, isMin);
UPDATE_DATA(pCtx, *data, val, notNullElems, isMin, key);
} else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) {
int16_t *data = (int16_t *)pOutput;
UPDATE_DATA(pCtx, *data, val, notNullElems, isMin);
UPDATE_DATA(pCtx, *data, val, notNullElems, isMin, key);
} else if (pCtx->inputType == TSDB_DATA_TYPE_INT) {
int32_t *data = (int32_t *)pOutput;
#if defined(_DEBUG_VIEW)
@ -906,27 +933,27 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
*data = val;
for (int32_t i = 0; i < (pCtx)->tagInfo.numOfTagCols; ++i) {
SQLFunctionCtx *__ctx = pCtx->tagInfo.pTagCtxList[i];
if (__ctx->functionId == TSDB_FUNC_TAG_DUMMY) {
aAggs[TSDB_FUNC_TAG].xFunction(__ctx);
} else if (__ctx->functionId == TSDB_FUNC_TS_DUMMY) {
*((int64_t *)__ctx->aOutputBuf) = pCtx->ptsList[index];
if (__ctx->functionId == TSDB_FUNC_TS_DUMMY) {
__ctx->tag = (tVariant){.i64Key = key, .nType = TSDB_DATA_TYPE_BIGINT};
}
aAggs[TSDB_FUNC_TAG].xFunction(__ctx);
}
}
} else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) {
int64_t *data = (int64_t *)pOutput;
UPDATE_DATA(pCtx, *data, val, notNullElems, isMin);
UPDATE_DATA(pCtx, *data, val, notNullElems, isMin, key);
}
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) {
double *data = (double *)pOutput;
double val = GET_DOUBLE_VAL(tval);
UPDATE_DATA(pCtx, *data, val, notNullElems, isMin);
UPDATE_DATA(pCtx, *data, val, notNullElems, isMin, key);
} else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
float *data = (float *)pOutput;
double val = GET_DOUBLE_VAL(tval);
UPDATE_DATA(pCtx, *data, val, notNullElems, isMin);
UPDATE_DATA(pCtx, *data, val, notNullElems, isMin, key);
}
return;
@ -951,7 +978,9 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
if ((*retVal < pData[i]) ^ isMin) {
*retVal = pData[i];
DO_UPDATE_TAG_COLUMNS(pCtx, pCtx->ptsList[i]);
TSKEY k = pCtx->ptsList[i];
DO_UPDATE_TAG_COLUMNS(pCtx, k);
}
*notNullElems += 1;
@ -1089,12 +1118,12 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp
switch (type) {
case TSDB_DATA_TYPE_TINYINT: {
int8_t v = GET_INT8_VAL(input);
UPDATE_DATA(pCtx, *(int8_t *)output, v, notNullElems, isMin);
DUPATE_DATA_WITHOUT_TS(pCtx, *(int8_t *)output, v, notNullElems, isMin);
break;
};
case TSDB_DATA_TYPE_SMALLINT: {
int16_t v = GET_INT16_VAL(input);
UPDATE_DATA(pCtx, *(int16_t *)output, v, notNullElems, isMin);
DUPATE_DATA_WITHOUT_TS(pCtx, *(int16_t *)output, v, notNullElems, isMin);
break;
}
case TSDB_DATA_TYPE_INT: {
@ -1104,7 +1133,7 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp
for (int32_t i = 0; i < pCtx->tagInfo.numOfTagCols; ++i) {
SQLFunctionCtx *__ctx = pCtx->tagInfo.pTagCtxList[i];
aAggs[TSDB_FUNC_TAG].xFunction(__ctx);
aAggs[TSDB_FUNC_TAG].xFunction(__ctx);
}
notNullElems++;
@ -1113,17 +1142,17 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp
}
case TSDB_DATA_TYPE_FLOAT: {
float v = GET_FLOAT_VAL(input);
UPDATE_DATA(pCtx, *(float *)output, v, notNullElems, isMin);
DUPATE_DATA_WITHOUT_TS(pCtx, *(float *)output, v, notNullElems, isMin);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double v = GET_DOUBLE_VAL(input);
UPDATE_DATA(pCtx, *(double *)output, v, notNullElems, isMin);
DUPATE_DATA_WITHOUT_TS(pCtx, *(double *)output, v, notNullElems, isMin);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t v = GET_INT64_VAL(input);
UPDATE_DATA(pCtx, *(int64_t *)output, v, notNullElems, isMin);
DUPATE_DATA_WITHOUT_TS(pCtx, *(int64_t *)output, v, notNullElems, isMin);
break;
};
default:
@ -1179,38 +1208,39 @@ static void max_func_second_merge(SQLFunctionCtx *pCtx) {
static void minMax_function_f(SQLFunctionCtx *pCtx, int32_t index, int32_t isMin) {
char *pData = GET_INPUT_CHAR_INDEX(pCtx, index);
TSKEY key = pCtx->ptsList[index];
int32_t num = 0;
if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) {
int8_t *output = (int8_t *)pCtx->aOutputBuf;
int8_t i = GET_INT8_VAL(pData);
UPDATE_DATA(pCtx, *output, i, num, isMin);
UPDATE_DATA(pCtx, *output, i, num, isMin, key);
} else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) {
int16_t *output = pCtx->aOutputBuf;
int16_t i = GET_INT16_VAL(pData);
UPDATE_DATA(pCtx, *output, i, num, isMin);
UPDATE_DATA(pCtx, *output, i, num, isMin, key);
} else if (pCtx->inputType == TSDB_DATA_TYPE_INT) {
int32_t *output = pCtx->aOutputBuf;
int32_t i = GET_INT32_VAL(pData);
UPDATE_DATA(pCtx, *output, i, num, isMin);
UPDATE_DATA(pCtx, *output, i, num, isMin, key);
} else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) {
int64_t *output = pCtx->aOutputBuf;
int64_t i = GET_INT64_VAL(pData);
UPDATE_DATA(pCtx, *output, i, num, isMin);
UPDATE_DATA(pCtx, *output, i, num, isMin, key);
} else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
float *output = pCtx->aOutputBuf;
float i = GET_FLOAT_VAL(pData);
UPDATE_DATA(pCtx, *output, i, num, isMin);
UPDATE_DATA(pCtx, *output, i, num, isMin, key);
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) {
double *output = pCtx->aOutputBuf;
double i = GET_DOUBLE_VAL(pData);
UPDATE_DATA(pCtx, *output, i, num, isMin);
UPDATE_DATA(pCtx, *output, i, num, isMin, key);
}
GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG;
@ -1804,6 +1834,11 @@ static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int6
memcpy(dst->pTags, pTags, (size_t)pTagInfo->tagsLen);
} else { // the tags are dumped from the ctx tag fields
for (int32_t i = 0; i < pTagInfo->numOfTagCols; ++i) {
SQLFunctionCtx* __ctx = pTagInfo->pTagCtxList[i];
if (__ctx->functionId == TSDB_FUNC_TS_DUMMY) {
__ctx->tag = (tVariant) {.nType = TSDB_DATA_TYPE_BIGINT, .i64Key = tsKey};
}
tVariantDump(&pTagInfo->pTagCtxList[i]->tag, dst->pTags + size, pTagInfo->pTagCtxList[i]->tag.nType);
size += pTagInfo->pTagCtxList[i]->outputBytes;
}
@ -1825,8 +1860,9 @@ static void do_top_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pData,
tValuePair **pList = pInfo->res;
if (pInfo->num < maxLen) {
if (pInfo->num == 0 || ((type >= TSDB_DATA_TYPE_TINYINT && type <= TSDB_DATA_TYPE_BIGINT) &&
val.i64Key >= pList[pInfo->num - 1]->v.i64Key) ||
if (pInfo->num == 0 ||
((type >= TSDB_DATA_TYPE_TINYINT && type <= TSDB_DATA_TYPE_BIGINT) &&
val.i64Key >= pList[pInfo->num - 1]->v.i64Key) ||
((type >= TSDB_DATA_TYPE_FLOAT && type <= TSDB_DATA_TYPE_DOUBLE) &&
val.dKey >= pList[pInfo->num - 1]->v.dKey)) {
valuePairAssign(pList[pInfo->num], type, &val.i64Key, ts, pTags, pTagInfo, stage);
@ -4293,173 +4329,427 @@ int32_t funcCompatDefList[28] = {
*/
1, 1, 1, -1, 1, 1, 5};
SQLAggFuncElem aAggs[28] = {
{
// 0, count function does not invoke the finalize function
"count", TSDB_FUNC_COUNT, TSDB_FUNC_COUNT, TSDB_BASE_FUNC_SO, function_setup, count_function, count_function_f,
no_next_step, noop, count_func_merge, count_func_merge, count_load_data_info,
},
{
// 1
"sum", TSDB_FUNC_SUM, TSDB_FUNC_SUM, TSDB_BASE_FUNC_SO, function_setup, sum_function, sum_function_f,
no_next_step, function_finalizer, sum_func_merge, sum_func_second_merge, precal_req_load_info,
},
{
// 2
"avg", TSDB_FUNC_AVG, TSDB_FUNC_AVG, TSDB_BASE_FUNC_SO, function_setup, avg_function, avg_function_f,
no_next_step, avg_finalizer, avg_func_merge, avg_func_second_merge, precal_req_load_info,
},
{
// 3
"min", TSDB_FUNC_MIN, TSDB_FUNC_MIN, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, min_func_setup,
min_function, min_function_f, no_next_step, function_finalizer, min_func_merge, min_func_second_merge,
precal_req_load_info,
},
{
// 4
"max", TSDB_FUNC_MAX, TSDB_FUNC_MAX, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, max_func_setup,
max_function, max_function_f, no_next_step, function_finalizer, max_func_merge, max_func_second_merge,
precal_req_load_info,
},
{
// 5
"stddev", TSDB_FUNC_STDDEV, TSDB_FUNC_INVALID_ID, TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF,
function_setup, stddev_function, stddev_function_f, stddev_next_step, stddev_finalizer, noop, noop,
data_req_load_info,
},
{
// 6
"percentile", TSDB_FUNC_PERCT, TSDB_FUNC_INVALID_ID,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF, percentile_function_setup, percentile_function,
percentile_function_f, no_next_step, percentile_finalizer, noop, noop, data_req_load_info,
},
{
// 7
"apercentile", TSDB_FUNC_APERCT, TSDB_FUNC_APERCT,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC,
apercentile_function_setup, apercentile_function, apercentile_function_f, no_next_step, apercentile_finalizer,
apercentile_func_merge, apercentile_func_second_merge, data_req_load_info,
},
{
// 8
"first", TSDB_FUNC_FIRST, TSDB_FUNC_FIRST_DST, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, function_setup,
first_function, first_function_f, no_next_step, function_finalizer, noop, noop, first_data_req_info,
},
{
// 9
"last", TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, function_setup,
last_function, last_function_f, no_next_step, function_finalizer, noop, noop, last_data_req_info,
},
{
// 10
"last_row", TSDB_FUNC_LAST_ROW, TSDB_FUNC_LAST_ROW,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS |
TSDB_FUNCSTATE_SELECTIVITY,
first_last_function_setup, last_row_function, noop, no_next_step, last_row_finalizer, noop,
last_dist_func_second_merge, data_req_load_info,
},
{
// 11
"top", TSDB_FUNC_TOP, TSDB_FUNC_TOP, TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF |
TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY,
top_bottom_function_setup, top_function, top_function_f, no_next_step, top_bottom_func_finalizer,
top_func_merge, top_func_second_merge, data_req_load_info,
},
{
// 12
"bottom", TSDB_FUNC_BOTTOM, TSDB_FUNC_BOTTOM, TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF |
TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY,
top_bottom_function_setup, bottom_function, bottom_function_f, no_next_step, top_bottom_func_finalizer,
bottom_func_merge, bottom_func_second_merge, data_req_load_info,
},
{
// 13
"spread", TSDB_FUNC_SPREAD, TSDB_FUNC_SPREAD, TSDB_BASE_FUNC_SO, spread_function_setup, spread_function,
spread_function_f, no_next_step, spread_function_finalizer, spread_func_merge, spread_func_sec_merge,
count_load_data_info,
},
{
// 14
"twa", TSDB_FUNC_TWA, TSDB_FUNC_TWA, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, twa_function_setup,
twa_function, twa_function_f, no_next_step, twa_function_finalizer, twa_func_merge, twa_function_copy,
data_req_load_info,
},
{
// 15
"leastsquares", TSDB_FUNC_LEASTSQR, TSDB_FUNC_INVALID_ID,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF, leastsquares_function_setup,
leastsquares_function, leastsquares_function_f, no_next_step, leastsquares_finalizer, noop, noop,
data_req_load_info,
},
{
// 16
"ts", TSDB_FUNC_TS, TSDB_FUNC_TS, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, function_setup,
date_col_output_function, date_col_output_function, no_next_step, noop, copy_function, copy_function,
no_data_info,
},
{
// 17
"ts", TSDB_FUNC_TS_DUMMY, TSDB_FUNC_TS_DUMMY, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, function_setup, noop,
noop, no_next_step, noop, copy_function, copy_function, data_req_load_info,
},
{
// 18
"tag", TSDB_FUNC_TAG_DUMMY, TSDB_FUNC_TAG_DUMMY, TSDB_BASE_FUNC_SO, function_setup, tag_function, noop,
no_next_step, noop, copy_function, copy_function, no_data_info,
},
{
// 19
"ts", TSDB_FUNC_TS_COMP, TSDB_FUNC_TS_COMP, TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS, ts_comp_function_setup,
ts_comp_function, ts_comp_function_f, no_next_step, ts_comp_finalize, copy_function, copy_function,
data_req_load_info,
},
{
// 20
"tag", TSDB_FUNC_TAG, TSDB_FUNC_TAG, TSDB_BASE_FUNC_SO, function_setup, tag_function, tag_function_f,
no_next_step, noop, copy_function, copy_function, no_data_info,
},
{
// 21, column project sql function
"colprj", TSDB_FUNC_PRJ, TSDB_FUNC_PRJ, TSDB_BASE_FUNC_MO | TSDB_FUNCSTATE_NEED_TS, function_setup,
col_project_function, col_project_function_f, no_next_step, noop, copy_function, copy_function,
data_req_load_info,
},
{
// 22, multi-output, tag function has only one result
"tagprj", TSDB_FUNC_TAGPRJ, TSDB_FUNC_TAGPRJ, TSDB_BASE_FUNC_MO, function_setup, tag_project_function,
tag_project_function_f, no_next_step, noop, copy_function, copy_function, no_data_info,
},
{
// 23
"arithmetic", TSDB_FUNC_ARITHM, TSDB_FUNC_ARITHM,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS, function_setup, arithmetic_function,
arithmetic_function_f, no_next_step, noop, copy_function, copy_function, data_req_load_info,
},
{
// 24
"diff", TSDB_FUNC_DIFF, TSDB_FUNC_INVALID_ID, TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS, diff_function_setup,
diff_function, diff_function_f, no_next_step, noop, noop, noop, data_req_load_info,
},
// distributed version used in two-stage aggregation processes
{
// 25
"first_dist", TSDB_FUNC_FIRST_DST, TSDB_FUNC_FIRST_DST,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, first_last_function_setup,
first_dist_function, first_dist_function_f, no_next_step, function_finalizer, first_dist_func_merge,
first_dist_func_second_merge, first_dist_data_req_info,
},
{
// 26
"last_dist", TSDB_FUNC_LAST_DST, TSDB_FUNC_LAST_DST,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, first_last_function_setup,
last_dist_function, last_dist_function_f, no_next_step, function_finalizer, last_dist_func_merge,
last_dist_func_second_merge, last_dist_data_req_info,
},
{
// 27
"interp", TSDB_FUNC_INTERP, TSDB_FUNC_INTERP,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS, function_setup,
interp_function,
do_sum_f, // todo filter handle
no_next_step, noop, noop, copy_function, no_data_info,
}};
SQLAggFuncElem aAggs[28] = {{
// 0, count function does not invoke the finalize function
"count",
TSDB_FUNC_COUNT,
TSDB_FUNC_COUNT,
TSDB_BASE_FUNC_SO,
function_setup,
count_function,
count_function_f,
no_next_step,
noop,
count_func_merge,
count_func_merge,
count_load_data_info,
},
{
// 1
"sum",
TSDB_FUNC_SUM,
TSDB_FUNC_SUM,
TSDB_BASE_FUNC_SO,
function_setup,
sum_function,
sum_function_f,
no_next_step,
function_finalizer,
sum_func_merge,
sum_func_second_merge,
precal_req_load_info,
},
{
// 2
"avg",
TSDB_FUNC_AVG,
TSDB_FUNC_AVG,
TSDB_BASE_FUNC_SO,
function_setup,
avg_function,
avg_function_f,
no_next_step,
avg_finalizer,
avg_func_merge,
avg_func_second_merge,
precal_req_load_info,
},
{
// 3
"min",
TSDB_FUNC_MIN,
TSDB_FUNC_MIN,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
min_func_setup,
min_function,
min_function_f,
no_next_step,
function_finalizer,
min_func_merge,
min_func_second_merge,
precal_req_load_info,
},
{
// 4
"max",
TSDB_FUNC_MAX,
TSDB_FUNC_MAX,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
max_func_setup,
max_function,
max_function_f,
no_next_step,
function_finalizer,
max_func_merge,
max_func_second_merge,
precal_req_load_info,
},
{
// 5
"stddev",
TSDB_FUNC_STDDEV,
TSDB_FUNC_INVALID_ID,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF,
function_setup,
stddev_function,
stddev_function_f,
stddev_next_step,
stddev_finalizer,
noop,
noop,
data_req_load_info,
},
{
// 6
"percentile",
TSDB_FUNC_PERCT,
TSDB_FUNC_INVALID_ID,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF,
percentile_function_setup,
percentile_function,
percentile_function_f,
no_next_step,
percentile_finalizer,
noop,
noop,
data_req_load_info,
},
{
// 7
"apercentile",
TSDB_FUNC_APERCT,
TSDB_FUNC_APERCT,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC,
apercentile_function_setup,
apercentile_function,
apercentile_function_f,
no_next_step,
apercentile_finalizer,
apercentile_func_merge,
apercentile_func_second_merge,
data_req_load_info,
},
{
// 8
"first",
TSDB_FUNC_FIRST,
TSDB_FUNC_FIRST_DST,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
first_function,
first_function_f,
no_next_step,
function_finalizer,
noop,
noop,
first_data_req_info,
},
{
// 9
"last",
TSDB_FUNC_LAST,
TSDB_FUNC_LAST_DST,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
last_function,
last_function_f,
no_next_step,
function_finalizer,
noop,
noop,
last_data_req_info,
},
{
// 10
"last_row",
TSDB_FUNC_LAST_ROW,
TSDB_FUNC_LAST_ROW,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS |
TSDB_FUNCSTATE_SELECTIVITY,
first_last_function_setup,
last_row_function,
noop,
no_next_step,
last_row_finalizer,
noop,
last_dist_func_second_merge,
data_req_load_info,
},
{
// 11
"top",
TSDB_FUNC_TOP,
TSDB_FUNC_TOP,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_NEED_TS |
TSDB_FUNCSTATE_SELECTIVITY,
top_bottom_function_setup,
top_function,
top_function_f,
no_next_step,
top_bottom_func_finalizer,
top_func_merge,
top_func_second_merge,
data_req_load_info,
},
{
// 12
"bottom",
TSDB_FUNC_BOTTOM,
TSDB_FUNC_BOTTOM,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_NEED_TS |
TSDB_FUNCSTATE_SELECTIVITY,
top_bottom_function_setup,
bottom_function,
bottom_function_f,
no_next_step,
top_bottom_func_finalizer,
bottom_func_merge,
bottom_func_second_merge,
data_req_load_info,
},
{
// 13
"spread",
TSDB_FUNC_SPREAD,
TSDB_FUNC_SPREAD,
TSDB_BASE_FUNC_SO,
spread_function_setup,
spread_function,
spread_function_f,
no_next_step,
spread_function_finalizer,
spread_func_merge,
spread_func_sec_merge,
count_load_data_info,
},
{
// 14
"twa",
TSDB_FUNC_TWA,
TSDB_FUNC_TWA,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS,
twa_function_setup,
twa_function,
twa_function_f,
no_next_step,
twa_function_finalizer,
twa_func_merge,
twa_function_copy,
data_req_load_info,
},
{
// 15
"leastsquares",
TSDB_FUNC_LEASTSQR,
TSDB_FUNC_INVALID_ID,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF,
leastsquares_function_setup,
leastsquares_function,
leastsquares_function_f,
no_next_step,
leastsquares_finalizer,
noop,
noop,
data_req_load_info,
},
{
// 16
"ts",
TSDB_FUNC_TS,
TSDB_FUNC_TS,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS,
function_setup,
date_col_output_function,
date_col_output_function,
no_next_step,
noop,
copy_function,
copy_function,
no_data_info,
},
{
// 17
"ts",
TSDB_FUNC_TS_DUMMY,
TSDB_FUNC_TS_DUMMY,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS,
function_setup,
noop,
noop,
no_next_step,
noop,
copy_function,
copy_function,
data_req_load_info,
},
{
// 18
"tag",
TSDB_FUNC_TAG_DUMMY,
TSDB_FUNC_TAG_DUMMY,
TSDB_BASE_FUNC_SO,
function_setup,
tag_function,
noop,
no_next_step,
noop,
copy_function,
copy_function,
no_data_info,
},
{
// 19
"ts",
TSDB_FUNC_TS_COMP,
TSDB_FUNC_TS_COMP,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS,
ts_comp_function_setup,
ts_comp_function,
ts_comp_function_f,
no_next_step,
ts_comp_finalize,
copy_function,
copy_function,
data_req_load_info,
},
{
// 20
"tag",
TSDB_FUNC_TAG,
TSDB_FUNC_TAG,
TSDB_BASE_FUNC_SO,
function_setup,
tag_function,
tag_function_f,
no_next_step,
noop,
copy_function,
copy_function,
no_data_info,
},
{
// 21, column project sql function
"colprj",
TSDB_FUNC_PRJ,
TSDB_FUNC_PRJ,
TSDB_BASE_FUNC_MO | TSDB_FUNCSTATE_NEED_TS,
function_setup,
col_project_function,
col_project_function_f,
no_next_step,
noop,
copy_function,
copy_function,
data_req_load_info,
},
{
// 22, multi-output, tag function has only one result
"tagprj",
TSDB_FUNC_TAGPRJ,
TSDB_FUNC_TAGPRJ,
TSDB_BASE_FUNC_MO,
function_setup,
tag_project_function,
tag_project_function_f,
no_next_step,
noop,
copy_function,
copy_function,
no_data_info,
},
{
// 23
"arithmetic",
TSDB_FUNC_ARITHM,
TSDB_FUNC_ARITHM,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS,
function_setup,
arithmetic_function,
arithmetic_function_f,
no_next_step,
noop,
copy_function,
copy_function,
data_req_load_info,
},
{
// 24
"diff",
TSDB_FUNC_DIFF,
TSDB_FUNC_INVALID_ID,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS,
diff_function_setup,
diff_function,
diff_function_f,
no_next_step,
noop,
noop,
noop,
data_req_load_info,
},
// distributed version used in two-stage aggregation processes
{
// 25
"first_dist",
TSDB_FUNC_FIRST_DST,
TSDB_FUNC_FIRST_DST,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY,
first_last_function_setup,
first_dist_function,
first_dist_function_f,
no_next_step,
function_finalizer,
first_dist_func_merge,
first_dist_func_second_merge,
first_dist_data_req_info,
},
{
// 26
"last_dist",
TSDB_FUNC_LAST_DST,
TSDB_FUNC_LAST_DST,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY,
first_last_function_setup,
last_dist_function,
last_dist_function_f,
no_next_step,
function_finalizer,
last_dist_func_merge,
last_dist_func_second_merge,
last_dist_data_req_info,
},
{
// 27
"interp",
TSDB_FUNC_INTERP,
TSDB_FUNC_INTERP,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS,
function_setup,
interp_function,
do_sum_f, // todo filter handle
no_next_step,
noop,
noop,
copy_function,
no_data_info,
}};

View File

@ -81,7 +81,7 @@ static bool validateIpAddress(char* ip);
static bool hasUnsupportFunctionsForMetricQuery(SSqlCmd* pCmd);
static bool functionCompatibleCheck(SSqlCmd* pCmd);
static void setColumnOffsetValueInResultset(SSqlCmd* pCmd);
static void setColumnOffsetValueInResultset(SSqlCmd* pCmd);
static int32_t parseGroupbyClause(SSqlCmd* pCmd, tVariantList* pList);
static int32_t parseIntervalClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql);
@ -94,7 +94,7 @@ static int32_t parseFillClause(SSqlCmd* pCmd, SQuerySQL* pQuerySQL);
static int32_t parseOrderbyClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql, SSchema* pSchema, int32_t numOfCols);
static int32_t tsRewriteFieldNameIfNecessary(SSqlCmd* pCmd);
static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField);
static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField);
static int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo);
static int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd);
static int32_t buildArithmeticExprString(tSQLExpr* pExpr, char** exprString);
@ -105,8 +105,8 @@ static int32_t validateLocalConfig(tDCLSQL* pOptions);
static int32_t validateColumnName(char* name);
static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo);
static bool hasTimestampForPointInterpQuery(SSqlCmd* pCmd);
static void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex);
static bool hasTimestampForPointInterpQuery(SSqlCmd* pCmd);
static void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex);
static int32_t parseLimitClause(SSqlObj* pSql, SQuerySQL* pQuerySql);
static int32_t parseCreateDBOptions(SCreateDBInfo* pCreateDbSql, SSqlCmd* pCmd);
@ -115,13 +115,12 @@ static int32_t getTableIndexByName(SSQLToken* pToken, SSqlCmd* pCmd, SColumnInde
static int32_t optrToString(tSQLExpr* pExpr, char** exprString);
static SColumnList getColumnList(int32_t num, int16_t tableIndex, int32_t columnIndex);
static int32_t getMeterIndex(SSQLToken* pTableToken, SSqlCmd* pCmd, SColumnIndex* pIndex);
static int32_t doFunctionsCompatibleCheck(SSqlObj* pSql);
static int32_t getMeterIndex(SSQLToken* pTableToken, SSqlCmd* pCmd, SColumnIndex* pIndex);
static int32_t doFunctionsCompatibleCheck(SSqlObj* pSql);
static int32_t tscQueryOnlyMetricTags(SSqlCmd* pCmd, bool* queryOnMetricTags) {
assert(QUERY_IS_STABLE_QUERY(pCmd->type));
// here colIdx == -1 means the special column tbname that is the name of each table
*queryOnMetricTags = true;
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pCmd, i);
@ -1151,7 +1150,7 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql) {
}
// check the invalid sql expresssion: select count(tbname)/count(tag1)/count(tag2) from super_table interval(1d);
for(int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pCmd, i);
if (pExpr->functionId == TSDB_FUNC_COUNT && TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
setErrMsg(pCmd, msg1);
@ -2791,10 +2790,14 @@ static bool functionCompatibleCheck(SSqlCmd* pCmd) {
// diff function cannot be executed with other function
// arithmetic function can be executed with other arithmetic functions
for (int32_t i = startIdx + 1; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
int16_t functionId = tscSqlExprGet(pCmd, i)->functionId;
if (functionId == TSDB_FUNC_TAGPRJ ||
functionId == TSDB_FUNC_TAG ||
functionId == TSDB_FUNC_TS) {
SSqlExpr* pExpr = tscSqlExprGet(pCmd, i);
int16_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS) {
continue;
}
if (functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
continue;
}
@ -2809,9 +2812,7 @@ static bool functionCompatibleCheck(SSqlCmd* pCmd) {
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
int16_t functionId = tscSqlExprGet(pCmd, i)->functionId;
if (functionId == TSDB_FUNC_PRJ ||
functionId == TSDB_FUNC_TAGPRJ ||
functionId == TSDB_FUNC_TS ||
if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TS ||
functionId == TSDB_FUNC_ARITHM) {
continue;
}
@ -4986,19 +4987,37 @@ int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd) {
int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd) {
bool isProjectionFunction = false;
const char* msg = "column projection is not compatible with interval";
const char* msg1 = "column projection is not compatible with interval";
const char* msg2 = "interval not allowed for tag queries";
// multi-output set/ todo refactor
for (int32_t k = 0; k < pCmd->fieldsInfo.numOfOutputCols; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pCmd, k);
// projection query on primary timestamp, the selectivity function needs to be present.
if (pExpr->functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
bool hasSelectivity = false;
for(int32_t j = 0; j < pCmd->fieldsInfo.numOfOutputCols; ++j) {
SSqlExpr* pEx = tscSqlExprGet(pCmd, j);
if ((aAggs[pEx->functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) == TSDB_FUNCSTATE_SELECTIVITY) {
hasSelectivity = true;
break;
}
}
if (hasSelectivity) {
continue;
}
}
if (pExpr->functionId == TSDB_FUNC_PRJ || pExpr->functionId == TSDB_FUNC_DIFF ||
pExpr->functionId == TSDB_FUNC_ARITHM) {
pExpr->functionId == TSDB_FUNC_ARITHM) {
isProjectionFunction = true;
}
}
if (isProjectionFunction) {
setErrMsg(pCmd, msg);
setErrMsg(pCmd, msg1);
}
return isProjectionFunction == true ? TSDB_CODE_INVALID_SQL : TSDB_CODE_SUCCESS;
@ -5164,8 +5183,7 @@ int32_t parseLimitClause(SSqlObj* pSql, SQuerySQL* pQuerySql) {
if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) {
bool queryOnTags = false;
int32_t ret = tscQueryOnlyMetricTags(pCmd, &queryOnTags);
if (ret != TSDB_CODE_SUCCESS) {
if (tscQueryOnlyMetricTags(pCmd, &queryOnTags) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_INVALID_SQL;
}
@ -5382,8 +5400,8 @@ static void doUpdateSqlFunctionForTagPrj(SSqlCmd* pCmd) {
}
}
int16_t resType = 0;
int16_t resBytes = 0;
int16_t resType = 0;
int16_t resBytes = 0;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
SSchema* pSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta);
@ -5464,7 +5482,7 @@ static int32_t checkUpdateTagPrjFunctions(SSqlCmd* pCmd) {
const char* msg2 = "functions not allowed";
bool tagColExists = false;
int16_t numOfTimestamp = 0; // primary timestamp column
int16_t numOfTimestamp = 0; // primary timestamp column
int16_t numOfSelectivity = 0;
int16_t numOfAggregation = 0;
@ -5493,7 +5511,7 @@ static int32_t checkUpdateTagPrjFunctions(SSqlCmd* pCmd) {
// When the tag projection function on tag column that is not in the group by clause, aggregation function and
// selectivity function exist in select clause is not allowed.
if(numOfAggregation > 0) {
if (numOfAggregation > 0) {
setErrMsg(pCmd, msg1);
return TSDB_CODE_INVALID_SQL;
}
@ -5598,14 +5616,14 @@ int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) {
const char* msg2 = "interval not allowed in group by normal column";
const char* msg3 = "group by not allowed on projection query";
const char* msg4 = "tags retrieve not compatible with group by";
const char* msg5 = "retrieve tags not compatible with group by ";
const char* msg5 = "retrieve tags not compatible with group by or interval query";
SSqlCmd* pCmd = &pSql->cmd;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
// only retrieve tags, group by is not supportted
if (pCmd->command == TSDB_SQL_RETRIEVE_TAGS) {
if (pCmd->groupbyExpr.numOfGroupCols > 0) {
if (pCmd->groupbyExpr.numOfGroupCols > 0 || pCmd->nAggTimeInterval > 0) {
setErrMsg(pCmd, msg5);
return TSDB_CODE_INVALID_SQL;
} else {
@ -5634,7 +5652,7 @@ int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) {
* group by normal columns.
* Check if the column projection is identical to the group by column or not
*/
if (functId == TSDB_FUNC_PRJ) {
if (functId == TSDB_FUNC_PRJ && pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
bool qualified = false;
for (int32_t j = 0; j < pCmd->groupbyExpr.numOfGroupCols; ++j) {
SColIndexEx* pColIndex = &pCmd->groupbyExpr.columnInfo[j];
@ -5650,7 +5668,8 @@ int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) {
}
if (IS_MULTIOUTPUT(aAggs[functId].nStatus) && functId != TSDB_FUNC_TOP && functId != TSDB_FUNC_BOTTOM &&
functId != TSDB_FUNC_TAGPRJ) {
functId != TSDB_FUNC_TAGPRJ &&
(functId == TSDB_FUNC_PRJ && pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX)) {
setErrMsg(pCmd, msg1);
return TSDB_CODE_INVALID_SQL;
}

View File

@ -142,7 +142,6 @@ bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd) {
return false;
}
void tscGetDBInfoFromMeterId(char* meterId, char* db) {
char* st = strstr(meterId, TS_PATH_DELIMITER);
if (st != NULL) {
@ -265,7 +264,7 @@ bool tscIsPointInterpQuery(SSqlCmd* pCmd) {
}
bool tscIsTWAQuery(SSqlCmd* pCmd) {
for(int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) {
for (int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pCmd, i);
if (pExpr == NULL) {
continue;
@ -450,7 +449,8 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
tfree(pDataBlock);
}
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, uint32_t offset) {
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes,
uint32_t offset) {
uint32_t needed = pDataBlock->numOfParams + 1;
if (needed > pDataBlock->numOfAllocedParams) {
needed *= 2;
@ -490,13 +490,13 @@ SDataBlockList* tscCreateBlockArrayList() {
return pDataBlockArrayList;
}
void tscAppendDataBlock(SDataBlockList *pList, STableDataBlocks *pBlocks) {
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks) {
if (pList->nSize >= pList->nAlloc) {
pList->nAlloc = pList->nAlloc << 1;
pList->pData = realloc(pList->pData, sizeof(void *) * (size_t)pList->nAlloc);
pList->pData = realloc(pList->pData, sizeof(void*) * (size_t)pList->nAlloc);
// reset allocated memory
memset(pList->pData + pList->nSize, 0, sizeof(void *) * (pList->nAlloc - pList->nSize));
memset(pList->pData + pList->nSize, 0, sizeof(void*) * (pList->nAlloc - pList->nSize));
}
pList->pData[pList->nSize++] = pBlocks;
@ -553,7 +553,7 @@ void tscFreeUnusedDataBlocks(SDataBlockList* pList) {
}
STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name) {
STableDataBlocks *dataBuf = tscCreateDataBlock(size);
STableDataBlocks* dataBuf = tscCreateDataBlock(size);
dataBuf->rowSize = rowSize;
dataBuf->size = startOffset;
@ -573,7 +573,7 @@ STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pData
}
if (dataBuf == NULL) {
dataBuf = tscCreateDataBlockEx((size_t) size, rowSize, startOffset, tableId);
dataBuf = tscCreateDataBlockEx((size_t)size, rowSize, startOffset, tableId);
dataBuf = *(STableDataBlocks**)taosAddIntHash(pHashList, id, (char*)&dataBuf);
tscAppendDataBlock(pDataBlockList, dataBuf);
}
@ -604,7 +604,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
if (tmp != NULL) {
dataBuf->pData = tmp;
memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size);
} else { // failed to allocate memory, free already allocated memory and return error code
} else { // failed to allocate memory, free already allocated memory and return error code
tscError("%p failed to allocate memory for merging submit block, size:%d", pSql, dataBuf->nAllocSize);
taosCleanUpIntHash(pVnodeDataBlockHashList);
@ -673,7 +673,7 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) {
pCmd->allocSize = size;
} else {
if (pCmd->allocSize < size) {
char* b = realloc(pCmd->payload, size);
char* b = realloc(pCmd->payload, size);
if (b == NULL) return TSDB_CODE_CLI_OUT_OF_MEMORY;
pCmd->payload = b;
pCmd->allocSize = size;
@ -869,11 +869,11 @@ void tscClearFieldInfo(SFieldInfo* pFieldInfo) {
static void _exprCheckSpace(SSqlExprInfo* pExprInfo, int32_t size) {
if (size > pExprInfo->numOfAlloc) {
int32_t oldSize = pExprInfo->numOfAlloc;
uint32_t oldSize = pExprInfo->numOfAlloc;
int32_t newSize = (oldSize <= 0) ? 8 : (oldSize << 1);
uint32_t newSize = (oldSize <= 0) ? 8 : (oldSize << 1U);
while (newSize < size) {
newSize = (newSize << 1);
newSize = (newSize << 1U);
}
if (newSize > TSDB_MAX_COLUMNS) {
@ -1161,7 +1161,7 @@ void tscColumnBaseInfoDestroy(SColumnBaseInfo* pColumnBaseInfo) {
assert(pColumnBaseInfo->numOfCols <= TSDB_MAX_COLUMNS);
for (int32_t i = 0; i < pColumnBaseInfo->numOfCols; ++i) {
SColumnBase *pColBase = &(pColumnBaseInfo->pColList[i]);
SColumnBase* pColBase = &(pColumnBaseInfo->pColList[i]);
if (pColBase->numOfFilters > 0) {
for (int32_t j = 0; j < pColBase->numOfFilters; ++j) {
@ -1179,8 +1179,9 @@ void tscColumnBaseInfoDestroy(SColumnBaseInfo* pColumnBaseInfo) {
tfree(pColumnBaseInfo->pColList);
}
void tscColumnBaseInfoReserve(SColumnBaseInfo* pColumnBaseInfo, int32_t size) { _cf_ensureSpace(pColumnBaseInfo, size); }
void tscColumnBaseInfoReserve(SColumnBaseInfo* pColumnBaseInfo, int32_t size) {
_cf_ensureSpace(pColumnBaseInfo, size);
}
/*
* 1. normal name, not a keyword or number
@ -1228,16 +1229,16 @@ int32_t tscValidateName(SSQLToken* pToken) {
int len = tSQLGetToken(pToken->z, &pToken->type);
// single token, validate it
if (len == pToken->n){
if (len == pToken->n) {
return validateQuoteToken(pToken);
} else {
sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true);
if (sep == NULL) {
return TSDB_CODE_INVALID_SQL;
}
sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true);
if (sep == NULL) {
return TSDB_CODE_INVALID_SQL;
}
return tscValidateName(pToken);
}
}
} else {
if (isNumber(pToken)) {
return TSDB_CODE_INVALID_SQL;
@ -1616,8 +1617,8 @@ int32_t SStringAlloc(SString* pStr, int32_t size) {
#ifdef WINDOWS
LPVOID lpMsgBuf;
FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL,
GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language
(LPTSTR)&lpMsgBuf, 0, NULL);
GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language
(LPTSTR)&lpMsgBuf, 0, NULL);
tscTrace("failed to allocate memory, reason:%s", lpMsgBuf);
LocalFree(lpMsgBuf);
#else
@ -1652,12 +1653,11 @@ int32_t SStringEnsureRemain(SString* pStr, int32_t size) {
char* tmp = realloc(pStr->z, newsize);
if (tmp == NULL) {
#ifdef WINDOWS
LPVOID lpMsgBuf;
FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL,
GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language
(LPTSTR)&lpMsgBuf, 0, NULL);
GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language
(LPTSTR)&lpMsgBuf, 0, NULL);
tscTrace("failed to allocate memory, reason:%s", lpMsgBuf);
LocalFree(lpMsgBuf);
#else
@ -1728,7 +1728,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex
if (pPrevSql != NULL) {
pNew->cmd.type = pPrevSql->cmd.type;
} else {
pNew->cmd.type |= TSDB_QUERY_TYPE_SUBQUERY; // it must be the subquery
pNew->cmd.type |= TSDB_QUERY_TYPE_SUBQUERY; // it must be the subquery
}
uint64_t uid = pMeterMetaInfo->pMeterMeta->uid;
@ -1760,7 +1760,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex
char key[TSDB_MAX_TAGS_LEN + 1] = {0};
tscGetMetricMetaCacheKey(pCmd, key, pMetermetaInfo->pMeterMeta->uid);
char* name = pMeterMetaInfo->name;
char* name = pMeterMetaInfo->name;
SMeterMetaInfo* pFinalInfo = NULL;
if (pPrevSql == NULL) {
@ -1768,11 +1768,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex
SMetricMeta* pMetricMeta = taosGetDataFromCache(tscCacheHandle, key);
pFinalInfo = tscAddMeterMetaInfo(&pNew->cmd, name, pMeterMeta, pMetricMeta, pMeterMetaInfo->numOfTags,
pMeterMetaInfo->tagColumnIndex);
pMeterMetaInfo->tagColumnIndex);
} else {
SMeterMetaInfo* pPrevInfo = tscGetMeterMetaInfo(&pPrevSql->cmd, 0);
pFinalInfo = tscAddMeterMetaInfo(&pNew->cmd, name, pPrevInfo->pMeterMeta, pPrevInfo->pMetricMeta, pMeterMetaInfo->numOfTags,
pMeterMetaInfo->tagColumnIndex);
pFinalInfo = tscAddMeterMetaInfo(&pNew->cmd, name, pPrevInfo->pMeterMeta, pPrevInfo->pMetricMeta,
pMeterMetaInfo->numOfTags, pMeterMetaInfo->tagColumnIndex);
pPrevInfo->pMeterMeta = NULL;
pPrevInfo->pMetricMeta = NULL;
@ -1783,13 +1783,14 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex
assert(pFinalInfo->pMetricMeta != NULL);
}
tscTrace("%p new subquery %p, vnodeIdx:%d, tableIndex:%d, type:%d", pSql, pNew, vnodeIndex, tableIndex, pNew->cmd.type);
tscTrace("%p new subquery %p, vnodeIdx:%d, tableIndex:%d, type:%d", pSql, pNew, vnodeIndex, tableIndex,
pNew->cmd.type);
return pNew;
}
void tscDoQuery(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
void* fp = pSql->fp;
void* fp = pSql->fp;
if (pCmd->command > TSDB_SQL_LOCAL) {
tscProcessLocalCmd(pSql);

View File

@ -3122,9 +3122,11 @@ static bool onlyOneQueryType(SQuery *pQuery, int32_t functId, int32_t functIdDst
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG) {
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY ||
functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAG_DUMMY) {
continue;
}
if (functionId != functId && functionId != functIdDst) {
return false;
}
@ -3137,10 +3139,9 @@ static bool onlyFirstQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSD
static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); }
static void rewriteExecOrder(SQuery *pQuery, bool metricQuery) {
static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) {
// in case of point-interpolation query, use asc order scan
char msg[] =
"QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%lld-%lld, "
char msg[] = "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%lld-%lld, "
"new qrange:%lld-%lld";
// descending order query
@ -3614,7 +3615,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
}
setScanLimitationByResultBuffer(pQuery);
rewriteExecOrder(pQuery, false);
changeExecuteScanOrder(pQuery, false);
pQInfo->over = 0;
pQInfo->pointsRead = 0;
@ -3790,7 +3791,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
pQInfo->pointsRead = 0;
pQuery->pointsRead = 0;
rewriteExecOrder(pQuery, true);
changeExecuteScanOrder(pQuery, true);
vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo);
vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo);

View File

@ -289,7 +289,7 @@ SSqlFunctionExpr* vnodeCreateSqlFunctionExpr(SQueryMeterMsg* pQueryMsg, int32_t*
return NULL;
}
if (pExprs[i].pBase.functionId == TSDB_FUNC_TAG_DUMMY) {
if (pExprs[i].pBase.functionId == TSDB_FUNC_TAG_DUMMY || pExprs[i].pBase.functionId == TSDB_FUNC_TS_DUMMY) {
tagLen += pExprs[i].resBytes;
}
assert(isValidDataType(pExprs[i].resType, pExprs[i].resBytes));