Merge branch 'develop' into feature/TD-2942
This commit is contained in:
commit
ebb15e0e6a
|
@ -172,10 +172,10 @@ IF (TD_WINDOWS)
|
|||
ENDIF ()
|
||||
|
||||
IF (TD_MEMORY_SANITIZER)
|
||||
MESSAGE("memory sanitizer detected as true")
|
||||
MESSAGE("memory sanitizer detected as true")
|
||||
SET(DEBUG_FLAGS "/fsanitize=address /Zi /W3 /GL")
|
||||
ELSE ()
|
||||
MESSAGE("memory sanitizer detected as false")
|
||||
MESSAGE("memory sanitizer detected as false")
|
||||
SET(DEBUG_FLAGS "/Zi /W3 /GL")
|
||||
ENDIF ()
|
||||
SET(RELEASE_FLAGS "/W0 /O3 /GL")
|
||||
|
|
|
@ -15,6 +15,7 @@ TDengine是一个高效的存储、查询、分析时序大数据的平台,专
|
|||
* [命令行程序TAOS](/getting-started#console):访问TDengine的简便方式
|
||||
* [极速体验](/getting-started#demo):运行示例程序,快速体验高效的数据插入、查询
|
||||
* [支持平台列表](/getting-started#platforms):TDengine服务器和客户端支持的平台列表
|
||||
* [Kubenetes部署](https://taosdata.github.io/TDengine-Operator/zh/index.html):TDengine在Kubenetes环境进行部署的详细说明
|
||||
|
||||
## [整体架构](/architecture)
|
||||
|
||||
|
|
|
@ -138,6 +138,7 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo);
|
|||
|
||||
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
|
||||
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
|
||||
bool tscIsDiffDerivQuery(SQueryInfo* pQueryInfo);
|
||||
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
|
||||
|
||||
bool tscIsProjectionQuery(SQueryInfo* pQueryInfo);
|
||||
|
|
|
@ -266,6 +266,7 @@ typedef struct SSqlObj {
|
|||
|
||||
typedef struct SSqlStream {
|
||||
SSqlObj *pSql;
|
||||
void * cqhandle; // stream belong to SCQContext handle
|
||||
const char* dstTable;
|
||||
uint32_t streamId;
|
||||
char listed;
|
||||
|
|
|
@ -107,14 +107,10 @@ int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int1
|
|||
return tscInvalidOperationMsg(error, "value expected in timestamp", sToken.z);
|
||||
}
|
||||
|
||||
if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval) != TSDB_CODE_SUCCESS) {
|
||||
if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval, timePrec) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
|
||||
if (timePrec == TSDB_TIME_PRECISION_MILLI) {
|
||||
interval /= 1000;
|
||||
}
|
||||
|
||||
if (sToken.type == TK_PLUS) {
|
||||
useconds += interval;
|
||||
} else {
|
||||
|
@ -468,6 +464,10 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
|
|||
|
||||
int32_t cnt = 0;
|
||||
int32_t j = 0;
|
||||
if (sToken.n >= TSDB_MAX_BYTES_PER_ROW) {
|
||||
return tscSQLSyntaxErrMsg(pInsertParam->msg, "too long string", sToken.z);
|
||||
}
|
||||
|
||||
for (uint32_t k = 1; k < sToken.n - 1; ++k) {
|
||||
if (sToken.z[k] == '\\' || (sToken.z[k] == delim && sToken.z[k + 1] == delim)) {
|
||||
tmpTokenBuf[j] = sToken.z[k + 1];
|
||||
|
@ -711,7 +711,7 @@ static int32_t doParseInsertStatement(SInsertStatementParam *pInsertParam, char
|
|||
}
|
||||
|
||||
code = TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
char tmpTokenBuf[16*1024] = {0}; // used for deleting Escape character: \\, \', \"
|
||||
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
|
||||
|
||||
int32_t numOfRows = 0;
|
||||
code = tsParseValues(str, dataBuf, maxNumOfRows, pInsertParam, &numOfRows, tmpTokenBuf);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "ttimer.h"
|
||||
#include "tutil.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tcq.h"
|
||||
|
||||
#include "taos.h"
|
||||
|
||||
|
@ -294,24 +295,34 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
|
|||
return msgLen;
|
||||
}
|
||||
|
||||
// cqContext->dbconn is killed then call this callback
|
||||
void cqConnKilledNotify(void* handle, void* conn) {
|
||||
if (handle == NULL || conn == NULL){
|
||||
return ;
|
||||
}
|
||||
|
||||
SCqContext* pContext = (SCqContext*) handle;
|
||||
if (pContext->dbConn == conn){
|
||||
atomic_store_ptr(&(pContext->dbConn), NULL);
|
||||
}
|
||||
}
|
||||
|
||||
void tscKillConnection(STscObj *pObj) {
|
||||
// get stream header by locked
|
||||
pthread_mutex_lock(&pObj->mutex);
|
||||
|
||||
SSqlObj *pSql = pObj->sqlList;
|
||||
while (pSql) {
|
||||
pSql = pSql->next;
|
||||
}
|
||||
|
||||
|
||||
SSqlStream *pStream = pObj->streamList;
|
||||
pthread_mutex_unlock(&pObj->mutex);
|
||||
|
||||
while (pStream) {
|
||||
SSqlStream *tmp = pStream->next;
|
||||
// set associate variant to NULL
|
||||
cqConnKilledNotify(pStream->cqhandle, pObj);
|
||||
// taos_close_stream function call pObj->mutet lock , careful death-lock
|
||||
taos_close_stream(pStream);
|
||||
pStream = tmp;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&pObj->mutex);
|
||||
|
||||
tscDebug("connection:%p is killed", pObj);
|
||||
taos_close(pObj);
|
||||
}
|
||||
|
||||
|
|
|
@ -438,7 +438,9 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
}
|
||||
|
||||
} else if (pInfo->type == TSDB_SQL_DROP_DNODE) {
|
||||
pzName->n = strdequote(pzName->z);
|
||||
if (pzName->type == TK_STRING) {
|
||||
pzName->n = strdequote(pzName->z);
|
||||
}
|
||||
strncpy(pCmd->payload, pzName->z, pzName->n);
|
||||
} else { // drop user/account
|
||||
if (pzName->n >= TSDB_USER_LEN) {
|
||||
|
@ -516,7 +518,9 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
}
|
||||
|
||||
SStrToken* id = taosArrayGet(pInfo->pMiscInfo->a, 0);
|
||||
id->n = strdequote(id->z);
|
||||
if (id->type == TK_STRING) {
|
||||
id->n = strdequote(id->z);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -921,18 +925,15 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pS
|
|||
|
||||
// interval is not null
|
||||
SStrToken *t = &pSqlNode->interval.interval;
|
||||
if (parseNatualDuration(t->z, t->n, &pQueryInfo->interval.interval, &pQueryInfo->interval.intervalUnit) != TSDB_CODE_SUCCESS) {
|
||||
if (parseNatualDuration(t->z, t->n, &pQueryInfo->interval.interval,
|
||||
&pQueryInfo->interval.intervalUnit, tinfo.precision) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
|
||||
if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit != 'y') {
|
||||
// if the unit of time window value is millisecond, change the value from microsecond
|
||||
if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) {
|
||||
pQueryInfo->interval.interval = pQueryInfo->interval.interval / 1000;
|
||||
}
|
||||
|
||||
// interval cannot be less than 10 milliseconds
|
||||
if (pQueryInfo->interval.interval < tsMinIntervalTime) {
|
||||
if (convertTimePrecision(pQueryInfo->interval.interval, tinfo.precision, TSDB_TIME_PRECISION_MILLI) < tsMinIntervalTime) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
}
|
||||
|
@ -1008,6 +1009,8 @@ int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pS
|
|||
const char* msg3 = "invalid column name";
|
||||
const char* msg4 = "invalid time window";
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||
// no session window
|
||||
if (!TPARSER_HAS_TOKEN(pSqlNode->sessionVal.gap)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1017,7 +1020,7 @@ int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pS
|
|||
SStrToken* gap = &pSqlNode->sessionVal.gap;
|
||||
|
||||
char timeUnit = 0;
|
||||
if (parseNatualDuration(gap->z, gap->n, &pQueryInfo->sessionWindow.gap, &timeUnit) != TSDB_CODE_SUCCESS) {
|
||||
if (parseNatualDuration(gap->z, gap->n, &pQueryInfo->sessionWindow.gap, &timeUnit, tinfo.precision) != TSDB_CODE_SUCCESS) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
}
|
||||
|
||||
|
@ -1025,13 +1028,6 @@ int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pS
|
|||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
}
|
||||
|
||||
// if the unit of time window value is millisecond, change the value from microsecond
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||
if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) {
|
||||
pQueryInfo->sessionWindow.gap = pQueryInfo->sessionWindow.gap / 1000;
|
||||
}
|
||||
|
||||
if (pQueryInfo->sessionWindow.gap != 0 && pQueryInfo->interval.interval != 0) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
|
@ -1068,7 +1064,8 @@ int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* of
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (parseNatualDuration(t->z, t->n, &pQueryInfo->interval.offset, &pQueryInfo->interval.offsetUnit) != TSDB_CODE_SUCCESS) {
|
||||
if (parseNatualDuration(t->z, t->n, &pQueryInfo->interval.offset,
|
||||
&pQueryInfo->interval.offsetUnit, tinfo.precision) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
|
||||
|
@ -1077,10 +1074,6 @@ int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* of
|
|||
}
|
||||
|
||||
if (pQueryInfo->interval.offsetUnit != 'n' && pQueryInfo->interval.offsetUnit != 'y') {
|
||||
// if the unit of time window value is millisecond, change the value from microsecond
|
||||
if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) {
|
||||
pQueryInfo->interval.offset = pQueryInfo->interval.offset / 1000;
|
||||
}
|
||||
if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit != 'y') {
|
||||
if (pQueryInfo->interval.offset >= pQueryInfo->interval.interval) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
|
@ -1125,12 +1118,10 @@ int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSl
|
|||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
}
|
||||
|
||||
parseAbsoluteDuration(pSliding->z, pSliding->n, &pQueryInfo->interval.sliding);
|
||||
if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) {
|
||||
pQueryInfo->interval.sliding /= 1000;
|
||||
}
|
||||
parseAbsoluteDuration(pSliding->z, pSliding->n, &pQueryInfo->interval.sliding, tinfo.precision);
|
||||
|
||||
if (pQueryInfo->interval.sliding < tsMinSlidingTime) {
|
||||
if (pQueryInfo->interval.sliding <
|
||||
convertTimePrecision(tsMinSlidingTime, TSDB_TIME_PRECISION_MILLI, tinfo.precision)) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg0);
|
||||
}
|
||||
|
||||
|
@ -2160,6 +2151,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
const char* msg9 = "diff/derivative can no be applied to unsigned numeric type";
|
||||
const char* msg10 = "derivative duration should be greater than 1 Second";
|
||||
const char* msg11 = "third parameter in derivative should be 0 or 1";
|
||||
const char* msg12 = "parameter is out of range [1, 100]";
|
||||
|
||||
switch (functionId) {
|
||||
case TSDB_FUNC_COUNT: {
|
||||
|
@ -2563,7 +2555,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
|
||||
int64_t nTop = GET_INT32_VAL(val);
|
||||
if (nTop <= 0 || nTop > 100) { // todo use macro
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg12);
|
||||
}
|
||||
|
||||
// todo REFACTOR
|
||||
|
@ -3312,8 +3304,9 @@ static int32_t doExtractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo,
|
|||
return retVal;
|
||||
}
|
||||
} else if ((colType == TSDB_DATA_TYPE_TIMESTAMP) && (TSDB_DATA_TYPE_BIGINT == pRight->value.nType)) {
|
||||
if ((timePrecision == TSDB_TIME_PRECISION_MILLI) && (pRight->flags & (1 << EXPR_FLAG_US_TIMESTAMP))) {
|
||||
pRight->value.i64 /= 1000;
|
||||
if (pRight->flags & (1 << EXPR_FLAG_NS_TIMESTAMP)) {
|
||||
pRight->value.i64 =
|
||||
convertTimePrecision(pRight->value.i64, TSDB_TIME_PRECISION_NANO, timePrecision);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4856,7 +4849,7 @@ int32_t getTimeRange(STimeWindow* win, tSqlExpr* pRight, int32_t optr, int16_t t
|
|||
|
||||
char* seg = strnchr(pRight->value.pz, '-', pRight->value.nLen, false);
|
||||
if (seg != NULL) {
|
||||
if (taosParseTime(pRight->value.pz, &val, pRight->value.nLen, TSDB_TIME_PRECISION_MICRO, tsDaylight) == TSDB_CODE_SUCCESS) {
|
||||
if (taosParseTime(pRight->value.pz, &val, pRight->value.nLen, timePrecision, tsDaylight) == TSDB_CODE_SUCCESS) {
|
||||
parsed = true;
|
||||
} else {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
|
@ -4869,18 +4862,6 @@ int32_t getTimeRange(STimeWindow* win, tSqlExpr* pRight, int32_t optr, int16_t t
|
|||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
}
|
||||
} else if (pRight->tokenId == TK_INTEGER && timePrecision == TSDB_TIME_PRECISION_MILLI) {
|
||||
/*
|
||||
* if the pRight->tokenId == TK_INTEGER/TK_FLOAT, the value is adaptive, we
|
||||
* need the time precision in metermeta to transfer the value in MICROSECOND
|
||||
*
|
||||
* Additional check to avoid data overflow
|
||||
*/
|
||||
if (pRight->value.i64 <= INT64_MAX / 1000) {
|
||||
pRight->value.i64 *= 1000;
|
||||
}
|
||||
} else if (pRight->tokenId == TK_FLOAT && timePrecision == TSDB_TIME_PRECISION_MILLI) {
|
||||
pRight->value.dKey *= 1000;
|
||||
}
|
||||
|
||||
if (!parsed) {
|
||||
|
@ -4888,33 +4869,19 @@ int32_t getTimeRange(STimeWindow* win, tSqlExpr* pRight, int32_t optr, int16_t t
|
|||
* failed to parse timestamp in regular formation, try next
|
||||
* it may be a epoch time in string format
|
||||
*/
|
||||
tVariantDump(&pRight->value, (char*)&val, TSDB_DATA_TYPE_BIGINT, true);
|
||||
|
||||
/*
|
||||
* transfer it into MICROSECOND format if it is a string, since for
|
||||
* TK_INTEGER/TK_FLOAT the value has been transferred
|
||||
*
|
||||
* additional check to avoid data overflow
|
||||
*/
|
||||
if (pRight->tokenId == TK_STRING && timePrecision == TSDB_TIME_PRECISION_MILLI) {
|
||||
if (val <= INT64_MAX / 1000) {
|
||||
val *= 1000;
|
||||
}
|
||||
if (pRight->flags & (1 << EXPR_FLAG_NS_TIMESTAMP)) {
|
||||
pRight->value.i64 = convertTimePrecision(pRight->value.i64, TSDB_TIME_PRECISION_NANO, timePrecision);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t delta = 1;
|
||||
/* for millisecond, delta is 1ms=1000us */
|
||||
if (timePrecision == TSDB_TIME_PRECISION_MILLI) {
|
||||
delta *= 1000;
|
||||
|
||||
tVariantDump(&pRight->value, (char*)&val, TSDB_DATA_TYPE_BIGINT, true);
|
||||
}
|
||||
|
||||
if (optr == TK_LE) {
|
||||
win->ekey = val;
|
||||
} else if (optr == TK_LT) {
|
||||
win->ekey = val - delta;
|
||||
win->ekey = val - 1;
|
||||
} else if (optr == TK_GT) {
|
||||
win->skey = val + delta;
|
||||
win->skey = val + 1;
|
||||
} else if (optr == TK_GE) {
|
||||
win->skey = val;
|
||||
} else if (optr == TK_EQ) {
|
||||
|
@ -5637,8 +5604,10 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
||||
const char* msg0 = "sample interval can not be less than 10ms.";
|
||||
const char* msg1 = "functions not allowed in select clause";
|
||||
|
||||
if (pQueryInfo->interval.interval != 0 && pQueryInfo->interval.interval < 10 &&
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||
if (pQueryInfo->interval.interval != 0 &&
|
||||
convertTimePrecision(pQueryInfo->interval.interval, tinfo.precision, TSDB_TIME_PRECISION_MILLI)< 10 &&
|
||||
pQueryInfo->interval.intervalUnit != 'n' &&
|
||||
pQueryInfo->interval.intervalUnit != 'y') {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg0);
|
||||
|
@ -6023,11 +5992,15 @@ static int32_t setTimePrecision(SSqlCmd* pCmd, SCreateDbMsg* pMsg, SCreateDbInfo
|
|||
} else if (strncmp(pToken->z, TSDB_TIME_PRECISION_MICRO_STR, pToken->n) == 0 &&
|
||||
strlen(TSDB_TIME_PRECISION_MICRO_STR) == pToken->n) {
|
||||
pMsg->precision = TSDB_TIME_PRECISION_MICRO;
|
||||
} else if (strncmp(pToken->z, TSDB_TIME_PRECISION_NANO_STR, pToken->n) == 0 &&
|
||||
strlen(TSDB_TIME_PRECISION_NANO_STR) == pToken->n) {
|
||||
pMsg->precision = TSDB_TIME_PRECISION_NANO;
|
||||
} else {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -6293,7 +6266,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
|
|||
|
||||
int16_t functionId = pExpr->base.functionId;
|
||||
if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TS ||
|
||||
functionId == TSDB_FUNC_ARITHM) {
|
||||
functionId == TSDB_FUNC_ARITHM || functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -6399,9 +6372,14 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
|
|||
size_t size = tscNumOfExprs(pQueryInfo);
|
||||
|
||||
if (TSDB_COL_IS_TAG(pColIndex->flag)) {
|
||||
|
||||
int32_t f = TSDB_FUNC_TAG;
|
||||
if (tscIsDiffDerivQuery(pQueryInfo)) {
|
||||
f = TSDB_FUNC_TAGPRJ;
|
||||
}
|
||||
|
||||
SColumnIndex index = {.tableIndex = pQueryInfo->groupbyExpr.tableIndex, .columnIndex = colIndex};
|
||||
SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, s->type, s->bytes,
|
||||
getNewResColId(pCmd), s->bytes, true);
|
||||
SExprInfo* pExpr = tscExprAppend(pQueryInfo, f, &index, s->type, s->bytes, getNewResColId(pCmd), s->bytes, true);
|
||||
|
||||
memset(pExpr->base.aliasName, 0, sizeof(pExpr->base.aliasName));
|
||||
tstrncpy(pExpr->base.aliasName, s->name, sizeof(pExpr->base.aliasName));
|
||||
|
@ -6537,7 +6515,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
|
|||
}
|
||||
|
||||
// projection query on super table does not compatible with "group by" syntax
|
||||
if (tscIsProjectionQuery(pQueryInfo)) {
|
||||
if (tscIsProjectionQuery(pQueryInfo) && !(tscIsDiffDerivQuery(pQueryInfo))) {
|
||||
return invalidOperationMsg(msg, msg3);
|
||||
}
|
||||
|
||||
|
@ -6664,9 +6642,10 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg* pCreate) {
|
|||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg);
|
||||
}
|
||||
|
||||
if (pCreate->precision != TSDB_TIME_PRECISION_MILLI && pCreate->precision != TSDB_TIME_PRECISION_MICRO) {
|
||||
snprintf(msg, tListLen(msg), "invalid db option timePrecision: %d valid value: [%d, %d]", pCreate->precision,
|
||||
TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_MICRO);
|
||||
if (pCreate->precision != TSDB_TIME_PRECISION_MILLI && pCreate->precision != TSDB_TIME_PRECISION_MICRO &&
|
||||
pCreate->precision != TSDB_TIME_PRECISION_NANO) {
|
||||
snprintf(msg, tListLen(msg), "invalid db option timePrecision: %d valid value: [%d, %d, %d]", pCreate->precision,
|
||||
TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg);
|
||||
}
|
||||
|
||||
|
@ -7065,6 +7044,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
|||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
|
||||
// project query primary column must be timestamp type
|
||||
if (tscIsProjectionQuery(pQueryInfo)) {
|
||||
SExprInfo* pExpr = tscExprGet(pQueryInfo, 0);
|
||||
if (pExpr->base.colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
|
@ -7073,7 +7053,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
|||
} else {
|
||||
if (pQueryInfo->interval.interval == 0) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// set the created table[stream] name
|
||||
|
@ -7744,7 +7724,8 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
const char* msg2 = "too many tables in from clause";
|
||||
const char* msg3 = "start(end) time of query range required or time range too large";
|
||||
const char* msg4 = "interval query not supported, since the result of sub query not include valid timestamp column";
|
||||
const char* msg9 = "only tag query not compatible with normal column filter";
|
||||
const char* msg5 = "only tag query not compatible with normal column filter";
|
||||
const char* msg6 = "not support stddev/percentile in outer query yet";
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -7785,17 +7766,20 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
|
||||
// todo NOT support yet
|
||||
for(int32_t i = 0; i < tscNumOfExprs(pQueryInfo); ++i) {
|
||||
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
|
||||
int32_t f = pExpr->base.functionId;
|
||||
if (f == TSDB_FUNC_STDDEV || f == TSDB_FUNC_PERCT) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
|
||||
}
|
||||
}
|
||||
|
||||
// validate the query filter condition info
|
||||
if (pSqlNode->pWhere != NULL) {
|
||||
if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
|
||||
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
|
||||
if (pTableMeta->tableInfo.precision == TSDB_TIME_PRECISION_MILLI) {
|
||||
pQueryInfo->window.skey = pQueryInfo->window.skey / 1000;
|
||||
pQueryInfo->window.ekey = pQueryInfo->window.ekey / 1000;
|
||||
}
|
||||
}
|
||||
|
||||
// validate the interval info
|
||||
|
@ -7849,18 +7833,12 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
}
|
||||
|
||||
// set where info
|
||||
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||
|
||||
if (pSqlNode->pWhere != NULL) {
|
||||
if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
|
||||
pSqlNode->pWhere = NULL;
|
||||
if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) {
|
||||
pQueryInfo->window.skey = pQueryInfo->window.skey / 1000;
|
||||
pQueryInfo->window.ekey = pQueryInfo->window.ekey / 1000;
|
||||
}
|
||||
} else {
|
||||
if (taosArrayGetSize(pSqlNode->from->list) > 1) { // Cross join not allowed yet
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), "cross join not supported yet");
|
||||
|
@ -7898,7 +7876,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumn* pCols = taosArrayGetP(pQueryInfo->colList, i);
|
||||
if (pCols->info.flist.numOfFilters > 0) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9);
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,9 +53,7 @@ static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, in
|
|||
|
||||
if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') {
|
||||
// change to ms
|
||||
if (prec == TSDB_TIME_PRECISION_MICRO) {
|
||||
slidingTime = slidingTime / 1000;
|
||||
}
|
||||
slidingTime = convertTimePrecision(slidingTime, pStream->precision, TSDB_TIME_PRECISION_MILLI);
|
||||
|
||||
if (slidingTime < retryDelta) {
|
||||
return slidingTime;
|
||||
|
@ -139,8 +137,13 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
|
|||
|
||||
pStream->numOfRes = 0; // reset the numOfRes.
|
||||
SSqlObj *pSql = pStream->pSql;
|
||||
|
||||
// pSql == NULL maybe killStream already called
|
||||
if(pSql == NULL) {
|
||||
return ;
|
||||
}
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
|
||||
tscDebug("0x%"PRIx64" timer launch query", pSql->self);
|
||||
tscDebug("0x%"PRIx64" add into timer", pSql->self);
|
||||
|
||||
if (pStream->isProject) {
|
||||
/*
|
||||
|
@ -157,11 +160,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
|
|||
pQueryInfo->window.skey = pStream->stime;
|
||||
int64_t etime = taosGetTimestamp(pStream->precision);
|
||||
// delay to wait all data in last time window
|
||||
if (pStream->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
etime -= tsMaxStreamComputDelay * 1000l;
|
||||
} else {
|
||||
etime -= tsMaxStreamComputDelay;
|
||||
}
|
||||
etime -= convertTimePrecision(tsMaxStreamComputDelay, TSDB_TIME_PRECISION_MILLI, pStream->precision);
|
||||
if (etime > pStream->etime) {
|
||||
etime = pStream->etime;
|
||||
} else if (pStream->interval.intervalUnit != 'y' && pStream->interval.intervalUnit != 'n') {
|
||||
|
@ -178,8 +177,8 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
|
|||
int64_t timer = pStream->interval.sliding;
|
||||
if (pStream->interval.intervalUnit == 'y' || pStream->interval.intervalUnit == 'n') {
|
||||
timer = 86400 * 1000l;
|
||||
} else if (pStream->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
timer /= 1000l;
|
||||
} else {
|
||||
timer = convertTimePrecision(timer, pStream->precision, TSDB_TIME_PRECISION_MILLI);
|
||||
}
|
||||
tscSetRetryTimer(pStream, pSql, timer);
|
||||
return;
|
||||
|
@ -339,8 +338,12 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
|
|||
if (pStream->isProject) {
|
||||
int64_t now = taosGetTimestamp(pStream->precision);
|
||||
int64_t etime = now > pStream->etime ? pStream->etime : now;
|
||||
|
||||
if (pStream->etime < now && now - pStream->etime > tsMaxRetentWindow) {
|
||||
int64_t maxRetent = tsMaxRetentWindow * 1000;
|
||||
if(pStream->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
maxRetent *= 1000;
|
||||
}
|
||||
|
||||
if (pStream->etime < now && now - pStream->etime > maxRetent) {
|
||||
/*
|
||||
* current time window will be closed, since it too early to exceed the maxRetentWindow value
|
||||
*/
|
||||
|
@ -369,9 +372,8 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
|
|||
}
|
||||
|
||||
static int64_t getLaunchTimeDelay(const SSqlStream* pStream) {
|
||||
int64_t maxDelay =
|
||||
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMaxStreamComputDelay * 1000L : tsMaxStreamComputDelay;
|
||||
|
||||
int64_t maxDelay = convertTimePrecision(tsMaxStreamComputDelay, TSDB_TIME_PRECISION_MILLI, pStream->precision);
|
||||
|
||||
int64_t delayDelta = maxDelay;
|
||||
if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') {
|
||||
delayDelta = (int64_t)(pStream->interval.sliding * tsStreamComputDelayRatio);
|
||||
|
@ -438,16 +440,14 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
|
|||
|
||||
timer += getLaunchTimeDelay(pStream);
|
||||
|
||||
if (pStream->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
timer = timer / 1000L;
|
||||
}
|
||||
timer = convertTimePrecision(timer, pStream->precision, TSDB_TIME_PRECISION_MILLI);
|
||||
|
||||
tscSetRetryTimer(pStream, pSql, timer);
|
||||
}
|
||||
|
||||
static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
|
||||
int64_t minIntervalTime =
|
||||
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime;
|
||||
convertTimePrecision(tsMinIntervalTime, TSDB_TIME_PRECISION_MILLI, pStream->precision);
|
||||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
|
||||
|
||||
|
@ -471,7 +471,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
|
|||
}
|
||||
|
||||
int64_t minSlidingTime =
|
||||
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime;
|
||||
convertTimePrecision(tsMinSlidingTime, TSDB_TIME_PRECISION_MILLI, pStream->precision);
|
||||
|
||||
if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit!= 'y' && pQueryInfo->interval.sliding < minSlidingTime) {
|
||||
tscWarn("0x%"PRIx64" stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64, pSql->self, pStream,
|
||||
|
@ -539,13 +539,12 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) {
|
|||
timer = pStream->stime - now;
|
||||
}
|
||||
|
||||
int64_t startDelay =
|
||||
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsStreamCompStartDelay * 1000L : tsStreamCompStartDelay;
|
||||
|
||||
int64_t startDelay = convertTimePrecision(tsStreamCompStartDelay, TSDB_TIME_PRECISION_MILLI, pStream->precision);
|
||||
|
||||
timer += getLaunchTimeDelay(pStream);
|
||||
timer += startDelay;
|
||||
|
||||
return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer;
|
||||
return convertTimePrecision(timer, pStream->precision, TSDB_TIME_PRECISION_MILLI);
|
||||
}
|
||||
|
||||
static void tscCreateStream(void *param, TAOS_RES *res, int code) {
|
||||
|
@ -664,7 +663,7 @@ void cbParseSql(void* param, TAOS_RES* res, int code) {
|
|||
}
|
||||
|
||||
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
|
||||
int64_t stime, void *param, void (*callback)(void *)) {
|
||||
int64_t stime, void *param, void (*callback)(void *), void* cqhandle) {
|
||||
STscObj *pObj = (STscObj *)taos;
|
||||
if (pObj == NULL || pObj->signature != pObj) return NULL;
|
||||
|
||||
|
@ -697,6 +696,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
|
|||
pStream->callback = callback;
|
||||
pStream->param = param;
|
||||
pStream->pSql = pSql;
|
||||
pStream->cqhandle = cqhandle;
|
||||
pSql->pStream = pStream;
|
||||
pSql->param = pStream;
|
||||
pSql->maxRetry = TSDB_MAX_REPLICA;
|
||||
|
@ -745,7 +745,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
|
|||
|
||||
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
|
||||
int64_t stime, void *param, void (*callback)(void *)) {
|
||||
return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback);
|
||||
return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback, NULL);
|
||||
}
|
||||
|
||||
void taos_close_stream(TAOS_STREAM *handle) {
|
||||
|
|
|
@ -255,10 +255,14 @@ bool tscIsProjectionQuery(SQueryInfo* pQueryInfo) {
|
|||
size_t size = tscNumOfExprs(pQueryInfo);
|
||||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
int32_t functionId = tscExprGet(pQueryInfo, i)->base.functionId;
|
||||
int32_t f = tscExprGet(pQueryInfo, i)->base.functionId;
|
||||
if (f == TSDB_FUNC_TS_DUMMY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG &&
|
||||
functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) {
|
||||
if (f != TSDB_FUNC_PRJ && f != TSDB_FUNC_TAGPRJ && f != TSDB_FUNC_TAG &&
|
||||
f != TSDB_FUNC_TS && f != TSDB_FUNC_ARITHM && f != TSDB_FUNC_DIFF &&
|
||||
f != TSDB_FUNC_DERIVATIVE) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -266,6 +270,24 @@ bool tscIsProjectionQuery(SQueryInfo* pQueryInfo) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool tscIsDiffDerivQuery(SQueryInfo* pQueryInfo) {
|
||||
size_t size = tscNumOfExprs(pQueryInfo);
|
||||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
int32_t f = tscExprGet(pQueryInfo, i)->base.functionId;
|
||||
if (f == TSDB_FUNC_TS_DUMMY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (f == TSDB_FUNC_DIFF || f == TSDB_FUNC_DERIVATIVE) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
bool tscHasColumnFilter(SQueryInfo* pQueryInfo) {
|
||||
// filter on primary timestamp column
|
||||
if (pQueryInfo->window.skey != INT64_MIN || pQueryInfo->window.ekey != INT64_MAX) {
|
||||
|
|
|
@ -831,6 +831,16 @@ static void doInitGlobalConfig(void) {
|
|||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "precision";
|
||||
cfg.ptr = &tsTimePrecision;
|
||||
cfg.valType = TAOS_CFG_VTYPE_INT8;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
|
||||
cfg.minValue = TSDB_MIN_PRECISION;
|
||||
cfg.maxValue = TSDB_MAX_PRECISION;
|
||||
cfg.ptrLength = 0;
|
||||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "comp";
|
||||
cfg.ptr = &tsCompression;
|
||||
cfg.valType = TAOS_CFG_VTYPE_INT8;
|
||||
|
@ -901,6 +911,16 @@ static void doInitGlobalConfig(void) {
|
|||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "cachelast";
|
||||
cfg.ptr = &tsCacheLastRow;
|
||||
cfg.valType = TAOS_CFG_VTYPE_INT8;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
|
||||
cfg.minValue = TSDB_MIN_DB_CACHE_LAST_ROW;
|
||||
cfg.maxValue = TSDB_MAX_DB_CACHE_LAST_ROW;
|
||||
cfg.ptrLength = 0;
|
||||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "mqttHostName";
|
||||
cfg.ptr = tsMqttHostName;
|
||||
cfg.valType = TAOS_CFG_VTYPE_STRING;
|
||||
|
|
|
@ -74,7 +74,7 @@ void tVariantCreate(tVariant *pVar, SStrToken *token) {
|
|||
|
||||
case TSDB_DATA_TYPE_BINARY: {
|
||||
pVar->pz = strndup(token->z, token->n);
|
||||
pVar->nLen = strdequote(pVar->pz);
|
||||
pVar->nLen = strRmquote(pVar->pz, token->n);
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -303,7 +303,7 @@ public class TSDBPreparedStatementTest {
|
|||
stmt.execute("create database dbtest");
|
||||
Assert.assertThrows(SQLException.class, () -> stmt.execute("create database dbtest"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void setBoolean() throws SQLException {
|
||||
// given
|
||||
|
|
|
@ -177,7 +177,8 @@ public class TSDBResultSetTest {
|
|||
rs.getAsciiStream("f1");
|
||||
}
|
||||
|
||||
@Test(expected = SQLFeatureNotSupportedException.class)
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test(expected = SQLFeatureNotSupportedException.class)
|
||||
public void getUnicodeStream() throws SQLException {
|
||||
rs.getUnicodeStream("f1");
|
||||
}
|
||||
|
|
|
@ -38,21 +38,6 @@
|
|||
#define cDebug(...) { if (cqDebugFlag & DEBUG_DEBUG) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }}
|
||||
#define cTrace(...) { if (cqDebugFlag & DEBUG_TRACE) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }}
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int32_t master;
|
||||
int32_t num; // number of continuous streams
|
||||
char user[TSDB_USER_LEN];
|
||||
char pass[TSDB_KEY_LEN];
|
||||
char db[TSDB_DB_NAME_LEN];
|
||||
FCqWrite cqWrite;
|
||||
struct SCqObj *pHead;
|
||||
void *dbConn;
|
||||
void *tmrCtrl;
|
||||
pthread_mutex_t mutex;
|
||||
int32_t delete;
|
||||
int32_t cqObjNum;
|
||||
} SCqContext;
|
||||
|
||||
typedef struct SCqObj {
|
||||
tmr_h tmrId;
|
||||
|
@ -439,7 +424,7 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
|
|||
|
||||
// inner implement in tscStream.c
|
||||
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
|
||||
int64_t stime, void *param, void (*callback)(void *));
|
||||
int64_t stime, void *param, void (*callback)(void *), void* cqhandle);
|
||||
|
||||
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
|
||||
pObj->pContext = pContext;
|
||||
|
@ -453,7 +438,8 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
|
|||
pObj->tmrId = 0;
|
||||
|
||||
if (pObj->pStream == NULL) {
|
||||
pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL);
|
||||
pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, \
|
||||
INT64_MIN, (void *)pObj->rid, NULL, pContext);
|
||||
|
||||
// TODO the pObj->pStream may be released if error happens
|
||||
if (pObj->pStream) {
|
||||
|
|
|
@ -31,6 +31,23 @@ typedef struct {
|
|||
FCqWrite cqWrite;
|
||||
} SCqCfg;
|
||||
|
||||
// SCqContext
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int32_t master;
|
||||
int32_t num; // number of continuous streams
|
||||
char user[TSDB_USER_LEN];
|
||||
char pass[TSDB_KEY_LEN];
|
||||
char db[TSDB_DB_NAME_LEN];
|
||||
FCqWrite cqWrite;
|
||||
struct SCqObj *pHead;
|
||||
void *dbConn;
|
||||
void *tmrCtrl;
|
||||
pthread_mutex_t mutex;
|
||||
int32_t delete;
|
||||
int32_t cqObjNum;
|
||||
} SCqContext;
|
||||
|
||||
// the following API shall be called by vnode
|
||||
void *cqOpen(void *ahandle, const SCqCfg *pCfg);
|
||||
void cqClose(void *handle);
|
||||
|
|
|
@ -398,7 +398,10 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
|
|||
|
||||
time_t tt;
|
||||
int32_t ms = 0;
|
||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||
tt = (time_t)(val / 1000000000);
|
||||
ms = val % 1000000000;
|
||||
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
tt = (time_t)(val / 1000000);
|
||||
ms = val % 1000000;
|
||||
} else {
|
||||
|
@ -419,7 +422,9 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
|
|||
#endif
|
||||
if (tt <= 0 && ms < 0) {
|
||||
tt--;
|
||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||
ms += 1000000000;
|
||||
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
ms += 1000000;
|
||||
} else {
|
||||
ms += 1000;
|
||||
|
@ -427,9 +432,11 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
|
|||
}
|
||||
|
||||
struct tm* ptm = localtime(&tt);
|
||||
size_t pos = strftime(buf, 32, "%Y-%m-%d %H:%M:%S", ptm);
|
||||
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
|
||||
|
||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||
sprintf(buf + pos, ".%09d", ms);
|
||||
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
sprintf(buf + pos, ".%06d", ms);
|
||||
} else {
|
||||
sprintf(buf + pos, ".%03d", ms);
|
||||
|
@ -778,6 +785,8 @@ static int calcColWidth(TAOS_FIELD* field, int precision) {
|
|||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
if (args.is_raw_time) {
|
||||
return MAX(14, width);
|
||||
} if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||
return MAX(29, width);
|
||||
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
return MAX(26, width); // '2020-01-01 00:00:00.000000'
|
||||
} else {
|
||||
|
|
|
@ -1852,7 +1852,9 @@ static void printfQueryMeta() {
|
|||
|
||||
static char* formatTimestamp(char* buf, int64_t val, int precision) {
|
||||
time_t tt;
|
||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||
tt = (time_t)(val / 1000000000);
|
||||
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
tt = (time_t)(val / 1000000);
|
||||
} else {
|
||||
tt = (time_t)(val / 1000);
|
||||
|
@ -1873,7 +1875,9 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
|
|||
struct tm* ptm = localtime(&tt);
|
||||
size_t pos = strftime(buf, 32, "%Y-%m-%d %H:%M:%S", ptm);
|
||||
|
||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||
sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
|
||||
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
sprintf(buf + pos, ".%06d", (int)(val % 1000000));
|
||||
} else {
|
||||
sprintf(buf + pos, ".%03d", (int)(val % 1000));
|
||||
|
@ -6253,9 +6257,11 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
|||
if (0 != precision[0]) {
|
||||
if (0 == strncasecmp(precision, "ms", 2)) {
|
||||
timePrec = TSDB_TIME_PRECISION_MILLI;
|
||||
} else if (0 == strncasecmp(precision, "us", 2)) {
|
||||
} else if (0 == strncasecmp(precision, "us", 2)) {
|
||||
timePrec = TSDB_TIME_PRECISION_MICRO;
|
||||
} else {
|
||||
} else if (0 == strncasecmp(precision, "ns", 2)) {
|
||||
timePrec = TSDB_TIME_PRECISION_NANO;
|
||||
} else {
|
||||
errorPrint("Not support precision: %s\n", precision);
|
||||
exit(-1);
|
||||
}
|
||||
|
|
|
@ -834,8 +834,13 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
|
|||
#endif
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
char *prec = (pDb->cfg.precision == TSDB_TIME_PRECISION_MILLI) ? TSDB_TIME_PRECISION_MILLI_STR
|
||||
: TSDB_TIME_PRECISION_MICRO_STR;
|
||||
char *prec = NULL;
|
||||
switch (pDb->cfg.precision) {
|
||||
case TSDB_TIME_PRECISION_MILLI: prec = TSDB_TIME_PRECISION_MILLI_STR; break;
|
||||
case TSDB_TIME_PRECISION_MICRO: prec = TSDB_TIME_PRECISION_MICRO_STR; break;
|
||||
case TSDB_TIME_PRECISION_NANO: prec = TSDB_TIME_PRECISION_NANO_STR; break;
|
||||
default: assert(false); break;
|
||||
}
|
||||
STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
|
||||
cols++;
|
||||
|
||||
|
|
|
@ -55,6 +55,13 @@ static FORCE_INLINE int64_t taosGetTimestampUs() {
|
|||
return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec;
|
||||
}
|
||||
|
||||
//@return timestamp in nanosecond
|
||||
static FORCE_INLINE int64_t taosGetTimestampNs() {
|
||||
struct timespec systemTime = {0};
|
||||
clock_gettime(CLOCK_REALTIME, &systemTime);
|
||||
return (int64_t)systemTime.tv_sec * 1000000000L + (int64_t)systemTime.tv_nsec;
|
||||
}
|
||||
|
||||
/*
|
||||
* @return timestamp decided by global conf variable, tsTimePrecision
|
||||
* if precision == TSDB_TIME_PRECISION_MICRO, it returns timestamp in microsecond.
|
||||
|
@ -63,7 +70,9 @@ static FORCE_INLINE int64_t taosGetTimestampUs() {
|
|||
static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) {
|
||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
return taosGetTimestampUs();
|
||||
} else {
|
||||
} else if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||
return taosGetTimestampNs();
|
||||
}else {
|
||||
return taosGetTimestampMs();
|
||||
}
|
||||
}
|
||||
|
@ -88,12 +97,13 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);
|
|||
int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision);
|
||||
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision);
|
||||
|
||||
int32_t parseAbsoluteDuration(char* token, int32_t tokenlen, int64_t* ts);
|
||||
int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit);
|
||||
int32_t parseAbsoluteDuration(char* token, int32_t tokenlen, int64_t* ts, int32_t timePrecision);
|
||||
int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit, int32_t timePrecision);
|
||||
|
||||
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);
|
||||
void deltaToUtcInitOnce();
|
||||
|
||||
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -14,7 +14,13 @@
|
|||
*/
|
||||
|
||||
#define _BSD_SOURCE
|
||||
|
||||
#ifdef DARWIN
|
||||
#define _XOPEN_SOURCE
|
||||
#else
|
||||
#define _XOPEN_SOURCE 500
|
||||
#endif
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "os.h"
|
||||
|
@ -119,8 +125,9 @@ int64_t parseFraction(char* str, char** end, int32_t timePrec) {
|
|||
|
||||
const int32_t MILLI_SEC_FRACTION_LEN = 3;
|
||||
const int32_t MICRO_SEC_FRACTION_LEN = 6;
|
||||
const int32_t NANO_SEC_FRACTION_LEN = 9;
|
||||
|
||||
int32_t factor[6] = {1, 10, 100, 1000, 10000, 100000};
|
||||
int32_t factor[9] = {1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000};
|
||||
int32_t times = 1;
|
||||
|
||||
while (str[i] >= '0' && str[i] <= '9') {
|
||||
|
@ -140,12 +147,17 @@ int64_t parseFraction(char* str, char** end, int32_t timePrec) {
|
|||
}
|
||||
|
||||
times = MILLI_SEC_FRACTION_LEN - i;
|
||||
} else {
|
||||
assert(timePrec == TSDB_TIME_PRECISION_MICRO);
|
||||
} else if (timePrec == TSDB_TIME_PRECISION_MICRO) {
|
||||
if (i >= MICRO_SEC_FRACTION_LEN) {
|
||||
i = MICRO_SEC_FRACTION_LEN;
|
||||
}
|
||||
times = MICRO_SEC_FRACTION_LEN - i;
|
||||
} else {
|
||||
assert(timePrec == TSDB_TIME_PRECISION_NANO);
|
||||
if (i >= NANO_SEC_FRACTION_LEN) {
|
||||
i = NANO_SEC_FRACTION_LEN;
|
||||
}
|
||||
times = NANO_SEC_FRACTION_LEN - i;
|
||||
}
|
||||
|
||||
fraction = strnatoi(str, i) * factor[times];
|
||||
|
@ -202,7 +214,9 @@ int32_t parseTimezone(char* str, int64_t* tzOffset) {
|
|||
* 2013-04-12T15:52:01.123+0800
|
||||
*/
|
||||
int32_t parseTimeWithTz(char* timestr, int64_t* time, int32_t timePrec) {
|
||||
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : 1000000;
|
||||
|
||||
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 :
|
||||
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
|
||||
int64_t tzOffset = 0;
|
||||
|
||||
struct tm tm = {0};
|
||||
|
@ -287,7 +301,8 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) {
|
|||
}
|
||||
}
|
||||
|
||||
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : 1000000;
|
||||
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 :
|
||||
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
|
||||
*time = factor * seconds + fraction;
|
||||
|
||||
return 0;
|
||||
|
@ -315,37 +330,50 @@ int32_t parseLocaltimeWithDst(char* timestr, int64_t* time, int32_t timePrec) {
|
|||
}
|
||||
}
|
||||
|
||||
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : 1000000;
|
||||
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 :
|
||||
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
|
||||
*time = factor * seconds + fraction;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static int32_t getDurationInUs(int64_t val, char unit, int64_t* result) {
|
||||
*result = val;
|
||||
|
||||
int64_t factor = 1000L;
|
||||
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) {
|
||||
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI ||
|
||||
fromPrecision == TSDB_TIME_PRECISION_MICRO ||
|
||||
fromPrecision == TSDB_TIME_PRECISION_NANO);
|
||||
assert(toPrecision == TSDB_TIME_PRECISION_MILLI ||
|
||||
toPrecision == TSDB_TIME_PRECISION_MICRO ||
|
||||
toPrecision == TSDB_TIME_PRECISION_NANO);
|
||||
static double factors[3][3] = { {1., 1000., 1000000.},
|
||||
{1.0 / 1000, 1., 1000.},
|
||||
{1.0 / 1000000, 1.0 / 1000, 1.} };
|
||||
return (int64_t)((double)time * factors[fromPrecision][toPrecision]);
|
||||
}
|
||||
static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision) {
|
||||
|
||||
switch (unit) {
|
||||
case 's':
|
||||
(*result) *= MILLISECOND_PER_SECOND*factor;
|
||||
(*result) = convertTimePrecision(val * MILLISECOND_PER_SECOND, TSDB_TIME_PRECISION_MILLI, timePrecision);
|
||||
break;
|
||||
case 'm':
|
||||
(*result) *= MILLISECOND_PER_MINUTE*factor;
|
||||
(*result) = convertTimePrecision(val * MILLISECOND_PER_MINUTE, TSDB_TIME_PRECISION_MILLI, timePrecision);
|
||||
break;
|
||||
case 'h':
|
||||
(*result) *= MILLISECOND_PER_HOUR*factor;
|
||||
(*result) = convertTimePrecision(val * MILLISECOND_PER_HOUR, TSDB_TIME_PRECISION_MILLI, timePrecision);
|
||||
break;
|
||||
case 'd':
|
||||
(*result) *= MILLISECOND_PER_DAY*factor;
|
||||
(*result) = convertTimePrecision(val * MILLISECOND_PER_DAY, TSDB_TIME_PRECISION_MILLI, timePrecision);
|
||||
break;
|
||||
case 'w':
|
||||
(*result) *= MILLISECOND_PER_WEEK*factor;
|
||||
(*result) = convertTimePrecision(val * MILLISECOND_PER_WEEK, TSDB_TIME_PRECISION_MILLI, timePrecision);
|
||||
break;
|
||||
case 'a':
|
||||
(*result) *= factor;
|
||||
(*result) = convertTimePrecision(val, TSDB_TIME_PRECISION_MILLI, timePrecision);
|
||||
break;
|
||||
case 'u':
|
||||
(*result) = convertTimePrecision(val, TSDB_TIME_PRECISION_MICRO, timePrecision);
|
||||
break;
|
||||
case 'b':
|
||||
(*result) = convertTimePrecision(val, TSDB_TIME_PRECISION_NANO, timePrecision);
|
||||
break;
|
||||
default: {
|
||||
return -1;
|
||||
|
@ -357,6 +385,8 @@ static int32_t getDurationInUs(int64_t val, char unit, int64_t* result) {
|
|||
}
|
||||
|
||||
/*
|
||||
* b - nanoseconds;
|
||||
* u - microseconds;
|
||||
* a - Millionseconds
|
||||
* s - Seconds
|
||||
* m - Minutes
|
||||
|
@ -366,7 +396,7 @@ static int32_t getDurationInUs(int64_t val, char unit, int64_t* result) {
|
|||
* n - Months (30 days)
|
||||
* y - Years (365 days)
|
||||
*/
|
||||
int32_t parseAbsoluteDuration(char* token, int32_t tokenlen, int64_t* duration) {
|
||||
int32_t parseAbsoluteDuration(char* token, int32_t tokenlen, int64_t* duration, int32_t timePrecision) {
|
||||
errno = 0;
|
||||
char* endPtr = NULL;
|
||||
|
||||
|
@ -382,10 +412,10 @@ int32_t parseAbsoluteDuration(char* token, int32_t tokenlen, int64_t* duration)
|
|||
return -1;
|
||||
}
|
||||
|
||||
return getDurationInUs(timestamp, unit, duration);
|
||||
return getDuration(timestamp, unit, duration, timePrecision);
|
||||
}
|
||||
|
||||
int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit) {
|
||||
int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit, int32_t timePrecision) {
|
||||
errno = 0;
|
||||
|
||||
/* get the basic numeric value */
|
||||
|
@ -399,7 +429,7 @@ int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* durati
|
|||
return 0;
|
||||
}
|
||||
|
||||
return getDurationInUs(*duration, *unit, duration);
|
||||
return getDuration(*duration, *unit, duration, timePrecision);
|
||||
}
|
||||
|
||||
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
|
||||
|
|
|
@ -64,8 +64,8 @@ void httpJsonOriginString(JsonBuf* buf, char* sVal, int32_t len);
|
|||
void httpJsonStringForTransMean(JsonBuf* buf, char* SVal, int32_t maxLen);
|
||||
void httpJsonInt64(JsonBuf* buf, int64_t num);
|
||||
void httpJsonUInt64(JsonBuf* buf, uint64_t num);
|
||||
void httpJsonTimestamp(JsonBuf* buf, int64_t t, bool us);
|
||||
void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, bool us);
|
||||
void httpJsonTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision);
|
||||
void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision);
|
||||
void httpJsonInt(JsonBuf* buf, int32_t num);
|
||||
void httpJsonUInt(JsonBuf* buf, uint32_t num);
|
||||
void httpJsonFloat(JsonBuf* buf, float num);
|
||||
|
|
|
@ -262,42 +262,92 @@ void httpJsonUInt64(JsonBuf* buf, uint64_t num) {
|
|||
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%" PRIu64, num);
|
||||
}
|
||||
|
||||
void httpJsonTimestamp(JsonBuf* buf, int64_t t, bool us) {
|
||||
void httpJsonTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) {
|
||||
char ts[35] = {0};
|
||||
struct tm* ptm;
|
||||
int32_t precision = 1000;
|
||||
if (us) {
|
||||
precision = 1000000;
|
||||
|
||||
int32_t fractionLen;
|
||||
char* format = NULL;
|
||||
time_t quot = 0;
|
||||
long mod = 0;
|
||||
|
||||
switch (timePrecision) {
|
||||
case TSDB_TIME_PRECISION_MILLI: {
|
||||
quot = t / 1000;
|
||||
fractionLen = 5;
|
||||
format = ".%03" PRId64;
|
||||
mod = t % 1000;
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_TIME_PRECISION_MICRO: {
|
||||
quot = t / 1000000;
|
||||
fractionLen = 8;
|
||||
format = ".%06" PRId64;
|
||||
mod = t % 1000000;
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_TIME_PRECISION_NANO: {
|
||||
quot = t / 1000000000;
|
||||
fractionLen = 11;
|
||||
format = ".%09" PRId64;
|
||||
mod = t % 1000000000;
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
assert(false);
|
||||
}
|
||||
|
||||
time_t tt = t / precision;
|
||||
ptm = localtime(&tt);
|
||||
ptm = localtime(");
|
||||
int32_t length = (int32_t)strftime(ts, 35, "%Y-%m-%d %H:%M:%S", ptm);
|
||||
if (us) {
|
||||
length += snprintf(ts + length, 8, ".%06" PRId64, t % precision);
|
||||
} else {
|
||||
length += snprintf(ts + length, 5, ".%03" PRId64, t % precision);
|
||||
}
|
||||
length += snprintf(ts + length, fractionLen, format, mod);
|
||||
|
||||
httpJsonString(buf, ts, length);
|
||||
}
|
||||
|
||||
void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, bool us) {
|
||||
void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) {
|
||||
char ts[40] = {0};
|
||||
struct tm* ptm;
|
||||
int32_t precision = 1000;
|
||||
if (us) {
|
||||
precision = 1000000;
|
||||
|
||||
int32_t fractionLen;
|
||||
char* format = NULL;
|
||||
time_t quot = 0;
|
||||
long mod = 0;
|
||||
|
||||
switch (timePrecision) {
|
||||
case TSDB_TIME_PRECISION_MILLI: {
|
||||
quot = t / 1000;
|
||||
fractionLen = 5;
|
||||
format = ".%03" PRId64;
|
||||
mod = t % 1000;
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_TIME_PRECISION_MICRO: {
|
||||
quot = t / 1000000;
|
||||
fractionLen = 8;
|
||||
format = ".%06" PRId64;
|
||||
mod = t % 1000000;
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_TIME_PRECISION_NANO: {
|
||||
quot = t / 1000000000;
|
||||
fractionLen = 11;
|
||||
format = ".%09" PRId64;
|
||||
mod = t % 1000000000;
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
assert(false);
|
||||
}
|
||||
|
||||
time_t tt = t / precision;
|
||||
ptm = localtime(&tt);
|
||||
ptm = localtime(");
|
||||
int32_t length = (int32_t)strftime(ts, 40, "%Y-%m-%dT%H:%M:%S", ptm);
|
||||
if (us) {
|
||||
length += snprintf(ts + length, 8, ".%06" PRId64, t % precision);
|
||||
} else {
|
||||
length += snprintf(ts + length, 5, ".%03" PRId64, t % precision);
|
||||
}
|
||||
length += snprintf(ts + length, fractionLen, format, mod);
|
||||
length += (int32_t)strftime(ts + length, 40 - length, "%z", ptm);
|
||||
|
||||
httpJsonString(buf, ts, length);
|
||||
|
|
|
@ -186,13 +186,11 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
|
|||
break;
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
if (timestampFormat == REST_TIMESTAMP_FMT_LOCAL_STRING) {
|
||||
httpJsonTimestamp(jsonBuf, *((int64_t *)row[i]),
|
||||
taos_result_precision(result) == TSDB_TIME_PRECISION_MICRO);
|
||||
httpJsonTimestamp(jsonBuf, *((int64_t *)row[i]), taos_result_precision(result));
|
||||
} else if (timestampFormat == REST_TIMESTAMP_FMT_TIMESTAMP) {
|
||||
httpJsonInt64(jsonBuf, *((int64_t *)row[i]));
|
||||
} else {
|
||||
httpJsonUtcTimestamp(jsonBuf, *((int64_t *)row[i]),
|
||||
taos_result_precision(result) == TSDB_TIME_PRECISION_MICRO);
|
||||
httpJsonUtcTimestamp(jsonBuf, *((int64_t *)row[i]), taos_result_precision(result));
|
||||
}
|
||||
break;
|
||||
default:
|
||||
|
|
|
@ -46,7 +46,7 @@ enum SQL_NODE_FROM_TYPE {
|
|||
|
||||
enum SQL_EXPR_FLAG {
|
||||
EXPR_FLAG_TS_ERROR = 1,
|
||||
EXPR_FLAG_US_TIMESTAMP = 2,
|
||||
EXPR_FLAG_NS_TIMESTAMP = 2,
|
||||
EXPR_FLAG_TIMESTAMP_VAR = 3,
|
||||
};
|
||||
|
||||
|
|
|
@ -5394,7 +5394,7 @@ SAggFunctionInfo aAggs[] = {{
|
|||
"diff",
|
||||
TSDB_FUNC_DIFF,
|
||||
TSDB_FUNC_INVALID_ID,
|
||||
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS,
|
||||
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY,
|
||||
diff_function_setup,
|
||||
diff_function,
|
||||
diff_function_f,
|
||||
|
@ -5498,7 +5498,7 @@ SAggFunctionInfo aAggs[] = {{
|
|||
"derivative", // return table id and the corresponding tags for join match and subscribe
|
||||
TSDB_FUNC_DERIVATIVE,
|
||||
TSDB_FUNC_INVALID_ID,
|
||||
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS,
|
||||
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY,
|
||||
deriv_function_setup,
|
||||
deriv_function,
|
||||
noop2,
|
||||
|
|
|
@ -131,10 +131,10 @@ static void getNextTimeWindow(SQueryAttr* pQueryAttr, STimeWindow* tw) {
|
|||
return;
|
||||
}
|
||||
|
||||
int64_t key = tw->skey / 1000, interval = pQueryAttr->interval.interval;
|
||||
if (pQueryAttr->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
key /= 1000;
|
||||
}
|
||||
int64_t key = tw->skey, interval = pQueryAttr->interval.interval;
|
||||
//convert key to second
|
||||
key = convertTimePrecision(key, pQueryAttr->precision, TSDB_TIME_PRECISION_MILLI) / 1000;
|
||||
|
||||
if (pQueryAttr->interval.intervalUnit == 'y') {
|
||||
interval *= 12;
|
||||
}
|
||||
|
@ -146,17 +146,13 @@ static void getNextTimeWindow(SQueryAttr* pQueryAttr, STimeWindow* tw) {
|
|||
int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
|
||||
tm.tm_year = mon / 12;
|
||||
tm.tm_mon = mon % 12;
|
||||
tw->skey = mktime(&tm) * 1000L;
|
||||
tw->skey = convertTimePrecision(mktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, pQueryAttr->precision);
|
||||
|
||||
mon = (int)(mon + interval);
|
||||
tm.tm_year = mon / 12;
|
||||
tm.tm_mon = mon % 12;
|
||||
tw->ekey = mktime(&tm) * 1000L;
|
||||
tw->ekey = convertTimePrecision(mktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, pQueryAttr->precision);
|
||||
|
||||
if (pQueryAttr->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
tw->skey *= 1000L;
|
||||
tw->ekey *= 1000L;
|
||||
}
|
||||
tw->ekey -= 1;
|
||||
}
|
||||
|
||||
|
@ -1687,8 +1683,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
|
||||
pRuntimeEnv->prevGroupId = INT32_MIN;
|
||||
pRuntimeEnv->enableGroupData = false;
|
||||
|
||||
pRuntimeEnv->pQueryAttr = pQueryAttr;
|
||||
|
||||
pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
|
@ -4123,6 +4117,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
|
|||
pQueryAttr->interBufSize = getOutputInterResultBufSize(pQueryAttr);
|
||||
|
||||
pRuntimeEnv->groupResInfo.totalGroup = (int32_t) (pQueryAttr->stableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0);
|
||||
pRuntimeEnv->enableGroupData = false;
|
||||
|
||||
pRuntimeEnv->pQueryAttr = pQueryAttr;
|
||||
pRuntimeEnv->pTsBuf = pTsBuf;
|
||||
|
|
|
@ -139,19 +139,20 @@ tSqlExpr *tSqlExprCreateIdValue(SStrToken *pToken, int32_t optrType) {
|
|||
pSqlExpr->tokenId = optrType;
|
||||
pSqlExpr->type = SQL_NODE_VALUE;
|
||||
} else if (optrType == TK_NOW) {
|
||||
// use microsecond by default
|
||||
pSqlExpr->value.i64 = taosGetTimestamp(TSDB_TIME_PRECISION_MICRO);
|
||||
// use nanosecond by default TODO set value after getting database precision
|
||||
pSqlExpr->value.i64 = taosGetTimestamp(TSDB_TIME_PRECISION_NANO);
|
||||
pSqlExpr->value.nType = TSDB_DATA_TYPE_BIGINT;
|
||||
pSqlExpr->tokenId = TK_TIMESTAMP; // TK_TIMESTAMP used to denote the time value is in microsecond
|
||||
pSqlExpr->type = SQL_NODE_VALUE;
|
||||
pSqlExpr->flags |= 1 << EXPR_FLAG_US_TIMESTAMP;
|
||||
pSqlExpr->flags |= 1 << EXPR_FLAG_NS_TIMESTAMP;
|
||||
} else if (optrType == TK_VARIABLE) {
|
||||
int32_t ret = parseAbsoluteDuration(pToken->z, pToken->n, &pSqlExpr->value.i64);
|
||||
// use nanosecond by default TODO set value after getting database precision
|
||||
int32_t ret = parseAbsoluteDuration(pToken->z, pToken->n, &pSqlExpr->value.i64, TSDB_TIME_PRECISION_NANO);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
terrno = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
|
||||
}
|
||||
|
||||
pSqlExpr->flags |= 1 << EXPR_FLAG_US_TIMESTAMP;
|
||||
pSqlExpr->flags |= 1 << EXPR_FLAG_NS_TIMESTAMP;
|
||||
pSqlExpr->flags |= 1 << EXPR_FLAG_TIMESTAMP_VAR;
|
||||
pSqlExpr->value.nType = TSDB_DATA_TYPE_BIGINT;
|
||||
pSqlExpr->tokenId = TK_TIMESTAMP;
|
||||
|
|
|
@ -26,6 +26,7 @@ extern "C" {
|
|||
#include "taosdef.h"
|
||||
|
||||
int32_t strdequote(char *src);
|
||||
int32_t strRmquote(char *z, int32_t len);
|
||||
size_t strtrim(char *src);
|
||||
char * strnchr(char *haystack, char needle, int32_t len, bool skipquote);
|
||||
char ** strsplit(char *src, const char *delim, int32_t *num);
|
||||
|
|
|
@ -494,9 +494,9 @@ uint32_t tGetToken(char* z, uint32_t* tokenId) {
|
|||
}
|
||||
|
||||
/* here is the 1u/1a/2s/3m/9y */
|
||||
if ((z[i] == 'u' || z[i] == 'a' || z[i] == 's' || z[i] == 'm' || z[i] == 'h' || z[i] == 'd' || z[i] == 'n' ||
|
||||
if ((z[i] == 'b' || z[i] == 'u' || z[i] == 'a' || z[i] == 's' || z[i] == 'm' || z[i] == 'h' || z[i] == 'd' || z[i] == 'n' ||
|
||||
z[i] == 'y' || z[i] == 'w' ||
|
||||
z[i] == 'U' || z[i] == 'A' || z[i] == 'S' || z[i] == 'M' || z[i] == 'H' || z[i] == 'D' || z[i] == 'N' ||
|
||||
z[i] == 'B' || z[i] == 'U' || z[i] == 'A' || z[i] == 'S' || z[i] == 'M' || z[i] == 'H' || z[i] == 'D' || z[i] == 'N' ||
|
||||
z[i] == 'Y' || z[i] == 'W') &&
|
||||
(isIdChar[(uint8_t)z[i + 1]] == 0)) {
|
||||
*tokenId = TK_VARIABLE;
|
||||
|
|
|
@ -52,6 +52,36 @@ int32_t strdequote(char *z) {
|
|||
return j + 1; // only one quote, do nothing
|
||||
}
|
||||
|
||||
|
||||
int32_t strRmquote(char *z, int32_t len){
|
||||
// delete escape character: \\, \', \"
|
||||
char delim = z[0];
|
||||
if (delim != '\'' && delim != '\"') {
|
||||
return len;
|
||||
}
|
||||
|
||||
int32_t cnt = 0;
|
||||
int32_t j = 0;
|
||||
for (uint32_t k = 1; k < len - 1; ++k) {
|
||||
if (z[k] == '\\' || (z[k] == delim && z[k + 1] == delim)) {
|
||||
z[j] = z[k + 1];
|
||||
|
||||
cnt++;
|
||||
j++;
|
||||
k++;
|
||||
continue;
|
||||
}
|
||||
|
||||
z[j] = z[k];
|
||||
j++;
|
||||
}
|
||||
|
||||
z[j] = 0;
|
||||
|
||||
return len - 2 - cnt;
|
||||
}
|
||||
|
||||
|
||||
size_t strtrim(char *z) {
|
||||
int32_t i = 0;
|
||||
int32_t j = 0;
|
||||
|
|
|
@ -21,7 +21,7 @@ def pre_test(){
|
|||
cmake .. > /dev/null
|
||||
make > /dev/null
|
||||
make install > /dev/null
|
||||
pip3 install ${WKC}/src/connector/python/linux/python3/
|
||||
pip3 install ${WKC}/src/connector/python/ || echo 0
|
||||
'''
|
||||
return 1
|
||||
}
|
||||
|
|
|
@ -23,7 +23,8 @@ class Node:
|
|||
self.hostIP = hostIP
|
||||
self.hostName = hostName
|
||||
self.homeDir = homeDir
|
||||
self.conn = Connection("{}@{}".format(username, hostName), connect_kwargs={"password": "{}".format(password)})
|
||||
self.corePath = '/coredump'
|
||||
self.conn = Connection("{}@{}".format(username, hostName), connect_kwargs={"password": "{}".format(password)})
|
||||
|
||||
def buildTaosd(self):
|
||||
try:
|
||||
|
@ -126,21 +127,37 @@ class Node:
|
|||
except Exception as e:
|
||||
print("remove taosd error for node %d " % self.index)
|
||||
logging.exception(e)
|
||||
|
||||
|
||||
def detectCoredumpFile(self):
|
||||
try:
|
||||
result = self.conn.run("find /coredump -name 'core_*' ", hide=True)
|
||||
output = result.stdout
|
||||
print("output: %s" % output)
|
||||
return output
|
||||
except Exception as e:
|
||||
print("find coredump file error on node %d " % self.index)
|
||||
logging.exception(e)
|
||||
|
||||
|
||||
class Nodes:
|
||||
def __init__(self):
|
||||
self.tdnodes = []
|
||||
self.tdnodes.append(Node(0, 'root', '52.143.103.7', 'node1', 'a', '/root/'))
|
||||
self.tdnodes.append(Node(1, 'root', '52.250.48.222', 'node2', 'a', '/root/'))
|
||||
self.tdnodes.append(Node(2, 'root', '51.141.167.23', 'node3', 'a', '/root/'))
|
||||
self.tdnodes.append(Node(3, 'root', '52.247.207.173', 'node4', 'a', '/root/'))
|
||||
self.tdnodes.append(Node(4, 'root', '51.141.166.100', 'node5', 'a', '/root/'))
|
||||
self.tdnodes.append(Node(0, 'root', '192.168.17.194', 'taosdata', 'r', '/root/'))
|
||||
# self.tdnodes.append(Node(1, 'root', '52.250.48.222', 'node2', 'a', '/root/'))
|
||||
# self.tdnodes.append(Node(2, 'root', '51.141.167.23', 'node3', 'a', '/root/'))
|
||||
# self.tdnodes.append(Node(3, 'root', '52.247.207.173', 'node4', 'a', '/root/'))
|
||||
# self.tdnodes.append(Node(4, 'root', '51.141.166.100', 'node5', 'a', '/root/'))
|
||||
|
||||
def stopOneNode(self, index):
|
||||
self.tdnodes[index].stopTaosd()
|
||||
self.tdnodes[index].forceStopOneTaosd()
|
||||
|
||||
def startOneNode(self, index):
|
||||
self.tdnodes[index].startOneTaosd()
|
||||
|
||||
def detectCoredumpFile(self, index):
|
||||
return self.tdnodes[index].detectCoredumpFile()
|
||||
|
||||
def stopAllTaosd(self):
|
||||
for i in range(len(self.tdnodes)):
|
||||
|
@ -166,14 +183,32 @@ class Nodes:
|
|||
for i in range(len(self.tdnodes)):
|
||||
self.tdnodes[i].removeData()
|
||||
|
||||
# kill taosd randomly every 10 mins
|
||||
nodes = Nodes()
|
||||
loop = 0
|
||||
while True:
|
||||
loop = loop + 1
|
||||
index = random.randint(0, 4)
|
||||
print("loop: %d, kill taosd on node%d" %(loop, index))
|
||||
nodes.stopOneNode(index)
|
||||
time.sleep(60)
|
||||
nodes.startOneNode(index)
|
||||
time.sleep(600)
|
||||
class Test:
|
||||
def __init__(self):
|
||||
self.nodes = Nodes()
|
||||
|
||||
# kill taosd randomly every 10 mins
|
||||
def randomlyKillDnode(self):
|
||||
loop = 0
|
||||
while True:
|
||||
index = random.randint(0, 4)
|
||||
print("loop: %d, kill taosd on node%d" %(loop, index))
|
||||
self.nodes.stopOneNode(index)
|
||||
time.sleep(60)
|
||||
self.nodes.startOneNode(index)
|
||||
time.sleep(600)
|
||||
loop = loop + 1
|
||||
|
||||
def detectCoredump(self):
|
||||
loop = 0
|
||||
while True:
|
||||
for i in range(len(self.nodes.tdnodes)):
|
||||
result = self.nodes.detectCoredumpFile(i)
|
||||
print("core file path is %s" % result)
|
||||
if result and not result.isspace():
|
||||
self.nodes.stopAllTaosd()
|
||||
print("sleep for 10 mins")
|
||||
time.sleep(600)
|
||||
|
||||
test = Test()
|
||||
test.detectCoredump()
|
|
@ -25,7 +25,7 @@ class TDTestCase:
|
|||
|
||||
def run(self):
|
||||
tdSql.query("show variables")
|
||||
tdSql.checkData(51, 1, 864000)
|
||||
tdSql.checkData(53, 1, 864000)
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
|
@ -33,4 +33,4 @@ class TDTestCase:
|
|||
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
|
|
|
@ -135,6 +135,7 @@ run general/parser/tags_dynamically_specifiy.sim
|
|||
run general/parser/set_tag_vals.sim
|
||||
#unsupport run general/parser/repeatAlter.sim
|
||||
#unsupport run general/parser/slimit_alter_tags.sim
|
||||
run general/parser/precision_ns.sim
|
||||
run general/stable/disk.sim
|
||||
run general/stable/dnode3.sim
|
||||
run general/stable/metrics.sim
|
||||
|
|
|
@ -147,8 +147,57 @@ if $data02 != @nest_tb0@ then
|
|||
endi
|
||||
|
||||
print ===================> nest query interval
|
||||
sql_error select ts, avg(c1) from (select ts, c1 from nest_tb0);
|
||||
|
||||
sql select avg(c1) from (select * from nest_tb0) interval(3d)
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != @20-09-14 00:00:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 49.222222222 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data10 != @20-09-17 00:00:00.000@ then
|
||||
print expect 20-09-17 00:00:00.000, actual: $data10
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data11 != 49.685185185 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data20 != @20-09-20 00:00:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data21 != 49.500000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#define TSDB_FUNC_APERCT 7
|
||||
#define TSDB_FUNC_LAST_ROW 10
|
||||
#define TSDB_FUNC_TWA 14
|
||||
#define TSDB_FUNC_LEASTSQR 15
|
||||
#define TSDB_FUNC_ARITHM 23
|
||||
#define TSDB_FUNC_DIFF 24
|
||||
#define TSDB_FUNC_INTERP 28
|
||||
#define TSDB_FUNC_RATE 29
|
||||
#define TSDB_FUNC_IRATE 30
|
||||
#define TSDB_FUNC_DERIVATIVE 32
|
||||
|
||||
sql_error select stddev(c1) from (select c1 from nest_tb0);
|
||||
sql_error select percentile(c1, 20) from (select * from nest_tb0);
|
||||
|
||||
sql select avg(c1),sum(c2), max(c3), min(c4), count(*), first(c7), last(c7),spread(c6) from (select * from nest_tb0) interval(1d);
|
||||
|
||||
sql select top(x, 20) from (select c1 x from nest_tb0);
|
||||
|
||||
sql select bottom(x, 20) from (select c1 x from nest_tb0)
|
||||
|
||||
print ===================> complex query
|
||||
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
system sh/stop_dnodes.sh
|
||||
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/cfg.sh -n dnode1 -c walLevel -v 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 1000
|
||||
sql connect
|
||||
|
||||
$dbPrefix = m_di_db_ns
|
||||
$tbPrefix = m_di_tb
|
||||
$mtPrefix = m_di_mt
|
||||
$ntPrefix = m_di_nt
|
||||
$tbNum = 2
|
||||
$rowNum = 200
|
||||
$futureTs = 300000000000
|
||||
|
||||
print =============== step1: create database and tables and insert data
|
||||
$i = 0
|
||||
$db = $dbPrefix . $i
|
||||
$mt = $mtPrefix . $i
|
||||
$nt = $ntPrefix . $i
|
||||
|
||||
sql drop database $db -x step1
|
||||
step1:
|
||||
sql create database $db precision 'ns'
|
||||
sql use $db
|
||||
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int)
|
||||
|
||||
$i = 0
|
||||
while $i < $tbNum
|
||||
$tb = $tbPrefix . $i
|
||||
sql create table $tb using $mt tags( $i )
|
||||
|
||||
$x = 0
|
||||
while $x < $rowNum
|
||||
$cc = $futureTs + $x * 100 + 43
|
||||
$ns = $cc . b
|
||||
sql insert into $tb values (now + $ns , $x )
|
||||
$x = $x + 1
|
||||
endw
|
||||
|
||||
$i = $i + 1
|
||||
endw
|
||||
|
||||
sql create table $nt (ts timestamp, tbcol int)
|
||||
$x = 0
|
||||
while $x < $rowNum
|
||||
$cc = $futureTs + $x * 100 + 43
|
||||
$ns = $cc . b
|
||||
sql insert into $nt values (now + $ns , $x )
|
||||
$x = $x + 1
|
||||
endw
|
||||
|
||||
sleep 100
|
||||
|
||||
print =============== step2: select count(*) from tables
|
||||
$i = 0
|
||||
$tb = $tbPrefix . $i
|
||||
|
||||
sql select count(*) from $tb
|
||||
|
||||
if $data00 != $rowNum then
|
||||
print expect $rowNum, actual:$data00
|
||||
return -1
|
||||
endi
|
||||
|
||||
$i = 0
|
||||
$mt = $mtPrefix . $i
|
||||
sql select count(*) from $mt
|
||||
|
||||
$mtRowNum = $tbNum * $rowNum
|
||||
if $data00 != $mtRowNum then
|
||||
print expect $mtRowNum, actual:$data00
|
||||
return -1
|
||||
endi
|
||||
|
||||
$i = 0
|
||||
$nt = $ntPrefix . $i
|
||||
|
||||
sql select count(*) from $nt
|
||||
|
||||
if $data00 != $rowNum then
|
||||
print expect $rowNum, actual:$data00
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== step3: check nano second timestamp
|
||||
$i = 0
|
||||
$mt = $mtPrefix . $i
|
||||
$tb = $tbPrefix . $i
|
||||
sql insert into $tb values (now-43b , $x )
|
||||
sql select count(*) from $tb where ts<now
|
||||
if $data00 != 1 then
|
||||
print expected 1, actual: $data00
|
||||
endi
|
||||
|
||||
|
||||
print =============== step4: check interval/sliding nano second
|
||||
$i = 0
|
||||
$mt = $mtPrefix . $i
|
||||
sql_error select count(*) from $mt interval(1000b) sliding(100b)
|
||||
sql_error select count(*) from $mt interval(10000000b) sliding(99999b)
|
||||
|
||||
sql select count(*) from $mt interval(100000000b) sliding(100000000b)
|
||||
|
||||
print =============== clear
|
||||
sql drop database $db
|
||||
sql show databases
|
||||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -62,4 +62,5 @@ run general/parser/binary_escapeCharacter.sim
|
|||
run general/parser/between_and.sim
|
||||
run general/parser/last_cache.sim
|
||||
run general/parser/nestquery.sim
|
||||
run general/parser/precision_ns.sim
|
||||
|
||||
|
|
|
@ -134,6 +134,7 @@ run general/parser/bug.sim
|
|||
run general/parser/tags_dynamically_specifiy.sim
|
||||
run general/parser/set_tag_vals.sim
|
||||
run general/parser/repeatAlter.sim
|
||||
run general/parser/precision_ns.sim
|
||||
##unsupport run general/parser/slimit_alter_tags.sim
|
||||
run general/stable/disk.sim
|
||||
run general/stable/dnode3.sim
|
||||
|
|
|
@ -813,8 +813,15 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
|
|||
value[length[i]] = 0;
|
||||
// snprintf(value, fields[i].bytes, "%s", (char *)row[i]);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
tt = *(int64_t *)row[i] / 1000;
|
||||
case TSDB_DATA_TYPE_TIMESTAMP: {
|
||||
int32_t precision = taos_result_precision(pSql);
|
||||
if (precision == TSDB_TIME_PRECISION_MILLI) {
|
||||
tt = (*(int64_t *)row[i]) / 1000;
|
||||
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
tt = (*(int64_t *)row[i]) / 1000000;
|
||||
} else {
|
||||
tt = (*(int64_t *)row[i]) / 1000000000;
|
||||
}
|
||||
/* comment out as it make testcases like select_with_tags.sim fail.
|
||||
but in windows, this may cause the call to localtime crash if tt < 0,
|
||||
need to find a better solution.
|
||||
|
@ -829,9 +836,16 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
|
|||
|
||||
tp = localtime(&tt);
|
||||
strftime(timeStr, 64, "%y-%m-%d %H:%M:%S", tp);
|
||||
sprintf(value, "%s.%03d", timeStr, (int32_t)(*((int64_t *)row[i]) % 1000));
|
||||
if (precision == TSDB_TIME_PRECISION_MILLI) {
|
||||
sprintf(value, "%s.%03d", timeStr, (int32_t)(*((int64_t *)row[i]) % 1000));
|
||||
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
sprintf(value, "%s.%06d", timeStr, (int32_t)(*((int64_t *)row[i]) % 1000000));
|
||||
} else {
|
||||
sprintf(value, "%s.%09d", timeStr, (int32_t)(*((int64_t *)row[i]) % 1000000000));
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
} // end of switch
|
||||
|
|
Loading…
Reference in New Issue