|
|
|
@ -25,7 +25,6 @@
|
|
|
|
|
#include "taosmsg.h"
|
|
|
|
|
#include "tlosertree.h"
|
|
|
|
|
#include "tscompression.h"
|
|
|
|
|
#include "tsdbMain.h" //todo use TableId instead of STable object
|
|
|
|
|
#include "ttime.h"
|
|
|
|
|
#include "tscUtil.h" // todo move the function to common module
|
|
|
|
|
|
|
|
|
@ -35,8 +34,6 @@
|
|
|
|
|
* check if the primary column is load by default, otherwise, the program will
|
|
|
|
|
* forced to load primary column explicitly.
|
|
|
|
|
*/
|
|
|
|
|
#define PRIMARY_TSCOL_LOADED(query) ((query)->colList[0].data.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)
|
|
|
|
|
|
|
|
|
|
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0)
|
|
|
|
|
#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0)
|
|
|
|
|
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
|
|
|
|
@ -98,12 +95,18 @@ typedef struct {
|
|
|
|
|
STSCursor cur;
|
|
|
|
|
} SQueryStatusInfo;
|
|
|
|
|
|
|
|
|
|
typedef struct SGroupItem {
|
|
|
|
|
STableId id;
|
|
|
|
|
STableQueryInfo* info;
|
|
|
|
|
} SGroupItem;
|
|
|
|
|
|
|
|
|
|
static void setQueryStatus(SQuery *pQuery, int8_t status);
|
|
|
|
|
bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
|
|
|
|
|
static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
|
|
|
|
|
|
|
|
|
|
// todo move to utility
|
|
|
|
|
static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *group);
|
|
|
|
|
static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult);
|
|
|
|
|
|
|
|
|
|
static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult);
|
|
|
|
|
static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo);
|
|
|
|
|
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
|
|
|
|
|
static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow);
|
|
|
|
@ -114,9 +117,10 @@ static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
|
|
|
|
|
static void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols);
|
|
|
|
|
static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
|
|
|
|
|
static bool hasMainOutput(SQuery *pQuery);
|
|
|
|
|
static void createTableDataInfo(SQInfo *pQInfo);
|
|
|
|
|
static void createTableQueryInfo(SQInfo *pQInfo);
|
|
|
|
|
static void buildTagQueryResult(SQInfo *pQInfo);
|
|
|
|
|
|
|
|
|
|
static int32_t setAdditionalInfo(SQInfo *pQInfo, STable *pTable, STableQueryInfo *pTableQueryInfo);
|
|
|
|
|
static int32_t setAdditionalInfo(SQInfo *pQInfo, STableId *pTaleId, STableQueryInfo *pTableQueryInfo);
|
|
|
|
|
static int32_t flushFromResultBuf(SQInfo *pQInfo);
|
|
|
|
|
|
|
|
|
|
bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter *pPointInterpSupporter) {
|
|
|
|
@ -926,7 +930,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
|
|
|
|
TSKEY ts = primaryKeyCol[offset];
|
|
|
|
|
|
|
|
|
|
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
|
|
|
|
|
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->sid, &win) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -947,7 +951,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// null data, failed to allocate more memory buffer
|
|
|
|
|
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->sid, &nextWin) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1178,7 +1182,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|
|
|
|
int64_t ts = primaryKeyCol[offset];
|
|
|
|
|
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
|
|
|
|
|
|
|
|
|
|
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->sid, &win);
|
|
|
|
|
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win);
|
|
|
|
|
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
@ -1206,7 +1210,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// null data, failed to allocate more memory buffer
|
|
|
|
|
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->sid, &nextWin) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1553,7 +1557,7 @@ static bool isQueryKilled(SQInfo *pQInfo) {
|
|
|
|
|
|
|
|
|
|
static void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_QUERY_CANCELLED; }
|
|
|
|
|
|
|
|
|
|
bool isFixedOutputQuery(SQuery *pQuery) {
|
|
|
|
|
static bool isFixedOutputQuery(SQuery *pQuery) {
|
|
|
|
|
if (pQuery->intervalTime != 0) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
@ -1584,7 +1588,7 @@ bool isFixedOutputQuery(SQuery *pQuery) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool isPointInterpoQuery(SQuery *pQuery) {
|
|
|
|
|
static bool isPointInterpoQuery(SQuery *pQuery) {
|
|
|
|
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
|
|
|
|
int32_t functionID = pQuery->pSelectExpr[i].base.functionId;
|
|
|
|
|
if (functionID == TSDB_FUNC_INTERP || functionID == TSDB_FUNC_LAST_ROW) {
|
|
|
|
@ -1596,7 +1600,7 @@ bool isPointInterpoQuery(SQuery *pQuery) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO REFACTOR:MERGE WITH CLIENT-SIDE FUNCTION
|
|
|
|
|
bool isSumAvgRateQuery(SQuery *pQuery) {
|
|
|
|
|
static bool isSumAvgRateQuery(SQuery *pQuery) {
|
|
|
|
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
|
|
|
|
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
|
|
|
|
|
if (functionId == TSDB_FUNC_TS) {
|
|
|
|
@ -1612,7 +1616,7 @@ bool isSumAvgRateQuery(SQuery *pQuery) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool isFirstLastRowQuery(SQuery *pQuery) {
|
|
|
|
|
static bool isFirstLastRowQuery(SQuery *pQuery) {
|
|
|
|
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
|
|
|
|
int32_t functionID = pQuery->pSelectExpr[i].base.functionId;
|
|
|
|
|
if (functionID == TSDB_FUNC_LAST_ROW) {
|
|
|
|
@ -1623,7 +1627,7 @@ bool isFirstLastRowQuery(SQuery *pQuery) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool notHasQueryTimeRange(SQuery *pQuery) {
|
|
|
|
|
static bool notHasQueryTimeRange(SQuery *pQuery) {
|
|
|
|
|
return (pQuery->window.skey == 0 && pQuery->window.ekey == INT64_MAX && QUERY_IS_ASC_QUERY(pQuery)) ||
|
|
|
|
|
(pQuery->window.skey == INT64_MAX && pQuery->window.ekey == 0 && (!QUERY_IS_ASC_QUERY(pQuery)));
|
|
|
|
|
}
|
|
|
|
@ -1643,6 +1647,19 @@ static bool needReverseScan(SQuery *pQuery) {
|
|
|
|
|
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool onlyQueryTags(SQuery* pQuery) {
|
|
|
|
|
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
|
|
|
|
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
|
|
|
|
|
|
|
|
|
|
if (functionId != TSDB_FUNC_TAG) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
|
|
|
|
void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, int64_t *realSkey,
|
|
|
|
@ -2238,22 +2255,6 @@ UNUSED_FUNC void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
UNUSED_FUNC void setTimestampRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t stime, int64_t etime) {
|
|
|
|
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
|
|
|
|
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
|
|
|
|
|
|
|
|
|
|
if (functionId == TSDB_FUNC_SPREAD) {
|
|
|
|
|
pRuntimeEnv->pCtx[i].param[1].dKey = stime;
|
|
|
|
|
pRuntimeEnv->pCtx[i].param[2].dKey = etime;
|
|
|
|
|
|
|
|
|
|
pRuntimeEnv->pCtx[i].param[1].nType = TSDB_DATA_TYPE_DOUBLE;
|
|
|
|
|
pRuntimeEnv->pCtx[i].param[2].nType = TSDB_DATA_TYPE_DOUBLE;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx,
|
|
|
|
|
int32_t numOfTotalPoints) {
|
|
|
|
|
if (pDataStatis == NULL) {
|
|
|
|
@ -2539,7 +2540,7 @@ static void updatelastkey(SQuery *pQuery, STableQueryInfo *pTableQInfo) { pTable
|
|
|
|
|
* set tag value in SQLFunctionCtx
|
|
|
|
|
* e.g.,tag information into input buffer
|
|
|
|
|
*/
|
|
|
|
|
static void doSetTagValueInParam(void *tsdb, STableId id, int32_t tagColId, tVariant *param) {
|
|
|
|
|
static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColId, tVariant *param) {
|
|
|
|
|
tVariantDestroy(param);
|
|
|
|
|
|
|
|
|
|
char * val = NULL;
|
|
|
|
@ -2547,11 +2548,11 @@ static void doSetTagValueInParam(void *tsdb, STableId id, int32_t tagColId, tVar
|
|
|
|
|
int16_t type = 0;
|
|
|
|
|
|
|
|
|
|
if (tagColId == TSDB_TBNAME_COLUMN_INDEX) {
|
|
|
|
|
tsdbTableGetName(tsdb, id, &val);
|
|
|
|
|
tsdbTableGetName(tsdb, pTableId, &val);
|
|
|
|
|
bytes = TSDB_TABLE_NAME_LEN;
|
|
|
|
|
type = TSDB_DATA_TYPE_BINARY;
|
|
|
|
|
} else {
|
|
|
|
|
tsdbGetTableTagVal(tsdb, id, tagColId, &type, &bytes, &val);
|
|
|
|
|
tsdbGetTableTagVal(tsdb, pTableId, tagColId, &type, &bytes, &val);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tVariantCreateFromBinary(param, val, bytes, type);
|
|
|
|
@ -2561,13 +2562,13 @@ static void doSetTagValueInParam(void *tsdb, STableId id, int32_t tagColId, tVar
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void *tsdb) {
|
|
|
|
|
void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId* pTableId, void *tsdb) {
|
|
|
|
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
|
|
|
|
|
SSqlFuncMsg *pFuncMsg = &pQuery->pSelectExpr[0].base;
|
|
|
|
|
if (pQuery->numOfOutput == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) {
|
|
|
|
|
assert(pFuncMsg->numOfParams == 1);
|
|
|
|
|
doSetTagValueInParam(tsdb, id, pFuncMsg->arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag);
|
|
|
|
|
doSetTagValueInParam(tsdb, pTableId, pFuncMsg->arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag);
|
|
|
|
|
} else {
|
|
|
|
|
// set tag value, by which the results are aggregated.
|
|
|
|
|
for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
|
|
|
|
@ -2579,7 +2580,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void *tsdb) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// todo use tag column index to optimize performance
|
|
|
|
|
doSetTagValueInParam(tsdb, id, pCol->colId, &pRuntimeEnv->pCtx[idx].tag);
|
|
|
|
|
doSetTagValueInParam(tsdb, pTableId, pCol->colId, &pRuntimeEnv->pCtx[idx].tag);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// set the join tag for first column
|
|
|
|
@ -2744,9 +2745,9 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct SCompSupporter {
|
|
|
|
|
STableDataInfo **pTableDataInfo;
|
|
|
|
|
int32_t * position;
|
|
|
|
|
SQInfo * pQInfo;
|
|
|
|
|
STableQueryInfo **pTableQueryInfo;
|
|
|
|
|
int32_t * position;
|
|
|
|
|
SQInfo * pQInfo;
|
|
|
|
|
} SCompSupporter;
|
|
|
|
|
|
|
|
|
|
int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) {
|
|
|
|
@ -2769,13 +2770,13 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param)
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SWindowResInfo *pWindowResInfo1 = &supporter->pTableDataInfo[left]->pTableQInfo->windowResInfo;
|
|
|
|
|
SWindowResInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo;
|
|
|
|
|
SWindowResult * pWindowRes1 = getWindowResult(pWindowResInfo1, leftPos);
|
|
|
|
|
|
|
|
|
|
char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1);
|
|
|
|
|
TSKEY leftTimestamp = GET_INT64_VAL(b1);
|
|
|
|
|
|
|
|
|
|
SWindowResInfo *pWindowResInfo2 = &supporter->pTableDataInfo[right]->pTableQInfo->windowResInfo;
|
|
|
|
|
SWindowResInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo;
|
|
|
|
|
SWindowResult * pWindowRes2 = getWindowResult(pWindowResInfo2, rightPos);
|
|
|
|
|
|
|
|
|
|
char *b2 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes2);
|
|
|
|
@ -2902,16 +2903,16 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
|
|
|
|
|
tFilePage **buffer = (tFilePage **)pQuery->sdata;
|
|
|
|
|
int32_t * posList = calloc(size, sizeof(int32_t));
|
|
|
|
|
|
|
|
|
|
STableDataInfo **pTableList = malloc(POINTER_BYTES * size);
|
|
|
|
|
STableQueryInfo **pTableList = malloc(POINTER_BYTES * size);
|
|
|
|
|
|
|
|
|
|
// todo opt for the case of one table per group
|
|
|
|
|
int32_t numOfTables = 0;
|
|
|
|
|
for (int32_t i = 0; i < size; ++i) {
|
|
|
|
|
SPair * p = taosArrayGet(pGroup, i);
|
|
|
|
|
STableDataInfo *pInfo = p->sec;
|
|
|
|
|
SGroupItem *item = taosArrayGet(pGroup, i);
|
|
|
|
|
STableQueryInfo *pInfo = item->info;
|
|
|
|
|
|
|
|
|
|
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, pInfo->pTableQInfo->tid);
|
|
|
|
|
if (list.size > 0 && pInfo->pTableQInfo->windowResInfo.size > 0) {
|
|
|
|
|
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, pInfo->id.tid);
|
|
|
|
|
if (list.size > 0 && pInfo->windowResInfo.size > 0) {
|
|
|
|
|
pTableList[numOfTables] = pInfo;
|
|
|
|
|
numOfTables += 1;
|
|
|
|
|
}
|
|
|
|
@ -2940,7 +2941,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
|
|
|
|
|
while (1) {
|
|
|
|
|
int32_t pos = pTree->pNode[0].index;
|
|
|
|
|
|
|
|
|
|
SWindowResInfo *pWindowResInfo = &pTableList[pos]->pTableQInfo->windowResInfo;
|
|
|
|
|
SWindowResInfo *pWindowResInfo = &pTableList[pos]->windowResInfo;
|
|
|
|
|
SWindowResult * pWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]);
|
|
|
|
|
|
|
|
|
|
char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes);
|
|
|
|
@ -3074,9 +3075,9 @@ void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pRes
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void setTableDataInfo(STableDataInfo *pTableDataInfo, int32_t tableIndex, int32_t groupId) {
|
|
|
|
|
pTableDataInfo->groupIdx = groupId;
|
|
|
|
|
pTableDataInfo->tableIndex = tableIndex;
|
|
|
|
|
void setTableDataInfo(STableQueryInfo *pTableQueryInfo, int32_t tableIndex, int32_t groupId) {
|
|
|
|
|
pTableQueryInfo->groupIdx = groupId;
|
|
|
|
|
pTableQueryInfo->tableIndex = tableIndex;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *pWindowResInfo, int32_t order) {
|
|
|
|
@ -3136,7 +3137,7 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) {
|
|
|
|
|
|
|
|
|
|
if (isIntervalQuery(pQuery)) {
|
|
|
|
|
// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
|
|
|
|
|
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
|
|
|
|
|
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableQueryInfo[i].pTableQInfo;
|
|
|
|
|
// SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
|
|
|
|
|
//
|
|
|
|
|
// doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
|
|
|
|
@ -3364,7 +3365,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
|
|
|
|
|
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo);
|
|
|
|
|
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableIdGroupInfo);
|
|
|
|
|
|
|
|
|
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
|
|
|
|
switchCtxOrder(pRuntimeEnv);
|
|
|
|
@ -3430,7 +3431,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
|
|
|
|
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo);
|
|
|
|
|
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableIdGroupInfo);
|
|
|
|
|
pRuntimeEnv->windowResInfo.curIndex = qstatus.windowIndex;
|
|
|
|
|
|
|
|
|
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
|
|
|
@ -3503,13 +3504,13 @@ static bool hasMainOutput(SQuery *pQuery) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, int32_t tid, STimeWindow win) {
|
|
|
|
|
STableQueryInfo *createTableQueryInfoImpl(SQueryRuntimeEnv *pRuntimeEnv, STableId tableId, STimeWindow win) {
|
|
|
|
|
STableQueryInfo *pTableQueryInfo = calloc(1, sizeof(STableQueryInfo));
|
|
|
|
|
|
|
|
|
|
pTableQueryInfo->win = win;
|
|
|
|
|
pTableQueryInfo->lastKey = win.skey;
|
|
|
|
|
|
|
|
|
|
pTableQueryInfo->tid = tid;
|
|
|
|
|
pTableQueryInfo->id = tableId;
|
|
|
|
|
pTableQueryInfo->cur.vnodeIndex = -1;
|
|
|
|
|
|
|
|
|
|
initWindowResInfo(&pTableQueryInfo->windowResInfo, pRuntimeEnv, 100, 100, TSDB_DATA_TYPE_INT);
|
|
|
|
@ -3562,8 +3563,7 @@ void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *p
|
|
|
|
|
* @param pRuntimeEnv
|
|
|
|
|
* @param pDataBlockInfo
|
|
|
|
|
*/
|
|
|
|
|
void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, STable *pTable, int32_t groupIdx,
|
|
|
|
|
TSKEY nextKey) {
|
|
|
|
|
void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, STableId* pTableId, int32_t groupIdx, TSKEY nextKey) {
|
|
|
|
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
|
|
|
|
SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
|
|
|
|
int32_t GROUPRESULTID = 1;
|
|
|
|
@ -3588,7 +3588,7 @@ void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, STabl
|
|
|
|
|
initCtxOutputBuf(pRuntimeEnv);
|
|
|
|
|
|
|
|
|
|
pTableQueryInfo->lastKey = nextKey;
|
|
|
|
|
setAdditionalInfo(pQInfo, pTable, pTableQueryInfo);
|
|
|
|
|
setAdditionalInfo(pQInfo, pTableId, pTableQueryInfo);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) {
|
|
|
|
@ -3616,11 +3616,11 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t setAdditionalInfo(SQInfo *pQInfo, STable *pTable, STableQueryInfo *pTableQueryInfo) {
|
|
|
|
|
int32_t setAdditionalInfo(SQInfo *pQInfo, STableId* pTableId, STableQueryInfo *pTableQueryInfo) {
|
|
|
|
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
|
|
|
|
assert(pTableQueryInfo->lastKey >= 0);
|
|
|
|
|
|
|
|
|
|
setTagVal(pRuntimeEnv, pTable->tableId, pQInfo->tsdb);
|
|
|
|
|
setTagVal(pRuntimeEnv, pTableId, pQInfo->tsdb);
|
|
|
|
|
|
|
|
|
|
// both the master and supplement scan needs to set the correct ts comp start position
|
|
|
|
|
if (pRuntimeEnv->pTSBuf != NULL) {
|
|
|
|
@ -3819,12 +3819,12 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) {
|
|
|
|
|
assert(pQuery->rec.rows <= pQuery->rec.capacity);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableDataInfo *pTableDataInfo) {
|
|
|
|
|
static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
|
|
|
|
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
|
|
|
|
|
// update the number of result for each, only update the number of rows for the corresponding window result.
|
|
|
|
|
if (pQuery->intervalTime == 0) {
|
|
|
|
|
int32_t g = pTableDataInfo->groupIdx;
|
|
|
|
|
int32_t g = pTableQueryInfo->groupIdx;
|
|
|
|
|
assert(pRuntimeEnv->windowResInfo.size > 0);
|
|
|
|
|
|
|
|
|
|
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&g, sizeof(g));
|
|
|
|
@ -3834,11 +3834,10 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableDataInf
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableDataInfo *pTableDataInfo,
|
|
|
|
|
void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo,
|
|
|
|
|
SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis, SArray *pDataBlock,
|
|
|
|
|
__block_search_fn_t searchFn) {
|
|
|
|
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
STableQueryInfo *pTableQueryInfo = pTableDataInfo->pTableQInfo;
|
|
|
|
|
SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
|
|
|
|
|
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
|
|
|
|
|
|
|
|
|
@ -3848,7 +3847,7 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableDataInfo *
|
|
|
|
|
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
updateWindowResNumOfRes(pRuntimeEnv, pTableDataInfo);
|
|
|
|
|
updateWindowResNumOfRes(pRuntimeEnv, pTableQueryInfo);
|
|
|
|
|
updatelastkey(pQuery, pTableQueryInfo);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -4226,7 +4225,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, bool isSTableQuery)
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) {
|
|
|
|
|
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->groupInfo);
|
|
|
|
|
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pQInfo->tsdb = tsdb;
|
|
|
|
@ -4362,8 +4361,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle);
|
|
|
|
|
STableDataInfo *pTableDataInfo = NULL;
|
|
|
|
|
STable * pTable = NULL;
|
|
|
|
|
STableQueryInfo *pTableQueryInfo = NULL;
|
|
|
|
|
|
|
|
|
|
// todo opt performance using hash table
|
|
|
|
|
size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
|
|
|
|
@ -4372,20 +4370,19 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
|
|
|
|
|
|
|
|
|
|
size_t num = taosArrayGetSize(group);
|
|
|
|
|
for (int32_t j = 0; j < num; ++j) {
|
|
|
|
|
SPair * p = taosArrayGet(group, j);
|
|
|
|
|
STableDataInfo *pInfo = p->sec;
|
|
|
|
|
SGroupItem *item = taosArrayGet(group, j);
|
|
|
|
|
STableQueryInfo *pInfo = item->info;
|
|
|
|
|
|
|
|
|
|
if (pInfo->pTableQInfo->tid == blockInfo.sid) {
|
|
|
|
|
pTableDataInfo = p->sec;
|
|
|
|
|
pTable = p->first;
|
|
|
|
|
if (pInfo->id.tid == blockInfo.tid) {
|
|
|
|
|
assert(pInfo->id.uid == blockInfo.uid);
|
|
|
|
|
pTableQueryInfo = item->info;
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
assert(pTableDataInfo != NULL && pTableDataInfo->pTableQInfo != NULL);
|
|
|
|
|
STableQueryInfo *pTableQueryInfo = pTableDataInfo->pTableQInfo;
|
|
|
|
|
|
|
|
|
|
assert(pTableQueryInfo != NULL && pTableQueryInfo != NULL);
|
|
|
|
|
restoreIntervalQueryRange(pRuntimeEnv, pTableQueryInfo);
|
|
|
|
|
|
|
|
|
|
SDataStatis *pStatis = NULL;
|
|
|
|
@ -4393,10 +4390,10 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
|
|
|
|
|
|
|
|
|
|
TSKEY nextKey = blockInfo.window.skey;
|
|
|
|
|
if (!isIntervalQuery(pQuery)) {
|
|
|
|
|
setExecutionContext(pQInfo, pTableQueryInfo, pTable, pTableDataInfo->groupIdx, nextKey);
|
|
|
|
|
setExecutionContext(pQInfo, pTableQueryInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIdx, nextKey);
|
|
|
|
|
} else { // interval query
|
|
|
|
|
setIntervalQueryRange(pTableQueryInfo, pQInfo, nextKey);
|
|
|
|
|
int32_t ret = setAdditionalInfo(pQInfo, pTable, pTableQueryInfo);
|
|
|
|
|
int32_t ret = setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo);
|
|
|
|
|
|
|
|
|
|
if (ret != TSDB_CODE_SUCCESS) {
|
|
|
|
|
pQInfo->code = ret;
|
|
|
|
@ -4404,7 +4401,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
stableApplyFunctionsOnBlock(pRuntimeEnv, pTableDataInfo, &blockInfo, pStatis, pDataBlock, binarySearchForKey);
|
|
|
|
|
stableApplyFunctionsOnBlock(pRuntimeEnv, pTableQueryInfo, &blockInfo, pStatis, pDataBlock, binarySearchForKey);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int64_t et = taosGetTimestampMs();
|
|
|
|
@ -4417,27 +4414,24 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
|
|
|
|
|
|
|
|
|
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
|
|
|
|
SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
|
|
|
|
|
SPair * p = taosArrayGet(group, index);
|
|
|
|
|
SGroupItem* item = taosArrayGet(group, index);
|
|
|
|
|
|
|
|
|
|
STable * pTable = p->first;
|
|
|
|
|
STableDataInfo *pInfo = p->sec;
|
|
|
|
|
|
|
|
|
|
setTagVal(pRuntimeEnv, pTable->tableId, pQInfo->tsdb);
|
|
|
|
|
setTagVal(pRuntimeEnv, &item->id, pQInfo->tsdb);
|
|
|
|
|
|
|
|
|
|
qTrace("QInfo:%p query on (%d): uid:%" PRIu64 ", tid:%d, qrange:%" PRId64 "-%" PRId64, pQInfo, index,
|
|
|
|
|
pTable->tableId.uid, pInfo->pTableQInfo->lastKey, pInfo->pTableQInfo->win.ekey);
|
|
|
|
|
item->id.uid, item->info->lastKey, item->info->win.ekey);
|
|
|
|
|
|
|
|
|
|
STsdbQueryCond cond = {
|
|
|
|
|
.twindow = {pInfo->pTableQInfo->lastKey, pInfo->pTableQInfo->win.ekey},
|
|
|
|
|
.order = pQuery->order.order,
|
|
|
|
|
.colList = pQuery->colList,
|
|
|
|
|
.twindow = {item->info->lastKey, item->info->win.ekey},
|
|
|
|
|
.order = pQuery->order.order,
|
|
|
|
|
.colList = pQuery->colList,
|
|
|
|
|
.numOfCols = pQuery->numOfCols,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
|
|
|
|
|
SArray *tx = taosArrayInit(1, sizeof(SPair));
|
|
|
|
|
SArray *tx = taosArrayInit(1, sizeof(STableId));
|
|
|
|
|
|
|
|
|
|
taosArrayPush(tx, p);
|
|
|
|
|
taosArrayPush(tx, &item->info->id);
|
|
|
|
|
taosArrayPush(g1, &tx);
|
|
|
|
|
STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1};
|
|
|
|
|
|
|
|
|
@ -4585,7 +4579,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
createTableDataInfo(pQInfo);
|
|
|
|
|
createTableQueryInfo(pQInfo);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* 1. super table projection query, 2. group-by on normal columns query, 3. ts-comp query
|
|
|
|
@ -4618,12 +4612,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SPair * p = taosArrayGet(group, pQInfo->tableIndex);
|
|
|
|
|
STableDataInfo *pInfo = p->sec;
|
|
|
|
|
|
|
|
|
|
TSKEY skey = pInfo->pTableQInfo->lastKey;
|
|
|
|
|
if (skey > 0) {
|
|
|
|
|
pQuery->window.skey = skey;
|
|
|
|
|
SGroupItem *item = taosArrayGet(group, pQInfo->tableIndex);
|
|
|
|
|
|
|
|
|
|
STableQueryInfo *pInfo = item->info;
|
|
|
|
|
if (pInfo->lastKey > 0) {
|
|
|
|
|
pQuery->window.skey = pInfo->lastKey;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!multiTableMultioutputHelper(pQInfo, pQInfo->tableIndex)) {
|
|
|
|
@ -4665,7 +4658,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|
|
|
|
* to ensure that, we can reset the query range once query on a meter is completed.
|
|
|
|
|
*/
|
|
|
|
|
pQInfo->tableIndex++;
|
|
|
|
|
pInfo->pTableQInfo->lastKey = pQuery->lastKey;
|
|
|
|
|
pInfo->lastKey = pQuery->lastKey;
|
|
|
|
|
|
|
|
|
|
// if the buffer is full or group by each table, we need to jump out of the loop
|
|
|
|
|
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL) /*||
|
|
|
|
@ -4737,7 +4730,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|
|
|
|
pQuery->limit.offset);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void createTableDataInfo(SQInfo *pQInfo) {
|
|
|
|
|
static void createTableQueryInfo(SQInfo *pQInfo) {
|
|
|
|
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
|
|
|
|
|
|
|
|
|
// todo make sure the table are added the reference count to gauranteed that all involved tables are valid
|
|
|
|
@ -4749,21 +4742,18 @@ static void createTableDataInfo(SQInfo *pQInfo) {
|
|
|
|
|
|
|
|
|
|
size_t s = taosArrayGetSize(group);
|
|
|
|
|
for (int32_t j = 0; j < s; ++j) {
|
|
|
|
|
SPair *p = (SPair *)taosArrayGet(group, j);
|
|
|
|
|
SGroupItem* item = (SGroupItem *)taosArrayGet(group, j);
|
|
|
|
|
|
|
|
|
|
// STableDataInfo has been created for each table
|
|
|
|
|
if (p->sec != NULL) { // todo refactor
|
|
|
|
|
// STableQueryInfo has been created for each table
|
|
|
|
|
if (item->info != NULL) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
STableDataInfo *pInfo = calloc(1, sizeof(STableDataInfo));
|
|
|
|
|
|
|
|
|
|
setTableDataInfo(pInfo, index, i);
|
|
|
|
|
pInfo->pTableQInfo =
|
|
|
|
|
createTableQueryInfo(&pQInfo->runtimeEnv, ((STable *)(p->first))->tableId.tid, pQuery->window);
|
|
|
|
|
|
|
|
|
|
p->sec = pInfo;
|
|
|
|
|
|
|
|
|
|
STableQueryInfo* pInfo = createTableQueryInfoImpl(&pQInfo->runtimeEnv, item->id, pQuery->window);
|
|
|
|
|
pInfo->groupIdx = i;
|
|
|
|
|
pInfo->tableIndex = index;
|
|
|
|
|
|
|
|
|
|
item->info = pInfo;
|
|
|
|
|
index += 1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -4773,7 +4763,7 @@ static void prepareQueryInfoForReverseScan(SQInfo *pQInfo) {
|
|
|
|
|
// SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
|
|
|
|
|
|
|
|
|
// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
|
|
|
|
|
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
|
|
|
|
|
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableQueryInfo[i].pTableQInfo;
|
|
|
|
|
// changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo);
|
|
|
|
|
// }
|
|
|
|
|
}
|
|
|
|
@ -4811,20 +4801,14 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
|
|
|
|
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
|
|
|
|
|
|
|
|
|
if (isIntervalQuery(pQuery)) {
|
|
|
|
|
// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
|
|
|
|
|
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
|
|
|
|
|
// closeAllTimeWindow(&pTableQueryInfo->windowResInfo);
|
|
|
|
|
// }
|
|
|
|
|
size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
|
|
|
|
|
for (int32_t i = 0; i < numOfGroup; ++i) {
|
|
|
|
|
SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i);
|
|
|
|
|
|
|
|
|
|
size_t num = taosArrayGetSize(group);
|
|
|
|
|
for (int32_t j = 0; j < num; ++j) {
|
|
|
|
|
SPair * p = taosArrayGet(group, j);
|
|
|
|
|
STableDataInfo *pInfo = p->sec;
|
|
|
|
|
|
|
|
|
|
closeAllTimeWindow(&pInfo->pTableQInfo->windowResInfo);
|
|
|
|
|
SGroupItem* item = taosArrayGet(group, j);
|
|
|
|
|
closeAllTimeWindow(&item->info->windowResInfo);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else { // close results for group result
|
|
|
|
@ -4865,7 +4849,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
|
|
|
|
|
pQuery->window.skey, pQuery->window.ekey, pQuery->order.order);
|
|
|
|
|
|
|
|
|
|
// create the query support structures
|
|
|
|
|
createTableDataInfo(pQInfo);
|
|
|
|
|
createTableQueryInfo(pQInfo);
|
|
|
|
|
|
|
|
|
|
// do check all qualified data blocks
|
|
|
|
|
int64_t el = queryOnDataBlocks(pQInfo);
|
|
|
|
@ -5481,7 +5465,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t buildAirthmeticExprFromMsg(SArithExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) {
|
|
|
|
|
static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) {
|
|
|
|
|
qTrace("qmsg:%p create arithmetic expr from binary string", pQueryMsg, pArithExprInfo->base.arg[0].argValue.pz);
|
|
|
|
|
|
|
|
|
|
tExprNode* pExprNode = exprTreeFromBinary(pArithExprInfo->base.arg[0].argValue.pz, pArithExprInfo->base.arg[0].argBytes);
|
|
|
|
@ -5494,12 +5478,12 @@ static int32_t buildAirthmeticExprFromMsg(SArithExprInfo *pArithExprInfo, SQuery
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExprInfo **pSqlFuncExpr,
|
|
|
|
|
static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo **pSqlFuncExpr,
|
|
|
|
|
SSqlFuncMsg **pExprMsg, SColumnInfo* pTagCols) {
|
|
|
|
|
*pSqlFuncExpr = NULL;
|
|
|
|
|
int32_t code = TSDB_CODE_SUCCESS;
|
|
|
|
|
|
|
|
|
|
SArithExprInfo *pExprs = (SArithExprInfo *)calloc(1, sizeof(SArithExprInfo) * pQueryMsg->numOfOutput);
|
|
|
|
|
SExprInfo *pExprs = (SExprInfo *)calloc(1, sizeof(SExprInfo) * pQueryMsg->numOfOutput);
|
|
|
|
|
if (pExprs == NULL) {
|
|
|
|
|
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
|
|
|
|
}
|
|
|
|
@ -5713,7 +5697,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SArithExprInfo *pExprs,
|
|
|
|
|
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
|
|
|
|
|
STableGroupInfo *groupInfo, SColumnInfo* pTagCols) {
|
|
|
|
|
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
|
|
|
|
|
if (pQInfo == NULL) {
|
|
|
|
@ -5727,7 +5711,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
|
|
|
|
int16_t numOfOutput = pQueryMsg->numOfOutput;
|
|
|
|
|
|
|
|
|
|
pQuery->numOfCols = numOfCols;
|
|
|
|
|
pQuery->numOfOutput = numOfOutput;
|
|
|
|
|
pQuery->numOfOutput = numOfOutput;
|
|
|
|
|
pQuery->limit.limit = pQueryMsg->limit;
|
|
|
|
|
pQuery->limit.offset = pQueryMsg->offset;
|
|
|
|
|
pQuery->order.order = pQueryMsg->order;
|
|
|
|
@ -5801,7 +5785,26 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
|
|
|
|
|
|
|
|
|
// to make sure third party won't overwrite this structure
|
|
|
|
|
pQInfo->signature = pQInfo;
|
|
|
|
|
pQInfo->groupInfo = *groupInfo;
|
|
|
|
|
|
|
|
|
|
pQInfo->tableIdGroupInfo = *groupInfo;
|
|
|
|
|
size_t numOfGroups = taosArrayGetSize(groupInfo->pGroupList);
|
|
|
|
|
|
|
|
|
|
pQInfo->groupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES);
|
|
|
|
|
pQInfo->groupInfo.numOfTables = groupInfo->numOfTables;
|
|
|
|
|
|
|
|
|
|
for(int32_t i = 0; i < numOfGroups; ++i) {
|
|
|
|
|
SArray* pa = taosArrayGetP(groupInfo->pGroupList, i);
|
|
|
|
|
size_t s = taosArrayGetSize(pa);
|
|
|
|
|
|
|
|
|
|
SArray* p1 = taosArrayInit(s, sizeof(SGroupItem));
|
|
|
|
|
|
|
|
|
|
for(int32_t j = 0; j < s; ++j) {
|
|
|
|
|
SGroupItem item = { .id = *(STableId*) taosArrayGet(pa, j), .info = NULL, };
|
|
|
|
|
taosArrayPush(p1, &item);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosArrayPush(pQInfo->groupInfo.pGroupList, &p1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pQuery->pos = -1;
|
|
|
|
|
|
|
|
|
@ -5853,7 +5856,8 @@ static bool isValidQInfo(void *param) {
|
|
|
|
|
return (sig == (uint64_t)pQInfo);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void freeQInfo(SQInfo *pQInfo);
|
|
|
|
|
static void freeQInfo(SQInfo *pQInfo);
|
|
|
|
|
|
|
|
|
|
static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, SQInfo *pQInfo, bool isSTable) {
|
|
|
|
|
int32_t code = TSDB_CODE_SUCCESS;
|
|
|
|
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
|
|
|
@ -5918,12 +5922,11 @@ static void freeQInfo(SQInfo *pQInfo) {
|
|
|
|
|
|
|
|
|
|
if (pQuery->pSelectExpr != NULL) {
|
|
|
|
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
|
|
|
|
// SExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].arithExprInfo;
|
|
|
|
|
SExprInfo* pExprInfo = &pQuery->pSelectExpr[i];
|
|
|
|
|
|
|
|
|
|
// if (pBinExprInfo->numOfCols > 0) {
|
|
|
|
|
// tfree(pBinExprInfo->pReqColumns);
|
|
|
|
|
// tExprTreeDestroy(&pBinExprInfo->pBinExpr, NULL);
|
|
|
|
|
// }
|
|
|
|
|
if (pExprInfo->pExpr != NULL) {
|
|
|
|
|
tExprTreeDestroy(&pExprInfo->pExpr, NULL);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tfree(pQuery->pSelectExpr);
|
|
|
|
@ -5940,15 +5943,24 @@ static void freeQInfo(SQInfo *pQInfo) {
|
|
|
|
|
|
|
|
|
|
size_t num = taosArrayGetSize(p);
|
|
|
|
|
for(int32_t j = 0; j < num; ++j) {
|
|
|
|
|
SPair* pair = taosArrayGet(p, j);
|
|
|
|
|
if (pair->sec != NULL) {
|
|
|
|
|
destroyTableQueryInfo(((STableDataInfo*)pair->sec)->pTableQInfo, pQuery->numOfOutput);
|
|
|
|
|
tfree(pair->sec);
|
|
|
|
|
SGroupItem* item = taosArrayGet(p, j);
|
|
|
|
|
if (item->info != NULL) {
|
|
|
|
|
destroyTableQueryInfo(item->info, pQuery->numOfOutput);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosArrayDestroy(p);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosArrayDestroy(pQInfo->groupInfo.pGroupList);
|
|
|
|
|
|
|
|
|
|
for(int32_t i = 0; i < numOfGroups; ++i) {
|
|
|
|
|
SArray* p = taosArrayGetP(pQInfo->tableIdGroupInfo.pGroupList, i);
|
|
|
|
|
taosArrayDestroy(p);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosArrayDestroy(pQInfo->tableIdGroupInfo.pGroupList);
|
|
|
|
|
|
|
|
|
|
if (pQuery->pGroupbyExpr != NULL) {
|
|
|
|
|
taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo);
|
|
|
|
|
tfree(pQuery->pGroupbyExpr);
|
|
|
|
@ -5959,7 +5971,6 @@ static void freeQInfo(SQInfo *pQInfo) {
|
|
|
|
|
tfree(pQuery->colList);
|
|
|
|
|
tfree(pQuery->sdata);
|
|
|
|
|
|
|
|
|
|
taosArrayDestroy(pQInfo->groupInfo.pGroupList);
|
|
|
|
|
tfree(pQuery);
|
|
|
|
|
|
|
|
|
|
qTrace("QInfo:%p QInfo is freed", pQInfo);
|
|
|
|
@ -6059,7 +6070,7 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
|
|
|
|
|
goto _query_over;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SArithExprInfo *pExprs = NULL;
|
|
|
|
|
SExprInfo *pExprs = NULL;
|
|
|
|
|
if ((code = createSqlFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
goto _query_over;
|
|
|
|
|
}
|
|
|
|
@ -6069,7 +6080,7 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
|
|
|
|
|
goto _query_over;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool isSTableQuery = false;
|
|
|
|
|
bool isSTableQuery = false;
|
|
|
|
|
STableGroupInfo groupInfo = {0};
|
|
|
|
|
|
|
|
|
|
if ((pQueryMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) {
|
|
|
|
@ -6085,7 +6096,7 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// todo handle the error
|
|
|
|
|
/*int32_t ret =*/tsdbQueryByTagsCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex,
|
|
|
|
|
/*int32_t ret =*/tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex,
|
|
|
|
|
numOfGroupByCols);
|
|
|
|
|
if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query
|
|
|
|
|
code = TSDB_CODE_SUCCESS;
|
|
|
|
@ -6136,8 +6147,10 @@ void qTableQuery(qinfo_t qinfo) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
qTrace("QInfo:%p query task is launched", pQInfo);
|
|
|
|
|
|
|
|
|
|
if (pQInfo->runtimeEnv.stableQuery) {
|
|
|
|
|
|
|
|
|
|
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
|
|
|
|
|
buildTagQueryResult(pQInfo);
|
|
|
|
|
} else if (pQInfo->runtimeEnv.stableQuery) {
|
|
|
|
|
stableQueryImpl(pQInfo);
|
|
|
|
|
} else {
|
|
|
|
|
tableQueryImpl(pQInfo);
|
|
|
|
@ -6229,3 +6242,33 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
|
|
|
|
|
// pObj->qhandle = NULL;
|
|
|
|
|
// }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void buildTagQueryResult(SQInfo* pQInfo) {
|
|
|
|
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
|
|
|
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
|
|
|
|
|
size_t num = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
|
|
|
|
|
assert(num == 1); // only one group
|
|
|
|
|
|
|
|
|
|
SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
|
|
|
|
|
|
|
|
|
|
num = taosArrayGetSize(pa);
|
|
|
|
|
assert(num == pQInfo->groupInfo.numOfTables);
|
|
|
|
|
|
|
|
|
|
int16_t type, bytes;
|
|
|
|
|
|
|
|
|
|
for(int32_t i = 0; i < num; ++i) {
|
|
|
|
|
SExprInfo* pExprInfo = pQuery->pSelectExpr;
|
|
|
|
|
char* data = NULL;
|
|
|
|
|
|
|
|
|
|
SGroupItem* item = taosArrayGet(pa, i);
|
|
|
|
|
|
|
|
|
|
for(int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
|
|
|
|
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, j, &type, &bytes, &data);
|
|
|
|
|
assert(bytes == pExprInfo[j].bytes && type == pExprInfo[j].type);
|
|
|
|
|
|
|
|
|
|
memcpy(pQuery->sdata[j]->data + num * bytes, data, bytes);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|