|
|
|
@ -99,6 +99,16 @@ static UNUSED_FUNC void *u_malloc (size_t __size) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
|
|
|
|
|
uint32_t v = rand();
|
|
|
|
|
if (v % 5 <= 1) {
|
|
|
|
|
return NULL;
|
|
|
|
|
} else {
|
|
|
|
|
return calloc(num, __size);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#define calloc u_calloc
|
|
|
|
|
#define malloc u_malloc
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
@ -108,6 +118,7 @@ static UNUSED_FUNC void *u_malloc (size_t __size) {
|
|
|
|
|
|
|
|
|
|
static void setQueryStatus(SQuery *pQuery, int8_t status);
|
|
|
|
|
|
|
|
|
|
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0)
|
|
|
|
|
static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
|
|
|
|
|
|
|
|
|
|
// todo move to utility
|
|
|
|
@ -313,6 +324,24 @@ static bool isTopBottomQuery(SQuery *pQuery) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool hasTagValOutput(SQuery* pQuery) {
|
|
|
|
|
SExprInfo *pExprInfo = &pQuery->pSelectExpr[0];
|
|
|
|
|
if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) {
|
|
|
|
|
return true;
|
|
|
|
|
} else { // set tag value, by which the results are aggregated.
|
|
|
|
|
for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
|
|
|
|
|
SExprInfo *pLocalExprInfo = &pQuery->pSelectExpr[idx];
|
|
|
|
|
|
|
|
|
|
// ts_comp column required the tag value for join filter
|
|
|
|
|
if (TSDB_COL_IS_TAG(pLocalExprInfo->base.colInfo.flag)) {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, int32_t numOfCols, int32_t index) {
|
|
|
|
|
// for a tag column, no corresponding field info
|
|
|
|
|
SColIndex *pColIndex = &pQuery->pSelectExpr[index].base.colInfo;
|
|
|
|
@ -786,8 +815,8 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
char *dataBlock = NULL;
|
|
|
|
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
|
|
|
|
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
|
|
|
|
|
|
|
|
|
int32_t functionId = pQuery->pSelectExpr[col].base.functionId;
|
|
|
|
@ -806,6 +835,10 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
|
|
|
|
|
sas->numOfCols = pQuery->numOfCols;
|
|
|
|
|
sas->data = calloc(pQuery->numOfCols, POINTER_BYTES);
|
|
|
|
|
|
|
|
|
|
if (sas->data == NULL) {
|
|
|
|
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// here the pQuery->colList and sas->colList are identical
|
|
|
|
|
int32_t numOfCols = taosArrayGetSize(pDataBlock);
|
|
|
|
|
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
|
|
|
|
@ -859,6 +892,9 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
|
|
|
|
|
if (sasArray == NULL) {
|
|
|
|
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
|
|
|
|
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
|
|
|
|
@ -866,7 +902,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
|
|
|
|
if (isIntervalQuery(pQuery) && tsCols != NULL) {
|
|
|
|
|
if (QUERY_IS_INTERVAL_QUERY(pQuery) && tsCols != NULL) {
|
|
|
|
|
int32_t offset = GET_COL_DATA_POS(pQuery, 0, step);
|
|
|
|
|
TSKEY ts = tsCols[offset];
|
|
|
|
|
|
|
|
|
@ -1083,8 +1119,12 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|
|
|
|
SColumnInfoData* pColumnInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock, 0);
|
|
|
|
|
|
|
|
|
|
TSKEY *tsCols = (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (TSKEY*) pColumnInfoData->pData:NULL;
|
|
|
|
|
bool groupbyColumnValue = isGroupbyNormalCol(pQuery->pGroupbyExpr);
|
|
|
|
|
bool groupbyColumnValue = pRuntimeEnv->groupbyNormalCol;
|
|
|
|
|
|
|
|
|
|
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
|
|
|
|
|
if (sasArray == NULL) {
|
|
|
|
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int16_t type = 0;
|
|
|
|
|
int16_t bytes = 0;
|
|
|
|
@ -1230,7 +1270,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
|
|
|
|
|
STableQueryInfo* pTableQInfo = pQuery->current;
|
|
|
|
|
SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
|
|
|
|
|
|
|
|
|
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
|
|
|
|
|
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->groupbyNormalCol) {
|
|
|
|
|
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
|
|
|
|
|
} else {
|
|
|
|
|
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
|
|
|
|
@ -1351,14 +1391,16 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// set the output buffer for the selectivity + tag query
|
|
|
|
|
static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) {
|
|
|
|
|
static void setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx) {
|
|
|
|
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
|
|
|
|
|
if (isSelectivityWithTagsQuery(pQuery)) {
|
|
|
|
|
int32_t num = 0;
|
|
|
|
|
int16_t tagLen = 0;
|
|
|
|
|
|
|
|
|
|
SQLFunctionCtx *p = NULL;
|
|
|
|
|
SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutput, POINTER_BYTES);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
|
|
|
|
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base;
|
|
|
|
|
|
|
|
|
@ -1385,7 +1427,7 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery) {
|
|
|
|
|
static FORCE_INLINE void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery) {
|
|
|
|
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
|
|
|
|
assert(pQuery->pSelectExpr[i].interBytes <= DEFAULT_INTERN_BUF_PAGE_SIZE);
|
|
|
|
|
|
|
|
|
@ -1476,7 +1518,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
|
|
|
|
|
resetCtxOutputBuf(pRuntimeEnv);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setCtxTagColumnInfo(pQuery, pRuntimeEnv->pCtx);
|
|
|
|
|
setCtxTagColumnInfo(pRuntimeEnv, pRuntimeEnv->pCtx);
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
|
|
|
|
|
_clean:
|
|
|
|
@ -2116,7 +2158,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa
|
|
|
|
|
static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) {
|
|
|
|
|
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
|
|
|
|
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
if (!isIntervalQuery(pQuery) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isFixedOutputQuery(pQuery)) {
|
|
|
|
|
if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pRuntimeEnv->groupbyNormalCol && !isFixedOutputQuery(pQuery)) {
|
|
|
|
|
SResultRec *pRec = &pQuery->rec;
|
|
|
|
|
|
|
|
|
|
if (pQuery->rec.capacity - pQuery->rec.rows < pBlockInfo->rows) {
|
|
|
|
@ -2170,7 +2212,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
|
|
|
|
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle);
|
|
|
|
|
|
|
|
|
|
// todo extract methods
|
|
|
|
|
if (isIntervalQuery(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL) {
|
|
|
|
|
if (QUERY_IS_INTERVAL_QUERY(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL) {
|
|
|
|
|
STimeWindow realWin = TSWINDOW_INITIALIZER, w = TSWINDOW_INITIALIZER;
|
|
|
|
|
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
|
|
|
|
|
|
|
|
@ -2216,7 +2258,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
|
|
|
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (isIntervalQuery(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) {
|
|
|
|
|
if (QUERY_IS_INTERVAL_QUERY(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) {
|
|
|
|
|
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
|
|
|
|
int32_t step = QUERY_IS_ASC_QUERY(pQuery) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP;
|
|
|
|
|
|
|
|
|
@ -2637,7 +2679,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
|
|
|
|
|
tfree(pTableList);
|
|
|
|
|
|
|
|
|
|
qError("QInfo:%p failed alloc memory", pQInfo);
|
|
|
|
|
longjmp(pQInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
|
|
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// todo opt for the case of one table per group
|
|
|
|
@ -2645,7 +2687,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
|
|
|
|
|
for (int32_t i = 0; i < size; ++i) {
|
|
|
|
|
STableQueryInfo *item = taosArrayGetP(pGroup, i);
|
|
|
|
|
|
|
|
|
|
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, tsdbGetTableId(item->pTable).tid);
|
|
|
|
|
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, TSDB_TABLEID(item->pTable)->tid);
|
|
|
|
|
if (list.size > 0 && item->windowResInfo.size > 0) {
|
|
|
|
|
pTableList[numOfTables] = item;
|
|
|
|
|
numOfTables += 1;
|
|
|
|
@ -2668,6 +2710,10 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
|
|
|
|
|
tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
|
|
|
|
|
|
|
|
|
|
SResultInfo *pResultInfo = calloc(pQuery->numOfOutput, sizeof(SResultInfo));
|
|
|
|
|
if (pResultInfo == NULL) {
|
|
|
|
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery);
|
|
|
|
|
resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo);
|
|
|
|
|
|
|
|
|
@ -2868,7 +2914,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
|
|
|
|
|
|
|
|
|
|
// group by normal columns and interval query on normal table
|
|
|
|
|
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
|
|
|
|
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
|
|
|
|
|
if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
|
|
|
|
|
disableFuncInReverseScanImpl(pQInfo, pWindowResInfo, order);
|
|
|
|
|
} else { // for simple result of table query,
|
|
|
|
|
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo refactor
|
|
|
|
@ -3043,7 +3089,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
|
|
|
|
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
|
|
|
|
|
bool toContinue = false;
|
|
|
|
|
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
|
|
|
|
|
if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
|
|
|
|
|
// for each group result, call the finalize function for each column
|
|
|
|
|
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
|
|
|
|
|
|
|
|
@ -3235,10 +3281,10 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
|
|
|
|
|
void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
|
|
|
|
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
|
|
|
|
|
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
|
|
|
|
|
if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
|
|
|
|
|
// for each group result, call the finalize function for each column
|
|
|
|
|
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
|
|
|
|
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
|
|
|
|
|
if (pRuntimeEnv->groupbyNormalCol) {
|
|
|
|
|
closeAllTimeWindow(pWindowResInfo);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -3280,10 +3326,10 @@ static bool hasMainOutput(SQuery *pQuery) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static STableQueryInfo *createTableQueryInfo( SQueryRuntimeEnv *pRuntimeEnv, void* pTable, STimeWindow win) {
|
|
|
|
|
static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void* pTable, STimeWindow win, void* buf) {
|
|
|
|
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
|
|
|
|
|
STableQueryInfo *pTableQueryInfo = calloc(1, sizeof(STableQueryInfo));
|
|
|
|
|
STableQueryInfo *pTableQueryInfo = buf;//calloc(1, sizeof(STableQueryInfo));
|
|
|
|
|
|
|
|
|
|
pTableQueryInfo->win = win;
|
|
|
|
|
pTableQueryInfo->lastKey = win.skey;
|
|
|
|
@ -3294,7 +3340,8 @@ static STableQueryInfo *createTableQueryInfo( SQueryRuntimeEnv *pRuntimeEnv, voi
|
|
|
|
|
int32_t initialSize = 1;
|
|
|
|
|
int32_t initialThreshold = 1;
|
|
|
|
|
|
|
|
|
|
if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
|
|
|
|
|
// set more initial size of interval/groupby query
|
|
|
|
|
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
|
|
|
|
|
initialSize = 20;
|
|
|
|
|
initialThreshold = 100;
|
|
|
|
|
}
|
|
|
|
@ -3309,7 +3356,6 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cleanupTimeWindowInfo(&pTableQueryInfo->windowResInfo, numOfCols);
|
|
|
|
|
free(pTableQueryInfo);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#define SET_CURRENT_QUERY_TABLE_INFO(_runtime, _tableInfo) \
|
|
|
|
@ -3322,7 +3368,6 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* set output buffer for different group
|
|
|
|
|
* TODO opt performance if current group is identical to previous group
|
|
|
|
|
* @param pRuntimeEnv
|
|
|
|
|
* @param pDataBlockInfo
|
|
|
|
|
*/
|
|
|
|
@ -3333,7 +3378,10 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
|
|
|
|
|
|
|
|
|
|
// lastKey needs to be updated
|
|
|
|
|
pTableQueryInfo->lastKey = nextKey;
|
|
|
|
|
setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo);
|
|
|
|
|
|
|
|
|
|
if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTSBuf != NULL) {
|
|
|
|
|
setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) {
|
|
|
|
|
return;
|
|
|
|
@ -3522,7 +3570,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) {
|
|
|
|
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
|
|
|
|
|
|
|
|
|
int32_t totalSubset = 0;
|
|
|
|
|
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) {
|
|
|
|
|
if (pQInfo->runtimeEnv.groupbyNormalCol || (isIntervalQuery(pQuery))) {
|
|
|
|
|
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
|
|
|
|
|
} else {
|
|
|
|
|
totalSubset = GET_NUM_OF_TABLEGROUP(pQInfo);
|
|
|
|
@ -3653,7 +3701,7 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *
|
|
|
|
|
SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
|
|
|
|
|
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
|
|
|
|
|
|
|
|
|
|
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
|
|
|
|
|
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->groupbyNormalCol) {
|
|
|
|
|
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
|
|
|
|
|
} else {
|
|
|
|
|
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
|
|
|
|
@ -3695,7 +3743,7 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
|
|
|
|
|
} else {
|
|
|
|
|
// there are results waiting for returned to client.
|
|
|
|
|
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) &&
|
|
|
|
|
(isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) &&
|
|
|
|
|
(pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) &&
|
|
|
|
|
(pRuntimeEnv->windowResInfo.size > 0)) {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
@ -4100,6 +4148,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
|
|
|
|
pRuntimeEnv->cur.vgroupIndex = -1;
|
|
|
|
|
pRuntimeEnv->stableQuery = isSTableQuery;
|
|
|
|
|
pRuntimeEnv->prevGroupId = INT32_MIN;
|
|
|
|
|
pRuntimeEnv->groupbyNormalCol = isGroupbyNormalCol(pQuery->pGroupbyExpr);
|
|
|
|
|
|
|
|
|
|
if (pTsBuf != NULL) {
|
|
|
|
|
int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
|
|
|
@ -4124,7 +4173,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
|
|
|
|
if (pQuery->intervalTime == 0) {
|
|
|
|
|
int16_t type = TSDB_DATA_TYPE_NULL;
|
|
|
|
|
|
|
|
|
|
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags;
|
|
|
|
|
if (pRuntimeEnv->groupbyNormalCol) { // group by columns not tags;
|
|
|
|
|
type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
|
|
|
|
|
} else {
|
|
|
|
|
type = TSDB_DATA_TYPE_INT; // group id
|
|
|
|
@ -4133,7 +4182,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
|
|
|
|
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} else if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
|
|
|
|
|
} else if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
|
|
|
|
|
int32_t rows = getInitialPageNum(pQInfo);
|
|
|
|
|
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize, pQInfo);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
@ -4141,7 +4190,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int16_t type = TSDB_DATA_TYPE_NULL;
|
|
|
|
|
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
|
|
|
|
|
if (pRuntimeEnv->groupbyNormalCol) {
|
|
|
|
|
type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
|
|
|
|
|
} else {
|
|
|
|
|
type = TSDB_DATA_TYPE_TIMESTAMP;
|
|
|
|
@ -4159,8 +4208,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
|
|
|
|
|
|
|
|
|
// todo refactor
|
|
|
|
|
pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery);
|
|
|
|
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
|
|
|
|
pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery);
|
|
|
|
|
|
|
|
|
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -4201,7 +4251,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
|
|
|
|
SDataStatis *pStatis = NULL;
|
|
|
|
|
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
|
|
|
|
|
|
|
|
|
|
if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
|
|
|
|
|
if (!pRuntimeEnv->groupbyNormalCol) {
|
|
|
|
|
if (!isIntervalQuery(pQuery)) {
|
|
|
|
|
int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1;
|
|
|
|
|
setExecutionContext(pQInfo, (*pTableQueryInfo)->groupIndex, blockInfo.window.ekey + step);
|
|
|
|
@ -4233,9 +4283,9 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
|
|
|
|
|
|
|
|
|
|
setTagVal(pRuntimeEnv, pCheckInfo->pTable, pQInfo->tsdb);
|
|
|
|
|
|
|
|
|
|
STableId id = tsdbGetTableId(pCheckInfo->pTable);
|
|
|
|
|
STableId* id = TSDB_TABLEID(pCheckInfo->pTable);
|
|
|
|
|
qDebug("QInfo:%p query on (%d): uid:%" PRIu64 ", tid:%d, qrange:%" PRId64 "-%" PRId64, pQInfo, index,
|
|
|
|
|
id.uid, id.tid, pCheckInfo->lastKey, pCheckInfo->win.ekey);
|
|
|
|
|
id->uid, id->tid, pCheckInfo->lastKey, pCheckInfo->win.ekey);
|
|
|
|
|
|
|
|
|
|
STsdbQueryCond cond = {
|
|
|
|
|
.twindow = {pCheckInfo->lastKey, pCheckInfo->win.ekey},
|
|
|
|
@ -4364,7 +4414,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group-by on normal columns query
|
|
|
|
|
} else if (pRuntimeEnv->groupbyNormalCol) { // group-by on normal columns query
|
|
|
|
|
while (pQInfo->groupIndex < numOfGroups) {
|
|
|
|
|
SArray* group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex);
|
|
|
|
|
|
|
|
|
@ -4502,11 +4552,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|
|
|
|
*/
|
|
|
|
|
pQInfo->tableIndex++;
|
|
|
|
|
|
|
|
|
|
STableIdInfo tidInfo;
|
|
|
|
|
STableId id = tsdbGetTableId(pQuery->current->pTable);
|
|
|
|
|
STableIdInfo tidInfo = {0};
|
|
|
|
|
|
|
|
|
|
tidInfo.uid = id.uid;
|
|
|
|
|
tidInfo.tid = id.tid;
|
|
|
|
|
STableId* id = TSDB_TABLEID(pQuery->current->pTable);
|
|
|
|
|
tidInfo.uid = id->uid;
|
|
|
|
|
tidInfo.tid = id->tid;
|
|
|
|
|
tidInfo.key = pQuery->current->lastKey;
|
|
|
|
|
taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo);
|
|
|
|
|
|
|
|
|
@ -4680,7 +4730,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (isIntervalQuery(pQuery) || isSumAvgRateQuery(pQuery)) {
|
|
|
|
|
if (QUERY_IS_INTERVAL_QUERY(pQuery) || isSumAvgRateQuery(pQuery)) {
|
|
|
|
|
if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) {
|
|
|
|
|
copyResToQueryResultBuf(pQInfo, pQuery);
|
|
|
|
|
|
|
|
|
@ -4777,10 +4827,10 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
|
|
|
|
|
pQuery->current->lastKey, pQuery->window.ekey);
|
|
|
|
|
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
|
|
|
|
STableIdInfo tidInfo;
|
|
|
|
|
STableId id = tsdbGetTableId(pQuery->current);
|
|
|
|
|
STableId* id = TSDB_TABLEID(pQuery->current);
|
|
|
|
|
|
|
|
|
|
tidInfo.uid = id.uid;
|
|
|
|
|
tidInfo.tid = id.tid;
|
|
|
|
|
tidInfo.uid = id->uid;
|
|
|
|
|
tidInfo.tid = id->tid;
|
|
|
|
|
tidInfo.key = pQuery->current->lastKey;
|
|
|
|
|
taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo);
|
|
|
|
|
}
|
|
|
|
@ -4870,7 +4920,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// all data scanned, the group by normal column can return
|
|
|
|
|
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // todo refactor with merge interval time result
|
|
|
|
|
if (pRuntimeEnv->groupbyNormalCol) { // todo refactor with merge interval time result
|
|
|
|
|
pQInfo->groupIndex = 0;
|
|
|
|
|
pQuery->rec.rows = 0;
|
|
|
|
|
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
|
|
|
@ -4931,7 +4981,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
|
|
|
|
|
STableQueryInfo* item = taosArrayGetP(g, 0);
|
|
|
|
|
|
|
|
|
|
// group by normal column, sliding window query, interval query are handled by interval query processor
|
|
|
|
|
if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation)
|
|
|
|
|
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { // interval (down sampling operation)
|
|
|
|
|
tableIntervalProcess(pQInfo, item);
|
|
|
|
|
} else if (isFixedOutputQuery(pQuery)) {
|
|
|
|
|
tableFixedOutputProcess(pQInfo, item);
|
|
|
|
@ -4946,18 +4996,19 @@ static void tableQueryImpl(SQInfo *pQInfo) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void stableQueryImpl(SQInfo *pQInfo) {
|
|
|
|
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
|
|
|
|
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
|
|
|
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
pQuery->rec.rows = 0;
|
|
|
|
|
|
|
|
|
|
int64_t st = taosGetTimestampUs();
|
|
|
|
|
|
|
|
|
|
if (isIntervalQuery(pQuery) ||
|
|
|
|
|
(isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) &&
|
|
|
|
|
if (QUERY_IS_INTERVAL_QUERY(pQuery) ||
|
|
|
|
|
(isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !pRuntimeEnv->groupbyNormalCol &&
|
|
|
|
|
!isFirstLastRowQuery(pQuery))) {
|
|
|
|
|
multiTableQueryProcess(pQInfo);
|
|
|
|
|
} else {
|
|
|
|
|
assert((pQuery->checkBuffer == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) ||
|
|
|
|
|
isFirstLastRowQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr));
|
|
|
|
|
isFirstLastRowQuery(pQuery) || pRuntimeEnv->groupbyNormalCol);
|
|
|
|
|
|
|
|
|
|
sequentialTableProcess(pQInfo);
|
|
|
|
|
}
|
|
|
|
@ -5652,28 +5703,33 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
|
|
|
|
|
STimeWindow window = pQueryMsg->window;
|
|
|
|
|
taosArraySort(pTableIdList, compareTableIdInfo);
|
|
|
|
|
|
|
|
|
|
// TODO optimize the STableQueryInfo malloc strategy
|
|
|
|
|
pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
|
|
|
|
|
int32_t index = 0;
|
|
|
|
|
|
|
|
|
|
for(int32_t i = 0; i < numOfGroups; ++i) {
|
|
|
|
|
SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i);
|
|
|
|
|
size_t s = taosArrayGetSize(pa);
|
|
|
|
|
|
|
|
|
|
size_t s = taosArrayGetSize(pa);
|
|
|
|
|
SArray* p1 = taosArrayInit(s, POINTER_BYTES);
|
|
|
|
|
|
|
|
|
|
for(int32_t j = 0; j < s; ++j) {
|
|
|
|
|
void* pTable = taosArrayGetP(pa, j);
|
|
|
|
|
STableId* id = TSDB_TABLEID(pTable);
|
|
|
|
|
|
|
|
|
|
// NOTE: compare STableIdInfo with STableId
|
|
|
|
|
STableId id = tsdbGetTableId(pTable);
|
|
|
|
|
STableIdInfo* pTableId = taosArraySearch(pTableIdList, &id, compareTableIdInfo);
|
|
|
|
|
STableIdInfo* pTableId = taosArraySearch(pTableIdList, id, compareTableIdInfo);
|
|
|
|
|
if (pTableId != NULL ) {
|
|
|
|
|
window.skey = pTableId->key;
|
|
|
|
|
} else {
|
|
|
|
|
window.skey = pQueryMsg->window.skey;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, pTable, window);
|
|
|
|
|
void* buf = pQInfo->pBuf + index * sizeof(STableQueryInfo);
|
|
|
|
|
STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, pTable, window, buf);
|
|
|
|
|
item->groupIndex = i;
|
|
|
|
|
taosArrayPush(p1, &item);
|
|
|
|
|
taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id.tid, sizeof(id.tid), &item, POINTER_BYTES);
|
|
|
|
|
taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id->tid, sizeof(id->tid), &item, POINTER_BYTES);
|
|
|
|
|
index += 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosArrayPush(pQInfo->tableqinfoGroupInfo.pGroupList, &p1);
|
|
|
|
@ -5817,6 +5873,7 @@ static void freeQInfo(SQInfo *pQInfo) {
|
|
|
|
|
taosArrayDestroy(p);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tfree(pQInfo->pBuf);
|
|
|
|
|
taosArrayDestroy(pQInfo->tableqinfoGroupInfo.pGroupList);
|
|
|
|
|
taosHashCleanup(pQInfo->tableqinfoGroupInfo.map);
|
|
|
|
|
tsdbDestoryTableGroup(&pQInfo->tableGroupInfo);
|
|
|
|
@ -6093,7 +6150,8 @@ void qTableQuery(qinfo_t qinfo) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t ret = setjmp(pQInfo->env);
|
|
|
|
|
int32_t ret = setjmp(pQInfo->runtimeEnv.env);
|
|
|
|
|
|
|
|
|
|
// error occurs, record the error code and return to client
|
|
|
|
|
if (ret != TSDB_CODE_SUCCESS) {
|
|
|
|
|
pQInfo->code = ret;
|
|
|
|
@ -6276,13 +6334,13 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
|
|
|
|
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
|
|
|
|
|
|
|
|
|
|
output = varDataVal(output);
|
|
|
|
|
STableId id = tsdbGetTableId(item->pTable);
|
|
|
|
|
STableId* id = TSDB_TABLEID(item->pTable);
|
|
|
|
|
|
|
|
|
|
*(int64_t *)output = id.uid; // memory align problem, todo serialize
|
|
|
|
|
output += sizeof(id.uid);
|
|
|
|
|
*(int64_t *)output = id->uid; // memory align problem, todo serialize
|
|
|
|
|
output += sizeof(id->uid);
|
|
|
|
|
|
|
|
|
|
*(int32_t *)output = id.tid;
|
|
|
|
|
output += sizeof(id.tid);
|
|
|
|
|
*(int32_t *)output = id->tid;
|
|
|
|
|
output += sizeof(id->tid);
|
|
|
|
|
|
|
|
|
|
*(int32_t *)output = pQInfo->vgId;
|
|
|
|
|
output += sizeof(pQInfo->vgId);
|
|
|
|
|