Merge branch 'develop' of github.com:taosdata/TDengine into dev/chr
This commit is contained in:
commit
3a91e8ea84
|
@ -1938,8 +1938,10 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
|
||||||
}
|
}
|
||||||
|
|
||||||
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
||||||
SSchema* colSchema = tGetTbnameColumnSchema();
|
SSchema colSchema = *tGetTbnameColumnSchema();
|
||||||
tscAddFuncInSelectClause(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, colSchema, TSDB_COL_TAG, getNewResColId(pCmd));
|
getColumnName(pItem, colSchema.name, colSchema.name, sizeof(colSchema.name) - 1);
|
||||||
|
|
||||||
|
/*SExprInfo* pExpr = */tscAddFuncInSelectClause(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, TSDB_COL_TAG, getNewResColId(pCmd));
|
||||||
} else {
|
} else {
|
||||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
|
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
|
||||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||||
|
@ -2151,10 +2153,6 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
||||||
case TSDB_FUNC_AVG:
|
case TSDB_FUNC_AVG:
|
||||||
case TSDB_FUNC_RATE:
|
case TSDB_FUNC_RATE:
|
||||||
case TSDB_FUNC_IRATE:
|
case TSDB_FUNC_IRATE:
|
||||||
case TSDB_FUNC_SUM_RATE:
|
|
||||||
case TSDB_FUNC_SUM_IRATE:
|
|
||||||
case TSDB_FUNC_AVG_RATE:
|
|
||||||
case TSDB_FUNC_AVG_IRATE:
|
|
||||||
case TSDB_FUNC_TWA:
|
case TSDB_FUNC_TWA:
|
||||||
case TSDB_FUNC_MIN:
|
case TSDB_FUNC_MIN:
|
||||||
case TSDB_FUNC_MAX:
|
case TSDB_FUNC_MAX:
|
||||||
|
@ -2219,8 +2217,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
||||||
|
|
||||||
SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), resultSize, false);
|
SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), resultSize, false);
|
||||||
|
|
||||||
if (functionId == TSDB_FUNC_LEASTSQR) {
|
if (functionId == TSDB_FUNC_LEASTSQR) { // set the leastsquares parameters
|
||||||
/* set the leastsquares parameters */
|
|
||||||
char val[8] = {0};
|
char val[8] = {0};
|
||||||
if (tVariantDump(&pParamElem[1].pNode->value, val, TSDB_DATA_TYPE_DOUBLE, true) < 0) {
|
if (tVariantDump(&pParamElem[1].pNode->value, val, TSDB_DATA_TYPE_DOUBLE, true) < 0) {
|
||||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||||
|
@ -2234,6 +2231,11 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
||||||
}
|
}
|
||||||
|
|
||||||
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
|
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
|
||||||
|
} else if (functionId == TSDB_FUNC_IRATE) {
|
||||||
|
STableComInfo info = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||||
|
int64_t prec = info.precision;
|
||||||
|
|
||||||
|
tscExprAddParams(&pExpr->base, (char*)&prec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
|
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
|
||||||
|
@ -2882,7 +2884,7 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) {
|
||||||
|
|
||||||
if ((functionId >= TSDB_FUNC_SUM && functionId <= TSDB_FUNC_TWA) ||
|
if ((functionId >= TSDB_FUNC_SUM && functionId <= TSDB_FUNC_TWA) ||
|
||||||
(functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_STDDEV_DST) ||
|
(functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_STDDEV_DST) ||
|
||||||
(functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) {
|
(functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_IRATE)) {
|
||||||
if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes,
|
if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes,
|
||||||
&interBytes, 0, true) != TSDB_CODE_SUCCESS) {
|
&interBytes, 0, true) != TSDB_CODE_SUCCESS) {
|
||||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||||
|
@ -3894,7 +3896,8 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (index.columnIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) { // query on time range
|
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex);
|
||||||
|
if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP && index.columnIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) { // query on time range
|
||||||
if (!validateJoinExprNode(pCmd, pQueryInfo, *pExpr, &index)) {
|
if (!validateJoinExprNode(pCmd, pQueryInfo, *pExpr, &index)) {
|
||||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||||
}
|
}
|
||||||
|
@ -6854,6 +6857,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
||||||
const char* msg5 = "sql too long"; // todo ADD support
|
const char* msg5 = "sql too long"; // todo ADD support
|
||||||
const char* msg6 = "from missing in subclause";
|
const char* msg6 = "from missing in subclause";
|
||||||
const char* msg7 = "time interval is required";
|
const char* msg7 = "time interval is required";
|
||||||
|
const char* msg8 = "the first column should be primary timestamp column";
|
||||||
|
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
|
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
|
||||||
|
@ -6907,13 +6911,19 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
||||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (isTimeWindowQuery(pQueryInfo) && (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
|
if (isTimeWindowQuery(pQueryInfo) && (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
|
||||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!tscIsProjectionQuery(pQueryInfo) && pQueryInfo->interval.interval == 0) {
|
if (tscIsProjectionQuery(pQueryInfo)) {
|
||||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
|
SExprInfo* pExpr = tscExprGet(pQueryInfo, 0);
|
||||||
|
if (pExpr->base.colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||||
|
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg8);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pQueryInfo->interval.interval == 0) {
|
||||||
|
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the created table[stream] name
|
// set the created table[stream] name
|
||||||
|
|
|
@ -244,6 +244,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0613) //"No available disk")
|
#define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0613) //"No available disk")
|
||||||
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614) //"TSDB messed message")
|
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614) //"TSDB messed message")
|
||||||
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615) //"TSDB invalid tag value")
|
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615) //"TSDB invalid tag value")
|
||||||
|
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616) //"TSDB no cache last row data")
|
||||||
|
|
||||||
// query
|
// query
|
||||||
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) //"Invalid handle")
|
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) //"Invalid handle")
|
||||||
|
|
|
@ -43,7 +43,7 @@
|
||||||
*/
|
*/
|
||||||
int64_t user_mktime64(const unsigned int year0, const unsigned int mon0,
|
int64_t user_mktime64(const unsigned int year0, const unsigned int mon0,
|
||||||
const unsigned int day, const unsigned int hour,
|
const unsigned int day, const unsigned int hour,
|
||||||
const unsigned int min, const unsigned int sec, int64_t timezone)
|
const unsigned int min, const unsigned int sec, int64_t time_zone)
|
||||||
{
|
{
|
||||||
unsigned int mon = mon0, year = year0;
|
unsigned int mon = mon0, year = year0;
|
||||||
|
|
||||||
|
@ -61,7 +61,7 @@ int64_t user_mktime64(const unsigned int year0, const unsigned int mon0,
|
||||||
res = res*24;
|
res = res*24;
|
||||||
res = ((res + hour) * 60 + min) * 60 + sec;
|
res = ((res + hour) * 60 + min) * 60 + sec;
|
||||||
|
|
||||||
return (res + timezone);
|
return (res + time_zone);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ==== mktime() kernel code =================//
|
// ==== mktime() kernel code =================//
|
||||||
|
|
|
@ -65,24 +65,18 @@ extern "C" {
|
||||||
|
|
||||||
#define TSDB_FUNC_RATE 29
|
#define TSDB_FUNC_RATE 29
|
||||||
#define TSDB_FUNC_IRATE 30
|
#define TSDB_FUNC_IRATE 30
|
||||||
#define TSDB_FUNC_SUM_RATE 31
|
#define TSDB_FUNC_TID_TAG 31
|
||||||
#define TSDB_FUNC_SUM_IRATE 32
|
#define TSDB_FUNC_BLKINFO 32
|
||||||
#define TSDB_FUNC_AVG_RATE 33
|
|
||||||
#define TSDB_FUNC_AVG_IRATE 34
|
|
||||||
|
|
||||||
#define TSDB_FUNC_TID_TAG 35
|
|
||||||
#define TSDB_FUNC_BLKINFO 36
|
|
||||||
|
|
||||||
#define TSDB_FUNC_HISTOGRAM 37
|
|
||||||
#define TSDB_FUNC_HLL 38
|
|
||||||
#define TSDB_FUNC_MODE 39
|
|
||||||
#define TSDB_FUNC_SAMPLE 40
|
|
||||||
#define TSDB_FUNC_CEIL 41
|
|
||||||
#define TSDB_FUNC_FLOOR 42
|
|
||||||
#define TSDB_FUNC_ROUND 43
|
|
||||||
#define TSDB_FUNC_MAVG 44
|
|
||||||
#define TSDB_FUNC_CSUM 45
|
|
||||||
|
|
||||||
|
#define TSDB_FUNC_HISTOGRAM 33
|
||||||
|
#define TSDB_FUNC_HLL 34
|
||||||
|
#define TSDB_FUNC_MODE 35
|
||||||
|
#define TSDB_FUNC_SAMPLE 36
|
||||||
|
#define TSDB_FUNC_CEIL 37
|
||||||
|
#define TSDB_FUNC_FLOOR 38
|
||||||
|
#define TSDB_FUNC_ROUND 39
|
||||||
|
#define TSDB_FUNC_MAVG 40
|
||||||
|
#define TSDB_FUNC_CSUM 41
|
||||||
|
|
||||||
#define TSDB_FUNCSTATE_SO 0x1u // single output
|
#define TSDB_FUNCSTATE_SO 0x1u // single output
|
||||||
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
|
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
|
||||||
|
|
|
@ -152,15 +152,13 @@ typedef struct STSCompInfo {
|
||||||
} STSCompInfo;
|
} STSCompInfo;
|
||||||
|
|
||||||
typedef struct SRateInfo {
|
typedef struct SRateInfo {
|
||||||
int64_t CorrectionValue;
|
double correctionValue;
|
||||||
int64_t firstValue;
|
double firstValue;
|
||||||
TSKEY firstKey;
|
TSKEY firstKey;
|
||||||
int64_t lastValue;
|
double lastValue;
|
||||||
TSKEY lastKey;
|
TSKEY lastKey;
|
||||||
int8_t hasResult; // flag to denote has value
|
int8_t hasResult; // flag to denote has value
|
||||||
bool isIRate; // true for IRate functions, false for Rate functions
|
bool isIRate; // true for IRate functions, false for Rate functions
|
||||||
int64_t num; // for sum/avg
|
|
||||||
double sum; // for sum/avg
|
|
||||||
} SRateInfo;
|
} SRateInfo;
|
||||||
|
|
||||||
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
|
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
|
||||||
|
@ -238,7 +236,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
|
||||||
*interBytes = *bytes;
|
*interBytes = *bytes;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
} else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE) {
|
} else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_IRATE) {
|
||||||
*type = TSDB_DATA_TYPE_DOUBLE;
|
*type = TSDB_DATA_TYPE_DOUBLE;
|
||||||
*bytes = sizeof(SRateInfo);
|
*bytes = sizeof(SRateInfo);
|
||||||
*interBytes = sizeof(SRateInfo);
|
*interBytes = sizeof(SRateInfo);
|
||||||
|
@ -304,7 +302,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
|
||||||
*type = TSDB_DATA_TYPE_DOUBLE;
|
*type = TSDB_DATA_TYPE_DOUBLE;
|
||||||
*bytes = sizeof(double);
|
*bytes = sizeof(double);
|
||||||
*interBytes = sizeof(SAvgInfo);
|
*interBytes = sizeof(SAvgInfo);
|
||||||
} else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE) {
|
} else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_IRATE) {
|
||||||
*type = TSDB_DATA_TYPE_DOUBLE;
|
*type = TSDB_DATA_TYPE_DOUBLE;
|
||||||
*bytes = sizeof(double);
|
*bytes = sizeof(double);
|
||||||
*interBytes = sizeof(SRateInfo);
|
*interBytes = sizeof(SRateInfo);
|
||||||
|
@ -4479,36 +4477,34 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// RATE functions
|
// rate functions
|
||||||
|
static double do_calc_rate(const SRateInfo* pRateInfo, double tickPerSec) {
|
||||||
static double do_calc_rate(const SRateInfo* pRateInfo) {
|
if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->firstKey) ||
|
||||||
if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->firstKey) || (pRateInfo->firstKey >= pRateInfo->lastKey)) {
|
(pRateInfo->firstKey >= pRateInfo->lastKey)) {
|
||||||
return 0;
|
return 0.0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t diff = 0;
|
double diff = 0;
|
||||||
|
|
||||||
if (pRateInfo->isIRate) {
|
if (pRateInfo->isIRate) {
|
||||||
|
// If the previous value of the last is greater than the last value, only keep the last point instead of the delta
|
||||||
|
// value between two values.
|
||||||
diff = pRateInfo->lastValue;
|
diff = pRateInfo->lastValue;
|
||||||
if (diff >= pRateInfo->firstValue) {
|
if (diff >= pRateInfo->firstValue) {
|
||||||
diff -= pRateInfo->firstValue;
|
diff -= pRateInfo->firstValue;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
diff = pRateInfo->CorrectionValue + pRateInfo->lastValue - pRateInfo->firstValue;
|
diff = pRateInfo->correctionValue + pRateInfo->lastValue - pRateInfo->firstValue;
|
||||||
if (diff <= 0) {
|
if (diff <= 0) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t duration = pRateInfo->lastKey - pRateInfo->firstKey;
|
int64_t duration = pRateInfo->lastKey - pRateInfo->firstKey;
|
||||||
duration = (duration + 500) / 1000;
|
if (duration == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
double resultVal = ((double)diff) / duration;
|
return (duration > 0)? ((double)diff) / (duration/tickPerSec):0.0;
|
||||||
|
|
||||||
qDebug("do_calc_rate() isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64 " resultVal:%f",
|
|
||||||
pRateInfo->isIRate, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, resultVal);
|
|
||||||
|
|
||||||
return resultVal;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool rate_function_setup(SQLFunctionCtx *pCtx) {
|
static bool rate_function_setup(SQLFunctionCtx *pCtx) {
|
||||||
|
@ -4516,19 +4512,17 @@ static bool rate_function_setup(SQLFunctionCtx *pCtx) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); //->pOutput + pCtx->outputBytes;
|
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||||
SRateInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
|
||||||
|
|
||||||
pInfo->CorrectionValue = 0;
|
SRateInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
pInfo->correctionValue = 0;
|
||||||
pInfo->firstKey = INT64_MIN;
|
pInfo->firstKey = INT64_MIN;
|
||||||
pInfo->lastKey = INT64_MIN;
|
pInfo->lastKey = INT64_MIN;
|
||||||
pInfo->firstValue = INT64_MIN;
|
pInfo->firstValue = INT64_MIN;
|
||||||
pInfo->lastValue = INT64_MIN;
|
pInfo->lastValue = INT64_MIN;
|
||||||
pInfo->num = 0;
|
|
||||||
pInfo->sum = 0;
|
|
||||||
|
|
||||||
pInfo->hasResult = 0;
|
pInfo->hasResult = 0;
|
||||||
pInfo->isIRate = ((pCtx->functionId == TSDB_FUNC_IRATE) || (pCtx->functionId == TSDB_FUNC_SUM_IRATE) || (pCtx->functionId == TSDB_FUNC_AVG_IRATE));
|
pInfo->isIRate = (pCtx->functionId == TSDB_FUNC_IRATE);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4550,26 +4544,22 @@ static void rate_function(SQLFunctionCtx *pCtx) {
|
||||||
|
|
||||||
notNullElems++;
|
notNullElems++;
|
||||||
|
|
||||||
int64_t v = 0;
|
double v = 0;
|
||||||
GET_TYPED_DATA(v, int64_t, pCtx->inputType, pData);
|
GET_TYPED_DATA(v, double, pCtx->inputType, pData);
|
||||||
|
|
||||||
if ((INT64_MIN == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) {
|
if ((INT64_MIN == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) {
|
||||||
pRateInfo->firstValue = v;
|
pRateInfo->firstValue = v;
|
||||||
pRateInfo->firstKey = primaryKey[i];
|
pRateInfo->firstKey = primaryKey[i];
|
||||||
|
|
||||||
qDebug("firstValue:%" PRId64 " firstKey:%" PRId64, pRateInfo->firstValue, pRateInfo->firstKey);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (INT64_MIN == pRateInfo->lastValue) {
|
if (INT64_MIN == pRateInfo->lastValue) {
|
||||||
pRateInfo->lastValue = v;
|
pRateInfo->lastValue = v;
|
||||||
} else if (v < pRateInfo->lastValue) {
|
} else if (v < pRateInfo->lastValue) {
|
||||||
pRateInfo->CorrectionValue += pRateInfo->lastValue;
|
pRateInfo->correctionValue += pRateInfo->lastValue;
|
||||||
qDebug("CorrectionValue:%" PRId64, pRateInfo->CorrectionValue);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pRateInfo->lastValue = v;
|
pRateInfo->lastValue = v;
|
||||||
pRateInfo->lastKey = primaryKey[i];
|
pRateInfo->lastKey = primaryKey[i];
|
||||||
qDebug("lastValue:%" PRId64 " lastKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pCtx->hasNull) {
|
if (!pCtx->hasNull) {
|
||||||
|
@ -4600,8 +4590,8 @@ static void rate_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||||
SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo);
|
SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
TSKEY *primaryKey = GET_TS_LIST(pCtx);
|
TSKEY *primaryKey = GET_TS_LIST(pCtx);
|
||||||
|
|
||||||
int64_t v = 0;
|
double v = 0;
|
||||||
GET_TYPED_DATA(v, int64_t, pCtx->inputType, pData);
|
GET_TYPED_DATA(v, double, pCtx->inputType, pData);
|
||||||
|
|
||||||
if ((INT64_MIN == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) {
|
if ((INT64_MIN == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) {
|
||||||
pRateInfo->firstValue = v;
|
pRateInfo->firstValue = v;
|
||||||
|
@ -4611,14 +4601,12 @@ static void rate_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||||
if (INT64_MIN == pRateInfo->lastValue) {
|
if (INT64_MIN == pRateInfo->lastValue) {
|
||||||
pRateInfo->lastValue = v;
|
pRateInfo->lastValue = v;
|
||||||
} else if (v < pRateInfo->lastValue) {
|
} else if (v < pRateInfo->lastValue) {
|
||||||
pRateInfo->CorrectionValue += pRateInfo->lastValue;
|
pRateInfo->correctionValue += pRateInfo->lastValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRateInfo->lastValue = v;
|
pRateInfo->lastValue = v;
|
||||||
pRateInfo->lastKey = primaryKey[index];
|
pRateInfo->lastKey = primaryKey[index];
|
||||||
|
|
||||||
qDebug("====%p rate_function_f() index:%d lastValue:%" PRId64 " lastKey:%" PRId64 " CorrectionValue:%" PRId64, pCtx, index, pRateInfo->lastValue, pRateInfo->lastKey, pRateInfo->CorrectionValue);
|
|
||||||
|
|
||||||
SET_VAL(pCtx, 1, 1);
|
SET_VAL(pCtx, 1, 1);
|
||||||
|
|
||||||
// set has result flag
|
// set has result flag
|
||||||
|
@ -4637,27 +4625,18 @@ static void rate_func_copy(SQLFunctionCtx *pCtx) {
|
||||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||||
memcpy(GET_ROWCELL_INTERBUF(pResInfo), pCtx->pInput, (size_t)pCtx->inputBytes);
|
memcpy(GET_ROWCELL_INTERBUF(pResInfo), pCtx->pInput, (size_t)pCtx->inputBytes);
|
||||||
pResInfo->hasResult = ((SRateInfo*)pCtx->pInput)->hasResult;
|
pResInfo->hasResult = ((SRateInfo*)pCtx->pInput)->hasResult;
|
||||||
|
|
||||||
SRateInfo* pRateInfo = (SRateInfo*)pCtx->pInput;
|
|
||||||
qDebug("%p rate_func_merge() firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64 " hasResult:%d",
|
|
||||||
pCtx, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, pRateInfo->hasResult);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rate_finalizer(SQLFunctionCtx *pCtx) {
|
static void rate_finalizer(SQLFunctionCtx *pCtx) {
|
||||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||||
SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo);
|
SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
qDebug("%p isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64 " hasResult:%d",
|
|
||||||
pCtx, pRateInfo->isIRate, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, pRateInfo->hasResult);
|
|
||||||
|
|
||||||
if (pRateInfo->hasResult != DATA_SET_FLAG) {
|
if (pRateInfo->hasResult != DATA_SET_FLAG) {
|
||||||
setNull(pCtx->pOutput, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
|
setNull(pCtx->pOutput, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
*(double*)pCtx->pOutput = do_calc_rate(pRateInfo);
|
*(double*) pCtx->pOutput = do_calc_rate(pRateInfo, TSDB_TICK_PER_SECOND(pCtx->param[0].i64));
|
||||||
|
|
||||||
qDebug("rate_finalizer() output result:%f", *(double *)pCtx->pOutput);
|
|
||||||
|
|
||||||
// cannot set the numOfIteratedElems again since it is set during previous iteration
|
// cannot set the numOfIteratedElems again since it is set during previous iteration
|
||||||
pResInfo->numOfRes = 1;
|
pResInfo->numOfRes = 1;
|
||||||
|
@ -4667,44 +4646,32 @@ static void rate_finalizer(SQLFunctionCtx *pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void irate_function(SQLFunctionCtx *pCtx) {
|
static void irate_function(SQLFunctionCtx *pCtx) {
|
||||||
|
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
int32_t notNullElems = 0;
|
int32_t notNullElems = 0;
|
||||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo);
|
TSKEY *primaryKey = GET_TS_LIST(pCtx);
|
||||||
TSKEY *primaryKey = GET_TS_LIST(pCtx);
|
|
||||||
|
|
||||||
qDebug("%p irate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull);
|
|
||||||
|
|
||||||
if (pCtx->size < 1) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = pCtx->size - 1; i >= 0; --i) {
|
for (int32_t i = pCtx->size - 1; i >= 0; --i) {
|
||||||
char *pData = GET_INPUT_DATA(pCtx, i);
|
char *pData = GET_INPUT_DATA(pCtx, i);
|
||||||
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
|
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
|
||||||
qDebug("%p irate_function() index of null data:%d", pCtx, i);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
notNullElems++;
|
notNullElems++;
|
||||||
|
|
||||||
int64_t v = 0;
|
double v = 0;
|
||||||
GET_TYPED_DATA(v, int64_t, pCtx->inputType, pData);
|
GET_TYPED_DATA(v, double, pCtx->inputType, pData);
|
||||||
|
|
||||||
// TODO: calc once if only call this function once ????
|
if ((INT64_MIN == pRateInfo->lastKey) || primaryKey[i] > pRateInfo->lastKey) {
|
||||||
if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->lastValue)) {
|
|
||||||
pRateInfo->lastValue = v;
|
pRateInfo->lastValue = v;
|
||||||
pRateInfo->lastKey = primaryKey[i];
|
pRateInfo->lastKey = primaryKey[i];
|
||||||
|
|
||||||
qDebug("%p irate_function() lastValue:%" PRId64 " lastKey:%" PRId64, pCtx, pRateInfo->lastValue, pRateInfo->lastKey);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((INT64_MIN == pRateInfo->firstKey) || (INT64_MIN == pRateInfo->firstValue)){
|
if ((INT64_MIN == pRateInfo->firstKey) || primaryKey[i] > pRateInfo->firstKey) {
|
||||||
pRateInfo->firstValue = v;
|
pRateInfo->firstValue = v;
|
||||||
pRateInfo->firstKey = primaryKey[i];
|
pRateInfo->firstKey = primaryKey[i];
|
||||||
|
|
||||||
qDebug("%p irate_function() firstValue:%" PRId64 " firstKey:%" PRId64, pCtx, pRateInfo->firstValue, pRateInfo->firstKey);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4733,8 +4700,8 @@ static void irate_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||||
SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo);
|
SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
TSKEY *primaryKey = GET_TS_LIST(pCtx);
|
TSKEY *primaryKey = GET_TS_LIST(pCtx);
|
||||||
|
|
||||||
int64_t v = 0;
|
double v = 0;
|
||||||
GET_TYPED_DATA(v, int64_t, pCtx->inputType, pData);
|
GET_TYPED_DATA(v, double, pCtx->inputType, pData);
|
||||||
|
|
||||||
pRateInfo->firstKey = pRateInfo->lastKey;
|
pRateInfo->firstKey = pRateInfo->lastKey;
|
||||||
pRateInfo->firstValue = pRateInfo->lastValue;
|
pRateInfo->firstValue = pRateInfo->lastValue;
|
||||||
|
@ -4742,8 +4709,7 @@ static void irate_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||||
pRateInfo->lastValue = v;
|
pRateInfo->lastValue = v;
|
||||||
pRateInfo->lastKey = primaryKey[index];
|
pRateInfo->lastKey = primaryKey[index];
|
||||||
|
|
||||||
qDebug("====%p irate_function_f() index:%d lastValue:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " firstKey:%" PRId64, pCtx, index, pRateInfo->lastValue, pRateInfo->lastKey, pRateInfo->firstValue , pRateInfo->firstKey);
|
// qDebug("====%p irate_function_f() index:%d lastValue:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " firstKey:%" PRId64, pCtx, index, pRateInfo->lastValue, pRateInfo->lastKey, pRateInfo->firstValue , pRateInfo->firstKey);
|
||||||
|
|
||||||
SET_VAL(pCtx, 1, 1);
|
SET_VAL(pCtx, 1, 1);
|
||||||
|
|
||||||
// set has result flag
|
// set has result flag
|
||||||
|
@ -4756,68 +4722,6 @@ static void irate_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void do_sumrate_merge(SQLFunctionCtx *pCtx) {
|
|
||||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
|
||||||
assert(pCtx->stableQuery);
|
|
||||||
|
|
||||||
SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo);
|
|
||||||
char * input = GET_INPUT_DATA_LIST(pCtx);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) {
|
|
||||||
SRateInfo *pInput = (SRateInfo *)input;
|
|
||||||
|
|
||||||
qDebug("%p do_sumrate_merge() hasResult:%d input num:%" PRId64 " input sum:%f total num:%" PRId64 " total sum:%f", pCtx, pInput->hasResult, pInput->num, pInput->sum, pRateInfo->num, pRateInfo->sum);
|
|
||||||
|
|
||||||
if (pInput->hasResult != DATA_SET_FLAG) {
|
|
||||||
continue;
|
|
||||||
} else if (pInput->num == 0) {
|
|
||||||
pRateInfo->sum += do_calc_rate(pInput);
|
|
||||||
pRateInfo->num++;
|
|
||||||
} else {
|
|
||||||
pRateInfo->sum += pInput->sum;
|
|
||||||
pRateInfo->num += pInput->num;
|
|
||||||
}
|
|
||||||
pRateInfo->hasResult = DATA_SET_FLAG;
|
|
||||||
}
|
|
||||||
|
|
||||||
// if the data set hasResult is not set, the result is null
|
|
||||||
if (DATA_SET_FLAG == pRateInfo->hasResult) {
|
|
||||||
pResInfo->hasResult = DATA_SET_FLAG;
|
|
||||||
SET_VAL(pCtx, pRateInfo->num, 1);
|
|
||||||
memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SRateInfo));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void sumrate_func_merge(SQLFunctionCtx *pCtx) {
|
|
||||||
qDebug("%p sumrate_func_merge() process ...", pCtx);
|
|
||||||
do_sumrate_merge(pCtx);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void sumrate_finalizer(SQLFunctionCtx *pCtx) {
|
|
||||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
|
||||||
SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo);
|
|
||||||
|
|
||||||
qDebug("%p sumrate_finalizer() superTableQ:%d num:%" PRId64 " sum:%f hasResult:%d", pCtx, pCtx->stableQuery, pRateInfo->num, pRateInfo->sum, pRateInfo->hasResult);
|
|
||||||
|
|
||||||
if (pRateInfo->hasResult != DATA_SET_FLAG) {
|
|
||||||
setNull(pCtx->pOutput, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pRateInfo->num == 0) {
|
|
||||||
// from meter
|
|
||||||
*(double*)pCtx->pOutput = do_calc_rate(pRateInfo);
|
|
||||||
} else if (pCtx->functionId == TSDB_FUNC_SUM_RATE || pCtx->functionId == TSDB_FUNC_SUM_IRATE) {
|
|
||||||
*(double*)pCtx->pOutput = pRateInfo->sum;
|
|
||||||
} else {
|
|
||||||
*(double*)pCtx->pOutput = pRateInfo->sum / pRateInfo->num;
|
|
||||||
}
|
|
||||||
|
|
||||||
pResInfo->numOfRes = 1;
|
|
||||||
pResInfo->hasResult = DATA_SET_FLAG;
|
|
||||||
doFinalizer(pCtx);
|
|
||||||
}
|
|
||||||
|
|
||||||
void blockInfo_func(SQLFunctionCtx* pCtx) {
|
void blockInfo_func(SQLFunctionCtx* pCtx) {
|
||||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||||
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo);
|
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
@ -4983,12 +4887,12 @@ void blockinfo_func_finalizer(SQLFunctionCtx* pCtx) {
|
||||||
int32_t functionCompatList[] = {
|
int32_t functionCompatList[] = {
|
||||||
// count, sum, avg, min, max, stddev, percentile, apercentile, first, last
|
// count, sum, avg, min, max, stddev, percentile, apercentile, first, last
|
||||||
1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
|
1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
|
||||||
// last_row,top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_z
|
// last_row,top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_comp
|
||||||
4, -1, -1, 1, 1, 1, 1, 1, 1, -1,
|
4, -1, -1, 1, 1, 1, 1, 1, 1, -1,
|
||||||
// tag, colprj, tagprj, arithmetic, diff, first_dist, last_dist, interp rate irate
|
// tag, colprj, tagprj, arithmetic, diff, first_dist, last_dist, stddev_dst, interp rate irate
|
||||||
1, 1, 1, 1, -1, 1, 1, 5, 1, 1,
|
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1,
|
||||||
// sum_rate, sum_irate, avg_rate, avg_irate, tid_tag, blk_info
|
// tid_tag, blk_info
|
||||||
1, 1, 1, 1, 6, 7
|
6, 7
|
||||||
};
|
};
|
||||||
|
|
||||||
SAggFunctionInfo aAggs[] = {{
|
SAggFunctionInfo aAggs[] = {{
|
||||||
|
@ -5400,58 +5304,6 @@ SAggFunctionInfo aAggs[] = {{
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
// 31
|
// 31
|
||||||
"sum_rate",
|
|
||||||
TSDB_FUNC_SUM_RATE,
|
|
||||||
TSDB_FUNC_SUM_RATE,
|
|
||||||
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS,
|
|
||||||
rate_function_setup,
|
|
||||||
rate_function,
|
|
||||||
rate_function_f,
|
|
||||||
sumrate_finalizer,
|
|
||||||
sumrate_func_merge,
|
|
||||||
dataBlockRequired,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
// 32
|
|
||||||
"sum_irate",
|
|
||||||
TSDB_FUNC_SUM_IRATE,
|
|
||||||
TSDB_FUNC_SUM_IRATE,
|
|
||||||
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS,
|
|
||||||
rate_function_setup,
|
|
||||||
irate_function,
|
|
||||||
irate_function_f,
|
|
||||||
sumrate_finalizer,
|
|
||||||
sumrate_func_merge,
|
|
||||||
dataBlockRequired,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
// 33
|
|
||||||
"avg_rate",
|
|
||||||
TSDB_FUNC_AVG_RATE,
|
|
||||||
TSDB_FUNC_AVG_RATE,
|
|
||||||
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS,
|
|
||||||
rate_function_setup,
|
|
||||||
rate_function,
|
|
||||||
rate_function_f,
|
|
||||||
sumrate_finalizer,
|
|
||||||
sumrate_func_merge,
|
|
||||||
dataBlockRequired,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
// 34
|
|
||||||
"avg_irate",
|
|
||||||
TSDB_FUNC_AVG_IRATE,
|
|
||||||
TSDB_FUNC_AVG_IRATE,
|
|
||||||
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS,
|
|
||||||
rate_function_setup,
|
|
||||||
irate_function,
|
|
||||||
irate_function_f,
|
|
||||||
sumrate_finalizer,
|
|
||||||
sumrate_func_merge,
|
|
||||||
dataBlockRequired,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
// 35
|
|
||||||
"tbid", // return table id and the corresponding tags for join match and subscribe
|
"tbid", // return table id and the corresponding tags for join match and subscribe
|
||||||
TSDB_FUNC_TID_TAG,
|
TSDB_FUNC_TID_TAG,
|
||||||
TSDB_FUNC_TID_TAG,
|
TSDB_FUNC_TID_TAG,
|
||||||
|
@ -5464,15 +5316,15 @@ SAggFunctionInfo aAggs[] = {{
|
||||||
dataBlockRequired,
|
dataBlockRequired,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
// 35
|
// 32
|
||||||
"_block_dist", // return table id and the corresponding tags for join match and subscribe
|
"_block_dist", // return table id and the corresponding tags for join match and subscribe
|
||||||
TSDB_FUNC_BLKINFO,
|
TSDB_FUNC_BLKINFO,
|
||||||
TSDB_FUNC_BLKINFO,
|
TSDB_FUNC_BLKINFO,
|
||||||
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STABLE,
|
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STABLE,
|
||||||
function_setup,
|
function_setup,
|
||||||
blockInfo_func,
|
blockInfo_func,
|
||||||
noop2,
|
noop2,
|
||||||
blockinfo_func_finalizer,
|
blockinfo_func_finalizer,
|
||||||
block_func_merge,
|
block_func_merge,
|
||||||
dataBlockRequired,
|
dataBlockRequired,
|
||||||
}};
|
}};
|
||||||
|
|
|
@ -1990,23 +1990,6 @@ void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELL
|
||||||
// return false;
|
// return false;
|
||||||
//}
|
//}
|
||||||
|
|
||||||
// TODO REFACTOR:MERGE WITH CLIENT-SIDE FUNCTION
|
|
||||||
static UNUSED_FUNC bool isSumAvgRateQuery(SQueryAttr *pQueryAttr) {
|
|
||||||
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
|
|
||||||
int32_t functionId = pQueryAttr->pExpr1[i].base.functionId;
|
|
||||||
if (functionId == TSDB_FUNC_TS) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (functionId == TSDB_FUNC_SUM_RATE || functionId == TSDB_FUNC_SUM_IRATE || functionId == TSDB_FUNC_AVG_RATE ||
|
|
||||||
functionId == TSDB_FUNC_AVG_IRATE) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool isFirstLastRowQuery(SQueryAttr *pQueryAttr) {
|
static bool isFirstLastRowQuery(SQueryAttr *pQueryAttr) {
|
||||||
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
|
||||||
int32_t functionID = pQueryAttr->pExpr1[i].base.functionId;
|
int32_t functionID = pQueryAttr->pExpr1[i].base.functionId;
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
#ifndef _TD_TSDB_COMMIT_QUEUE_H_
|
#ifndef _TD_TSDB_COMMIT_QUEUE_H_
|
||||||
#define _TD_TSDB_COMMIT_QUEUE_H_
|
#define _TD_TSDB_COMMIT_QUEUE_H_
|
||||||
|
|
||||||
typedef enum { COMMIT_REQ, COMPACT_REQ } TSDB_REQ_T;
|
typedef enum { COMMIT_REQ, COMPACT_REQ,COMMIT_CONFIG_REQ } TSDB_REQ_T;
|
||||||
|
|
||||||
int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req);
|
int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req);
|
||||||
|
|
||||||
|
|
|
@ -66,6 +66,7 @@ int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot, SArray* pAT
|
||||||
void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot);
|
void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot);
|
||||||
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
|
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
|
||||||
int tsdbAsyncCommit(STsdbRepo* pRepo);
|
int tsdbAsyncCommit(STsdbRepo* pRepo);
|
||||||
|
int tsdbSyncCommitConfig(STsdbRepo* pRepo);
|
||||||
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
|
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
|
||||||
TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo);
|
TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo);
|
||||||
void* tsdbCommitData(STsdbRepo* pRepo);
|
void* tsdbCommitData(STsdbRepo* pRepo);
|
||||||
|
|
|
@ -78,7 +78,6 @@ struct STsdbRepo {
|
||||||
bool config_changed; // config changed flag
|
bool config_changed; // config changed flag
|
||||||
pthread_mutex_t save_mutex; // protect save config
|
pthread_mutex_t save_mutex; // protect save config
|
||||||
|
|
||||||
uint8_t hasCachedLastRow;
|
|
||||||
uint8_t hasCachedLastColumn;
|
uint8_t hasCachedLastColumn;
|
||||||
|
|
||||||
STsdbAppH appH;
|
STsdbAppH appH;
|
||||||
|
|
|
@ -180,15 +180,14 @@ static void *tsdbLoopCommit(void *arg) {
|
||||||
req = ((SReq *)pNode->data)->req;
|
req = ((SReq *)pNode->data)->req;
|
||||||
pRepo = ((SReq *)pNode->data)->pRepo;
|
pRepo = ((SReq *)pNode->data)->pRepo;
|
||||||
|
|
||||||
// check if need to apply new config
|
|
||||||
if (pRepo->config_changed) {
|
|
||||||
tsdbApplyRepoConfig(pRepo);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (req == COMMIT_REQ) {
|
if (req == COMMIT_REQ) {
|
||||||
tsdbCommitData(pRepo);
|
tsdbCommitData(pRepo);
|
||||||
} else if (req == COMPACT_REQ) {
|
} else if (req == COMPACT_REQ) {
|
||||||
tsdbCompactImpl(pRepo);
|
tsdbCompactImpl(pRepo);
|
||||||
|
} else if (req == COMMIT_CONFIG_REQ) {
|
||||||
|
ASSERT(pRepo->config_changed);
|
||||||
|
tsdbApplyRepoConfig(pRepo);
|
||||||
|
tsem_post(&(pRepo->readyToCommit));
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -270,8 +270,8 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) {
|
||||||
|
|
||||||
pthread_mutex_unlock(&repo->save_mutex);
|
pthread_mutex_unlock(&repo->save_mutex);
|
||||||
|
|
||||||
// schedule a commit msg then the new config will be applied immediatly
|
// schedule a commit msg and wait for the new config applied
|
||||||
tsdbAsyncCommit(repo);
|
tsdbSyncCommitConfig(repo);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -553,7 +553,6 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pRepo->config_changed = false;
|
pRepo->config_changed = false;
|
||||||
atomic_store_8(&pRepo->hasCachedLastRow, 0);
|
|
||||||
atomic_store_8(&pRepo->hasCachedLastColumn, 0);
|
atomic_store_8(&pRepo->hasCachedLastColumn, 0);
|
||||||
|
|
||||||
code = tsem_init(&(pRepo->readyToCommit), 0, 1);
|
code = tsem_init(&(pRepo->readyToCommit), 0, 1);
|
||||||
|
@ -857,9 +856,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbDestroyReadH(&readh);
|
tsdbDestroyReadH(&readh);
|
||||||
if (CACHE_LAST_ROW(pCfg)) {
|
|
||||||
atomic_store_8(&pRepo->hasCachedLastRow, 1);
|
|
||||||
}
|
|
||||||
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
|
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
|
||||||
atomic_store_8(&pRepo->hasCachedLastColumn, 1);
|
atomic_store_8(&pRepo->hasCachedLastColumn, 1);
|
||||||
}
|
}
|
||||||
|
@ -900,20 +897,16 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
|
||||||
|
|
||||||
// if close last option,need to free data
|
// if close last option,need to free data
|
||||||
if (need_free_last_row || need_free_last_col) {
|
if (need_free_last_row || need_free_last_col) {
|
||||||
if (need_free_last_row) {
|
|
||||||
atomic_store_8(&pRepo->hasCachedLastRow, 0);
|
|
||||||
}
|
|
||||||
if (need_free_last_col) {
|
if (need_free_last_col) {
|
||||||
atomic_store_8(&pRepo->hasCachedLastColumn, 0);
|
atomic_store_8(&pRepo->hasCachedLastColumn, 0);
|
||||||
}
|
}
|
||||||
tsdbInfo("free cache last data since cacheLast option changed");
|
tsdbInfo("free cache last data since cacheLast option changed");
|
||||||
for (int i = 1; i < maxTableIdx; i++) {
|
for (int i = 1; i <= maxTableIdx; i++) {
|
||||||
STable *pTable = pMeta->tables[i];
|
STable *pTable = pMeta->tables[i];
|
||||||
if (pTable == NULL) continue;
|
if (pTable == NULL) continue;
|
||||||
if (need_free_last_row) {
|
if (need_free_last_row) {
|
||||||
taosTZfree(pTable->lastRow);
|
taosTZfree(pTable->lastRow);
|
||||||
pTable->lastRow = NULL;
|
pTable->lastRow = NULL;
|
||||||
pTable->lastKey = TSKEY_INITIAL_VAL;
|
|
||||||
}
|
}
|
||||||
if (need_free_last_col) {
|
if (need_free_last_col) {
|
||||||
tsdbFreeLastColumns(pTable);
|
tsdbFreeLastColumns(pTable);
|
||||||
|
@ -983,9 +976,6 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
|
||||||
|
|
||||||
tsdbDestroyReadH(&readh);
|
tsdbDestroyReadH(&readh);
|
||||||
|
|
||||||
if (cacheLastRow) {
|
|
||||||
atomic_store_8(&pRepo->hasCachedLastRow, 1);
|
|
||||||
}
|
|
||||||
if (cacheLastCol) {
|
if (cacheLastCol) {
|
||||||
atomic_store_8(&pRepo->hasCachedLastColumn, 1);
|
atomic_store_8(&pRepo->hasCachedLastColumn, 1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -271,10 +271,34 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
||||||
return ptr;
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tsdbSyncCommitConfig(STsdbRepo* pRepo) {
|
||||||
|
ASSERT(pRepo->config_changed == true);
|
||||||
|
tsem_wait(&(pRepo->readyToCommit));
|
||||||
|
|
||||||
|
if (pRepo->code != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbWarn("vgId:%d try to commit config when TSDB not in good state: %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsdbLockRepo(pRepo) < 0) return -1;
|
||||||
|
tsdbScheduleCommit(pRepo, COMMIT_CONFIG_REQ);
|
||||||
|
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||||
|
|
||||||
|
tsem_wait(&(pRepo->readyToCommit));
|
||||||
|
tsem_post(&(pRepo->readyToCommit));
|
||||||
|
|
||||||
|
if (pRepo->code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = pRepo->code;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int tsdbAsyncCommit(STsdbRepo *pRepo) {
|
int tsdbAsyncCommit(STsdbRepo *pRepo) {
|
||||||
tsem_wait(&(pRepo->readyToCommit));
|
tsem_wait(&(pRepo->readyToCommit));
|
||||||
|
|
||||||
//ASSERT(pRepo->imem == NULL);
|
ASSERT(pRepo->imem == NULL);
|
||||||
if (pRepo->mem == NULL) {
|
if (pRepo->mem == NULL) {
|
||||||
tsem_post(&(pRepo->readyToCommit));
|
tsem_post(&(pRepo->readyToCommit));
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1015,7 +1039,6 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow
|
||||||
taosTZfree(pTable->lastRow);
|
taosTZfree(pTable->lastRow);
|
||||||
TSDB_WLOCK_TABLE(pTable);
|
TSDB_WLOCK_TABLE(pTable);
|
||||||
pTable->lastRow = NULL;
|
pTable->lastRow = NULL;
|
||||||
pTable->lastKey = TSKEY_INITIAL_VAL;
|
|
||||||
TSDB_WUNLOCK_TABLE(pTable);
|
TSDB_WUNLOCK_TABLE(pTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2469,7 +2469,6 @@ static bool loadCachedLastRow(STsdbQueryHandle* pQueryHandle) {
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL);
|
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL);
|
||||||
tfree(pRow);
|
tfree(pRow);
|
||||||
|
|
||||||
|
@ -2860,24 +2859,29 @@ bool tsdbGetExternalRow(TsdbQueryHandleT pHandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* 1. no data at all (pTable->lastKey = TSKEY_INITIAL_VAL), just return TSKEY_INITIAL_VAL
|
* if lastRow == NULL, return TSDB_CODE_TDB_NO_CACHE_LAST_ROW
|
||||||
* 2. has data but not loaded, just return lastKey but not set pRes
|
* else set pRes and return TSDB_CODE_SUCCESS and save lastKey
|
||||||
* 3. has data and loaded, return lastKey and set pRes
|
|
||||||
*/
|
*/
|
||||||
int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) {
|
int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) {
|
||||||
TSDB_RLOCK_TABLE(pTable);
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
*lastKey = pTable->lastKey;
|
|
||||||
|
|
||||||
if ((*lastKey) != TSKEY_INITIAL_VAL && pTable->lastRow) {
|
TSDB_RLOCK_TABLE(pTable);
|
||||||
|
|
||||||
|
if (!pTable->lastRow) {
|
||||||
|
code = TSDB_CODE_TDB_NO_CACHE_LAST_ROW;
|
||||||
|
goto out;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pRes) {
|
||||||
*pRes = tdDataRowDup(pTable->lastRow);
|
*pRes = tdDataRowDup(pTable->lastRow);
|
||||||
if (*pRes == NULL) {
|
if (*pRes == NULL) {
|
||||||
TSDB_RUNLOCK_TABLE(pTable);
|
code = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
return TSDB_CODE_TDB_OUT_OF_MEMORY;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
out:
|
||||||
TSDB_RUNLOCK_TABLE(pTable);
|
TSDB_RUNLOCK_TABLE(pTable);
|
||||||
return TSDB_CODE_SUCCESS;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle) {
|
bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle) {
|
||||||
|
@ -2887,7 +2891,6 @@ bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle) {
|
||||||
int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList) {
|
int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList) {
|
||||||
assert(pQueryHandle != NULL && groupList != NULL);
|
assert(pQueryHandle != NULL && groupList != NULL);
|
||||||
|
|
||||||
SDataRow pRow = NULL;
|
|
||||||
TSKEY key = TSKEY_INITIAL_VAL;
|
TSKEY key = TSKEY_INITIAL_VAL;
|
||||||
|
|
||||||
SArray* group = taosArrayGetP(groupList->pGroupList, 0);
|
SArray* group = taosArrayGetP(groupList->pGroupList, 0);
|
||||||
|
@ -2898,7 +2901,7 @@ int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *g
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (((STable*)pInfo->pTable)->lastRow) {
|
if (((STable*)pInfo->pTable)->lastRow) {
|
||||||
code = tsdbGetCachedLastRow(pInfo->pTable, &pRow, &key);
|
code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pQueryHandle->cachelastrow = 0;
|
pQueryHandle->cachelastrow = 0;
|
||||||
} else {
|
} else {
|
||||||
|
@ -2913,7 +2916,6 @@ int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *g
|
||||||
pQueryHandle->activeIndex = -1; // start from -1
|
pQueryHandle->activeIndex = -1; // start from -1
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(pRow);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -402,10 +402,6 @@ void verify_prepare(TAOS* taos) {
|
||||||
taos_stmt_close(stmt);
|
taos_stmt_close(stmt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void verify_prepare2(TAOS* taos) {
|
void verify_prepare2(TAOS* taos) {
|
||||||
TAOS_RES* result = taos_query(taos, "drop database if exists test;");
|
TAOS_RES* result = taos_query(taos, "drop database if exists test;");
|
||||||
taos_free_result(result);
|
taos_free_result(result);
|
||||||
|
@ -531,10 +527,9 @@ void verify_prepare2(TAOS* taos) {
|
||||||
params[9].is_null = is_null;
|
params[9].is_null = is_null;
|
||||||
params[9].num = 10;
|
params[9].num = 10;
|
||||||
|
|
||||||
|
sql = "insert into ? (ts, b, v1, v2, v4, v8, f4, f8, bin, blob) values(?,?,?,?,?,?,?,?,?,?)";
|
||||||
sql = "insert into ? values(?,?,?,?,?,?,?,?,?,?)";
|
|
||||||
code = taos_stmt_prepare(stmt, sql, 0);
|
code = taos_stmt_prepare(stmt, sql, 0);
|
||||||
if (code != 0){
|
if (code != 0) {
|
||||||
printf("\033[31mfailed to execute taos_stmt_prepare. code:0x%x\033[0m\n", code);
|
printf("\033[31mfailed to execute taos_stmt_prepare. code:0x%x\033[0m\n", code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -577,10 +572,9 @@ void verify_prepare2(TAOS* taos) {
|
||||||
printf("\033[31mfailed to execute insert statement.\033[0m\n");
|
printf("\033[31mfailed to execute insert statement.\033[0m\n");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_stmt_close(stmt);
|
taos_stmt_close(stmt);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// query the records
|
// query the records
|
||||||
stmt = taos_stmt_init(taos);
|
stmt = taos_stmt_init(taos);
|
||||||
taos_stmt_prepare(stmt, "SELECT * FROM m1 WHERE v1 > ? AND v2 < ?", 0);
|
taos_stmt_prepare(stmt, "SELECT * FROM m1 WHERE v1 > ? AND v2 < ?", 0);
|
||||||
|
@ -623,10 +617,17 @@ void verify_prepare2(TAOS* taos) {
|
||||||
|
|
||||||
taos_free_result(result);
|
taos_free_result(result);
|
||||||
taos_stmt_close(stmt);
|
taos_stmt_close(stmt);
|
||||||
|
|
||||||
|
free(t8_len);
|
||||||
|
free(t16_len);
|
||||||
|
free(t32_len);
|
||||||
|
free(t64_len);
|
||||||
|
free(float_len);
|
||||||
|
free(double_len);
|
||||||
|
free(bin_len);
|
||||||
|
free(blob_len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void verify_prepare3(TAOS* taos) {
|
void verify_prepare3(TAOS* taos) {
|
||||||
TAOS_RES* result = taos_query(taos, "drop database if exists test;");
|
TAOS_RES* result = taos_query(taos, "drop database if exists test;");
|
||||||
taos_free_result(result);
|
taos_free_result(result);
|
||||||
|
@ -810,7 +811,6 @@ void verify_prepare3(TAOS* taos) {
|
||||||
blob_len[i] = (int32_t)strlen(v.blob[i]);
|
blob_len[i] = (int32_t)strlen(v.blob[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
taos_stmt_bind_param_batch(stmt, params);
|
taos_stmt_bind_param_batch(stmt, params);
|
||||||
taos_stmt_add_batch(stmt);
|
taos_stmt_add_batch(stmt);
|
||||||
|
|
||||||
|
@ -852,10 +852,12 @@ void verify_prepare3(TAOS* taos) {
|
||||||
int rows = 0;
|
int rows = 0;
|
||||||
int num_fields = taos_num_fields(result);
|
int num_fields = taos_num_fields(result);
|
||||||
TAOS_FIELD *fields = taos_fetch_fields(result);
|
TAOS_FIELD *fields = taos_fetch_fields(result);
|
||||||
char temp[256];
|
char temp[256] = {0};
|
||||||
|
|
||||||
// fetch the records row by row
|
// fetch the records row by row
|
||||||
while ((row = taos_fetch_row(result))) {
|
while ((row = taos_fetch_row(result))) {
|
||||||
|
memset(temp, 0, sizeof(temp)/sizeof(temp[0]));
|
||||||
|
|
||||||
rows++;
|
rows++;
|
||||||
taos_print_row(temp, row, fields, num_fields);
|
taos_print_row(temp, row, fields, num_fields);
|
||||||
printf("%s\n", temp);
|
printf("%s\n", temp);
|
||||||
|
@ -863,10 +865,17 @@ void verify_prepare3(TAOS* taos) {
|
||||||
|
|
||||||
taos_free_result(result);
|
taos_free_result(result);
|
||||||
taos_stmt_close(stmt);
|
taos_stmt_close(stmt);
|
||||||
|
|
||||||
|
free(t8_len);
|
||||||
|
free(t16_len);
|
||||||
|
free(t32_len);
|
||||||
|
free(t64_len);
|
||||||
|
free(float_len);
|
||||||
|
free(double_len);
|
||||||
|
free(bin_len);
|
||||||
|
free(blob_len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void retrieve_callback(void *param, TAOS_RES *tres, int numOfRows)
|
void retrieve_callback(void *param, TAOS_RES *tres, int numOfRows)
|
||||||
{
|
{
|
||||||
if (numOfRows > 0) {
|
if (numOfRows > 0) {
|
||||||
|
|
|
@ -0,0 +1,146 @@
|
||||||
|
###################################################################
|
||||||
|
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# This file is proprietary and confidential to TAOS Technologies.
|
||||||
|
# No part of this file may be reproduced, stored, transmitted,
|
||||||
|
# disclosed or used in any form or by any means other than as
|
||||||
|
# expressly provided by the written permission from Jianhui Tao
|
||||||
|
#
|
||||||
|
###################################################################
|
||||||
|
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import sys
|
||||||
|
from util.log import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.sql import *
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def init(self, conn, logSql):
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor(), logSql)
|
||||||
|
|
||||||
|
def alterKeepCommunity(self):
|
||||||
|
## community accepts both 1 paramater, 2 parmaters and 3 paramaters
|
||||||
|
## but paramaters other than paramater 1 will be ignored
|
||||||
|
## only paramater 1 will be used
|
||||||
|
tdSql.query('show databases')
|
||||||
|
tdSql.checkData(0,7,'3650,3650,3650')
|
||||||
|
|
||||||
|
tdSql.execute('alter database db keep 10')
|
||||||
|
tdSql.query('show databases')
|
||||||
|
tdSql.checkData(0,7,'10,10,10')
|
||||||
|
|
||||||
|
tdSql.execute('alter database db keep 50')
|
||||||
|
tdSql.query('show databases')
|
||||||
|
tdSql.checkData(0,7,'50,50,50')
|
||||||
|
|
||||||
|
tdSql.execute('alter database db keep 20')
|
||||||
|
tdSql.query('show databases')
|
||||||
|
tdSql.checkData(0,7,'20,20,20')
|
||||||
|
|
||||||
|
tdSql.execute('alter database db keep 100, 98 ,99')
|
||||||
|
tdSql.query('show databases')
|
||||||
|
tdSql.checkData(0,7,'100,100,100')
|
||||||
|
|
||||||
|
tdSql.execute('alter database db keep 99, 100 ,101')
|
||||||
|
tdSql.query('show databases')
|
||||||
|
tdSql.checkData(0,7,'99,99,99')
|
||||||
|
|
||||||
|
tdSql.execute('alter database db keep 200, 199 ,198')
|
||||||
|
tdSql.query('show databases')
|
||||||
|
tdSql.checkData(0,7,'200,200,200')
|
||||||
|
|
||||||
|
tdSql.execute('alter database db keep 4000,4001')
|
||||||
|
tdSql.query('show databases')
|
||||||
|
tdSql.checkData(0,7,'4000,4000,4000')
|
||||||
|
|
||||||
|
tdSql.execute('alter database db keep 5000,50')
|
||||||
|
tdSql.query('show databases')
|
||||||
|
tdSql.checkData(0,7,'5000,5000,5000')
|
||||||
|
|
||||||
|
tdSql.execute('alter database db keep 50,5000')
|
||||||
|
tdSql.query('show databases')
|
||||||
|
tdSql.checkData(0,7,'50,50,50')
|
||||||
|
|
||||||
|
|
||||||
|
def alterKeepEnterprise(self):
|
||||||
|
## enterprise only accept three inputs
|
||||||
|
## does not accept 1 paramaters nor 3 paramaters
|
||||||
|
tdSql.query('show databases')
|
||||||
|
tdSql.checkData(0,7,'3650,3650,3650')
|
||||||
|
|
||||||
|
tdSql.error('alter database db keep 10')
|
||||||
|
tdSql.query('show databases')
|
||||||
|
tdSql.checkData(0,7,'3650,3650,3650')
|
||||||
|
|
||||||
|
## the order for altering keep is keep(D), keep0, keep1.
|
||||||
|
## if the order is changed, please modify the following test
|
||||||
|
## to make sure the the test is accurate
|
||||||
|
|
||||||
|
tdSql.execute('alter database db keep 10, 10 ,10')
|
||||||
|
tdSql.query('show databases')
|
||||||
|
tdSql.checkData(0,7,'10,10,10')
|
||||||
|
|
||||||
|
tdSql.execute('alter database db keep 100, 98 ,99')
|
||||||
|
tdSql.query('show databases')
|
||||||
|
tdSql.checkData(0,7,'98,99,100')
|
||||||
|
|
||||||
|
tdSql.execute('alter database db keep 200, 200 ,200')
|
||||||
|
tdSql.query('show databases')
|
||||||
|
tdSql.checkData(0,7,'200,200,200')
|
||||||
|
|
||||||
|
tdSql.error('alter database db keep 198, 199 ,200')
|
||||||
|
tdSql.query('show databases')
|
||||||
|
tdSql.checkData(0,7,'200,200,200')
|
||||||
|
|
||||||
|
# tdSql.execute('alter database db keep 3650,3650,3650')
|
||||||
|
# tdSql.error('alter database db keep 4000,3640')
|
||||||
|
# tdSql.error('alter database db keep 10,10')
|
||||||
|
# tdSql.query('show databases')
|
||||||
|
# tdSql.checkData(0,7,'3650,3650,3650')
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
tdSql.prepare()
|
||||||
|
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
|
||||||
|
if ("community" in selfPath):
|
||||||
|
tdLog.debug('running enterprise test')
|
||||||
|
self.alterKeepEnterprise()
|
||||||
|
else:
|
||||||
|
tdLog.debug('running community test')
|
||||||
|
self.alterKeepCommunity()
|
||||||
|
|
||||||
|
|
||||||
|
##TODO: need to wait for TD-4445 to implement the following
|
||||||
|
## tests
|
||||||
|
# tdSql.prepare()
|
||||||
|
# tdSql.execute('create table tb (ts timestamp, speed int)')
|
||||||
|
# tdSql.execute('alter database db keep 10,10,10')
|
||||||
|
# tdSql.execute('insert into tb values (now, 10)')
|
||||||
|
# tdSql.execute('insert into tb values (now + 10m, 10)')
|
||||||
|
# tdSql.query('select * from tb')
|
||||||
|
# tdSql.checkRows(2)
|
||||||
|
# tdSql.execute('alter database db keep 40,40,40')
|
||||||
|
# tdSql.query('show databases')
|
||||||
|
# tdSql.checkData(0,7,'40,40,40')
|
||||||
|
# tdSql.error('insert into tb values (now-60d, 10)')
|
||||||
|
# tdSql.execute('insert into tb values (now-30d, 10)')
|
||||||
|
# tdSql.query('select * from tb')
|
||||||
|
# tdSql.checkRows(3)
|
||||||
|
# tdSql.execute('alter database db keep 20,20,20')
|
||||||
|
# tdSql.query('show databases')
|
||||||
|
# tdSql.checkData(0,7,'20,20,20')
|
||||||
|
# tdSql.query('select * from tb')
|
||||||
|
# tdSql.checkRows(2)
|
||||||
|
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -338,6 +338,7 @@ python3 test.py -f tools/taosdemoAllTest/taosdemoTestInsertWithJson.py
|
||||||
python3 test.py -f tools/taosdemoAllTest/taosdemoTestQueryWithJson.py
|
python3 test.py -f tools/taosdemoAllTest/taosdemoTestQueryWithJson.py
|
||||||
python3 ./test.py -f tag_lite/drop_auto_create.py
|
python3 ./test.py -f tag_lite/drop_auto_create.py
|
||||||
python3 test.py -f insert/insert_before_use_db.py
|
python3 test.py -f insert/insert_before_use_db.py
|
||||||
|
python3 test.py -f alter/alter_keep.py
|
||||||
python3 test.py -f alter/alter_cacheLastRow.py
|
python3 test.py -f alter/alter_cacheLastRow.py
|
||||||
python3 test.py -f alter/alter_keep_exception.py
|
python3 test.py -f alter/alter_keep_exception.py
|
||||||
#======================p4-end===============
|
#======================p4-end===============
|
||||||
|
|
|
@ -25,7 +25,7 @@ class TDTestCase:
|
||||||
|
|
||||||
self.tables = 10
|
self.tables = 10
|
||||||
self.rows = 20
|
self.rows = 20
|
||||||
self.columns = 50
|
self.columns = 5
|
||||||
self.perfix = 't'
|
self.perfix = 't'
|
||||||
self.ts = 1601481600000
|
self.ts = 1601481600000
|
||||||
|
|
||||||
|
@ -34,7 +34,7 @@ class TDTestCase:
|
||||||
sql = "create table st(ts timestamp, "
|
sql = "create table st(ts timestamp, "
|
||||||
for i in range(self.columns - 1):
|
for i in range(self.columns - 1):
|
||||||
sql += "c%d int, " % (i + 1)
|
sql += "c%d int, " % (i + 1)
|
||||||
sql += "c50 int) tags(t1 int)"
|
sql += "c5 int) tags(t1 int)"
|
||||||
tdSql.execute(sql)
|
tdSql.execute(sql)
|
||||||
|
|
||||||
for i in range(self.tables):
|
for i in range(self.tables):
|
||||||
|
@ -152,11 +152,34 @@ class TDTestCase:
|
||||||
print("============== alter last cache")
|
print("============== alter last cache")
|
||||||
tdSql.execute("alter database test1 cachelast 1")
|
tdSql.execute("alter database test1 cachelast 1")
|
||||||
self.executeQueries2()
|
self.executeQueries2()
|
||||||
|
|
||||||
|
tdSql.execute("alter database test1 cachelast 2")
|
||||||
|
self.executeQueries2()
|
||||||
|
|
||||||
|
tdSql.execute("alter database test1 cachelast 3")
|
||||||
|
self.executeQueries2()
|
||||||
|
|
||||||
|
|
||||||
|
print("============== alter last cache")
|
||||||
|
tdSql.execute("alter database test1 cachelast 0")
|
||||||
|
self.executeQueries2()
|
||||||
tdDnodes.stop(1)
|
tdDnodes.stop(1)
|
||||||
tdDnodes.start(1)
|
tdDnodes.start(1)
|
||||||
self.executeQueries2()
|
self.executeQueries2()
|
||||||
|
|
||||||
tdSql.execute("alter database test1 cachelast 0")
|
tdSql.execute("alter database test1 cachelast 1")
|
||||||
|
self.executeQueries2()
|
||||||
|
tdDnodes.stop(1)
|
||||||
|
tdDnodes.start(1)
|
||||||
|
self.executeQueries2()
|
||||||
|
|
||||||
|
tdSql.execute("alter database test1 cachelast 2")
|
||||||
|
self.executeQueries2()
|
||||||
|
tdDnodes.stop(1)
|
||||||
|
tdDnodes.start(1)
|
||||||
|
self.executeQueries2()
|
||||||
|
|
||||||
|
tdSql.execute("alter database test1 cachelast 3")
|
||||||
self.executeQueries2()
|
self.executeQueries2()
|
||||||
tdDnodes.stop(1)
|
tdDnodes.stop(1)
|
||||||
tdDnodes.start(1)
|
tdDnodes.start(1)
|
||||||
|
@ -173,6 +196,18 @@ class TDTestCase:
|
||||||
tdDnodes.start(1)
|
tdDnodes.start(1)
|
||||||
self.executeQueries2()
|
self.executeQueries2()
|
||||||
|
|
||||||
|
tdSql.execute("alter database test2 cachelast 0")
|
||||||
|
self.executeQueries2()
|
||||||
|
|
||||||
|
tdSql.execute("alter database test2 cachelast 1")
|
||||||
|
self.executeQueries2()
|
||||||
|
|
||||||
|
tdSql.execute("alter database test2 cachelast 2")
|
||||||
|
self.executeQueries2()
|
||||||
|
|
||||||
|
tdSql.execute("alter database test2 cachelast 3")
|
||||||
|
self.executeQueries2()
|
||||||
|
|
||||||
tdSql.execute("alter database test2 cachelast 0")
|
tdSql.execute("alter database test2 cachelast 0")
|
||||||
self.executeQueries2()
|
self.executeQueries2()
|
||||||
tdDnodes.stop(1)
|
tdDnodes.stop(1)
|
||||||
|
@ -185,6 +220,21 @@ class TDTestCase:
|
||||||
tdDnodes.start(1)
|
tdDnodes.start(1)
|
||||||
self.executeQueries2()
|
self.executeQueries2()
|
||||||
|
|
||||||
|
tdSql.execute("alter database test2 cachelast 2")
|
||||||
|
self.executeQueries2()
|
||||||
|
tdDnodes.stop(1)
|
||||||
|
tdDnodes.start(1)
|
||||||
|
self.executeQueries2()
|
||||||
|
|
||||||
|
tdSql.execute("alter database test2 cachelast 3")
|
||||||
|
self.executeQueries2()
|
||||||
|
tdDnodes.stop(1)
|
||||||
|
tdDnodes.start(1)
|
||||||
|
self.executeQueries2()
|
||||||
|
|
||||||
|
tdSql.query("select last_row(*) from st group by tbname")
|
||||||
|
tdSql.checkRows(10)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
tdLog.success("%s successfully executed" % __file__)
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
|
@ -814,3 +814,121 @@ if $data00 != 1 then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print ====================> TODO stddev + normal column filter
|
print ====================> TODO stddev + normal column filter
|
||||||
|
|
||||||
|
|
||||||
|
print ====================> irate
|
||||||
|
sql select irate(k) from t1
|
||||||
|
if $rows != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != 0.000027778 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select irate(k) from t1 where ts>='2015-8-18 00:30:00.000'
|
||||||
|
if $rows != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != 0.000000000 then
|
||||||
|
print expect 0.000000000, actual $data00
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select irate(k) from t1 where ts>='2015-8-18 00:06:00.000' and ts<='2015-8-18 00:12:000';
|
||||||
|
if $rows != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != 0.005633334 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select irate(k) from t1 interval(10a)
|
||||||
|
if $rows != 6 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 0.000000000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data11 != 0.000000000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data51 != 0.000000000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select count(*),irate(k) from t1 interval(10m)
|
||||||
|
if $rows != 4 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != @15-08-18 00:00:00.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 0.000144445 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data10 != @15-08-18 00:10:00.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data11 != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != 0.000272222 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data20 != @15-08-18 00:20:00.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data21 != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data22 != 0.000000000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data30 != @15-08-18 00:30:00.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data31 != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data32 != 0.000000000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select count(*),irate(k) from t1 interval(10m) order by ts desc
|
||||||
|
if $rows != 4 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data30 != @15-08-18 00:00:00.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data31 != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data32 != 0.000144445 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
|
|
@ -124,6 +124,27 @@ if $rows != 2 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql select * from (select count(*) a, tbname f1 from nest_mt0 group by tbname) t where t.a<0 and f1 = 'nest_tb0';
|
||||||
|
if $rows != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from (select count(*) a, tbname f1 from nest_mt0 group by tbname) t where t.a>0 and f1 = 'nest_tb0';
|
||||||
|
if $rows != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != 10000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != @nest_tb0@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != @nest_tb0@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
print ===================> nest query interval
|
print ===================> nest query interval
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue