Merge pull request #6655 from taosdata/feature/query

Feature/query
This commit is contained in:
Haojun Liao 2021-06-29 19:27:52 +08:00 committed by GitHub
commit 52d1ae7d8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 266 additions and 97 deletions

View File

@ -916,8 +916,8 @@ static int32_t checkInvalidExprForTimeWindow(SSqlCmd* pCmd, SQueryInfo* pQueryIn
} }
int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode) { int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode) {
const char* msg1 = "sliding cannot be used without interval";
const char* msg2 = "interval cannot be less than 10 ms"; const char* msg2 = "interval cannot be less than 10 ms";
const char* msg3 = "sliding cannot be used without interval";
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
@ -926,7 +926,7 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pS
if (!TPARSER_HAS_TOKEN(pSqlNode->interval.interval)) { if (!TPARSER_HAS_TOKEN(pSqlNode->interval.interval)) {
if (TPARSER_HAS_TOKEN(pSqlNode->sliding)) { if (TPARSER_HAS_TOKEN(pSqlNode->sliding)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -947,7 +947,7 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pS
if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit != 'y') { if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit != 'y') {
// interval cannot be less than 10 milliseconds // interval cannot be less than 10 milliseconds
if (convertTimePrecision(pQueryInfo->interval.interval, tinfo.precision, TSDB_TIME_PRECISION_MILLI) < tsMinIntervalTime) { if (convertTimePrecision(pQueryInfo->interval.interval, tinfo.precision, TSDB_TIME_PRECISION_MICRO) < tsMinIntervalTime) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
} }
@ -7841,8 +7841,8 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
pQueryInfo->numOfTables = 0; pQueryInfo->numOfTables = 0;
// parse the subquery in the first place // parse the subquery in the first place
int32_t numOfSub = (int32_t) taosArrayGetSize(pSqlNode->from->list); int32_t numOfSub = (int32_t)taosArrayGetSize(pSqlNode->from->list);
for(int32_t i = 0; i < numOfSub; ++i) { for (int32_t i = 0; i < numOfSub; ++i) {
code = doValidateSubquery(pSqlNode, i, pSql, pQueryInfo, tscGetErrorMsgPayload(pCmd)); code = doValidateSubquery(pSqlNode, i, pSql, pQueryInfo, tscGetErrorMsgPayload(pCmd));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
@ -7852,7 +7852,8 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
int32_t timeWindowQuery = int32_t timeWindowQuery =
(TPARSER_HAS_TOKEN(pSqlNode->interval.interval) || TPARSER_HAS_TOKEN(pSqlNode->sessionVal.gap)); (TPARSER_HAS_TOKEN(pSqlNode->interval.interval) || TPARSER_HAS_TOKEN(pSqlNode->sessionVal.gap));
if (validateSelectNodeList(pCmd, pQueryInfo, pSqlNode->pSelNodeList, false, false, timeWindowQuery) != TSDB_CODE_SUCCESS) { if (validateSelectNodeList(pCmd, pQueryInfo, pSqlNode->pSelNodeList, false, false, timeWindowQuery) !=
TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
@ -7862,7 +7863,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
} }
// todo NOT support yet // todo NOT support yet
for(int32_t i = 0; i < tscNumOfExprs(pQueryInfo); ++i) { for (int32_t i = 0; i < tscNumOfExprs(pQueryInfo); ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i); SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
int32_t f = pExpr->base.functionId; int32_t f = pExpr->base.functionId;
if (f == TSDB_FUNC_STDDEV || f == TSDB_FUNC_PERCT || f == TSDB_FUNC_INTERP) { if (f == TSDB_FUNC_STDDEV || f == TSDB_FUNC_PERCT || f == TSDB_FUNC_INTERP) {
@ -7876,22 +7877,19 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
} }
} }
// todo derivative function requires ts column exists in subquery
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, 0); SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, 0);
int32_t numOfExprs = (int32_t) tscNumOfExprs(pQueryInfo); if (pSchema->type != TSDB_DATA_TYPE_TIMESTAMP) {
if (numOfExprs == 1) { int32_t numOfExprs = (int32_t)tscNumOfExprs(pQueryInfo);
SExprInfo* pExpr = tscExprGet(pQueryInfo, 0);
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
int32_t f = pExpr->base.functionId; int32_t f = pExpr->base.functionId;
if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE) { if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
} }
} else {
SExprInfo* pExpr = tscExprGet(pQueryInfo, 1);
int32_t f = pExpr->base.functionId;
if ((f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE) && pSchema->type != TSDB_DATA_TYPE_TIMESTAMP) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
} }
} }

View File

@ -155,7 +155,7 @@ extern char tsMnodeTmpDir[];
extern char tsDataDir[]; extern char tsDataDir[];
extern char tsLogDir[]; extern char tsLogDir[];
extern char tsScriptDir[]; extern char tsScriptDir[];
extern int64_t tsMsPerDay[3]; extern int64_t tsTickPerDay[3];
// system info // system info
extern char tsOsName[]; extern char tsOsName[];

View File

@ -84,8 +84,8 @@ int32_t tsMaxNumOfOrderedResults = 100000;
// 10 ms for sliding time, the value will changed in case of time precision changed // 10 ms for sliding time, the value will changed in case of time precision changed
int32_t tsMinSlidingTime = 10; int32_t tsMinSlidingTime = 10;
// 10 ms for interval time range, changed accordingly // 1 us for interval time range, changed accordingly
int32_t tsMinIntervalTime = 10; int32_t tsMinIntervalTime = 1;
// 20sec, the maximum value of stream computing delay, changed accordingly // 20sec, the maximum value of stream computing delay, changed accordingly
int32_t tsMaxStreamComputDelay = 20000; int32_t tsMaxStreamComputDelay = 20000;
@ -204,7 +204,7 @@ SDiskCfg tsDiskCfg[TSDB_MAX_DISKS];
* TSDB_TIME_PRECISION_MICRO: 86400000000L * TSDB_TIME_PRECISION_MICRO: 86400000000L
* TSDB_TIME_PRECISION_NANO: 86400000000000L * TSDB_TIME_PRECISION_NANO: 86400000000000L
*/ */
int64_t tsMsPerDay[] = {86400000L, 86400000000L, 86400000000000L}; int64_t tsTickPerDay[] = {86400000L, 86400000000L, 86400000000000L};
// system info // system info
char tsOsName[10] = "Linux"; char tsOsName[10] = "Linux";

View File

@ -348,6 +348,7 @@ int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrec
{1.0 / 1000000, 1.0 / 1000, 1.} }; {1.0 / 1000000, 1.0 / 1000, 1.} };
return (int64_t)((double)time * factors[fromPrecision][toPrecision]); return (int64_t)((double)time * factors[fromPrecision][toPrecision]);
} }
static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision) { static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision) {
switch (unit) { switch (unit) {

View File

@ -4314,8 +4314,8 @@ static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, in
return pFillCol; return pFillCol;
} }
int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr, int32_t tbScanner, int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr, int32_t tbScanner, SArray* pOperator,
SArray* pOperator, void* param) { void* param) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;

View File

@ -350,8 +350,8 @@ static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) {
} }
static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY* minKey, TSKEY* maxKey) { static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY* minKey, TSKEY* maxKey) {
*minKey = fid * days * tsMsPerDay[precision]; *minKey = fid * days * tsTickPerDay[precision];
*maxKey = *minKey + days * tsMsPerDay[precision] - 1; *maxKey = *minKey + days * tsTickPerDay[precision] - 1;
} }
static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) { static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) {

View File

@ -17,9 +17,9 @@
#define TSDB_MAX_SUBBLOCKS 8 #define TSDB_MAX_SUBBLOCKS 8
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) { static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
if (key < 0) { if (key < 0) {
return (int)((key + 1) / tsMsPerDay[precision] / days - 1); return (int)((key + 1) / tsTickPerDay[precision] / days - 1);
} else { } else {
return (int)((key / tsMsPerDay[precision] / days)); return (int)((key / tsTickPerDay[precision] / days));
} }
} }
@ -363,9 +363,9 @@ void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) {
TSKEY minKey, midKey, maxKey, now; TSKEY minKey, midKey, maxKey, now;
now = taosGetTimestamp(pCfg->precision); now = taosGetTimestamp(pCfg->precision);
minKey = now - pCfg->keep * tsMsPerDay[pCfg->precision]; minKey = now - pCfg->keep * tsTickPerDay[pCfg->precision];
midKey = now - pCfg->keep2 * tsMsPerDay[pCfg->precision]; midKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision];
maxKey = now - pCfg->keep1 * tsMsPerDay[pCfg->precision]; maxKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision];
pRtn->minKey = minKey; pRtn->minKey = minKey;
pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision)); pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision));

View File

@ -632,8 +632,8 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
SDataRow row = NULL; SDataRow row = NULL;
TSKEY now = taosGetTimestamp(pRepo->config.precision); TSKEY now = taosGetTimestamp(pRepo->config.precision);
TSKEY minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep; TSKEY minKey = now - tsTickPerDay[pRepo->config.precision] * pRepo->config.keep;
TSKEY maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile; TSKEY maxKey = now + tsTickPerDay[pRepo->config.precision] * pRepo->config.daysPerFile;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
pMsg->length = htonl(pMsg->length); pMsg->length = htonl(pMsg->length);

View File

@ -39,6 +39,12 @@ enum {
TSDB_QUERY_TYPE_LAST = 2, TSDB_QUERY_TYPE_LAST = 2,
}; };
enum {
TSDB_CACHED_TYPE_NONE = 0,
TSDB_CACHED_TYPE_LASTROW = 1,
TSDB_CACHED_TYPE_LAST = 2,
};
typedef struct SQueryFilePos { typedef struct SQueryFilePos {
int32_t fid; int32_t fid;
int32_t slot; int32_t slot;
@ -280,9 +286,13 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa
info.tableId.uid = info.pTableObj->tableId.uid; info.tableId.uid = info.pTableObj->tableId.uid;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) { if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
assert(info.lastKey >= pQueryHandle->window.skey); if (info.lastKey == INT64_MIN || info.lastKey < pQueryHandle->window.skey) {
info.lastKey = pQueryHandle->window.skey;
}
assert(info.lastKey >= pQueryHandle->window.skey && info.lastKey <= pQueryHandle->window.ekey);
} else { } else {
assert(info.lastKey <= pQueryHandle->window.skey); assert(info.lastKey >= pQueryHandle->window.ekey && info.lastKey <= pQueryHandle->window.skey);
} }
taosArrayPush(pTableCheckInfo, &info); taosArrayPush(pTableCheckInfo, &info);
@ -339,14 +349,57 @@ static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY s
return pNew; return pNew;
} }
static bool emptyQueryTimewindow(STsdbQueryHandle* pQueryHandle) {
assert(pQueryHandle != NULL);
STimeWindow* w = &pQueryHandle->window;
bool asc = ASCENDING_TRAVERSE(pQueryHandle->order);
return ((asc && w->skey > w->ekey) || (!asc && w->ekey > w->skey));
}
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static int64_t getEarliestValidTimestamp(STsdbRepo* pTsdb) {
STsdbCfg* pCfg = &pTsdb->config;
int64_t now = taosGetTimestamp(pCfg->precision);
return now - (tsTickPerDay[pCfg->precision] * pCfg->keep);
}
static void setQueryTimewindow(STsdbQueryHandle* pQueryHandle, STsdbQueryCond* pCond) {
pQueryHandle->window = pCond->twindow;
bool updateTs = false;
int64_t startTs = getEarliestValidTimestamp(pQueryHandle->pTsdb);
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
if (startTs > pQueryHandle->window.skey) {
pQueryHandle->window.skey = startTs;
pCond->twindow.skey = startTs;
updateTs = true;
}
} else {
if (startTs > pQueryHandle->window.ekey) {
pQueryHandle->window.ekey = startTs;
pCond->twindow.ekey = startTs;
updateTs = true;
}
}
if (updateTs) {
tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64
", 0x%" PRIx64, pQueryHandle, pCond->twindow.skey, pCond->twindow.ekey, pQueryHandle->window.skey,
pQueryHandle->window.ekey, pQueryHandle->qId);
}
}
static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pCond, uint64_t qId, SMemRef* pMemRef) { static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pCond, uint64_t qId, SMemRef* pMemRef) {
STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
if (pQueryHandle == NULL) { if (pQueryHandle == NULL) {
goto out_of_memory; goto _end;
} }
pQueryHandle->order = pCond->order; pQueryHandle->order = pCond->order;
pQueryHandle->window = pCond->twindow;
pQueryHandle->pTsdb = tsdb; pQueryHandle->pTsdb = tsdb;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->cur.fid = INT32_MIN; pQueryHandle->cur.fid = INT32_MIN;
@ -354,36 +407,33 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
pQueryHandle->checkFiles = true; pQueryHandle->checkFiles = true;
pQueryHandle->activeIndex = 0; // current active table index pQueryHandle->activeIndex = 0; // current active table index
pQueryHandle->qId = qId; pQueryHandle->qId = qId;
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
pQueryHandle->allocSize = 0; pQueryHandle->allocSize = 0;
pQueryHandle->locateStart = false; pQueryHandle->locateStart = false;
pQueryHandle->pMemRef = pMemRef; pQueryHandle->pMemRef = pMemRef;
pQueryHandle->loadType = pCond->type;
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
pQueryHandle->loadExternalRow = pCond->loadExternalRows; pQueryHandle->loadExternalRow = pCond->loadExternalRows;
pQueryHandle->currentLoadExternalRows = pCond->loadExternalRows; pQueryHandle->currentLoadExternalRows = pCond->loadExternalRows;
pQueryHandle->loadType = pCond->type;
if (tsdbInitReadH(&pQueryHandle->rhelper, (STsdbRepo*)tsdb) != 0) { if (tsdbInitReadH(&pQueryHandle->rhelper, (STsdbRepo*)tsdb) != 0) {
goto out_of_memory; goto _end;
} }
assert(pCond != NULL && pMemRef != NULL); assert(pCond != NULL && pMemRef != NULL);
if (ASCENDING_TRAVERSE(pCond->order)) { setQueryTimewindow(pQueryHandle, pCond);
assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey);
} else {
assert(pQueryHandle->window.skey >= pQueryHandle->window.ekey);
}
if (pCond->numOfCols > 0) { if (pCond->numOfCols > 0) {
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
pQueryHandle->statis = calloc(pCond->numOfCols, sizeof(SDataStatis)); pQueryHandle->statis = calloc(pCond->numOfCols, sizeof(SDataStatis));
if (pQueryHandle->statis == NULL) { if (pQueryHandle->statis == NULL) {
goto out_of_memory; goto _end;
} }
pQueryHandle->pColumns = // todo: use list instead of array?
taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array? pQueryHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData));
if (pQueryHandle->pColumns == NULL) { if (pQueryHandle->pColumns == NULL) {
goto out_of_memory; goto _end;
} }
for (int32_t i = 0; i < pCond->numOfCols; ++i) { for (int32_t i = 0; i < pCond->numOfCols; ++i) {
@ -392,14 +442,16 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
colInfo.info = pCond->colList[i]; colInfo.info = pCond->colList[i];
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes); colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes);
if (colInfo.pData == NULL) { if (colInfo.pData == NULL) {
goto out_of_memory; goto _end;
} }
taosArrayPush(pQueryHandle->pColumns, &colInfo); taosArrayPush(pQueryHandle->pColumns, &colInfo);
pQueryHandle->statis[i].colId = colInfo.info.colId; pQueryHandle->statis[i].colId = colInfo.info.colId;
} }
pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true); pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true);
} }
STsdbMeta* pMeta = tsdbGetMeta(tsdb); STsdbMeta* pMeta = tsdbGetMeta(tsdb);
assert(pMeta != NULL); assert(pMeta != NULL);
@ -407,7 +459,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
if (pQueryHandle->pDataCols == NULL) { if (pQueryHandle->pDataCols == NULL) {
tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pQueryHandle, pQueryHandle->qId); tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pQueryHandle, pQueryHandle->qId);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto out_of_memory; goto _end;
} }
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
@ -415,7 +467,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
return (TsdbQueryHandleT) pQueryHandle; return (TsdbQueryHandleT) pQueryHandle;
out_of_memory: _end:
tsdbCleanupQueryHandle(pQueryHandle); tsdbCleanupQueryHandle(pQueryHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL; return NULL;
@ -423,6 +475,9 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, SMemRef* pRef) { TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, SMemRef* pRef) {
STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, pRef); STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, pRef);
if (emptyQueryTimewindow(pQueryHandle)) {
return (TsdbQueryHandleT*) pQueryHandle;
}
STsdbMeta* pMeta = tsdbGetMeta(tsdb); STsdbMeta* pMeta = tsdbGetMeta(tsdb);
assert(pMeta != NULL); assert(pMeta != NULL);
@ -446,6 +501,15 @@ TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STable
void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond) { void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond) {
STsdbQueryHandle* pQueryHandle = queryHandle; STsdbQueryHandle* pQueryHandle = queryHandle;
if (emptyQueryTimewindow(pQueryHandle)) {
if (pCond->order != pQueryHandle->order) {
pQueryHandle->order = pCond->order;
SWAP(pQueryHandle->window.skey, pQueryHandle->window.ekey, int64_t);
}
return;
}
pQueryHandle->order = pCond->order; pQueryHandle->order = pCond->order;
pQueryHandle->window = pCond->twindow; pQueryHandle->window = pCond->twindow;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
@ -864,10 +928,10 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio
} }
if (key < 0) { if (key < 0) {
key -= (daysPerFile * tsMsPerDay[precision]); key -= (daysPerFile * tsTickPerDay[precision]);
} }
int64_t fid = (int64_t)(key / (daysPerFile * tsMsPerDay[precision])); // set the starting fileId int64_t fid = (int64_t)(key / (daysPerFile * tsTickPerDay[precision])); // set the starting fileId
if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32 if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32
fid = INT32_MIN; fid = INT32_MIN;
} }
@ -1171,8 +1235,9 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SBlock* p
static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) { static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) {
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
bool asc = ASCENDING_TRAVERSE(pQueryHandle->order);
if (ASCENDING_TRAVERSE(pQueryHandle->order)) { if (asc) {
// query ended in/started from current block // query ended in/started from current block
if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) { if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
if ((code = doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) { if ((code = doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
@ -1193,7 +1258,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock,
assert(pCheckInfo->lastKey <= pBlock->keyLast); assert(pCheckInfo->lastKey <= pBlock->keyLast);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { // the whole block is loaded in to buffer } else { // the whole block is loaded in to buffer
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(pBlock->numOfRows - 1); cur->pos = asc? 0:(pBlock->numOfRows - 1);
code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
} }
} else { //desc order, query ended in current block } else { //desc order, query ended in current block
@ -1213,7 +1278,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock,
assert(pCheckInfo->lastKey >= pBlock->keyFirst); assert(pCheckInfo->lastKey >= pBlock->keyFirst);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { } else {
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(pBlock->numOfRows-1); cur->pos = asc? 0:(pBlock->numOfRows-1);
code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
} }
} }
@ -2684,13 +2749,19 @@ static bool loadDataBlockFromTableSeq(STsdbQueryHandle* pQueryHandle) {
bool tsdbNextDataBlock(TsdbQueryHandleT pHandle) { bool tsdbNextDataBlock(TsdbQueryHandleT pHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle; STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
if (emptyQueryTimewindow(pQueryHandle)) {
tsdbDebug("%p query window not overlaps with the data set, no result returned, 0x%"PRIx64, pQueryHandle, pQueryHandle->qId);
return false;
}
int64_t stime = taosGetTimestampUs(); int64_t stime = taosGetTimestampUs();
int64_t elapsedTime = stime; int64_t elapsedTime = stime;
// TODO refactor: remove "type"
if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST) { if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST) {
if (pQueryHandle->cachelastrow == 1) { if (pQueryHandle->cachelastrow == TSDB_CACHED_TYPE_LASTROW) {
return loadCachedLastRow(pQueryHandle); return loadCachedLastRow(pQueryHandle);
} else if (pQueryHandle->cachelastrow == 2) { } else if (pQueryHandle->cachelastrow == TSDB_CACHED_TYPE_LAST) {
return loadCachedLast(pQueryHandle); return loadCachedLast(pQueryHandle);
} }
} }
@ -2896,7 +2967,7 @@ out:
} }
bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle) { bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle) {
return ((STsdbQueryHandle *)pQueryHandle)->cachelastrow > 0; return ((STsdbQueryHandle *)pQueryHandle)->cachelastrow > TSDB_CACHED_TYPE_NONE;
} }
int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList) { int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList) {
@ -2914,9 +2985,9 @@ int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *g
if (((STable*)pInfo->pTable)->lastRow) { if (((STable*)pInfo->pTable)->lastRow) {
code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key); code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pQueryHandle->cachelastrow = 0; pQueryHandle->cachelastrow = TSDB_CACHED_TYPE_NONE;
} else { } else {
pQueryHandle->cachelastrow = 1; pQueryHandle->cachelastrow = TSDB_CACHED_TYPE_LASTROW;
} }
} }
@ -2936,12 +3007,11 @@ int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle) {
int32_t code = 0; int32_t code = 0;
if (pQueryHandle->pTsdb && atomic_load_8(&pQueryHandle->pTsdb->hasCachedLastColumn)){ if (pQueryHandle->pTsdb && atomic_load_8(&pQueryHandle->pTsdb->hasCachedLastColumn)){
pQueryHandle->cachelastrow = 2; pQueryHandle->cachelastrow = TSDB_CACHED_TYPE_LAST;
} }
// update the tsdb query time range // update the tsdb query time range
if (pQueryHandle->cachelastrow) { if (pQueryHandle->cachelastrow) {
pQueryHandle->window = TSWINDOW_INITIALIZER;
pQueryHandle->checkFiles = false; pQueryHandle->checkFiles = false;
pQueryHandle->activeIndex = -1; // start from -1 pQueryHandle->activeIndex = -1; // start from -1
} }
@ -3548,7 +3618,6 @@ int32_t tsdbGetOneTableGroup(STsdbRepo* tsdb, uint64_t uid, TSKEY startKey, STab
taosArrayPush(group, &info); taosArrayPush(group, &info);
taosArrayPush(pGroupInfo->pGroupList, &group); taosArrayPush(pGroupInfo->pGroupList, &group);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error: _error:
@ -3637,15 +3706,21 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
return; return;
} }
pQueryHandle->pTableCheckInfo = destroyTableCheckInfo(pQueryHandle->pTableCheckInfo);
pQueryHandle->pColumns = doFreeColumnInfoData(pQueryHandle->pColumns); pQueryHandle->pColumns = doFreeColumnInfoData(pQueryHandle->pColumns);
taosArrayDestroy(pQueryHandle->defaultLoadColumn); taosArrayDestroy(pQueryHandle->defaultLoadColumn);
tfree(pQueryHandle->pDataBlockInfo); tfree(pQueryHandle->pDataBlockInfo);
tfree(pQueryHandle->statis); tfree(pQueryHandle->statis);
// todo check error if (!emptyQueryTimewindow(pQueryHandle)) {
tsdbMayUnTakeMemSnapshot(pQueryHandle); tsdbMayUnTakeMemSnapshot(pQueryHandle);
} else {
assert(pQueryHandle->pTableCheckInfo == NULL);
}
if (pQueryHandle->pTableCheckInfo != NULL) {
pQueryHandle->pTableCheckInfo = destroyTableCheckInfo(pQueryHandle->pTableCheckInfo);
}
tsdbDestroyReadH(&pQueryHandle->rhelper); tsdbDestroyReadH(&pQueryHandle->rhelper);

View File

@ -173,8 +173,9 @@ class TDTestCase:
tdSql.checkData(0,7,'10,10,10') tdSql.checkData(0,7,'10,10,10')
tdSql.error('insert into tb values (now-15d, 10)') tdSql.error('insert into tb values (now-15d, 10)')
tdSql.query('select * from tb') tdSql.query('select * from tb')
tdSql.checkRows(rowNum) tdSql.checkRows(2)
rowNum = 2
tdLog.notice('testing keep will be altered if sudden change from small to big') tdLog.notice('testing keep will be altered if sudden change from small to big')
for i in range(30): for i in range(30):
tdSql.execute('alter database db keep 14,14,14') tdSql.execute('alter database db keep 14,14,14')
@ -182,14 +183,19 @@ class TDTestCase:
tdSql.execute('insert into tb values (now-15d, 10)') tdSql.execute('insert into tb values (now-15d, 10)')
tdSql.query('select * from tb') tdSql.query('select * from tb')
rowNum += 1 rowNum += 1
tdSql.checkRows(rowNum ) tdSql.checkRows(rowNum)
tdLog.notice('testing keep will be altered if sudden change from big to small') tdLog.notice('testing keep will be altered if sudden change from big to small')
tdSql.execute('alter database db keep 16,16,16') tdSql.execute('alter database db keep 16,16,16')
tdSql.execute('alter database db keep 14,14,14') tdSql.execute('alter database db keep 14,14,14')
tdSql.error('insert into tb values (now-15d, 10)') tdSql.error('insert into tb values (now-15d, 10)')
tdSql.query('select * from tb') tdSql.query('select * from tb')
tdSql.checkRows(rowNum) tdSql.checkRows(2)
tdLog.notice('testing data will show up again when keep is being changed to large value')
tdSql.execute('alter database db keep 40,40,40')
tdSql.query('select * from tb')
tdSql.checkRows(63)

View File

@ -71,13 +71,10 @@ class TDTestRetetion:
tdDnodes.start(1) tdDnodes.start(1)
tdLog.info(cmd) tdLog.info(cmd)
ttime = datetime.datetime.now()
tdSql.execute(cmd) tdSql.execute(cmd)
self.queryRows=tdSql.query('select * from test') self.queryRows=tdSql.query('select * from test')
if self.queryRows==4: self.checkRows(3,cmd)
self.checkRows(4,cmd)
return 0
else:
self.checkRows(5,cmd)
tdLog.info("=============== step3") tdLog.info("=============== step3")
tdDnodes.stop(1) tdDnodes.stop(1)
os.system("date -s '%s'"%(datetime.datetime.now()+datetime.timedelta(hours=48))) os.system("date -s '%s'"%(datetime.datetime.now()+datetime.timedelta(hours=48)))
@ -92,7 +89,7 @@ class TDTestRetetion:
tdLog.info(cmd) tdLog.info(cmd)
tdSql.execute(cmd) tdSql.execute(cmd)
self.queryRows=tdSql.query('select * from test') self.queryRows=tdSql.query('select * from test')
self.checkRows(6,cmd) self.checkRows(3,cmd)
tdLog.info("=============== step4") tdLog.info("=============== step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdDnodes.start(1) tdDnodes.start(1)
@ -100,7 +97,7 @@ class TDTestRetetion:
tdLog.info(cmd) tdLog.info(cmd)
tdSql.execute(cmd) tdSql.execute(cmd)
self.queryRows=tdSql.query('select * from test') self.queryRows=tdSql.query('select * from test')
self.checkRows(5,cmd) self.checkRows(4,cmd)
tdLog.info("=============== step5") tdLog.info("=============== step5")
tdDnodes.stop(1) tdDnodes.stop(1)
@ -109,6 +106,21 @@ class TDTestRetetion:
self.queryRows=tdSql.query('select * from test where ts > now-1d') self.queryRows=tdSql.query('select * from test where ts > now-1d')
self.checkRows(2,cmd) self.checkRows(2,cmd)
tdLog.info("=============== step6")
tdDnodes.stop(1)
os.system("date -s '%s'"%(ttime + datetime.timedelta(seconds=(72*60*60-7))))
tdDnodes.start(1)
while datetime.datetime.now() < (ttime + datetime.timedelta(seconds=(72*60*60-1))):
time.sleep(0.001)
cmd = 'select * from test'
self.queryRows=tdSql.query(cmd)
self.checkRows(4,cmd)
while datetime.datetime.now() < (ttime + datetime.timedelta(hours=72)):
time.sleep(0.001)
cmd = 'select * from test'
self.queryRows=tdSql.query(cmd)
self.checkRows(3,cmd)
def stop(self): def stop(self):
os.system("sudo timedatectl set-ntp true") os.system("sudo timedatectl set-ntp true")
os.system("date -s '%s'"%(datetime.datetime.now()+datetime.timedelta(hours=1))) os.system("date -s '%s'"%(datetime.datetime.now()+datetime.timedelta(hours=1)))

View File

@ -959,14 +959,14 @@ endi
if $data31 != 9.000000000 then if $data31 != 9.000000000 then
return -1 return -1
endi endi
if $data41 != null then if $data41 != NULL then
print ===== $data41 print ===== $data41
return -1 return -1
endi endi
if $data51 != 16.000000000 then if $data51 != 16.000000000 then
return -1 return -1
endi endi
if $data61 != null then if $data61 != NULL then
print ===== $data61 print ===== $data61
return -1 return -1
endi endi

View File

@ -1,6 +1,6 @@
system sh/stop_dnodes.sh system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0 system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4 system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start

View File

@ -179,14 +179,6 @@ if $data21 != 49.500000000 then
return -1 return -1
endi endi
#define TSDB_FUNC_APERCT 7
#define TSDB_FUNC_TWA 14
#define TSDB_FUNC_LEASTSQR 15
#define TSDB_FUNC_DIFF 24
#define TSDB_FUNC_INTERP 28
#define TSDB_FUNC_IRATE 30
#define TSDB_FUNC_DERIVATIVE 32
sql_error select stddev(c1) from (select c1 from nest_tb0); sql_error select stddev(c1) from (select c1 from nest_tb0);
sql_error select percentile(c1, 20) from (select * from nest_tb0); sql_error select percentile(c1, 20) from (select * from nest_tb0);
sql_error select interp(c1) from (select * from nest_tb0); sql_error select interp(c1) from (select * from nest_tb0);
@ -197,9 +189,90 @@ sql_error select diff(c1), twa(c1) from (select * from nest_tb0);
sql_error select irate(c1), interp(c1), twa(c1) from (select * from nest_tb0); sql_error select irate(c1), interp(c1), twa(c1) from (select * from nest_tb0);
sql select apercentile(c1, 50) from (select * from nest_tb0) interval(1d) sql select apercentile(c1, 50) from (select * from nest_tb0) interval(1d)
if $rows != 7 then
return -1
endi
if $data00 != @20-09-15 00:00:00.000@ then
return -1
endi
if $data01 != 47.571428571 then
return -1
endi
if $data10 != @20-09-16 00:00:00.000@ then
return -1
endi
if $data11 != 49.666666667 then
return -1
endi
if $data20 != @20-09-17 00:00:00.000@ then
return -1
endi
if $data21 != 49.000000000 then
return -1
endi
if $data30 != @20-09-18 00:00:00.000@ then
return -1
endi
if $data31 != 48.333333333 then
return -1
endi
sql select twa(c1) from (select * from nest_tb0); sql select twa(c1) from (select * from nest_tb0);
if $rows != 1 then
return -1
endi
if $data00 != 49.500000000 then
return -1
endi
sql select leastsquares(c1, 1, 1) from (select * from nest_tb0); sql select leastsquares(c1, 1, 1) from (select * from nest_tb0);
if $rows != 1 then
return -1
endi
if $data00 != @{slop:0.000100, intercept:49.000000}@ then
return -1
endi
sql select irate(c1) from (select * from nest_tb0); sql select irate(c1) from (select * from nest_tb0);
if $data00 != 0.016666667 then
return -1
endi
sql select derivative(c1, 1s, 0) from (select * from nest_tb0);
if $rows != 9999 then
return -1
endi
if $data00 != @20-09-15 00:01:00.000@ then
return -1
endi
if $data01 != 0.016666667 then
return -1
endi
if $data10 != @20-09-15 00:02:00.000@ then
return -1
endi
if $data11 != 0.016666667 then
return -1
endi
sql select diff(c1) from (select * from nest_tb0);
if $rows != 9999 then
return -1
endi
sql select avg(c1),sum(c2), max(c3), min(c4), count(*), first(c7), last(c7),spread(c6) from (select * from nest_tb0) interval(1d); sql select avg(c1),sum(c2), max(c3), min(c4), count(*), first(c7), last(c7),spread(c6) from (select * from nest_tb0) interval(1d);
if $rows != 7 then if $rows != 7 then

View File

@ -193,3 +193,7 @@ endi
if $data04 != 1 then if $data04 != 1 then
return -1 return -1
endi endi
print ===============>safty check TD-4927
sql select first(ts, c1) from sr_stb where ts<1 group by t1;
sql select first(ts, c1) from sr_stb where ts>0 and ts<1;

View File

@ -12,7 +12,7 @@ run general/parser/create_tb.sim
run general/parser/dbtbnameValidate.sim run general/parser/dbtbnameValidate.sim
run general/parser/fill.sim run general/parser/fill.sim
run general/parser/fill_stb.sim run general/parser/fill_stb.sim
#run general/parser/fill_us.sim # run general/parser/fill_us.sim
run general/parser/first_last.sim run general/parser/first_last.sim
run general/parser/import_commit1.sim run general/parser/import_commit1.sim
run general/parser/import_commit2.sim run general/parser/import_commit2.sim