commit
0062af553e
|
@ -110,8 +110,6 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
|
|||
//todo tags value as well as the table id structure needs refactor
|
||||
char *tsGetTagsValue(STableMeta *pMeta);
|
||||
|
||||
void extractTableNameFromToken(SSQLToken *pToken, SSQLToken* pTable);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "qast.h"
|
||||
#include "qextbuffer.h"
|
||||
#include "qfill.h"
|
||||
#include "qhistogram.h"
|
||||
|
@ -23,6 +22,7 @@
|
|||
#include "qtsbuf.h"
|
||||
#include "taosdef.h"
|
||||
#include "taosmsg.h"
|
||||
#include "qast.h"
|
||||
#include "tscLog.h"
|
||||
#include "tscSubquery.h"
|
||||
#include "tscompression.h"
|
||||
|
|
|
@ -18,19 +18,19 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "os.h"
|
||||
#include "qast.h"
|
||||
#include "taos.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tstoken.h"
|
||||
#include "tstrbuild.h"
|
||||
#include "ttime.h"
|
||||
#include "qast.h"
|
||||
#include "tcompare.h"
|
||||
#include "tname.h"
|
||||
#include "tscLog.h"
|
||||
#include "tscUtil.h"
|
||||
#include "tschemautil.h"
|
||||
#include "tsclient.h"
|
||||
#include "tstoken.h"
|
||||
#include "tstrbuild.h"
|
||||
#include "ttime.h"
|
||||
#include "ttokendef.h"
|
||||
#include "tname.h"
|
||||
#include "tcompare.h"
|
||||
|
||||
#define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0"
|
||||
|
||||
|
|
|
@ -215,25 +215,3 @@ __attribute__ ((unused)) static FORCE_INLINE size_t copy(char* dst, const char*
|
|||
return len;
|
||||
}
|
||||
|
||||
/*
|
||||
* tablePrefix.columnName
|
||||
* extract table name and save it in pTable, with only column name in pToken
|
||||
*/
|
||||
void extractTableNameFromToken(SSQLToken* pToken, SSQLToken* pTable) {
|
||||
const char sep = TS_PATH_DELIMITER[0];
|
||||
|
||||
if (pToken == pTable || pToken == NULL || pTable == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
char* r = strnchr(pToken->z, sep, pToken->n, false);
|
||||
|
||||
if (r != NULL) { // record the table name token
|
||||
pTable->n = r - pToken->z;
|
||||
pTable->z = pToken->z;
|
||||
|
||||
r += 1;
|
||||
pToken->n -= (r - pToken->z);
|
||||
pToken->z = r;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,12 +14,12 @@
|
|||
*/
|
||||
#include "os.h"
|
||||
|
||||
#include "tscSubquery.h"
|
||||
#include "qtsbuf.h"
|
||||
#include "qast.h"
|
||||
#include "tcompare.h"
|
||||
#include "tschemautil.h"
|
||||
#include "qtsbuf.h"
|
||||
#include "tscLog.h"
|
||||
#include "tscSubquery.h"
|
||||
#include "tschemautil.h"
|
||||
#include "tsclient.h"
|
||||
|
||||
typedef struct SInsertSupporter {
|
||||
|
@ -57,10 +57,15 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
|
|||
pSubQueryInfo1->tsBuf = output1;
|
||||
pSubQueryInfo2->tsBuf = output2;
|
||||
|
||||
// no result generated, return directly
|
||||
if (pSupporter1->pTSBuf == NULL || pSupporter2->pTSBuf == NULL) {
|
||||
tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql);
|
||||
return 0;
|
||||
}
|
||||
|
||||
tsBufResetPos(pSupporter1->pTSBuf);
|
||||
tsBufResetPos(pSupporter2->pTSBuf);
|
||||
|
||||
// TODO add more details information
|
||||
if (!tsBufNextPos(pSupporter1->pTSBuf)) {
|
||||
tsBufFlush(output1);
|
||||
tsBufFlush(output2);
|
||||
|
@ -210,6 +215,7 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
|
|||
pSupporter->f = NULL;
|
||||
}
|
||||
|
||||
tfree(pSupporter->pIdTagList);
|
||||
tscTagCondRelease(&pSupporter->tagCond);
|
||||
free(pSupporter);
|
||||
}
|
||||
|
@ -420,43 +426,6 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
|
|||
pQueryInfo->window = *win;
|
||||
}
|
||||
|
||||
static UNUSED_FUNC void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* pSql) {
|
||||
SSqlObj* pParentSql = pSupporter->pObj;
|
||||
SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
|
||||
|
||||
// if (tscNonOrderedProjectionQueryOnSTable(pParentQueryInfo, 0)) {
|
||||
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
// assert(pQueryInfo->numOfTables == 1);
|
||||
//
|
||||
// // for projection query, need to try next vnode
|
||||
//// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
|
||||
// int32_t totalVnode = 0;
|
||||
// if ((++pTableMetaInfo->vgroupIndex) < totalVnode) {
|
||||
// tscDebug("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
|
||||
// pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotal);
|
||||
//
|
||||
// pSql->cmd.command = TSDB_SQL_SELECT;
|
||||
// pSql->fp = tscJoinQueryCallback;
|
||||
// tscProcessSql(pSql);
|
||||
//
|
||||
// return;
|
||||
// }
|
||||
// }
|
||||
|
||||
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
|
||||
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
|
||||
|
||||
STimeWindow win = TSWINDOW_INITIALIZER;
|
||||
int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &win);
|
||||
if (num <= 0) { // no result during ts intersect
|
||||
tscDebug("%p free all sub SqlObj and quit", pParentSql);
|
||||
freeJoinSubqueryObj(pParentSql);
|
||||
} else {
|
||||
updateQueryTimeRange(pParentQueryInfo, &win);
|
||||
tscLaunchRealSubqueries(pParentSql);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tscCompareTidTags(const void* p1, const void* p2) {
|
||||
const STidTags* t1 = (const STidTags*) varDataVal(p1);
|
||||
const STidTags* t2 = (const STidTags*) varDataVal(p2);
|
||||
|
@ -713,9 +682,12 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
|
|||
SArray *s1 = NULL, *s2 = NULL;
|
||||
getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2);
|
||||
if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return.
|
||||
tscDebug("%p free all sub SqlObj and quit", pParentSql);
|
||||
tscDebug("%p tag intersect does not generated qualified tables for join, free all sub SqlObj and quit", pParentSql);
|
||||
freeJoinSubqueryObj(pParentSql);
|
||||
|
||||
// set no result command
|
||||
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
||||
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
|
||||
} else {
|
||||
// proceed to for ts_comp query
|
||||
SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd;
|
||||
|
@ -847,6 +819,9 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
|
|||
tscDebug("%p no results generated in ts intersection, free all sub SqlObj and quit", pParentSql);
|
||||
freeJoinSubqueryObj(pParentSql);
|
||||
|
||||
// set no result command
|
||||
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
||||
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -14,21 +14,21 @@
|
|||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "qast.h"
|
||||
#include "hash.h"
|
||||
#include "tscUtil.h"
|
||||
#include "taosmsg.h"
|
||||
#include "qast.h"
|
||||
#include "tcache.h"
|
||||
#include "tkey.h"
|
||||
#include "tmd5.h"
|
||||
#include "tscProfile.h"
|
||||
#include "tscLocalMerge.h"
|
||||
#include "tscLog.h"
|
||||
#include "tscProfile.h"
|
||||
#include "tscSubquery.h"
|
||||
#include "tschemautil.h"
|
||||
#include "tsclient.h"
|
||||
#include "ttimer.h"
|
||||
#include "ttokendef.h"
|
||||
#include "tscLog.h"
|
||||
#include "tscUtil.h"
|
||||
#include "hash.h"
|
||||
|
||||
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
|
||||
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
#include "os.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tstoken.h"
|
||||
|
||||
typedef struct SDataStatis {
|
||||
int16_t colId;
|
||||
|
@ -23,6 +24,8 @@ void extractTableName(const char *tableId, char *name);
|
|||
|
||||
char* extractDBName(const char *tableId, char *name);
|
||||
|
||||
void extractTableNameFromToken(SSQLToken *pToken, SSQLToken* pTable);
|
||||
|
||||
SSchema tGetTableNameColumnSchema();
|
||||
|
||||
bool tscValidateTableNameLength(size_t len);
|
||||
|
|
|
@ -81,7 +81,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in
|
|||
return startTime;
|
||||
}
|
||||
|
||||
int64_t start = ((startTime - intervalTime) / slidingTime + 1) * slidingTime;
|
||||
int64_t start = ((startTime - slidingTime) / slidingTime + 1) * slidingTime;
|
||||
if (!(timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h')) {
|
||||
/*
|
||||
* here we revised the start time of day according to the local time zone,
|
||||
|
@ -105,3 +105,26 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in
|
|||
}
|
||||
return start;
|
||||
}
|
||||
|
||||
/*
|
||||
* tablePrefix.columnName
|
||||
* extract table name and save it in pTable, with only column name in pToken
|
||||
*/
|
||||
void extractTableNameFromToken(SSQLToken* pToken, SSQLToken* pTable) {
|
||||
const char sep = TS_PATH_DELIMITER[0];
|
||||
|
||||
if (pToken == pTable || pToken == NULL || pTable == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
char* r = strnchr(pToken->z, sep, pToken->n, false);
|
||||
|
||||
if (r != NULL) { // record the table name token
|
||||
pTable->n = r - pToken->z;
|
||||
pTable->z = pToken->z;
|
||||
|
||||
r += 1;
|
||||
pToken->n -= (r - pToken->z);
|
||||
pToken->z = r;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -235,9 +235,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT *pQueryHandle);
|
|||
* Get current data block information
|
||||
*
|
||||
* @param pQueryHandle
|
||||
* @param pBlockInfo
|
||||
* @return
|
||||
*/
|
||||
SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle);
|
||||
void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo* pBlockInfo);
|
||||
|
||||
/**
|
||||
*
|
||||
|
|
|
@ -16,16 +16,16 @@
|
|||
#ifndef TDENGINE_TAST_H
|
||||
#define TDENGINE_TAST_H
|
||||
|
||||
#include <tbuffer.h>
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <tskiplist.h>
|
||||
#include "os.h"
|
||||
|
||||
#include "taosmsg.h"
|
||||
#include "taosdef.h"
|
||||
#include "tskiplist.h"
|
||||
#include "tbuffer.h"
|
||||
#include "tvariant.h"
|
||||
|
||||
struct tExprNode;
|
||||
|
@ -75,10 +75,6 @@ typedef struct tExprNode {
|
|||
};
|
||||
} tExprNode;
|
||||
|
||||
void tSQLBinaryExprFromString(tExprNode **pExpr, SSchema *pSchema, int32_t numOfCols, char *src, int32_t len);
|
||||
|
||||
void tSQLBinaryExprToString(tExprNode *pExpr, char *dst, int32_t *len);
|
||||
|
||||
void tExprTreeDestroy(tExprNode **pExprs, void (*fp)(void*));
|
||||
|
||||
void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param);
|
||||
|
@ -86,9 +82,6 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
|
|||
void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
|
||||
char *(*cb)(void *, const char*, int32_t));
|
||||
|
||||
// todo refactor: remove it
|
||||
void tSQLBinaryExprTrv(tExprNode *pExprs, SArray* res);
|
||||
|
||||
uint8_t getBinaryExprOptr(SSQLToken *pToken);
|
||||
|
||||
void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *));
|
||||
|
|
|
@ -18,18 +18,18 @@
|
|||
#include "qfill.h"
|
||||
#include "taosmsg.h"
|
||||
|
||||
#include "exception.h"
|
||||
#include "hash.h"
|
||||
#include "qExecutor.h"
|
||||
#include "qUtil.h"
|
||||
#include "qast.h"
|
||||
#include "qresultBuf.h"
|
||||
#include "query.h"
|
||||
#include "queryLog.h"
|
||||
#include "qast.h"
|
||||
#include "tfile.h"
|
||||
#include "tlosertree.h"
|
||||
#include "exception.h"
|
||||
#include "tscompression.h"
|
||||
#include "ttime.h"
|
||||
#include "tfile.h"
|
||||
|
||||
/**
|
||||
* check if the primary column is load by default, otherwise, the program will
|
||||
|
@ -49,6 +49,8 @@
|
|||
#define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index) * (step))
|
||||
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
||||
|
||||
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
|
||||
|
||||
/* get the qinfo struct address from the query struct address */
|
||||
#define GET_COLUMN_BYTES(query, colidx) \
|
||||
((query)->colList[(query)->pSelectExpr[colidx].base.colInfo.colIndex].bytes)
|
||||
|
@ -120,7 +122,6 @@ static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
|
|||
static void setQueryStatus(SQuery *pQuery, int8_t status);
|
||||
|
||||
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0)
|
||||
static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
|
||||
|
||||
// todo move to utility
|
||||
static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *group);
|
||||
|
@ -397,13 +398,14 @@ static bool hasNullValue(SQuery *pQuery, int32_t col, int32_t numOfCols, SDataSt
|
|||
}
|
||||
|
||||
static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, char *pData,
|
||||
int16_t bytes) {
|
||||
int16_t bytes, bool masterscan) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
int32_t *p1 = (int32_t *) taosHashGet(pWindowResInfo->hashList, pData, bytes);
|
||||
if (p1 != NULL) {
|
||||
pWindowResInfo->curIndex = *p1;
|
||||
} else { // more than the capacity, reallocate the resources
|
||||
} else {
|
||||
if (masterscan) { // more than the capacity, reallocate the resources
|
||||
if (pWindowResInfo->size >= pWindowResInfo->capacity) {
|
||||
int64_t newCap = pWindowResInfo->capacity * 2;
|
||||
|
||||
|
@ -425,6 +427,9 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
|
|||
// add a new result set for a new group
|
||||
pWindowResInfo->curIndex = pWindowResInfo->size++;
|
||||
taosHashPut(pWindowResInfo->hashList, pData, bytes, (char *)&pWindowResInfo->curIndex, sizeof(int32_t));
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return getWindowResult(pWindowResInfo, pWindowResInfo->curIndex);
|
||||
|
@ -511,15 +516,19 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult
|
|||
}
|
||||
|
||||
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, int32_t sid,
|
||||
STimeWindow *win) {
|
||||
STimeWindow *win, bool masterscan, bool* newWind) {
|
||||
assert(win->skey <= win->ekey);
|
||||
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
|
||||
|
||||
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE);
|
||||
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey,
|
||||
TSDB_KEYSIZE, masterscan);
|
||||
if (pWindowRes == NULL) {
|
||||
return -1;
|
||||
*newWind = false;
|
||||
|
||||
return masterscan? -1:0;
|
||||
}
|
||||
|
||||
*newWind = true;
|
||||
// not assign result buffer yet, add new result buffer
|
||||
if (pWindowRes->pos.pageId == -1) {
|
||||
int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, sid, pRuntimeEnv->numOfRowsPerPage);
|
||||
|
@ -563,7 +572,7 @@ static int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t sea
|
|||
*/
|
||||
static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SWindowResInfo *pWindowResInfo) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
if (pRuntimeEnv->scanFlag != MASTER_SCAN || (!isIntervalQuery(pQuery))) {
|
||||
if (pRuntimeEnv->scanFlag != MASTER_SCAN || (!QUERY_IS_INTERVAL_QUERY(pQuery))) {
|
||||
return pWindowResInfo->size;
|
||||
}
|
||||
|
||||
|
@ -686,6 +695,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat
|
|||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
||||
|
||||
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
|
||||
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
||||
int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
|
||||
|
||||
|
@ -707,12 +717,14 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus *pStatus, STimeWindow *pWin,
|
||||
int32_t offset) {
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
||||
|
||||
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
|
||||
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
||||
pCtx[k].nStartQueryTimestamp = pWin->skey;
|
||||
|
||||
|
@ -722,6 +734,7 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pNextWin,
|
||||
SDataBlockInfo *pDataBlockInfo, TSKEY *primaryKeys,
|
||||
|
@ -880,6 +893,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
|||
SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo,
|
||||
__block_search_fn_t searchFn, SArray *pDataBlock) {
|
||||
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
||||
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
|
||||
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
TSKEY *tsCols = NULL;
|
||||
|
@ -903,18 +917,21 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
|||
int32_t offset = GET_COL_DATA_POS(pQuery, 0, step);
|
||||
TSKEY ts = tsCols[offset];
|
||||
|
||||
bool hasTimeWindow = false;
|
||||
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
|
||||
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win) != TSDB_CODE_SUCCESS) {
|
||||
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) {
|
||||
tfree(sasArray);
|
||||
return;
|
||||
}
|
||||
|
||||
if (hasTimeWindow) {
|
||||
TSKEY ekey = reviseWindowEkey(pQuery, &win);
|
||||
int32_t forwardStep =
|
||||
getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true);
|
||||
|
||||
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
|
||||
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep, tsCols, pDataBlockInfo->rows);
|
||||
}
|
||||
|
||||
int32_t index = pWindowResInfo->curIndex;
|
||||
STimeWindow nextWin = win;
|
||||
|
@ -926,14 +943,19 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
|||
}
|
||||
|
||||
// null data, failed to allocate more memory buffer
|
||||
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin) != TSDB_CODE_SUCCESS) {
|
||||
bool hasTimeWindow = false;
|
||||
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
|
||||
ekey = reviseWindowEkey(pQuery, &nextWin);
|
||||
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true);
|
||||
if (!hasTimeWindow) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
|
||||
TSKEY ekey = reviseWindowEkey(pQuery, &nextWin);
|
||||
int32_t forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true);
|
||||
|
||||
SWindowStatus* pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
|
||||
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows);
|
||||
}
|
||||
|
||||
|
@ -983,7 +1005,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
|
|||
}
|
||||
|
||||
// assert(pRuntimeEnv->windowResInfo.hashList->size <= 2);
|
||||
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes);
|
||||
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes, true);
|
||||
if (pWindowRes == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -1113,6 +1135,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
|
|||
static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
|
||||
SWindowResInfo *pWindowResInfo, SArray *pDataBlock) {
|
||||
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
||||
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
|
||||
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
STableQueryInfo* item = pQuery->current;
|
||||
|
@ -1179,16 +1202,21 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|||
}
|
||||
|
||||
// interval window query
|
||||
if (isIntervalQuery(pQuery)) {
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
// decide the time window according to the primary timestamp
|
||||
int64_t ts = tsCols[offset];
|
||||
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
|
||||
|
||||
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win);
|
||||
bool hasTimeWindow = false;
|
||||
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win, masterScan, &hasTimeWindow);
|
||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!hasTimeWindow) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
|
||||
doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &win, offset);
|
||||
|
||||
|
@ -1208,13 +1236,16 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|||
}
|
||||
|
||||
// null data, failed to allocate more memory buffer
|
||||
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin) != TSDB_CODE_SUCCESS) {
|
||||
bool hasTimeWindow = false;
|
||||
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (hasTimeWindow) {
|
||||
pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
|
||||
doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, offset);
|
||||
}
|
||||
}
|
||||
|
||||
pWindowResInfo->curIndex = index;
|
||||
} else { // other queries
|
||||
|
@ -1283,7 +1314,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
|
|||
|
||||
// interval query with limit applied
|
||||
int32_t numOfRes = 0;
|
||||
if (isIntervalQuery(pQuery)) {
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
|
||||
} else {
|
||||
numOfRes = getNumOfResult(pRuntimeEnv);
|
||||
|
@ -1869,7 +1900,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
|
|||
|
||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
|
||||
num = 128;
|
||||
} else if (isIntervalQuery(pQuery)) { // time window query, allocate one page for each table
|
||||
} else if (QUERY_IS_INTERVAL_QUERY(pQuery)) { // time window query, allocate one page for each table
|
||||
size_t s = pQInfo->tableqinfoGroupInfo.numOfTables;
|
||||
num = MAX(s, INITIAL_RESULT_ROWS_VALUE);
|
||||
} else { // for super table query, one page for each subset
|
||||
|
@ -2019,7 +2050,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
|
|||
r |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pQuery->window.skey, pQuery->window.ekey, colId);
|
||||
}
|
||||
|
||||
if (pRuntimeEnv->pTSBuf > 0 || isIntervalQuery(pQuery)) {
|
||||
if (pRuntimeEnv->pTSBuf > 0 || QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
r |= BLK_DATA_ALL_NEEDED;
|
||||
}
|
||||
}
|
||||
|
@ -2173,6 +2204,7 @@ static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pB
|
|||
if (tmp == NULL) { // todo handle the oom
|
||||
assert(0);
|
||||
} else {
|
||||
memset(tmp + sizeof(tFilePage) + bytes * pRec->rows, 0, (newSize - pRec->rows) * bytes);
|
||||
pQuery->sdata[i] = (tFilePage *)tmp;
|
||||
}
|
||||
|
||||
|
@ -2203,13 +2235,15 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
pQuery->order.order);
|
||||
|
||||
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
|
||||
|
||||
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
||||
summary->totalBlocks += 1;
|
||||
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle);
|
||||
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
|
||||
|
||||
// todo extract methods
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL) {
|
||||
|
@ -2914,7 +2948,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
|
|||
|
||||
// group by normal columns and interval query on normal table
|
||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||
if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
|
||||
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
disableFuncInReverseScanImpl(pQInfo, pWindowResInfo, order);
|
||||
} else { // for simple result of table query,
|
||||
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo refactor
|
||||
|
@ -3089,7 +3123,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
bool toContinue = false;
|
||||
if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
|
||||
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
// for each group result, call the finalize function for each column
|
||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||
|
||||
|
@ -3281,7 +3315,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
|
|||
void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
|
||||
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
// for each group result, call the finalize function for each column
|
||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||
if (pRuntimeEnv->groupbyNormalCol) {
|
||||
|
@ -3329,7 +3363,7 @@ static bool hasMainOutput(SQuery *pQuery) {
|
|||
static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void* pTable, STimeWindow win, void* buf) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
STableQueryInfo *pTableQueryInfo = buf;//calloc(1, sizeof(STableQueryInfo));
|
||||
STableQueryInfo *pTableQueryInfo = buf;
|
||||
|
||||
pTableQueryInfo->win = win;
|
||||
pTableQueryInfo->lastKey = win.skey;
|
||||
|
@ -3337,16 +3371,14 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void
|
|||
pTableQueryInfo->pTable = pTable;
|
||||
pTableQueryInfo->cur.vgroupIndex = -1;
|
||||
|
||||
int32_t initialSize = 1;
|
||||
int32_t initialThreshold = 1;
|
||||
|
||||
// set more initial size of interval/groupby query
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
|
||||
initialSize = 20;
|
||||
initialThreshold = 100;
|
||||
int32_t initialSize = 20;
|
||||
int32_t initialThreshold = 100;
|
||||
initWindowResInfo(&pTableQueryInfo->windowResInfo, pRuntimeEnv, initialSize, initialThreshold, TSDB_DATA_TYPE_INT);
|
||||
} else { // in other aggregate query, do not initialize the windowResInfo
|
||||
}
|
||||
|
||||
initWindowResInfo(&pTableQueryInfo->windowResInfo, pRuntimeEnv, initialSize, initialThreshold, TSDB_DATA_TYPE_INT);
|
||||
return pTableQueryInfo;
|
||||
}
|
||||
|
||||
|
@ -3388,7 +3420,8 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
|
|||
}
|
||||
|
||||
int32_t GROUPRESULTID = 1;
|
||||
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex, sizeof(groupIndex));
|
||||
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex,
|
||||
sizeof(groupIndex), true);
|
||||
if (pWindowRes == NULL) {
|
||||
return;
|
||||
}
|
||||
|
@ -3570,7 +3603,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) {
|
|||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
|
||||
int32_t totalSubset = 0;
|
||||
if (pQInfo->runtimeEnv.groupbyNormalCol || (isIntervalQuery(pQuery))) {
|
||||
if (pQInfo->runtimeEnv.groupbyNormalCol || (QUERY_IS_INTERVAL_QUERY(pQuery))) {
|
||||
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
|
||||
} else {
|
||||
totalSubset = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||
|
@ -3735,7 +3768,7 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
|
|||
} else {
|
||||
// there are results waiting for returned to client.
|
||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) &&
|
||||
(pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) &&
|
||||
(pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) &&
|
||||
(pRuntimeEnv->windowResInfo.size > 0)) {
|
||||
return true;
|
||||
}
|
||||
|
@ -3918,12 +3951,13 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
STableQueryInfo* pTableQueryInfo = pQuery->current;
|
||||
TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle;
|
||||
|
||||
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
||||
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||
return;
|
||||
}
|
||||
|
||||
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle);
|
||||
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
|
||||
|
||||
if (pQuery->limit.offset > blockInfo.rows) {
|
||||
pQuery->limit.offset -= blockInfo.rows;
|
||||
|
@ -3960,8 +3994,9 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
|
|||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||
STableQueryInfo *pTableQueryInfo = pQuery->current;
|
||||
|
||||
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
||||
while (tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) {
|
||||
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle);
|
||||
tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle, &blockInfo);
|
||||
|
||||
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
|
||||
|
@ -4067,7 +4102,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (isSTableQuery && (!isIntervalQuery(pQuery)) && (!isFixedOutputQuery(pQuery))) {
|
||||
if (isSTableQuery && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isFixedOutputQuery(pQuery))) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -4081,7 +4116,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
|
|||
if (!isSTableQuery
|
||||
&& (pQInfo->tableqinfoGroupInfo.numOfTables == 1)
|
||||
&& (cond.order == TSDB_ORDER_ASC)
|
||||
&& (!isIntervalQuery(pQuery))
|
||||
&& (!QUERY_IS_INTERVAL_QUERY(pQuery))
|
||||
&& (!isGroupbyNormalCol(pQuery->pGroupbyExpr))
|
||||
&& (!isFixedOutputQuery(pQuery))
|
||||
) {
|
||||
|
@ -4174,7 +4209,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
|||
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type);
|
||||
}
|
||||
|
||||
} else if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
|
||||
} else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
int32_t rows = getInitialPageNum(pQInfo);
|
||||
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize, pQInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -4225,13 +4260,15 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
|||
int64_t st = taosGetTimestampMs();
|
||||
|
||||
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
|
||||
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
||||
|
||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
||||
summary->totalBlocks += 1;
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
break;
|
||||
}
|
||||
|
||||
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle);
|
||||
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
|
||||
STableQueryInfo **pTableQueryInfo = (STableQueryInfo**) taosHashGet(pQInfo->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid));
|
||||
if(pTableQueryInfo == NULL) {
|
||||
break;
|
||||
|
@ -4244,7 +4281,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
|||
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
|
||||
|
||||
if (!pRuntimeEnv->groupbyNormalCol) {
|
||||
if (!isIntervalQuery(pQuery)) {
|
||||
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1;
|
||||
setExecutionContext(pQInfo, (*pTableQueryInfo)->groupIndex, blockInfo.window.ekey + step);
|
||||
} else { // interval query
|
||||
|
@ -4655,7 +4692,7 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
|
|||
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
|
||||
if (isIntervalQuery(pQuery)) {
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||
for (int32_t i = 0; i < numOfGroup; ++i) {
|
||||
SArray *group = GET_TABLEGROUP(pQInfo, i);
|
||||
|
@ -4681,7 +4718,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
|
|||
* if the groupIndex > 0, the query process must be completed yet, we only need to
|
||||
* copy the data into output buffer
|
||||
*/
|
||||
if (isIntervalQuery(pQuery)) {
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
copyResToQueryResultBuf(pQInfo, pQuery);
|
||||
#ifdef _DEBUG_VIEW
|
||||
displayInterResult(pQuery->sdata, pRuntimeEnv, pQuery->sdata[0]->num);
|
||||
|
@ -4890,7 +4927,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
|||
while (1) {
|
||||
tableIntervalProcessImpl(pRuntimeEnv, newStartKey);
|
||||
|
||||
if (isIntervalQuery(pQuery)) {
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
pQInfo->groupIndex = 0; // always start from 0
|
||||
pQuery->rec.rows = 0;
|
||||
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
||||
|
|
|
@ -190,8 +190,7 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_
|
|||
}
|
||||
|
||||
// get the result order
|
||||
int32_t resultOrder = (pWindowResInfo->pResult[0].window.skey < pWindowResInfo->pResult[1].window.skey)?
|
||||
TSDB_ORDER_ASC:TSDB_ORDER_DESC;
|
||||
int32_t resultOrder = (pWindowResInfo->pResult[0].window.skey < pWindowResInfo->pResult[1].window.skey)? 1:-1;
|
||||
|
||||
if (order != resultOrder) {
|
||||
return;
|
||||
|
|
|
@ -13,25 +13,26 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
|
||||
#include "os.h"
|
||||
#include "tulog.h"
|
||||
#include "tutil.h"
|
||||
#include "tbuffer.h"
|
||||
|
||||
#include "tname.h"
|
||||
#include "qast.h"
|
||||
#include "tcompare.h"
|
||||
#include "tsdb.h"
|
||||
#include "exception.h"
|
||||
#include "qsqlparser.h"
|
||||
#include "qsyntaxtreefunction.h"
|
||||
#include "taosdef.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tarray.h"
|
||||
#include "tbuffer.h"
|
||||
#include "tcompare.h"
|
||||
#include "tskiplist.h"
|
||||
#include "tsqlfunction.h"
|
||||
#include "tstoken.h"
|
||||
#include "ttokendef.h"
|
||||
#include "tschemautil.h"
|
||||
#include "tarray.h"
|
||||
#include "tskiplist.h"
|
||||
#include "queryLog.h"
|
||||
#include "tsdbMain.h"
|
||||
#include "exception.h"
|
||||
#include "tulog.h"
|
||||
#include "tutil.h"
|
||||
|
||||
/*
|
||||
*
|
||||
|
@ -327,104 +328,6 @@ static tExprNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, char *st
|
|||
}
|
||||
}
|
||||
|
||||
void tSQLBinaryExprFromString(tExprNode **pExpr, SSchema *pSchema, int32_t numOfCols, char *src, int32_t len) {
|
||||
*pExpr = NULL;
|
||||
|
||||
if (len <= 0 || src == NULL || pSchema == NULL || numOfCols <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t pos = 0;
|
||||
|
||||
*pExpr = createSyntaxTree(pSchema, numOfCols, src, &pos);
|
||||
if (*pExpr != NULL) {
|
||||
assert((*pExpr)->nodeType == TSQL_NODE_EXPR);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tSQLBinaryExprToStringImpl(tExprNode *pNode, char *dst, uint8_t type) {
|
||||
int32_t len = 0;
|
||||
if (type == TSQL_NODE_EXPR) {
|
||||
*dst = '(';
|
||||
tSQLBinaryExprToString(pNode, dst + 1, &len);
|
||||
len += 2;
|
||||
*(dst + len - 1) = ')';
|
||||
} else if (type == TSQL_NODE_COL) {
|
||||
len = sprintf(dst, "%s", pNode->pSchema->name);
|
||||
} else {
|
||||
len = tVariantToString(pNode->pVal, dst);
|
||||
}
|
||||
return len;
|
||||
}
|
||||
|
||||
// TODO REFACTOR WITH SQL PARSER
|
||||
static char *tSQLOptrToString(uint8_t optr, char *dst) {
|
||||
switch (optr) {
|
||||
case TSDB_RELATION_LESS: {
|
||||
*dst = '<';
|
||||
dst += 1;
|
||||
break;
|
||||
}
|
||||
case TSDB_RELATION_LESS_EQUAL: {
|
||||
*dst = '<';
|
||||
*(dst + 1) = '=';
|
||||
dst += 2;
|
||||
break;
|
||||
}
|
||||
case TSDB_RELATION_EQUAL: {
|
||||
*dst = '=';
|
||||
dst += 1;
|
||||
break;
|
||||
}
|
||||
case TSDB_RELATION_GREATER: {
|
||||
*dst = '>';
|
||||
dst += 1;
|
||||
break;
|
||||
}
|
||||
case TSDB_RELATION_GREATER_EQUAL: {
|
||||
*dst = '>';
|
||||
*(dst + 1) = '=';
|
||||
dst += 2;
|
||||
break;
|
||||
}
|
||||
case TSDB_RELATION_NOT_EQUAL: {
|
||||
*dst = '<';
|
||||
*(dst + 1) = '>';
|
||||
dst += 2;
|
||||
break;
|
||||
}
|
||||
case TSDB_RELATION_OR: {
|
||||
memcpy(dst, "or", 2);
|
||||
dst += 2;
|
||||
break;
|
||||
}
|
||||
case TSDB_RELATION_AND: {
|
||||
memcpy(dst, "and", 3);
|
||||
dst += 3;
|
||||
break;
|
||||
}
|
||||
default:;
|
||||
}
|
||||
return dst;
|
||||
}
|
||||
|
||||
void tSQLBinaryExprToString(tExprNode *pExpr, char *dst, int32_t *len) {
|
||||
if (pExpr == NULL) {
|
||||
*dst = 0;
|
||||
*len = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t lhs = tSQLBinaryExprToStringImpl(pExpr->_node.pLeft, dst, pExpr->_node.pLeft->nodeType);
|
||||
dst += lhs;
|
||||
*len = lhs;
|
||||
|
||||
char *start = tSQLOptrToString(pExpr->_node.optr, dst);
|
||||
*len += (start - dst);
|
||||
|
||||
*len += tSQLBinaryExprToStringImpl(pExpr->_node.pRight, start, pExpr->_node.pRight->nodeType);
|
||||
}
|
||||
|
||||
static void UNUSED_FUNC destroySyntaxTree(tExprNode *pNode) { tExprNodeDestroy(pNode, NULL); }
|
||||
|
||||
void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)) {
|
||||
|
@ -773,8 +676,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo,
|
|||
SSkipListNode *pNode = tSkipListIterGet(iter);
|
||||
char * pData = SL_GET_NODE_DATA(pNode);
|
||||
|
||||
// todo refactor:
|
||||
tstr *name = (*(STable **)pData)->name;
|
||||
tstr *name = (tstr*) tsdbGetTableName(*(void**) pData);
|
||||
// todo speed up by using hash
|
||||
if (pQueryInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
if (pQueryInfo->optr == TSDB_RELATION_IN) {
|
||||
|
@ -976,27 +878,27 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
|
|||
free(pRightOutput);
|
||||
}
|
||||
|
||||
void tSQLBinaryExprTrv(tExprNode *pExprs, SArray* res) {
|
||||
if (pExprs == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
tExprNode *pLeft = pExprs->_node.pLeft;
|
||||
tExprNode *pRight = pExprs->_node.pRight;
|
||||
|
||||
// recursive traverse left child branch
|
||||
if (pLeft->nodeType == TSQL_NODE_EXPR) {
|
||||
tSQLBinaryExprTrv(pLeft, res);
|
||||
} else if (pLeft->nodeType == TSQL_NODE_COL) {
|
||||
taosArrayPush(res, &pLeft->pSchema->colId);
|
||||
}
|
||||
|
||||
if (pRight->nodeType == TSQL_NODE_EXPR) {
|
||||
tSQLBinaryExprTrv(pRight, res);
|
||||
} else if (pRight->nodeType == TSQL_NODE_COL) {
|
||||
taosArrayPush(res, &pRight->pSchema->colId);
|
||||
}
|
||||
}
|
||||
//void tSQLBinaryExprTrv(tExprNode *pExprs, SArray* res) {
|
||||
// if (pExprs == NULL) {
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// tExprNode *pLeft = pExprs->_node.pLeft;
|
||||
// tExprNode *pRight = pExprs->_node.pRight;
|
||||
//
|
||||
// // recursive traverse left child branch
|
||||
// if (pLeft->nodeType == TSQL_NODE_EXPR) {
|
||||
// tSQLBinaryExprTrv(pLeft, res);
|
||||
// } else if (pLeft->nodeType == TSQL_NODE_COL) {
|
||||
// taosArrayPush(res, &pLeft->pSchema->colId);
|
||||
// }
|
||||
//
|
||||
// if (pRight->nodeType == TSQL_NODE_EXPR) {
|
||||
// tSQLBinaryExprTrv(pRight, res);
|
||||
// } else if (pRight->nodeType == TSQL_NODE_COL) {
|
||||
// taosArrayPush(res, &pRight->pSchema->colId);
|
||||
// }
|
||||
//}
|
||||
|
||||
static void exprTreeToBinaryImpl(SBufferWriter* bw, tExprNode* expr) {
|
||||
tbufWriteUint8(bw, expr->nodeType);
|
||||
|
|
|
@ -192,11 +192,6 @@ char *tsdbGetTableName(void* pTable) {
|
|||
}
|
||||
}
|
||||
|
||||
STableId tsdbGetTableId(void *pTable) {
|
||||
assert(pTable);
|
||||
return ((STable*)pTable)->tableId;
|
||||
}
|
||||
|
||||
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
|
||||
if (pMsg == NULL) return NULL;
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
#include "exception.h"
|
||||
|
||||
#include "../../../query/inc/qast.h" // todo move to common module
|
||||
#include "../../../query/inc/tlosertree.h" // todo move to util module
|
||||
#include "tlosertree.h"
|
||||
#include "tsdb.h"
|
||||
#include "tsdbMain.h"
|
||||
|
||||
|
@ -122,7 +122,7 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
|
|||
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
|
||||
SArray* sa);
|
||||
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
|
||||
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
|
||||
STsdbQueryHandle* pQueryHandle);
|
||||
|
||||
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
|
||||
|
@ -249,8 +249,6 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
|
|||
pCheckInfo->initBuf = true;
|
||||
int32_t order = pHandle->order;
|
||||
|
||||
// tsdbTakeMemSnapshot(pHandle->pTsdb, &pCheckInfo->mem, &pCheckInfo->imem);
|
||||
|
||||
// no data in buffer, abort
|
||||
if (pHandle->mem == NULL && pHandle->imem == NULL) {
|
||||
return false;
|
||||
|
@ -393,7 +391,10 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
|
|||
STable* pTable = pCheckInfo->pTableObj;
|
||||
assert(pTable != NULL);
|
||||
|
||||
if (!pCheckInfo->initBuf) {
|
||||
initTableMemIterator(pHandle, pCheckInfo);
|
||||
}
|
||||
|
||||
SDataRow row = getSDataRowInTableMem(pCheckInfo);
|
||||
if (row == NULL) {
|
||||
return false;
|
||||
|
@ -411,8 +412,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
|
|||
|
||||
int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1;
|
||||
STimeWindow* win = &pHandle->cur.win;
|
||||
pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey,
|
||||
pHandle->outputCapacity, &win->skey, &win->ekey, pHandle); // todo refactor API
|
||||
pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle);
|
||||
|
||||
// update the last key value
|
||||
pCheckInfo->lastKey = win->ekey + step;
|
||||
|
@ -576,6 +576,8 @@ static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS
|
|||
|
||||
static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
|
||||
STsdbRepo *pRepo = pQueryHandle->pTsdb;
|
||||
|
||||
// TODO refactor
|
||||
SCompData* data = calloc(1, sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols);
|
||||
|
||||
data->numOfCols = pBlock->numOfCols;
|
||||
|
@ -608,8 +610,9 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
|
|||
}
|
||||
|
||||
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
|
||||
assert(pCols->numOfRows != 0);
|
||||
assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
|
||||
|
||||
pBlock->numOfRows = pCols->numOfRows;
|
||||
taosArrayDestroy(sa);
|
||||
tfree(data);
|
||||
|
||||
|
@ -639,7 +642,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
|
|||
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
|
||||
|
||||
cur->rows = tsdbReadRowsFromCache(pCheckInfo, binfo.window.skey - step,
|
||||
pQueryHandle->outputCapacity, &cur->win.skey, &cur->win.ekey, pQueryHandle);
|
||||
pQueryHandle->outputCapacity, &cur->win, pQueryHandle);
|
||||
pQueryHandle->realNumOfRows = cur->rows;
|
||||
|
||||
// update the last key value
|
||||
|
@ -1240,7 +1243,6 @@ static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void*
|
|||
// assert(pLeftBlockInfoEx->compBlock->offset != pRightBlockInfoEx->compBlock->offset);
|
||||
if (pLeftBlockInfoEx->compBlock->offset == pRightBlockInfoEx->compBlock->offset &&
|
||||
pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) {
|
||||
// todo add more information
|
||||
tsdbError("error in header file, two block with same offset:%" PRId64, (int64_t)pLeftBlockInfoEx->compBlock->offset);
|
||||
}
|
||||
|
||||
|
@ -1481,6 +1483,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
|||
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||
assert(numOfTables > 0);
|
||||
|
||||
SDataBlockInfo blockInfo = {{0}, 0};
|
||||
if (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL) {
|
||||
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
|
||||
pQueryHandle->order = TSDB_ORDER_DESC;
|
||||
|
@ -1490,7 +1493,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
|||
}
|
||||
|
||||
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
|
||||
/*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle);
|
||||
/*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle, &blockInfo);
|
||||
/*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, sa);
|
||||
|
||||
if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) {
|
||||
|
@ -1561,7 +1564,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
|||
bool ret = tsdbNextDataBlock((void*) pSecQueryHandle);
|
||||
assert(ret);
|
||||
|
||||
/*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle);
|
||||
/*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo);
|
||||
/*SArray *pDataBlock = */tsdbRetrieveDataBlock((void*) pSecQueryHandle, sa);
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
|
@ -1696,11 +1699,11 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
|
|||
pQueryHandle->window = (STimeWindow) {info.lastKey, TSKEY_INITIAL_VAL};
|
||||
}
|
||||
|
||||
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
|
||||
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
|
||||
STsdbQueryHandle* pQueryHandle) {
|
||||
int numOfRows = 0;
|
||||
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
|
||||
*skey = TSKEY_INITIAL_VAL;
|
||||
win->skey = TSKEY_INITIAL_VAL;
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
|
||||
|
@ -1720,11 +1723,11 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
|||
break;
|
||||
}
|
||||
|
||||
if (*skey == INT64_MIN) {
|
||||
*skey = key;
|
||||
if (win->skey == INT64_MIN) {
|
||||
win->skey = key;
|
||||
}
|
||||
|
||||
*ekey = key;
|
||||
win->ekey = key;
|
||||
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, pMeta, numOfCols, pTable);
|
||||
|
||||
if (++numOfRows >= maxRowsToRead) {
|
||||
|
@ -1753,7 +1756,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
|||
return numOfRows;
|
||||
}
|
||||
|
||||
SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
|
||||
void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle, SDataBlockInfo* pDataBlockInfo) {
|
||||
STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;
|
||||
SQueryFilePos* cur = &pHandle->cur;
|
||||
STable* pTable = NULL;
|
||||
|
@ -1767,15 +1770,11 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
|
|||
pTable = pCheckInfo->pTableObj;
|
||||
}
|
||||
|
||||
SDataBlockInfo blockInfo = {
|
||||
.uid = pTable->tableId.uid,
|
||||
.tid = pTable->tableId.tid,
|
||||
.rows = cur->rows,
|
||||
.window = cur->win,
|
||||
.numOfCols = QH_GET_NUM_OF_COLS(pHandle),
|
||||
};
|
||||
|
||||
return blockInfo;
|
||||
pDataBlockInfo->uid = pTable->tableId.uid;
|
||||
pDataBlockInfo->tid = pTable->tableId.tid;
|
||||
pDataBlockInfo->rows = cur->rows;
|
||||
pDataBlockInfo->window = cur->win;
|
||||
pDataBlockInfo->numOfCols = QH_GET_NUM_OF_COLS(pHandle);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -1977,9 +1976,9 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
|
|||
int32_t type = 0;
|
||||
int32_t bytes = 0;
|
||||
|
||||
if (colIndex == TSDB_TBNAME_COLUMN_INDEX) { // todo refactor extract method , to queryExecutor to generate tags values
|
||||
f1 = (char*) pTable1->name;
|
||||
f2 = (char*) pTable2->name;
|
||||
if (colIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
f1 = (char*) TABLE_NAME(pTable1);
|
||||
f2 = (char*) TABLE_NAME(pTable2);
|
||||
type = TSDB_DATA_TYPE_BINARY;
|
||||
bytes = tGetTableNameColumnSchema().bytes;
|
||||
} else {
|
||||
|
@ -2088,13 +2087,17 @@ bool indexedNodeFilterFp(const void* pNode, void* param) {
|
|||
char* val = NULL;
|
||||
|
||||
if (pInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
val = (char*) pTable->name;
|
||||
val = (char*) TABLE_NAME(pTable);
|
||||
} else {
|
||||
val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
|
||||
}
|
||||
|
||||
//todo :the val is possible to be null, so check it out carefully
|
||||
int32_t ret = pInfo->compare(val, pInfo->q);
|
||||
int32_t ret = 0;
|
||||
if (val == NULL) { //the val is possible to be null, so check it out carefully
|
||||
ret = -1; // val is missing in table tags value pairs
|
||||
} else {
|
||||
ret = pInfo->compare(val, pInfo->q);
|
||||
}
|
||||
|
||||
switch (pInfo->optr) {
|
||||
case TSDB_RELATION_EQUAL: {
|
||||
|
|
|
@ -13,10 +13,10 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tlosertree.h"
|
||||
#include "os.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tlosertree.h"
|
||||
#include "queryLog.h"
|
||||
#include "tulog.h"
|
||||
|
||||
// set initial value for loser tree
|
||||
void tLoserTreeInit(SLoserTreeInfo* pTree) {
|
||||
|
@ -45,7 +45,7 @@ uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* pa
|
|||
|
||||
*pTree = (SLoserTreeInfo*)calloc(1, sizeof(SLoserTreeInfo) + sizeof(SLoserTreeNode) * totalEntries);
|
||||
if ((*pTree) == NULL) {
|
||||
qError("allocate memory for loser-tree failed. reason:%s", strerror(errno));
|
||||
uError("allocate memory for loser-tree failed. reason:%s", strerror(errno));
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
|
@ -94,10 +94,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
|||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
qinfo_t pQInfo = NULL;
|
||||
void** handle = NULL;
|
||||
|
||||
if (contLen != 0) {
|
||||
qinfo_t pQInfo = NULL;
|
||||
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, NULL, &pQInfo);
|
||||
|
||||
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
|
||||
|
@ -116,19 +116,19 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
|||
|
||||
qKillQuery(pQInfo);
|
||||
qKillQuery(pQInfo);
|
||||
pQInfo = NULL;
|
||||
} else {
|
||||
assert(*handle == pQInfo);
|
||||
pRsp->qhandle = htobe64((uint64_t) (handle));
|
||||
}
|
||||
|
||||
pQInfo = NULL;
|
||||
if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, pQInfo, pReadMsg->rpcMsg.handle);
|
||||
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
|
||||
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||
|
||||
// NOTE: there two refcount, needs to kill twice
|
||||
// query has not been put into qhandle pool, kill it directly.
|
||||
qKillQuery(pQInfo);
|
||||
qKillQuery(*handle);
|
||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
|
||||
return pRsp->code;
|
||||
}
|
||||
|
@ -139,16 +139,18 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
|||
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", vgId, pQInfo);
|
||||
} else {
|
||||
assert(pCont != NULL);
|
||||
pQInfo = *(void**)(pCont);
|
||||
handle = pCont;
|
||||
handle = qAcquireQInfo(pVnode->qMgmt, (void**) pCont);
|
||||
if (handle == NULL) {
|
||||
vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", *(void**) pCont, pReadMsg->rpcMsg.handle);
|
||||
code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
} else {
|
||||
vDebug("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, *(void**) pCont);
|
||||
code = TSDB_CODE_VND_ACTION_IN_PROGRESS;
|
||||
|
||||
vDebug("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, pQInfo);
|
||||
}
|
||||
}
|
||||
|
||||
if (pQInfo != NULL) {
|
||||
qTableQuery(pQInfo); // do execute query
|
||||
assert(handle != NULL);
|
||||
if (handle != NULL) {
|
||||
qTableQuery(*handle); // do execute query
|
||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
|
||||
}
|
||||
|
||||
|
@ -160,22 +162,33 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
|||
SRspRet *pRet = &pReadMsg->rspRet;
|
||||
|
||||
SRetrieveTableMsg *pRetrieve = pCont;
|
||||
void **pQInfo = (void*) htobe64(pRetrieve->qhandle);
|
||||
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
|
||||
pRetrieve->free = htons(pRetrieve->free);
|
||||
|
||||
vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, *pQInfo);
|
||||
vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, *(void**) pRetrieve->qhandle);
|
||||
|
||||
memset(pRet, 0, sizeof(SRspRet));
|
||||
int32_t ret = 0;
|
||||
|
||||
void** handle = qAcquireQInfo(pVnode->qMgmt, pQInfo);
|
||||
if (handle == NULL || handle != pQInfo) {
|
||||
ret = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
void** handle = qAcquireQInfo(pVnode->qMgmt, (void**) pRetrieve->qhandle);
|
||||
if (handle == NULL || handle != (void**) pRetrieve->qhandle) {
|
||||
code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, *(void**) pRetrieve->qhandle);
|
||||
|
||||
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||
pRet->len = sizeof(SRetrieveTableRsp);
|
||||
|
||||
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
|
||||
SRetrieveTableRsp* pRsp = pRet->rsp;
|
||||
pRsp->numOfRows = 0;
|
||||
pRsp->completed = true;
|
||||
pRsp->useconds = 0;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pRetrieve->free == 1) {
|
||||
if (ret == TSDB_CODE_SUCCESS) {
|
||||
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo);
|
||||
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
|
||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
|
||||
|
||||
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||
|
@ -186,31 +199,27 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
|||
pRsp->numOfRows = 0;
|
||||
pRsp->completed = true;
|
||||
pRsp->useconds = 0;
|
||||
} else { // todo handle error
|
||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
|
||||
}
|
||||
return ret;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t code = qRetrieveQueryResultInfo(*pQInfo);
|
||||
if (code != TSDB_CODE_SUCCESS || ret != TSDB_CODE_SUCCESS) {
|
||||
//TODO
|
||||
bool freeHandle = true;
|
||||
code = qRetrieveQueryResultInfo(*handle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
//TODO handle malloc failure
|
||||
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
|
||||
|
||||
} else {
|
||||
// todo check code and handle error in build result set
|
||||
code = qDumpRetrieveResult(*pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len);
|
||||
|
||||
} else { // if failed to dump result, free qhandle immediately
|
||||
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) {
|
||||
if (qHasMoreResultsToRetrieve(*handle)) {
|
||||
dnodePutItemIntoReadQueue(pVnode, handle);
|
||||
pRet->qhandle = handle;
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
} else { // no further execution invoked, release the ref to vnode
|
||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
|
||||
freeHandle = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -447,4 +447,15 @@ sql insert into um2 using m2 tags(9) values(1000001, 10)(2000000, 20);
|
|||
|
||||
sql_error select count(*) from m1,m2 where m1.a=m2.a and m1.ts=m2.ts;
|
||||
|
||||
#empty table join test, add for no result join test
|
||||
sql create database ux1;
|
||||
sql use ux1;
|
||||
sql create table m1(ts timestamp, k int) tags(a binary(12), b int);
|
||||
sql create table tm0 using m1 tags('abc', 1);
|
||||
sql create table m2(ts timestamp, k int) tags(a int, b binary(12));
|
||||
sql create table tm2 using m2 tags(2, 'abc');
|
||||
sql select count(*) from tm0, tm2 where tm0.ts=tm2.ts;
|
||||
sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a
|
||||
sql drop database ux1;
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue