[td-225]refactor

This commit is contained in:
Haojun Liao 2021-02-08 15:29:35 +08:00
parent 428bd3b742
commit db12677c61
4 changed files with 132 additions and 113 deletions

View File

@ -269,6 +269,7 @@ typedef struct SQueryRuntimeEnv {
SSDataBlock *ouptputBuf;
int32_t groupIndex;
int32_t tableIndex;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
} SQueryRuntimeEnv;
@ -295,7 +296,6 @@ typedef struct SQInfo {
* the query is executed position on which meter of the whole list.
* when the index reaches the last one of the list, it means the query is completed.
*/
int32_t tableIndex;
SGroupResInfo groupResInfo;
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;

View File

@ -265,7 +265,9 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) {
/*
* the value of number of result needs to be update due to offset value upated.
*/
void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQuery* pQuery, int32_t numOfRes) {
void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfRes) {
SQuery* pQuery = pRuntimeEnv->pQuery;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]);
@ -854,7 +856,8 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
}
}
static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQuery* pQuery, STimeWindow *pWin, int32_t offset) {
static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pWin, int32_t offset) {
SQuery* pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
@ -1902,8 +1905,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv));
SQuery *pQuery = pRuntimeEnv->pQuery;
pRuntimeEnv->interBufSize = getOutputInterResultBufSize(pQuery);
pRuntimeEnv->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo));
pQuery->interBufSize = getOutputInterResultBufSize(pQuery);
pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pRuntimeEnv->keyBuf = malloc(pQuery->maxSrcColumnSize + sizeof(int64_t));
@ -1972,7 +1974,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pCtx->order = pQuery->order.order;
pCtx->functionId = pSqlFuncMsg->functionId;
pCtx->stableQuery = pRuntimeEnv->stableQuery;
pCtx->stableQuery = pQuery->stableQuery;
pCtx->interBufBytes = pQuery->pExpr1[i].interBytes;
pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN;
@ -2044,7 +2046,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
// if it is group by normal column, do not set output buffer, the output buffer is pResult
// fixed output query/multi-output query for normal table
if (!pQuery->groupbyColumn && !pRuntimeEnv->stableQuery && !QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) {
if (!pQuery->groupbyColumn && !pQuery->stableQuery && !QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) {
resetDefaultResInfoOutputBuf(pRuntimeEnv);
}
@ -2070,6 +2072,7 @@ _clean:
static void doFreeQueryHandle(SQInfo* pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
@ -2077,7 +2080,7 @@ static void doFreeQueryHandle(SQInfo* pQInfo) {
pRuntimeEnv->pQueryHandle = NULL;
pRuntimeEnv->pSecQueryHandle = NULL;
SMemRef* pMemRef = &pQInfo->memRef;
SMemRef* pMemRef = &pQuery->memRef;
assert(pMemRef->ref == 0 && pMemRef->imem == NULL && pMemRef->mem == NULL);
}
@ -2179,7 +2182,7 @@ static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) {
}
// Note:top/bottom query is fixed output query
if (pRuntimeEnv->topBotQuery || pQuery->groupbyColumn) {
if (pQuery->topBotQuery || pQuery->groupbyColumn) {
return true;
}
@ -2478,9 +2481,10 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
}
static int32_t getInitialPageNum(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t INITIAL_RESULT_ROWS_VALUE = 16;
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t INITIAL_RESULT_ROWS_VALUE = 16;
int32_t num = 0;
if (isGroupbyColumn(pQuery->pGroupbyExpr)) {
@ -2500,7 +2504,7 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t MIN_ROWS_PER_PAGE = 4;
*rowsize = (int32_t)(pQuery->resultRowSize * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery));
*rowsize = (int32_t)(pQuery->resultRowSize * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery));
int32_t overhead = sizeof(tFilePage);
// one page contains at least two rows
@ -2518,7 +2522,7 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i
static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx,
int32_t numOfRows) {
SQuery* pQuery = pRuntimeEnv->pQuery;
if (pDataStatis == NULL || (pQuery->numOfFilterCols == 0 && (!pRuntimeEnv->topBotQuery))) {
if (pDataStatis == NULL || (pQuery->numOfFilterCols == 0 && (!pQuery->topBotQuery))) {
return true;
}
@ -2577,7 +2581,7 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat
}
}
if (pRuntimeEnv->topBotQuery) {
if (pQuery->topBotQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
@ -2644,7 +2648,8 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW
SQuery *pQuery = pRuntimeEnv->pQuery;
int64_t groupId = pQuery->current->groupIndex;
SQueryCostInfo* pCost = &pRuntimeEnv->summary;
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SQueryCostInfo* pCost = &pQInfo->summary;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf > 0) {
*status = BLK_DATA_ALL_NEEDED;
@ -2855,7 +2860,9 @@ static void doSetInitialTimewindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo
static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current;
SQueryCostInfo* summary = &pRuntimeEnv->summary;
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SQueryCostInfo* summary = &pQInfo->summary;
qDebug("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d",
GET_QINFO_ADDR(pRuntimeEnv), pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, pTableQueryInfo->lastKey,
@ -2982,7 +2989,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable) {
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SExprInfo *pExprInfo = &pQuery->pExpr1[0];
if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP && pRuntimeEnv->stableQuery) {
if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP && pQuery->stableQuery) {
assert(pExprInfo->base.numOfParams == 1);
int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64;
@ -3143,6 +3150,7 @@ void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntim
static int32_t doCopyToSData(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType);
void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo;
while(pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) {
@ -3160,7 +3168,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
if (!hasRemainData(pGroupResInfo)) {
cleanupGroupResInfo(pGroupResInfo);
if (!incNextGroup(pGroupResInfo)) {
SET_STABLE_QUERY_OVER(pQInfo);
SET_STABLE_QUERY_OVER(pRuntimeEnv);
}
}
@ -3241,11 +3249,12 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
}
static void setupQueryRangeForReverseScan(SQInfo* pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo));
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv));
for(int32_t i = 0; i < numOfGroups; ++i) {
SArray *group = GET_TABLEGROUP(pQInfo, i);
SArray *group = GET_TABLEGROUP(pRuntimeEnv, i);
SArray *tableKeyGroup = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, i);
size_t t = taosArrayGetSize(group);
@ -3495,7 +3504,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
}
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef);
if (pRuntimeEnv->pSecQueryHandle == NULL) {
longjmp(pRuntimeEnv->env, terrno);
}
@ -3565,8 +3574,8 @@ static void handleInterpolationQuery(SQInfo* pQInfo) {
return;
}
SArray *prev = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQInfo->memRef, TSDB_PREV_ROW);
SArray *next = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQInfo->memRef, TSDB_NEXT_ROW);
SArray *prev = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQuery->memRef, TSDB_PREV_ROW);
SArray *next = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQuery->memRef, TSDB_NEXT_ROW);
if (prev == NULL || next == NULL) {
return;
}
@ -3629,7 +3638,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
SQueryStatusInfo qstatus = getQueryStatusInfo(pRuntimeEnv, start);
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
if (!pQuery->groupbyColumn && pRuntimeEnv->hasTagResults) {
if (!pQuery->groupbyColumn && pQuery->hasTagResults) {
setTagVal(pRuntimeEnv, pTableQueryInfo->pTable);
}
@ -3660,7 +3669,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
restoreTimeWindow(&pQInfo->tableGroupInfo, &cond);
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef);
if (pRuntimeEnv->pSecQueryHandle == NULL) {
longjmp(pRuntimeEnv->env, terrno);
}
@ -4221,8 +4230,8 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
// Check if query is completed or not for stable query or normal table query respectively.
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
if (pQInfo->runtimeEnv.stableQuery) {
if (IS_STASBLE_QUERY_OVER(pQInfo)) {
if (pQInfo->query.stableQuery) {
if (IS_STASBLE_QUERY_OVER(&pQInfo->runtimeEnv)) {
setQueryStatus(pQuery, QUERY_OVER);
}
} else {
@ -4279,7 +4288,7 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst) {
void queryCostStatis(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryCostInfo *pSummary = &pRuntimeEnv->summary;
SQueryCostInfo *pSummary = &pQInfo->summary;
uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable);
hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
@ -4619,23 +4628,23 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
&& (!isGroupbyColumn(pQuery->pGroupbyExpr))
&& (!isFixedOutputQuery(pRuntimeEnv))
) {
SArray* pa = GET_TABLEGROUP(pQInfo, 0);
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
STableQueryInfo* pCheckInfo = taosArrayGetP(pa, 0);
cond.twindow = pCheckInfo->win;
}
terrno = TSDB_CODE_SUCCESS;
if (isFirstLastRowQuery(pQuery)) {
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef);
// update the query time window
pQuery->window = cond.twindow;
if (pQInfo->tableGroupInfo.numOfTables == 0) {
pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0;
} else {
size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
for(int32_t i = 0; i < numOfGroups; ++i) {
SArray *group = GET_TABLEGROUP(pQInfo, i);
SArray *group = GET_TABLEGROUP(pRuntimeEnv, i);
size_t t = taosArrayGetSize(group);
for (int32_t j = 0; j < t; ++j) {
@ -4647,9 +4656,9 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
}
}
} else if (isPointInterpoQuery(pQuery)) {
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef);
} else {
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef);
}
return terrno;
@ -4687,8 +4696,8 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery);
pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery);
pQuery->topBotQuery = isTopBottomQuery(pQuery);
pQuery->hasTagResults = hasTagValOutput(pQuery);
pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQuery);
pRuntimeEnv->prevResult = prevResult;
@ -4699,14 +4708,14 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
return code;
}
pQInfo->tsdb = tsdb;
pQuery->tsdb = tsdb;
pQInfo->vgId = vgId;
pQInfo->groupResInfo.totalGroup = isSTableQuery? GET_NUM_OF_TABLEGROUP(pQInfo):0;
pQInfo->groupResInfo.totalGroup = isSTableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0;
pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pTsBuf = pTsBuf;
pRuntimeEnv->cur.vgroupIndex = -1;
pRuntimeEnv->stableQuery = isSTableQuery;
pQuery->stableQuery = isSTableQuery;
pRuntimeEnv->prevGroupId = INT32_MIN;
pQuery->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr);
@ -4767,6 +4776,8 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
}
// create runtime environment
int32_t numOfTables = pQInfo->tableGroupInfo.numOfTables;
pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo));
code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQInfo->tableGroupInfo.numOfTables, pQuery->order.order, pQInfo->vgId);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -4806,7 +4817,7 @@ static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTa
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTsBuf != NULL) {
if (pQuery->hasTagResults || pRuntimeEnv->pTsBuf != NULL) {
setTagVal(pRuntimeEnv, pTableQueryInfo->pTable);
}
@ -4838,7 +4849,7 @@ static void doTableQueryInfoTimeWindowCheck(SQuery* pQuery, STableQueryInfo* pTa
static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
SQueryCostInfo* summary = &pRuntimeEnv->summary;
SQueryCostInfo* summary = &pQInfo->summary;
int64_t st = taosGetTimestampMs();
@ -4867,7 +4878,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo);
}
if (pRuntimeEnv->stabledev) {
if (pQuery->stabledev) {
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
if (pQuery->pExpr1[i].base.functionId == TSDB_FUNC_STDDEV_DST) {
setParamValue(pRuntimeEnv);
@ -4914,10 +4925,10 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
SQuery * pQuery = pRuntimeEnv->pQuery;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
SArray *group = GET_TABLEGROUP(pQInfo, 0);
SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0);
STableQueryInfo* pCheckInfo = taosArrayGetP(group, index);
if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTsBuf != NULL) {
if (pQuery->hasTagResults || pRuntimeEnv->pTsBuf != NULL) {
setTagVal(pRuntimeEnv, pCheckInfo->pTable);
}
@ -4949,7 +4960,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
pRuntimeEnv->pQueryHandle = NULL;
}
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo, &pQInfo->memRef);
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &gp, pQInfo, &pQuery->memRef);
taosArrayDestroy(tx);
taosArrayDestroy(g1);
if (pRuntimeEnv->pQueryHandle == NULL) {
@ -5068,17 +5079,17 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
SQuery * pQuery = pRuntimeEnv->pQuery;
setQueryStatus(pQuery, QUERY_COMPLETED);
size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
if (isPointInterpoQuery(pQuery)) {
resetDefaultResInfoOutputBuf(pRuntimeEnv);
assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0);
while (pQInfo->groupIndex < numOfGroups) {
SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex);
while (pRuntimeEnv->groupIndex < numOfGroups) {
SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pRuntimeEnv->groupIndex);
qDebug("QInfo:%p point interpolation query on group:%d, total group:%" PRIzu ", current group:%p", pQInfo,
pQInfo->groupIndex, numOfGroups, group);
pRuntimeEnv->groupIndex, numOfGroups, group);
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
@ -5093,7 +5104,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pRuntimeEnv->pQueryHandle = NULL;
}
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQInfo->tsdb, &cond, &gp, pQInfo, &pQInfo->memRef);
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQuery->tsdb, &cond, &gp, pQInfo, &pQuery->memRef);
taosArrayDestroy(tx);
taosArrayDestroy(g1);
@ -5110,7 +5121,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
taosArrayDestroy(s);
// here we simply set the first table as current table
SArray *first = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex);
SArray *first = GET_TABLEGROUP(pRuntimeEnv, pRuntimeEnv->groupIndex);
pQuery->current = taosArrayGetP(first, 0);
scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
@ -5122,7 +5133,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
skipResults(pRuntimeEnv);
pQInfo->groupIndex += 1;
pRuntimeEnv->groupIndex += 1;
// enable execution for next table, when handling the projection query
enableExecutionForNextTable(pRuntimeEnv);
@ -5133,10 +5144,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
}
} else if (pQuery->groupbyColumn) { // group-by on normal columns query
while (pQInfo->groupIndex < numOfGroups) {
SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex);
while (pRuntimeEnv->groupIndex < numOfGroups) {
SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pRuntimeEnv->groupIndex);
qDebug("QInfo:%p group by normal columns group:%d, total group:%" PRIzu "", pQInfo, pQInfo->groupIndex,
qDebug("QInfo:%p group by normal columns group:%d, total group:%" PRIzu "", pQInfo, pRuntimeEnv->groupIndex,
numOfGroups);
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
@ -5154,7 +5165,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
// no need to update the lastkey for each table
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo, &pQInfo->memRef);
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &gp, pQInfo, &pQuery->memRef);
taosArrayDestroy(g1);
taosArrayDestroy(tx);
@ -5169,7 +5180,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
// here we simply set the first table as current table
scanMultiTableDataBlocks(pQInfo);
pQInfo->groupIndex += 1;
pRuntimeEnv->groupIndex += 1;
taosArrayDestroy(s);
@ -5190,7 +5201,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
qDebug("QInfo:%p generated groupby columns results %d rows for group %d completed", pQInfo, pWindowResInfo->size,
pQInfo->groupIndex);
pRuntimeEnv->groupIndex);
pQuery->rec.rows = 0;
if (pWindowResInfo->size > pQuery->rec.capacity) {
@ -5205,19 +5216,19 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
cleanupGroupResInfo(&pQInfo->groupResInfo);
break;
}
} else if (pRuntimeEnv->queryWindowIdentical && pRuntimeEnv->pTsBuf == NULL && !isTsCompQuery(pQuery)) {
} else if (pQuery->queryWindowIdentical && pRuntimeEnv->pTsBuf == NULL && !isTsCompQuery(pQuery)) {
//super table projection query with identical query time range for all tables.
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
resetDefaultResInfoOutputBuf(pRuntimeEnv);
SArray *group = GET_TABLEGROUP(pQInfo, 0);
SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0);
assert(taosArrayGetSize(group) == pRuntimeEnv->tableqinfoGroupInfo.numOfTables &&
1 == taosArrayGetSize(pRuntimeEnv->tableqinfoGroupInfo.pGroupList));
void *pQueryHandle = pRuntimeEnv->pQueryHandle;
if (pQueryHandle == NULL) {
STsdbQueryCond con = createTsdbQueryCond(pQuery, &pQuery->window);
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &con, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQuery->tsdb, &con, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef);
pQueryHandle = pRuntimeEnv->pQueryHandle;
}
@ -5236,7 +5247,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
bool hasMoreBlock = true;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
SQueryCostInfo *summary = &pRuntimeEnv->summary;
SQueryCostInfo *summary = &pQInfo->summary;
while ((hasMoreBlock = tsdbNextDataBlock(pQueryHandle)) == true) {
summary->totalBlocks += 1;
@ -5254,7 +5265,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pQuery->current = *pTableQueryInfo;
doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo);
if (pRuntimeEnv->hasTagResults) {
if (pQuery->hasTagResults) {
setTagVal(pRuntimeEnv, pQuery->current->pTable);
}
@ -5326,7 +5337,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
// the limitation of output result is reached, set the query completed
skipResults(pRuntimeEnv);
if (limitOperator(pQuery, pQInfo)) {
SET_STABLE_QUERY_OVER(pQInfo);
SET_STABLE_QUERY_OVER(pRuntimeEnv);
break;
}
}
@ -5339,7 +5350,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
if (!hasMoreBlock) {
setQueryStatus(pQuery, QUERY_COMPLETED);
SET_STABLE_QUERY_OVER(pQInfo);
SET_STABLE_QUERY_OVER(pRuntimeEnv);
}
} else {
/*
@ -5358,32 +5369,32 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
// all data have returned already
if (pQInfo->tableIndex >= pRuntimeEnv->tableqinfoGroupInfo.numOfTables) {
if (pRuntimeEnv->tableIndex >= pRuntimeEnv->tableqinfoGroupInfo.numOfTables) {
return;
}
resetDefaultResInfoOutputBuf(pRuntimeEnv);
resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->resultRowInfo);
SArray *group = GET_TABLEGROUP(pQInfo, 0);
SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0);
assert(taosArrayGetSize(group) == pRuntimeEnv->tableqinfoGroupInfo.numOfTables &&
1 == taosArrayGetSize(pRuntimeEnv->tableqinfoGroupInfo.pGroupList));
while (pQInfo->tableIndex < pRuntimeEnv->tableqinfoGroupInfo.numOfTables) {
while (pRuntimeEnv->tableIndex < pRuntimeEnv->tableqinfoGroupInfo.numOfTables) {
if (isQueryKilled(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
pQuery->current = taosArrayGetP(group, pQInfo->tableIndex);
if (!multiTableMultioutputHelper(pQInfo, pQInfo->tableIndex)) {
pQInfo->tableIndex++;
pQuery->current = taosArrayGetP(group, pRuntimeEnv->tableIndex);
if (!multiTableMultioutputHelper(pQInfo, pRuntimeEnv->tableIndex)) {
pRuntimeEnv->tableIndex++;
continue;
}
// TODO handle the limit offset problem
if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) {
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
pQInfo->tableIndex++;
pRuntimeEnv->tableIndex++;
continue;
}
}
@ -5393,7 +5404,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
// the limitation of output result is reached, set the query completed
if (limitOperator(pQuery, pQInfo)) {
SET_STABLE_QUERY_OVER(pQInfo);
SET_STABLE_QUERY_OVER(pRuntimeEnv);
break;
}
@ -5407,7 +5418,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
* not the consecutive query on meter on which is aborted due to buffer limitation
* to ensure that, we can reset the query range once query on a meter is completed.
*/
pQInfo->tableIndex++;
pRuntimeEnv->tableIndex++;
updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo);
// if the buffer is full or group by each table, we need to jump out of the loop
@ -5432,7 +5443,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
}
if (pQInfo->tableIndex >= pRuntimeEnv->tableqinfoGroupInfo.numOfTables) {
if (pRuntimeEnv->tableIndex >= pRuntimeEnv->tableqinfoGroupInfo.numOfTables) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
@ -5457,7 +5468,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
qDebug("QInfo %p numOfTables:%" PRIu64 ", index:%d, numOfGroups:%" PRIzu ", %" PRId64
" points returned, total:%" PRId64 ", offset:%" PRId64,
pQInfo, (uint64_t)pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows,
pQInfo, (uint64_t)pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pRuntimeEnv->tableIndex, numOfGroups, pQuery->rec.rows,
pQuery->rec.total, pQuery->limit.offset);
}
}
@ -5487,7 +5498,7 @@ static int32_t doSaveContext(SQInfo *pQInfo) {
setupQueryRangeForReverseScan(pQInfo);
pRuntimeEnv->prevGroupId = INT32_MIN;
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef);
return (pRuntimeEnv->pSecQueryHandle == NULL)? -1:0;
}
@ -5508,11 +5519,12 @@ static void doRestoreContext(SQInfo *pQInfo) {
static void doCloseAllTimeWindow(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
for (int32_t i = 0; i < numOfGroup; ++i) {
SArray *group = GET_TABLEGROUP(pQInfo, i);
SArray *group = GET_TABLEGROUP(pRuntimeEnv, i);
size_t num = taosArrayGetSize(group);
for (int32_t j = 0; j < num; ++j) {
@ -5681,6 +5693,8 @@ static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) {
return pBlock;
}
return NULL;
}
static SSDataBlock* doTableScan(void* param) {
@ -5701,8 +5715,8 @@ static SSDataBlock* doTableScan(void* param) {
tsdbCleanupQueryHandle(pTableScanInfo->pQueryHandle);
STsdbQueryCond cond = createTsdbQueryCond(pRuntimeEnv->pQuery, &pRuntimeEnv->pQuery->window);
pTableScanInfo->pQueryHandle =
tsdbQueryTables(pTableScanInfo->pQInfo->tsdb, &cond, &pTableScanInfo->pQInfo->tableGroupInfo,
pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->memRef);
tsdbQueryTables(pTableScanInfo->pQInfo->query.tsdb, &cond, &pTableScanInfo->pQInfo->tableGroupInfo,
pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->query.memRef);
if (pTableScanInfo->pQueryHandle == NULL) {
longjmp(pRuntimeEnv->env, terrno);
}
@ -5727,8 +5741,8 @@ static SSDataBlock* doTableScan(void* param) {
STsdbQueryCond cond = createTsdbQueryCond(pRuntimeEnv->pQuery, &pRuntimeEnv->pQuery->window);
pTableScanInfo->pQueryHandle =
tsdbQueryTables(pTableScanInfo->pQInfo->tsdb, &cond, &pTableScanInfo->pQInfo->tableGroupInfo,
pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->memRef);
tsdbQueryTables(pTableScanInfo->pQInfo->query.tsdb, &cond, &pTableScanInfo->pQInfo->tableGroupInfo,
pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->query.memRef);
qDebug("QInfo:%p start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
pTableScanInfo->pQInfo, cond.twindow.skey, cond.twindow.ekey);
@ -5810,8 +5824,8 @@ static SSDataBlock* doAggOperator(void* param) {
setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED);
finalizeQueryResult(pInfo->pRuntimeEnv);
pInfo->pRuntimeEnv->pQuery->ouptputBuf->info.rows = getNumOfResult(pInfo->pRuntimeEnv);
return pInfo->pRuntimeEnv->pQuery->ouptputBuf;
pInfo->pRuntimeEnv->ouptputBuf->info.rows = getNumOfResult(pInfo->pRuntimeEnv);
return pInfo->pRuntimeEnv->ouptputBuf;
}
// todo set the attribute of query scan count
@ -5849,7 +5863,7 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
if (!pRuntimeEnv->topBotQuery && pQuery->limit.offset > 0) { // no need to execute, since the output will be ignore.
if (!pQuery->topBotQuery && pQuery->limit.offset > 0) { // no need to execute, since the output will be ignore.
return;
}
@ -6038,7 +6052,7 @@ void tableQueryImpl(SQInfo *pQInfo) {
int64_t st = taosGetTimestampUs();
assert(pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1);
SArray* g = GET_TABLEGROUP(pQInfo, 0);
SArray* g = GET_TABLEGROUP(pRuntimeEnv, 0);
STableQueryInfo* item = taosArrayGetP(g, 0);
pQuery->current = item;
@ -6054,7 +6068,7 @@ void tableQueryImpl(SQInfo *pQInfo) {
}
// record the total elapsed time
pRuntimeEnv->summary.elapsedTime += (taosGetTimestampUs() - st);
pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st);
assert(pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1);
}
@ -6127,7 +6141,7 @@ void stableQueryImpl(SQInfo *pQInfo) {
}
// record the total elapsed time
pQInfo->runtimeEnv.summary.elapsedTime += (taosGetTimestampUs() - st);
pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st);
}
static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) {
@ -6881,7 +6895,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr
}
doUpdateExprColumnIndex(pQuery);
pQuery->ouptputBuf = createOutputBuf(pQuery);
pQInfo->runtimeEnv.ouptputBuf = createOutputBuf(pQuery);
int32_t ret = createFilterInfo(pQInfo, pQuery);
if (ret != TSDB_CODE_SUCCESS) {
@ -6926,7 +6940,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr
size_t numOfGroups = 0;
if (pTableGroupInfo->pGroupList != NULL) {
numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList);
STableGroupInfo* pTableqinfo = &pRuntimeEnv->tableqinfoGroupInfo;
STableGroupInfo* pTableqinfo = &pQInfo->runtimeEnv.tableqinfoGroupInfo;
pTableqinfo->pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES);
pTableqinfo->numOfTables = pTableGroupInfo->numOfTables;
@ -6950,7 +6964,8 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr
pQuery->window = pQueryMsg->window;
changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery);
pQInfo->runtimeEnv.queryWindowIdentical = true;
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
pQuery->queryWindowIdentical = true;
bool groupByCol = isGroupbyColumn(pQuery->pGroupbyExpr);
STimeWindow window = pQuery->window;
@ -6972,7 +6987,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr
window.skey = info->lastKey;
if (info->lastKey != pQuery->window.skey) {
pQInfo->runtimeEnv.queryWindowIdentical = false;
pQInfo->query.queryWindowIdentical = false;
}
void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo);
@ -6993,7 +7008,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr
colIdCheck(pQuery);
// todo refactor
pQInfo->runtimeEnv.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
qDebug("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo);
return pQInfo;
@ -7039,6 +7054,8 @@ bool isValidQInfo(void *param) {
int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, SQueryParam* param, bool isSTable) {
int32_t code = TSDB_CODE_SUCCESS;
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
STSBuf *pTsBuf = NULL;
@ -7149,6 +7166,7 @@ void freeQInfo(SQInfo *pQInfo) {
qDebug("QInfo:%p start to free QInfo", pQInfo);
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
releaseQueryBuf(pRuntimeEnv->tableqinfoGroupInfo.numOfTables);
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
@ -7326,14 +7344,14 @@ void buildTagQueryResult(SQInfo* pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
assert(numOfGroup == 0 || numOfGroup == 1);
if (numOfGroup == 0) {
return;
}
SArray* pa = GET_TABLEGROUP(pQInfo, 0);
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
size_t num = taosArrayGetSize(pa);
assert(num == pRuntimeEnv->tableqinfoGroupInfo.numOfTables);
@ -7358,8 +7376,8 @@ void buildTagQueryResult(SQInfo* pQInfo) {
}
}
while(pQInfo->tableIndex < num && count < pQuery->rec.capacity) {
int32_t i = pQInfo->tableIndex++;
while(pRuntimeEnv->tableIndex < num && count < pQuery->rec.capacity) {
int32_t i = pRuntimeEnv->tableIndex++;
STableQueryInfo *item = taosArrayGetP(pa, i);
char *output = pQuery->sdata[0]->data + count * rsize;
@ -7397,7 +7415,7 @@ void buildTagQueryResult(SQInfo* pQInfo) {
*(int64_t*) pQuery->sdata[0]->data = num;
count = 1;
SET_STABLE_QUERY_OVER(pQInfo);
SET_STABLE_QUERY_OVER(pRuntimeEnv);
qDebug("QInfo:%p create count(tbname) query, res:%d rows:1", pQInfo, count);
} else { // return only the tags|table name etc.
count = 0;
@ -7408,8 +7426,8 @@ void buildTagQueryResult(SQInfo* pQInfo) {
maxNumOfTables = (int32_t)pQuery->limit.limit;
}
while(pQInfo->tableIndex < num && count < maxNumOfTables) {
int32_t i = pQInfo->tableIndex++;
while(pRuntimeEnv->tableIndex < num && count < maxNumOfTables) {
int32_t i = pRuntimeEnv->tableIndex++;
// discard current result due to offset
if (pQuery->limit.offset > 0) {

View File

@ -164,7 +164,8 @@ SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRo
}
size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv) {
return (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultRowCellInfo)) + pRuntimeEnv->interBufSize + sizeof(SResultRow);
SQuery* pQuery = pRuntimeEnv->pQuery;
return (pQuery->numOfOutput * sizeof(SResultRowCellInfo)) + pQuery->interBufSize + sizeof(SResultRow);
}
SResultRowPool* initResultRowPool(size_t size) {
@ -540,11 +541,12 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQInfo *pQInfo) {
int64_t st = taosGetTimestampUs();
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) {
SArray *group = GET_TABLEGROUP(pQInfo, pGroupResInfo->currentGroup);
SArray *group = GET_TABLEGROUP(pRuntimeEnv, pGroupResInfo->currentGroup);
int32_t ret = mergeIntoGroupResultImpl(&pQInfo->runtimeEnv, pGroupResInfo, group, pQInfo);
int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group, pQInfo);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
@ -560,13 +562,13 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQInfo *pQInfo) {
}
if (pGroupResInfo->currentGroup >= pGroupResInfo->totalGroup && !hasRemainData(pGroupResInfo)) {
SET_STABLE_QUERY_OVER(pQInfo);
SET_STABLE_QUERY_OVER(pRuntimeEnv);
}
int64_t elapsedTime = taosGetTimestampUs() - st;
qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", pQInfo,
pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime);
pQInfo->runtimeEnv.summary.firstStageMergeTime += elapsedTime;
pQInfo->summary.firstStageMergeTime += elapsedTime;
return TSDB_CODE_SUCCESS;
}

View File

@ -216,7 +216,7 @@ bool qTableQuery(qinfo_t qinfo) {
return doBuildResCheck(pQInfo);
}
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
if (pQInfo->runtimeEnv.tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:%p no table exists for query, abort", pQInfo);
setQueryStatus(pQInfo->runtimeEnv.pQuery, QUERY_COMPLETED);
return doBuildResCheck(pQInfo);
@ -236,9 +236,9 @@ bool qTableQuery(qinfo_t qinfo) {
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
assert(pQInfo->runtimeEnv.pQueryHandle == NULL);
buildTagQueryResult(pQInfo);
} else if (pQInfo->runtimeEnv.stableQuery) {
} else if (pQInfo->query.stableQuery) {
stableQueryImpl(pQInfo);
} else if (pQInfo->runtimeEnv.queryBlockDist){
} else if (pQInfo->query.queryBlockDist){
buildTableBlockDistResult(pQInfo);
} else {
tableQueryImpl(pQInfo);
@ -248,7 +248,7 @@ bool qTableQuery(qinfo_t qinfo) {
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p query is killed", pQInfo);
} else if (pQuery->rec.rows == 0) {
qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total);
qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pQuery->rec.total);
} else {
qDebug("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
@ -309,7 +309,6 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
size_t size = getResultSize(pQInfo, &pQuery->rec.rows);
@ -328,10 +327,10 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
if (pQInfo->code == TSDB_CODE_SUCCESS) {
(*pRsp)->offset = htobe64(pQuery->limit.offset);
(*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime);
(*pRsp)->useconds = htobe64(pQInfo->summary.elapsedTime);
} else {
(*pRsp)->offset = 0;
(*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime);
(*pRsp)->useconds = htobe64(pQInfo->summary.elapsedTime);
}
(*pRsp)->precision = htons(pQuery->precision);