[TD-1978]
This commit is contained in:
parent
ff8930f903
commit
7314a0dd08
|
@ -28,6 +28,7 @@
|
|||
#include "tscompression.h"
|
||||
#include "tsqlfunction.h"
|
||||
#include "tutil.h"
|
||||
#include "ttype.h"
|
||||
|
||||
#define GET_INPUT_CHAR(x) (((char *)((x)->aInputElemBuf)) + ((x)->startOffset) * ((x)->inputBytes))
|
||||
#define GET_INPUT_CHAR_INDEX(x, y) (GET_INPUT_CHAR(x) + (y) * (x)->inputBytes)
|
||||
|
@ -2479,28 +2480,8 @@ static void percentile_function(SQLFunctionCtx *pCtx) {
|
|||
continue;
|
||||
}
|
||||
|
||||
// TODO extract functions
|
||||
double v = 0;
|
||||
switch (pCtx->inputType) {
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
v = GET_INT8_VAL(data);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
v = GET_INT16_VAL(data);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
v = (double)(GET_INT64_VAL(data));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
v = GET_FLOAT_VAL(data);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
v = GET_DOUBLE_VAL(data);
|
||||
break;
|
||||
default:
|
||||
v = GET_INT32_VAL(data);
|
||||
break;
|
||||
}
|
||||
GET_TYPED_DATA(v, pCtx->inputType, data);
|
||||
|
||||
if (v < GET_DOUBLE_VAL(&pInfo->minval)) {
|
||||
SET_DOUBLE_VAL(&pInfo->minval, v);
|
||||
|
@ -2541,30 +2522,10 @@ static void percentile_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
|||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
|
||||
SPercentileInfo *pInfo = (SPercentileInfo *)GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
if (pInfo->stage == 0) {
|
||||
// TODO extract functions
|
||||
|
||||
double v = 0;
|
||||
switch (pCtx->inputType) {
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
v = GET_INT8_VAL(pData);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
v = GET_INT16_VAL(pData);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
v = (double)(GET_INT64_VAL(pData));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
v = GET_FLOAT_VAL(pData);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
v = GET_DOUBLE_VAL(pData);
|
||||
break;
|
||||
default:
|
||||
v = GET_INT32_VAL(pData);
|
||||
break;
|
||||
}
|
||||
GET_TYPED_DATA(v, pCtx->inputType, pData);
|
||||
|
||||
if (v < GET_DOUBLE_VAL(&pInfo->minval)) {
|
||||
SET_DOUBLE_VAL(&pInfo->minval, v);
|
||||
|
@ -2653,29 +2614,9 @@ static void apercentile_function(SQLFunctionCtx *pCtx) {
|
|||
}
|
||||
|
||||
notNullElems += 1;
|
||||
|
||||
double v = 0;
|
||||
|
||||
switch (pCtx->inputType) {
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
v = GET_INT8_VAL(data);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
v = GET_INT16_VAL(data);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
v = (double)(GET_INT64_VAL(data));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
v = GET_FLOAT_VAL(data);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
v = GET_DOUBLE_VAL(data);
|
||||
break;
|
||||
default:
|
||||
v = GET_INT32_VAL(data);
|
||||
break;
|
||||
}
|
||||
|
||||
GET_TYPED_DATA(v, pCtx->inputType, data);
|
||||
tHistogramAdd(&pInfo->pHisto, v);
|
||||
}
|
||||
|
||||
|
@ -2700,26 +2641,7 @@ static void apercentile_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
|||
SAPercentileInfo *pInfo = getAPerctInfo(pCtx);
|
||||
|
||||
double v = 0;
|
||||
switch (pCtx->inputType) {
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
v = GET_INT8_VAL(pData);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
v = GET_INT16_VAL(pData);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
v = (double)(GET_INT64_VAL(pData));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
v = GET_FLOAT_VAL(pData);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
v = GET_DOUBLE_VAL(pData);
|
||||
break;
|
||||
default:
|
||||
v = GET_INT32_VAL(pData);
|
||||
break;
|
||||
}
|
||||
GET_TYPED_DATA(v, pCtx->inputType, pData);
|
||||
|
||||
tHistogramAdd(&pInfo->pHisto, v);
|
||||
|
||||
|
@ -4142,22 +4064,7 @@ static void rate_function(SQLFunctionCtx *pCtx) {
|
|||
notNullElems++;
|
||||
|
||||
int64_t v = 0;
|
||||
switch (pCtx->inputType) {
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
v = (int64_t)GET_INT8_VAL(pData);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
v = (int64_t)GET_INT16_VAL(pData);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
v = (int64_t)GET_INT32_VAL(pData);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
v = (int64_t)GET_INT64_VAL(pData);
|
||||
break;
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
GET_TYPED_DATA(v, pCtx->inputType, pData);
|
||||
|
||||
if ((INT64_MIN == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) {
|
||||
pRateInfo->firstValue = v;
|
||||
|
@ -4207,22 +4114,7 @@ static void rate_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
|||
TSKEY *primaryKey = pCtx->ptsList;
|
||||
|
||||
int64_t v = 0;
|
||||
switch (pCtx->inputType) {
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
v = (int64_t)GET_INT8_VAL(pData);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
v = (int64_t)GET_INT16_VAL(pData);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
v = (int64_t)GET_INT32_VAL(pData);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
v = (int64_t)GET_INT64_VAL(pData);
|
||||
break;
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
GET_TYPED_DATA(v, pCtx->inputType, pData);
|
||||
|
||||
if ((INT64_MIN == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) {
|
||||
pRateInfo->firstValue = v;
|
||||
|
@ -4349,22 +4241,7 @@ static void irate_function(SQLFunctionCtx *pCtx) {
|
|||
notNullElems++;
|
||||
|
||||
int64_t v = 0;
|
||||
switch (pCtx->inputType) {
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
v = (int64_t)GET_INT8_VAL(pData);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
v = (int64_t)GET_INT16_VAL(pData);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
v = (int64_t)GET_INT32_VAL(pData);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
v = (int64_t)GET_INT64_VAL(pData);
|
||||
break;
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
GET_TYPED_DATA(v, pCtx->inputType, pData);
|
||||
|
||||
// TODO: calc once if only call this function once ????
|
||||
if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->lastValue)) {
|
||||
|
@ -4409,23 +4286,8 @@ static void irate_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
|||
TSKEY *primaryKey = pCtx->ptsList;
|
||||
|
||||
int64_t v = 0;
|
||||
switch (pCtx->inputType) {
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
v = (int64_t)GET_INT8_VAL(pData);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
v = (int64_t)GET_INT16_VAL(pData);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
v = (int64_t)GET_INT32_VAL(pData);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
v = (int64_t)GET_INT64_VAL(pData);
|
||||
break;
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
|
||||
GET_TYPED_DATA(v, pCtx->inputType, pData);
|
||||
|
||||
pRateInfo->firstKey = pRateInfo->lastKey;
|
||||
pRateInfo->firstValue = pRateInfo->lastValue;
|
||||
|
||||
|
|
|
@ -913,7 +913,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
|
|||
}
|
||||
|
||||
while (1) {
|
||||
int64_t newRows = taosGenerateDataBlock(pFillInfo, pResPages, pLocalReducer->resColModel->capacity);
|
||||
int64_t newRows = taosFillResultDataBlock(pFillInfo, pResPages, pLocalReducer->resColModel->capacity);
|
||||
|
||||
if (pQueryInfo->limit.offset < newRows) {
|
||||
newRows -= pQueryInfo->limit.offset;
|
||||
|
@ -942,7 +942,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
|
|||
}
|
||||
|
||||
// all output in current group are completed
|
||||
int32_t totalRemainRows = (int32_t)getFilledNumOfRes(pFillInfo, actualETime, pLocalReducer->resColModel->capacity);
|
||||
int32_t totalRemainRows = (int32_t)getNumOfResWithFill(pFillInfo, actualETime, pLocalReducer->resColModel->capacity);
|
||||
if (totalRemainRows <= 0) {
|
||||
break;
|
||||
}
|
||||
|
@ -1291,7 +1291,7 @@ static bool doBuildFilledResultForGroup(SSqlObj *pSql) {
|
|||
int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pFillInfo->numOfRows - 1));
|
||||
|
||||
// the first column must be the timestamp column
|
||||
int32_t rows = (int32_t)getFilledNumOfRes(pFillInfo, etime, pLocalReducer->resColModel->capacity);
|
||||
int32_t rows = (int32_t) getNumOfResWithFill(pFillInfo, etime, pLocalReducer->resColModel->capacity);
|
||||
if (rows > 0) { // do fill gap
|
||||
doFillResult(pSql, pLocalReducer, false);
|
||||
}
|
||||
|
@ -1320,7 +1320,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
|
|||
((pRes->numOfRowsGroup < pQueryInfo->limit.limit && pQueryInfo->limit.limit > 0) || (pQueryInfo->limit.limit < 0))) {
|
||||
int64_t etime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey : pQueryInfo->window.skey;
|
||||
|
||||
int32_t rows = (int32_t)getFilledNumOfRes(pFillInfo, etime, pLocalReducer->resColModel->capacity);
|
||||
int32_t rows = (int32_t)getNumOfResWithFill(pFillInfo, etime, pLocalReducer->resColModel->capacity);
|
||||
if (rows > 0) {
|
||||
doFillResult(pSql, pLocalReducer, true);
|
||||
}
|
||||
|
|
|
@ -355,32 +355,6 @@ bool isValidDataType(int32_t type) {
|
|||
return type >= TSDB_DATA_TYPE_NULL && type <= TSDB_DATA_TYPE_NCHAR;
|
||||
}
|
||||
|
||||
//bool isNull(const char *val, int32_t type) {
|
||||
// switch (type) {
|
||||
// case TSDB_DATA_TYPE_BOOL:
|
||||
// return *(uint8_t *)val == TSDB_DATA_BOOL_NULL;
|
||||
// case TSDB_DATA_TYPE_TINYINT:
|
||||
// return *(uint8_t *)val == TSDB_DATA_TINYINT_NULL;
|
||||
// case TSDB_DATA_TYPE_SMALLINT:
|
||||
// return *(uint16_t *)val == TSDB_DATA_SMALLINT_NULL;
|
||||
// case TSDB_DATA_TYPE_INT:
|
||||
// return *(uint32_t *)val == TSDB_DATA_INT_NULL;
|
||||
// case TSDB_DATA_TYPE_BIGINT:
|
||||
// case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
// return *(uint64_t *)val == TSDB_DATA_BIGINT_NULL;
|
||||
// case TSDB_DATA_TYPE_FLOAT:
|
||||
// return *(uint32_t *)val == TSDB_DATA_FLOAT_NULL;
|
||||
// case TSDB_DATA_TYPE_DOUBLE:
|
||||
// return *(uint64_t *)val == TSDB_DATA_DOUBLE_NULL;
|
||||
// case TSDB_DATA_TYPE_NCHAR:
|
||||
// return *(uint32_t*) varDataVal(val) == TSDB_DATA_NCHAR_NULL;
|
||||
// case TSDB_DATA_TYPE_BINARY:
|
||||
// return *(uint8_t *) varDataVal(val) == TSDB_DATA_BINARY_NULL;
|
||||
// default:
|
||||
// return false;
|
||||
// };
|
||||
//}
|
||||
|
||||
void setVardataNull(char* val, int32_t type) {
|
||||
if (type == TSDB_DATA_TYPE_BINARY) {
|
||||
varDataSetLen(val, sizeof(int8_t));
|
||||
|
@ -433,14 +407,10 @@ void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) {
|
|||
*(uint64_t *)(val + i * tDataTypeDesc[type].nSize) = TSDB_DATA_DOUBLE_NULL;
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR: // todo : without length?
|
||||
for (int32_t i = 0; i < numOfElems; ++i) {
|
||||
*(uint32_t *)(val + i * bytes) = TSDB_DATA_NCHAR_NULL;
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
for (int32_t i = 0; i < numOfElems; ++i) {
|
||||
*(uint8_t *)(val + i * bytes) = TSDB_DATA_BINARY_NULL;
|
||||
setVardataNull(val + i * bytes, type);
|
||||
}
|
||||
break;
|
||||
default: {
|
||||
|
|
|
@ -61,13 +61,23 @@ typedef struct tstr {
|
|||
|
||||
// Bytes for each type.
|
||||
extern const int32_t TYPE_BYTES[11];
|
||||
|
||||
// TODO: replace and remove code below
|
||||
#define CHAR_BYTES sizeof(char)
|
||||
#define SHORT_BYTES sizeof(int16_t)
|
||||
#define INT_BYTES sizeof(int32_t)
|
||||
#define LONG_BYTES sizeof(int64_t)
|
||||
#define FLOAT_BYTES sizeof(float)
|
||||
#define DOUBLE_BYTES sizeof(double)
|
||||
#define CHAR_BYTES sizeof(char)
|
||||
#define SHORT_BYTES sizeof(int16_t)
|
||||
#define INT_BYTES sizeof(int32_t)
|
||||
#define LONG_BYTES sizeof(int64_t)
|
||||
#define FLOAT_BYTES sizeof(float)
|
||||
#define DOUBLE_BYTES sizeof(double)
|
||||
#define POINTER_BYTES sizeof(void *) // 8 by default assert(sizeof(ptrdiff_t) == sizseof(void*)
|
||||
|
||||
#define TSDB_KEYSIZE sizeof(TSKEY)
|
||||
|
||||
#if LINUX
|
||||
#define TSDB_NCHAR_SIZE sizeof(wchar_t)
|
||||
#else
|
||||
#define TSDB_NCHAR_SIZE sizeof(int32_t)
|
||||
#endif
|
||||
|
||||
// NULL definition
|
||||
#define TSDB_DATA_BOOL_NULL 0x02
|
||||
|
@ -101,10 +111,12 @@ extern const int32_t TYPE_BYTES[11];
|
|||
#define TSDB_TIME_PRECISION_MILLI 0
|
||||
#define TSDB_TIME_PRECISION_MICRO 1
|
||||
#define TSDB_TIME_PRECISION_NANO 2
|
||||
#define TSDB_TICK_PER_SECOND(precision) ((precision)==TSDB_TIME_PRECISION_MILLI ? 1e3L : ((precision)==TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L))
|
||||
|
||||
#define TSDB_TIME_PRECISION_MILLI_STR "ms"
|
||||
#define TSDB_TIME_PRECISION_MICRO_STR "us"
|
||||
#define TSDB_TIME_PRECISION_NANO_STR "ns"
|
||||
|
||||
#define TSDB_TICK_PER_SECOND(precision) ((precision)==TSDB_TIME_PRECISION_MILLI ? 1e3L : ((precision)==TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L))
|
||||
|
||||
#define T_MEMBER_SIZE(type, member) sizeof(((type *)0)->member)
|
||||
#define T_APPEND_MEMBER(dst, ptr, type, member) \
|
||||
|
@ -118,15 +130,6 @@ do { \
|
|||
(src) = (void *)((char *)src + sizeof(type));\
|
||||
} while(0)
|
||||
|
||||
#define TSDB_KEYSIZE sizeof(TSKEY)
|
||||
|
||||
#if LINUX
|
||||
#define TSDB_NCHAR_SIZE sizeof(wchar_t)
|
||||
#else
|
||||
#define TSDB_NCHAR_SIZE 4
|
||||
#endif
|
||||
//#define TSDB_CHAR_TERMINATED_SPACE 1
|
||||
|
||||
#define GET_INT8_VAL(x) (*(int8_t *)(x))
|
||||
#define GET_INT16_VAL(x) (*(int16_t *)(x))
|
||||
#define GET_INT32_VAL(x) (*(int32_t *)(x))
|
||||
|
@ -172,7 +175,6 @@ typedef struct tDataTypeDescriptor {
|
|||
} tDataTypeDescriptor;
|
||||
|
||||
extern tDataTypeDescriptor tDataTypeDesc[11];
|
||||
#define POINTER_BYTES sizeof(void *) // 8 by default assert(sizeof(ptrdiff_t) == sizseof(void*)
|
||||
|
||||
bool isValidDataType(int32_t type);
|
||||
//bool isNull(const char *val, int32_t type);
|
||||
|
@ -266,10 +268,6 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
|
|||
#define TSDB_AUTH_LEN 16
|
||||
#define TSDB_KEY_LEN 16
|
||||
#define TSDB_VERSION_LEN 12
|
||||
#define TSDB_STREET_LEN 64
|
||||
#define TSDB_CITY_LEN 20
|
||||
#define TSDB_STATE_LEN 20
|
||||
#define TSDB_COUNTRY_LEN 20
|
||||
#define TSDB_LOCALE_LEN 64
|
||||
#define TSDB_TIMEZONE_LEN 96
|
||||
#define TSDB_LABEL_LEN 8
|
||||
|
@ -388,27 +386,27 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
|
|||
* 1. ordinary sub query for select * from super_table
|
||||
* 2. all sqlobj generated by createSubqueryObj with this flag
|
||||
*/
|
||||
#define TSDB_QUERY_TYPE_SUBQUERY 0x02u
|
||||
#define TSDB_QUERY_TYPE_STABLE_SUBQUERY 0x04u // two-stage subquery for super table
|
||||
#define TSDB_QUERY_TYPE_SUBQUERY 0x02u
|
||||
#define TSDB_QUERY_TYPE_STABLE_SUBQUERY 0x04u // two-stage subquery for super table
|
||||
|
||||
#define TSDB_QUERY_TYPE_TABLE_QUERY 0x08u // query ordinary table; below only apply to client side
|
||||
#define TSDB_QUERY_TYPE_STABLE_QUERY 0x10u // query on super table
|
||||
#define TSDB_QUERY_TYPE_JOIN_QUERY 0x20u // join query
|
||||
#define TSDB_QUERY_TYPE_PROJECTION_QUERY 0x40u // select *,columns... query
|
||||
#define TSDB_QUERY_TYPE_JOIN_SEC_STAGE 0x80u // join sub query at the second stage
|
||||
#define TSDB_QUERY_TYPE_TABLE_QUERY 0x08u // query ordinary table; below only apply to client side
|
||||
#define TSDB_QUERY_TYPE_STABLE_QUERY 0x10u // query on super table
|
||||
#define TSDB_QUERY_TYPE_JOIN_QUERY 0x20u // join query
|
||||
#define TSDB_QUERY_TYPE_PROJECTION_QUERY 0x40u // select *,columns... query
|
||||
#define TSDB_QUERY_TYPE_JOIN_SEC_STAGE 0x80u // join sub query at the second stage
|
||||
|
||||
#define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u
|
||||
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
|
||||
#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u
|
||||
#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type
|
||||
#define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u
|
||||
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
|
||||
#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u
|
||||
#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type
|
||||
|
||||
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
|
||||
#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type))
|
||||
#define TSDB_QUERY_CLEAR_TYPE(x, _type) ((x) &= (~_type))
|
||||
#define TSDB_QUERY_RESET_TYPE(x) ((x) = TSDB_QUERY_TYPE_NON_TYPE)
|
||||
|
||||
#define TSDB_ORDER_ASC 1
|
||||
#define TSDB_ORDER_DESC 2
|
||||
#define TSDB_ORDER_ASC 1
|
||||
#define TSDB_ORDER_DESC 2
|
||||
|
||||
#define TSDB_DEFAULT_CLUSTER_HASH_SIZE 1
|
||||
#define TSDB_DEFAULT_MNODES_HASH_SIZE 5
|
||||
|
@ -420,17 +418,17 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
|
|||
#define TSDB_DEFAULT_STABLES_HASH_SIZE 100
|
||||
#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000
|
||||
|
||||
#define TSDB_PORT_DNODESHELL 0
|
||||
#define TSDB_PORT_DNODEDNODE 5
|
||||
#define TSDB_PORT_SYNC 10
|
||||
#define TSDB_PORT_HTTP 11
|
||||
#define TSDB_PORT_ARBITRATOR 12
|
||||
#define TSDB_PORT_DNODESHELL 0
|
||||
#define TSDB_PORT_DNODEDNODE 5
|
||||
#define TSDB_PORT_SYNC 10
|
||||
#define TSDB_PORT_HTTP 11
|
||||
#define TSDB_PORT_ARBITRATOR 12
|
||||
|
||||
#define TAOS_QTYPE_RPC 0
|
||||
#define TAOS_QTYPE_FWD 1
|
||||
#define TAOS_QTYPE_WAL 2
|
||||
#define TAOS_QTYPE_CQ 3
|
||||
#define TAOS_QTYPE_QUERY 4
|
||||
#define TAOS_QTYPE_RPC 0
|
||||
#define TAOS_QTYPE_FWD 1
|
||||
#define TAOS_QTYPE_WAL 2
|
||||
#define TAOS_QTYPE_CQ 3
|
||||
#define TAOS_QTYPE_QUERY 4
|
||||
|
||||
typedef enum {
|
||||
TSDB_SUPER_TABLE = 0, // super table
|
||||
|
|
|
@ -28,6 +28,7 @@ typedef struct {
|
|||
STColumn col; // column info
|
||||
int16_t functionId; // sql function id
|
||||
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
|
||||
int16_t tagIndex; // index of current tag in SFillTagColInfo array list
|
||||
union {int64_t i; double d;} fillVal;
|
||||
} SFillColInfo;
|
||||
|
||||
|
@ -37,26 +38,29 @@ typedef struct {
|
|||
} SFillTagColInfo;
|
||||
|
||||
typedef struct SFillInfo {
|
||||
TSKEY start; // start timestamp
|
||||
TSKEY endKey; // endKey for fill
|
||||
int32_t order; // order [TSDB_ORDER_ASC|TSDB_ORDER_DESC]
|
||||
int32_t fillType; // fill type
|
||||
int32_t numOfRows; // number of rows in the input data block
|
||||
int32_t rowIdx; // rowIdx
|
||||
int32_t numOfTotal; // number of filled rows in one round
|
||||
int32_t numOfCurrent; // number of filled rows in current results
|
||||
TSKEY start; // start timestamp
|
||||
TSKEY end; // endKey for fill
|
||||
TSKEY currentKey; // current active timestamp, the value may be changed during the fill procedure.
|
||||
int32_t order; // order [TSDB_ORDER_ASC|TSDB_ORDER_DESC]
|
||||
int32_t type; // fill type
|
||||
int32_t numOfRows; // number of rows in the input data block
|
||||
int32_t index; // active row index
|
||||
int32_t numOfTotal; // number of filled rows in one round
|
||||
int32_t numOfCurrent; // number of filled rows in current results
|
||||
|
||||
int32_t numOfTags; // number of tags
|
||||
int32_t numOfCols; // number of columns, including the tags columns
|
||||
int32_t rowSize; // size of each row
|
||||
SFillTagColInfo* pTags; // tags value for filling gap
|
||||
int32_t numOfTags; // number of tags
|
||||
int32_t numOfCols; // number of columns, including the tags columns
|
||||
int32_t rowSize; // size of each row
|
||||
SInterval interval;
|
||||
char * prevValues; // previous row of data, to generate the interpolation results
|
||||
char * nextValues; // next row of data
|
||||
char** pData; // original result data block involved in filling data
|
||||
int32_t capacityInRows; // data buffer size in rows
|
||||
int8_t precision; // time resoluation
|
||||
SFillColInfo* pFillCol; // column info for fill operations
|
||||
char * prevValues; // previous row of data, to generate the interpolation results
|
||||
char * nextValues; // next row of data
|
||||
char** pData; // original result data block involved in filling data
|
||||
int32_t alloc; // data buffer size in rows
|
||||
int8_t precision; // time resoluation
|
||||
|
||||
SFillColInfo* pFillCol; // column info for fill operations
|
||||
SFillTagColInfo* pTags; // tags value for filling gap
|
||||
void* handle; // for dubug purpose
|
||||
} SFillInfo;
|
||||
|
||||
typedef struct SPoint {
|
||||
|
@ -74,17 +78,17 @@ void* taosDestroyFillInfo(SFillInfo *pFillInfo);
|
|||
|
||||
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
|
||||
|
||||
void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput);
|
||||
void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, const tFilePage** pInput);
|
||||
|
||||
void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput);
|
||||
void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* pInput);
|
||||
|
||||
int64_t getFilledNumOfRes(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);
|
||||
int64_t getNumOfResWithFill(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);
|
||||
|
||||
int32_t taosNumOfRemainRows(SFillInfo *pFillInfo);
|
||||
|
||||
int32_t taosGetLinearInterpolationVal(int32_t type, SPoint *point1, SPoint *point2, SPoint *point);
|
||||
|
||||
int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity);
|
||||
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -4116,7 +4116,7 @@ bool queryHasRemainResForTableQuery(SQueryRuntimeEnv* pRuntimeEnv) {
|
|||
* first result row in the actual result set will fill nothing.
|
||||
*/
|
||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||
int32_t numOfTotal = (int32_t)getFilledNumOfRes(pFillInfo, pQuery->window.ekey, (int32_t)pQuery->rec.capacity);
|
||||
int32_t numOfTotal = (int32_t)getNumOfResWithFill(pFillInfo, pQuery->window.ekey, (int32_t)pQuery->rec.capacity);
|
||||
return numOfTotal > 0;
|
||||
}
|
||||
|
||||
|
@ -4174,7 +4174,7 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int
|
|||
SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo;
|
||||
|
||||
while (1) {
|
||||
int32_t ret = (int32_t)taosGenerateDataBlock(pFillInfo, (tFilePage**)pQuery->sdata, (int32_t)pQuery->rec.capacity);
|
||||
int32_t ret = (int32_t)taosFillResultDataBlock(pFillInfo, (tFilePage**)pQuery->sdata, (int32_t)pQuery->rec.capacity);
|
||||
|
||||
// todo apply limit output function
|
||||
/* reached the start position of according to offset value, return immediately */
|
||||
|
@ -4519,6 +4519,7 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
|
|||
pFillCol[i].col.bytes = pExprInfo->bytes;
|
||||
pFillCol[i].col.type = (int8_t)pExprInfo->type;
|
||||
pFillCol[i].col.offset = offset;
|
||||
pFillCol[i].tagIndex = -2;
|
||||
pFillCol[i].flag = TSDB_COL_NORMAL; // always be ta normal column for table query
|
||||
pFillCol[i].functionId = pExprInfo->base.functionId;
|
||||
pFillCol[i].fillVal.i = pQuery->fillVal[i];
|
||||
|
@ -4532,7 +4533,6 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
|
|||
int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bool isSTableQuery) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
|
||||
pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery);
|
||||
|
@ -4540,7 +4540,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
|||
|
||||
setScanLimitationByResultBuffer(pQuery);
|
||||
|
||||
code = setupQueryHandle(tsdb, pQInfo, isSTableQuery);
|
||||
int32_t code = setupQueryHandle(tsdb, pQInfo, isSTableQuery);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -5430,7 +5430,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
|||
break;
|
||||
} else {
|
||||
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, (int32_t)pQuery->rec.rows, pQuery->window.ekey);
|
||||
taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata);
|
||||
taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (const tFilePage**) pQuery->sdata);
|
||||
numOfFilled = 0;
|
||||
|
||||
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfFilled);
|
||||
|
|
|
@ -13,28 +13,33 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "qFill.h"
|
||||
#include "os.h"
|
||||
#include "qExtbuffer.h"
|
||||
|
||||
#include "taosdef.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tsqlfunction.h"
|
||||
#include "ttype.h"
|
||||
|
||||
#include "qFill.h"
|
||||
#include "qExtbuffer.h"
|
||||
#include "queryLog.h"
|
||||
|
||||
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
|
||||
|
||||
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
||||
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, SFillColInfo* pFillCol) {
|
||||
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
|
||||
SFillColInfo* pCol) {
|
||||
if (fillType == TSDB_FILL_NONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
SFillInfo* pFillInfo = calloc(1, sizeof(SFillInfo));
|
||||
|
||||
|
||||
taosResetFillInfo(pFillInfo, skey);
|
||||
|
||||
pFillInfo->order = order;
|
||||
pFillInfo->fillType = fillType;
|
||||
pFillInfo->pFillCol = pFillCol;
|
||||
|
||||
pFillInfo->order = order;
|
||||
pFillInfo->type = fillType;
|
||||
pFillInfo->pFillCol = pCol;
|
||||
pFillInfo->numOfTags = numOfTags;
|
||||
pFillInfo->numOfCols = numOfCols;
|
||||
pFillInfo->precision = precision;
|
||||
|
@ -47,11 +52,12 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
|
|||
pFillInfo->pData = malloc(POINTER_BYTES * numOfCols);
|
||||
if (numOfTags > 0) {
|
||||
pFillInfo->pTags = calloc(pFillInfo->numOfTags, sizeof(SFillTagColInfo));
|
||||
for(int32_t i = 0; i < numOfTags; ++i) {
|
||||
pFillInfo->pTags[i].col.colId = -2;
|
||||
for (int32_t i = 0; i < numOfTags; ++i) {
|
||||
pFillInfo->pTags[i].col.colId = -2; // TODO
|
||||
}
|
||||
}
|
||||
|
||||
// there are no duplicated tags in the SFillTagColInfo list
|
||||
int32_t rowsize = 0;
|
||||
int32_t k = 0;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
|
@ -60,7 +66,7 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
|
|||
|
||||
if (TSDB_COL_IS_TAG(pColInfo->flag)) {
|
||||
bool exists = false;
|
||||
for(int32_t j = 0; j < k; ++j) {
|
||||
for (int32_t j = 0; j < k; ++j) {
|
||||
if (pFillInfo->pTags[j].col.colId == pColInfo->col.colId) {
|
||||
exists = true;
|
||||
break;
|
||||
|
@ -70,6 +76,7 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
|
|||
if (!exists) {
|
||||
pFillInfo->pTags[k].col.colId = pColInfo->col.colId;
|
||||
pFillInfo->pTags[k].tagVal = calloc(1, pColInfo->col.bytes);
|
||||
pColInfo->tagIndex = k;
|
||||
|
||||
k += 1;
|
||||
}
|
||||
|
@ -78,14 +85,15 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
|
|||
}
|
||||
|
||||
pFillInfo->rowSize = rowsize;
|
||||
pFillInfo->capacityInRows = capacity;
|
||||
|
||||
pFillInfo->alloc = capacity;
|
||||
|
||||
return pFillInfo;
|
||||
}
|
||||
|
||||
void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) {
|
||||
pFillInfo->start = startTimestamp;
|
||||
pFillInfo->rowIdx = -1;
|
||||
pFillInfo->currentKey = startTimestamp;
|
||||
pFillInfo->index = -1;
|
||||
pFillInfo->numOfRows = 0;
|
||||
pFillInfo->numOfCurrent = 0;
|
||||
pFillInfo->numOfTotal = 0;
|
||||
|
@ -112,20 +120,20 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
|
|||
}
|
||||
|
||||
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) {
|
||||
if (pFillInfo->fillType == TSDB_FILL_NONE) {
|
||||
if (pFillInfo->type == TSDB_FILL_NONE) {
|
||||
return;
|
||||
}
|
||||
|
||||
pFillInfo->endKey = endKey;
|
||||
if (pFillInfo->order != TSDB_ORDER_ASC) {
|
||||
pFillInfo->endKey = taosTimeTruncate(endKey, &pFillInfo->interval, pFillInfo->precision);
|
||||
pFillInfo->end = endKey;
|
||||
if (!FILL_IS_ASC_FILL(pFillInfo)) {
|
||||
pFillInfo->end = taosTimeTruncate(endKey, &pFillInfo->interval, pFillInfo->precision);
|
||||
}
|
||||
|
||||
pFillInfo->rowIdx = 0;
|
||||
pFillInfo->index = 0;
|
||||
pFillInfo->numOfRows = numOfRows;
|
||||
|
||||
// ensure the space
|
||||
if (pFillInfo->capacityInRows < numOfRows) {
|
||||
if (pFillInfo->alloc < numOfRows) {
|
||||
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
char* tmp = realloc(pFillInfo->pData[i], numOfRows*pFillInfo->pFillCol[i].col.bytes);
|
||||
assert(tmp != NULL); // todo handle error
|
||||
|
@ -136,42 +144,38 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
|
|||
}
|
||||
}
|
||||
|
||||
void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput) {
|
||||
// copy the data into source data buffer
|
||||
// copy the data into source data buffer
|
||||
void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, const tFilePage** pInput) {
|
||||
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
memcpy(pFillInfo->pData[i], pInput[i]->data, pFillInfo->numOfRows * pFillInfo->pFillCol[i].col.bytes);
|
||||
}
|
||||
}
|
||||
|
||||
void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput) {
|
||||
void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* pInput) {
|
||||
assert(pFillInfo->numOfRows == pInput->num);
|
||||
|
||||
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
|
||||
char* data = pInput->data + pCol->col.offset * pInput->num;
|
||||
const char* data = pInput->data + pCol->col.offset * pInput->num;
|
||||
memcpy(pFillInfo->pData[i], data, (size_t)(pInput->num * pCol->col.bytes));
|
||||
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer
|
||||
for (int32_t j = 0; j < pFillInfo->numOfTags; ++j) {
|
||||
SFillTagColInfo* pTag = &pFillInfo->pTags[j];
|
||||
if (pTag->col.colId == pCol->col.colId) {
|
||||
memcpy(pTag->tagVal, data, pCol->col.bytes);
|
||||
break;
|
||||
}
|
||||
}
|
||||
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
|
||||
assert (pTag->col.colId == pCol->col.colId);
|
||||
memcpy(pTag->tagVal, data, pCol->col.bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) {
|
||||
int64_t getNumOfResWithFill(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) {
|
||||
int64_t* tsList = (int64_t*) pFillInfo->pData[0];
|
||||
|
||||
int32_t numOfRows = taosNumOfRemainRows(pFillInfo);
|
||||
|
||||
TSKEY ekey1 = ekey;
|
||||
if (!FILL_IS_ASC_FILL(pFillInfo)) {
|
||||
pFillInfo->endKey = taosTimeTruncate(ekey, &pFillInfo->interval, pFillInfo->precision);
|
||||
pFillInfo->end = taosTimeTruncate(ekey, &pFillInfo->interval, pFillInfo->precision);
|
||||
}
|
||||
|
||||
int64_t numOfRes = -1;
|
||||
|
@ -179,20 +183,20 @@ int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows
|
|||
TSKEY lastKey = tsList[pFillInfo->numOfRows - 1];
|
||||
numOfRes = taosTimeCountInterval(
|
||||
lastKey,
|
||||
pFillInfo->start,
|
||||
pFillInfo->currentKey,
|
||||
pFillInfo->interval.sliding,
|
||||
pFillInfo->interval.slidingUnit,
|
||||
pFillInfo->precision);
|
||||
numOfRes += 1;
|
||||
assert(numOfRes >= numOfRows);
|
||||
} else { // reach the end of data
|
||||
if ((ekey1 < pFillInfo->start && FILL_IS_ASC_FILL(pFillInfo)) ||
|
||||
(ekey1 > pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo))) {
|
||||
if ((ekey1 < pFillInfo->currentKey && FILL_IS_ASC_FILL(pFillInfo)) ||
|
||||
(ekey1 > pFillInfo->currentKey && !FILL_IS_ASC_FILL(pFillInfo))) {
|
||||
return 0;
|
||||
}
|
||||
numOfRes = taosTimeCountInterval(
|
||||
ekey1,
|
||||
pFillInfo->start,
|
||||
pFillInfo->currentKey,
|
||||
pFillInfo->interval.sliding,
|
||||
pFillInfo->interval.slidingUnit,
|
||||
pFillInfo->precision);
|
||||
|
@ -203,315 +207,284 @@ int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows
|
|||
}
|
||||
|
||||
int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
|
||||
if (pFillInfo->rowIdx == -1 || pFillInfo->numOfRows == 0) {
|
||||
if (pFillInfo->numOfRows == 0 || (pFillInfo->numOfRows > 0 && pFillInfo->index >= pFillInfo->numOfRows)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return pFillInfo->numOfRows - pFillInfo->rowIdx;
|
||||
return pFillInfo->numOfRows - pFillInfo->index;
|
||||
}
|
||||
|
||||
// todo: refactor
|
||||
static double linearInterpolationImpl(double v1, double v2, double k1, double k2, double k) {
|
||||
return v1 + (v2 - v1) * (k - k1) / (k2 - k1);
|
||||
}
|
||||
#define DO_INTERPOLATION(_v1, _v2, _k1, _k2, _k) ((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1))))
|
||||
|
||||
int32_t taosGetLinearInterpolationVal(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) {
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
*(int32_t*)point->val = (int32_t)linearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, (double)point1->key,
|
||||
(double)point2->key, (double)point->key);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
*(float*)point->val = (float)
|
||||
linearInterpolationImpl(*(float*)point1->val, *(float*)point2->val, (double)point1->key, (double)point2->key, (double)point->key);
|
||||
break;
|
||||
};
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
*(double*)point->val =
|
||||
linearInterpolationImpl(*(double*)point1->val, *(double*)point2->val, (double)point1->key, (double)point2->key, (double)point->key);
|
||||
break;
|
||||
};
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
*(int64_t*)point->val = (int64_t)linearInterpolationImpl((double)(*(int64_t*)point1->val), (double)(*(int64_t*)point2->val), (double)point1->key,
|
||||
(double)point2->key, (double)point->key);
|
||||
break;
|
||||
};
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
*(int16_t*)point->val = (int16_t)linearInterpolationImpl(*(int16_t*)point1->val, *(int16_t*)point2->val, (double)point1->key,
|
||||
(double)point2->key, (double)point->key);
|
||||
break;
|
||||
};
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
*(int8_t*) point->val = (int8_t)
|
||||
linearInterpolationImpl(*(int8_t*)point1->val, *(int8_t*)point2->val, (double)point1->key, (double)point2->key, (double)point->key);
|
||||
break;
|
||||
};
|
||||
default: {
|
||||
// TODO: Deal with interpolation with bool and strings and timestamp
|
||||
return -1;
|
||||
}
|
||||
double v1 = -1;
|
||||
double v2 = -1;
|
||||
|
||||
GET_TYPED_DATA(v1, type, point1->val);
|
||||
GET_TYPED_DATA(v2, type, point2->val);
|
||||
|
||||
double r = DO_INTERPOLATION(v1, v2, point1->key, point2->key, point->key);
|
||||
|
||||
switch(type) {
|
||||
case TSDB_DATA_TYPE_TINYINT: *(int8_t*) point->val = (int8_t) r;break;
|
||||
case TSDB_DATA_TYPE_SMALLINT: *(int16_t*) point->val = (int16_t) r;break;
|
||||
case TSDB_DATA_TYPE_INT: *(int32_t*) point->val = (int32_t) r;break;
|
||||
case TSDB_DATA_TYPE_BIGINT: *(int64_t*) point->val = (int64_t) r;break;
|
||||
case TSDB_DATA_TYPE_DOUBLE: *(double*) point->val = (double) r;break;
|
||||
case TSDB_DATA_TYPE_FLOAT: *(float*) point->val = (float) r;break;
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void setTagsValue(SFillInfo* pFillInfo, tFilePage** data, int32_t num) {
|
||||
static void setTagsValue(SFillInfo* pFillInfo, tFilePage** data, int32_t genRows) {
|
||||
for(int32_t j = 0; j < pFillInfo->numOfCols; ++j) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[j];
|
||||
if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* val1 = elePtrAt(data[j]->data, pCol->col.bytes, num);
|
||||
char* val1 = elePtrAt(data[j]->data, pCol->col.bytes, genRows);
|
||||
|
||||
for(int32_t i = 0; i < pFillInfo->numOfTags; ++i) {
|
||||
SFillTagColInfo* pTag = &pFillInfo->pTags[i];
|
||||
if (pTag->col.colId == pCol->col.colId) {
|
||||
assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type);
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags);
|
||||
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
|
||||
|
||||
assert (pTag->col.colId == pCol->col.colId);
|
||||
assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type);
|
||||
}
|
||||
}
|
||||
|
||||
static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData, int64_t ts,
|
||||
bool outOfBound) {
|
||||
char* prevValues = pFillInfo->prevValues;
|
||||
char* nextValues = pFillInfo->nextValues;
|
||||
static void setNullValueForRow(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfCol, int32_t rowIndex) {
|
||||
// the first are always the timestamp column, so start from the second column.
|
||||
for (int32_t i = 1; i < numOfCol; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
|
||||
char* output = elePtrAt(data[i]->data, pCol->col.bytes, rowIndex);
|
||||
setNull(output, pCol->col.type, pCol->col.bytes);
|
||||
}
|
||||
}
|
||||
|
||||
static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** srcData, int64_t ts, bool outOfBound) {
|
||||
char* prev = pFillInfo->prevValues;
|
||||
char* next = pFillInfo->nextValues;
|
||||
|
||||
SPoint point1, point2, point;
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
|
||||
|
||||
char* val = elePtrAt(data[0]->data, TSDB_KEYSIZE, *num);
|
||||
*(TSKEY*) val = pFillInfo->start;
|
||||
|
||||
int32_t numOfValCols = pFillInfo->numOfCols - pFillInfo->numOfTags;
|
||||
// set the primary timestamp column value
|
||||
int32_t index = pFillInfo->numOfCurrent;
|
||||
char* val = elePtrAt(data[0]->data, TSDB_KEYSIZE, index);
|
||||
*(TSKEY*) val = pFillInfo->currentKey;
|
||||
|
||||
// set the other values
|
||||
if (pFillInfo->fillType == TSDB_FILL_PREV) {
|
||||
char* p = FILL_IS_ASC_FILL(pFillInfo) ? prevValues : nextValues;
|
||||
if (pFillInfo->type == TSDB_FILL_PREV) {
|
||||
char* p = FILL_IS_ASC_FILL(pFillInfo) ? prev : next;
|
||||
|
||||
if (p != NULL) {
|
||||
for (int32_t i = 1; i < numOfValCols; ++i) {
|
||||
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
|
||||
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
|
||||
if (isNull(p + pCol->col.offset, pCol->col.type)) {
|
||||
if (pCol->col.type == TSDB_DATA_TYPE_BINARY || pCol->col.type == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(val1, pCol->col.type);
|
||||
} else {
|
||||
setNull(val1, pCol->col.type, pCol->col.bytes);
|
||||
}
|
||||
} else {
|
||||
assignVal(val1, p + pCol->col.offset, pCol->col.bytes, pCol->col.type);
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* output = elePtrAt(data[i]->data, pCol->col.bytes, index);
|
||||
assignVal(output, p + pCol->col.offset, pCol->col.bytes, pCol->col.type);
|
||||
}
|
||||
} else { // no prev value yet, set the value for NULL
|
||||
for (int32_t i = 1; i < numOfValCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
|
||||
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
|
||||
if (pCol->col.type == TSDB_DATA_TYPE_BINARY||pCol->col.type == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(val1, pCol->col.type);
|
||||
} else {
|
||||
setNull(val1, pCol->col.type, pCol->col.bytes);
|
||||
}
|
||||
}
|
||||
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
|
||||
}
|
||||
|
||||
setTagsValue(pFillInfo, data, *num);
|
||||
} else if (pFillInfo->fillType == TSDB_FILL_LINEAR) {
|
||||
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
|
||||
// TODO : linear interpolation supports NULL value
|
||||
if (prevValues != NULL && !outOfBound) {
|
||||
for (int32_t i = 1; i < numOfValCols; ++i) {
|
||||
if (prev != NULL && !outOfBound) {
|
||||
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int16_t type = pCol->col.type;
|
||||
int16_t bytes = pCol->col.bytes;
|
||||
|
||||
char *val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
|
||||
if (type == TSDB_DATA_TYPE_BINARY|| type == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(val1, pCol->col.type);
|
||||
continue;
|
||||
} else if (type == TSDB_DATA_TYPE_BOOL) {
|
||||
char *val1 = elePtrAt(data[i]->data, pCol->col.bytes, index);
|
||||
if (type == TSDB_DATA_TYPE_BINARY|| type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) {
|
||||
setNull(val1, pCol->col.type, bytes);
|
||||
continue;
|
||||
}
|
||||
|
||||
point1 = (SPoint){.key = *(TSKEY*)(prevValues), .val = prevValues + pCol->col.offset};
|
||||
point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->rowIdx * bytes};
|
||||
point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->col.offset};
|
||||
point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->index * bytes};
|
||||
point = (SPoint){.key = pFillInfo->start, .val = val1};
|
||||
taosGetLinearInterpolationVal(type, &point1, &point2, &point);
|
||||
}
|
||||
|
||||
setTagsValue(pFillInfo, data, *num);
|
||||
|
||||
} else {
|
||||
for (int32_t i = 1; i < numOfValCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
|
||||
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
|
||||
|
||||
if (pCol->col.type == TSDB_DATA_TYPE_BINARY || pCol->col.type == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(val1, pCol->col.type);
|
||||
} else {
|
||||
setNull(val1, pCol->col.type, pCol->col.bytes);
|
||||
}
|
||||
}
|
||||
|
||||
setTagsValue(pFillInfo, data, *num);
|
||||
|
||||
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
|
||||
}
|
||||
} else { /* fill the default value */
|
||||
for (int32_t i = 1; i < numOfValCols; ++i) {
|
||||
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
|
||||
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, index);
|
||||
assignVal(val1, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type);
|
||||
}
|
||||
|
||||
setTagsValue(pFillInfo, data, *num);
|
||||
}
|
||||
|
||||
pFillInfo->start = taosTimeAdd(pFillInfo->start, pFillInfo->interval.sliding * step, pFillInfo->interval.slidingUnit, pFillInfo->precision);
|
||||
setTagsValue(pFillInfo, data, index);
|
||||
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, pFillInfo->interval.slidingUnit, pFillInfo->precision);
|
||||
pFillInfo->numOfCurrent++;
|
||||
|
||||
(*num) += 1;
|
||||
}
|
||||
|
||||
static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** nextValues) {
|
||||
if (*nextValues != NULL) {
|
||||
static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** next) {
|
||||
if (*next != NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
*nextValues = calloc(1, pFillInfo->rowSize);
|
||||
*next = calloc(1, pFillInfo->rowSize);
|
||||
for (int i = 1; i < pFillInfo->numOfCols; i++) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
|
||||
if (pCol->col.type == TSDB_DATA_TYPE_BINARY||pCol->col.type == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(*nextValues + pCol->col.offset, pCol->col.type);
|
||||
} else {
|
||||
setNull(*nextValues + pCol->col.offset, pCol->col.type, pCol->col.bytes);
|
||||
}
|
||||
setNull(*next + pCol->col.offset, pCol->col.type, pCol->col.bytes);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfRows, int32_t outputRows, char** srcData) {
|
||||
int32_t num = 0;
|
||||
static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, int32_t numOfTags, char* buf) {
|
||||
int32_t rowIndex = pFillInfo->index;
|
||||
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
memcpy(buf + pCol->col.offset, srcData[i] + rowIndex * pCol->col.bytes, pCol->col.bytes);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t outputRows) {
|
||||
pFillInfo->numOfCurrent = 0;
|
||||
|
||||
char** prevValues = &pFillInfo->prevValues;
|
||||
char** nextValues = &pFillInfo->nextValues;
|
||||
char** srcData = pFillInfo->pData;
|
||||
char** prev = &pFillInfo->prevValues;
|
||||
char** next = &pFillInfo->nextValues;
|
||||
|
||||
int32_t numOfTags = pFillInfo->numOfTags;
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
|
||||
if (numOfRows == 0) {
|
||||
/*
|
||||
* These data are generated according to fill strategy, since the current timestamp is out of time window of
|
||||
* real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
|
||||
*/
|
||||
while (num < outputRows) {
|
||||
doFillResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, true);
|
||||
}
|
||||
|
||||
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
|
||||
return outputRows;
|
||||
|
||||
if (FILL_IS_ASC_FILL(pFillInfo)) {
|
||||
assert(pFillInfo->currentKey >= pFillInfo->start);
|
||||
} else {
|
||||
while (1) {
|
||||
int64_t ts = ((int64_t*)pFillInfo->pData[0])[pFillInfo->rowIdx];
|
||||
assert(pFillInfo->currentKey <= pFillInfo->start);
|
||||
}
|
||||
|
||||
if ((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
|
||||
(pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) {
|
||||
/* set the next value for interpolation */
|
||||
initBeforeAfterDataBuf(pFillInfo, nextValues);
|
||||
|
||||
int32_t offset = pFillInfo->rowIdx;
|
||||
for (int32_t i = 0; i < pFillInfo->numOfCols - numOfTags; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
memcpy(*nextValues + pCol->col.offset, srcData[i] + offset * pCol->col.bytes, pCol->col.bytes);
|
||||
}
|
||||
while (pFillInfo->numOfCurrent < outputRows) {
|
||||
int64_t ts = ((int64_t*)pFillInfo->pData[0])[pFillInfo->index];
|
||||
|
||||
if ((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
|
||||
(pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) {
|
||||
/* set the next value for interpolation */
|
||||
initBeforeAfterDataBuf(pFillInfo, next);
|
||||
copyCurrentRowIntoBuf(pFillInfo, srcData, numOfTags, *next);
|
||||
}
|
||||
|
||||
if (((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) || (pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) &&
|
||||
pFillInfo->numOfCurrent < outputRows) {
|
||||
|
||||
// fill the gap between two actual input rows
|
||||
while (((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
|
||||
(pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) &&
|
||||
pFillInfo->numOfCurrent < outputRows) {
|
||||
doFillOneRowResult(pFillInfo, data, srcData, ts, false);
|
||||
}
|
||||
|
||||
if (((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
|
||||
(pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) && num < outputRows) {
|
||||
|
||||
while (((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
|
||||
(pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) && num < outputRows) {
|
||||
doFillResultImpl(pFillInfo, data, &num, srcData, ts, false);
|
||||
}
|
||||
|
||||
/* output buffer is full, abort */
|
||||
if ((num == outputRows && FILL_IS_ASC_FILL(pFillInfo)) || (num < 0 && !FILL_IS_ASC_FILL(pFillInfo))) {
|
||||
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
|
||||
return outputRows;
|
||||
}
|
||||
} else {
|
||||
assert(pFillInfo->start == ts);
|
||||
initBeforeAfterDataBuf(pFillInfo, prevValues);
|
||||
|
||||
// assign rows to dst buffer
|
||||
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num);
|
||||
char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->rowIdx);
|
||||
|
||||
if (i == 0 ||
|
||||
(pCol->functionId != TSDB_FUNC_COUNT && !isNull(src, pCol->col.type)) ||
|
||||
(pCol->functionId == TSDB_FUNC_COUNT && GET_INT64_VAL(src) != 0)) {
|
||||
assignVal(val1, src, pCol->col.bytes, pCol->col.type);
|
||||
memcpy(*prevValues + pCol->col.offset, src, pCol->col.bytes);
|
||||
} else { // i > 0 and data is null , do interpolation
|
||||
if (pFillInfo->fillType == TSDB_FILL_PREV) {
|
||||
assignVal(val1, *prevValues + pCol->col.offset, pCol->col.bytes, pCol->col.type);
|
||||
} else if (pFillInfo->fillType == TSDB_FILL_LINEAR) {
|
||||
assignVal(val1, src, pCol->col.bytes, pCol->col.type);
|
||||
memcpy(*prevValues + pCol->col.offset, src, pCol->col.bytes);
|
||||
} else {
|
||||
assignVal(val1, (char*) &pCol->fillVal.i, pCol->col.bytes, pCol->col.type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// set the tag value for final result
|
||||
setTagsValue(pFillInfo, data, num);
|
||||
|
||||
pFillInfo->start = taosTimeAdd(pFillInfo->start, pFillInfo->interval.sliding*step, pFillInfo->interval.slidingUnit, pFillInfo->precision);
|
||||
pFillInfo->rowIdx += 1;
|
||||
|
||||
pFillInfo->numOfCurrent +=1;
|
||||
num += 1;
|
||||
}
|
||||
|
||||
if ((pFillInfo->rowIdx >= pFillInfo->numOfRows && FILL_IS_ASC_FILL(pFillInfo)) ||
|
||||
(pFillInfo->rowIdx < 0 && !FILL_IS_ASC_FILL(pFillInfo)) || num >= outputRows) {
|
||||
if (pFillInfo->rowIdx >= pFillInfo->numOfRows || pFillInfo->rowIdx < 0) {
|
||||
pFillInfo->rowIdx = -1;
|
||||
pFillInfo->numOfRows = 0;
|
||||
|
||||
/* the raw data block is exhausted, next value does not exists */
|
||||
taosTFree(*nextValues);
|
||||
}
|
||||
|
||||
// output buffer is full, abort
|
||||
if (pFillInfo->numOfCurrent == outputRows) {
|
||||
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
|
||||
return num;
|
||||
return outputRows;
|
||||
}
|
||||
} else {
|
||||
assert(pFillInfo->currentKey == ts);
|
||||
initBeforeAfterDataBuf(pFillInfo, prev);
|
||||
|
||||
// assign rows to dst buffer
|
||||
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* output = elePtrAt(data[i]->data, pCol->col.bytes, pFillInfo->numOfCurrent);
|
||||
char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->index);
|
||||
|
||||
if (i == 0 || (pCol->functionId != TSDB_FUNC_COUNT && !isNull(src, pCol->col.type)) ||
|
||||
(pCol->functionId == TSDB_FUNC_COUNT && GET_INT64_VAL(src) != 0)) {
|
||||
assignVal(output, src, pCol->col.bytes, pCol->col.type);
|
||||
memcpy(*prev + pCol->col.offset, src, pCol->col.bytes);
|
||||
} else { // i > 0 and data is null , do interpolation
|
||||
if (pFillInfo->type == TSDB_FILL_PREV) {
|
||||
assignVal(output, *prev + pCol->col.offset, pCol->col.bytes, pCol->col.type);
|
||||
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
|
||||
assignVal(output, src, pCol->col.bytes, pCol->col.type);
|
||||
memcpy(*prev + pCol->col.offset, src, pCol->col.bytes);
|
||||
} else {
|
||||
assignVal(output, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// set the tag value for final result
|
||||
setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent);
|
||||
|
||||
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step,
|
||||
pFillInfo->interval.slidingUnit, pFillInfo->precision);
|
||||
pFillInfo->index += 1;
|
||||
pFillInfo->numOfCurrent += 1;
|
||||
}
|
||||
|
||||
if (pFillInfo->index >= pFillInfo->numOfRows || pFillInfo->numOfCurrent >= outputRows) {
|
||||
/* the raw data block is exhausted, next value does not exists */
|
||||
if (pFillInfo->index >= pFillInfo->numOfRows) {
|
||||
taosTFree(*next);
|
||||
}
|
||||
|
||||
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
|
||||
return pFillInfo->numOfCurrent;
|
||||
}
|
||||
}
|
||||
|
||||
return pFillInfo->numOfCurrent;
|
||||
}
|
||||
|
||||
int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity) {
|
||||
int32_t remain = taosNumOfRemainRows(pFillInfo); // todo use iterator?
|
||||
static int64_t fillExternalResults(SFillInfo* pFillInfo, tFilePage** output, int64_t resultCapacity) {
|
||||
/*
|
||||
* These data are generated according to fill strategy, since the current timestamp is out of the time window of
|
||||
* real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
|
||||
*/
|
||||
pFillInfo->numOfCurrent = 0;
|
||||
while (pFillInfo->numOfCurrent < resultCapacity) {
|
||||
doFillOneRowResult(pFillInfo, output, pFillInfo->pData, pFillInfo->start, true);
|
||||
}
|
||||
|
||||
int32_t rows = (int32_t)getFilledNumOfRes(pFillInfo, pFillInfo->endKey, capacity);
|
||||
int32_t numOfRes = generateDataBlockImpl(pFillInfo, output, remain, rows, pFillInfo->pData);
|
||||
assert(numOfRes == rows);
|
||||
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
|
||||
|
||||
assert(pFillInfo->numOfCurrent == resultCapacity);
|
||||
return resultCapacity;
|
||||
}
|
||||
|
||||
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity) {
|
||||
int32_t remain = taosNumOfRemainRows(pFillInfo);
|
||||
|
||||
int64_t numOfRes = getNumOfResWithFill(pFillInfo, pFillInfo->end, capacity);
|
||||
assert(numOfRes <= capacity);
|
||||
|
||||
// no data existed for fill operation now, append result according to the fill strategy
|
||||
if (remain == 0) {
|
||||
return fillExternalResults(pFillInfo, output, numOfRes);
|
||||
}
|
||||
|
||||
fillResultImpl(pFillInfo, output, numOfRes);
|
||||
assert(numOfRes == pFillInfo->numOfCurrent);
|
||||
|
||||
qDebug("generated fill result, src block:%d, index:%d, startKey:%"PRId64", currentKey:%"PRId64", current:%d, total:%d",
|
||||
pFillInfo->numOfRows, pFillInfo->index, pFillInfo->start, pFillInfo->currentKey, pFillInfo->numOfCurrent,
|
||||
pFillInfo->numOfTotal);
|
||||
|
||||
return numOfRes;
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
#include "taosdef.h"
|
||||
#include "tcompare.h"
|
||||
#include "tarray.h"
|
||||
#include "tutil.h"
|
||||
|
||||
int32_t compareInt32Val(const void *pLeft, const void *pRight) {
|
||||
int32_t left = GET_INT32_VAL(pLeft), right = GET_INT32_VAL(pRight);
|
||||
|
|
Loading…
Reference in New Issue