[TD-2060]

This commit is contained in:
Haojun Liao 2020-11-16 14:45:10 +08:00
parent 875149dde2
commit d5aa32bf35
4 changed files with 98 additions and 94 deletions

View File

@ -152,7 +152,7 @@ typedef struct SQuery {
SLimitVal limit;
int32_t rowSize;
SSqlGroupbyExpr* pGroupbyExpr;
SExprInfo* pSelectExpr;
SExprInfo* pExpr1;
SExprInfo* pExpr2;
int32_t numOfExpr2;

View File

@ -48,7 +48,7 @@ static FORCE_INLINE SResultRow *getResultRow(SWindowResInfo *pWindowResInfo, int
}
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pSelectExpr[1].base.arg->argValue.i64:1)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pExpr1[1].base.arg->argValue.i64:1)
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot);
@ -62,7 +62,7 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3
int32_t realRowId = (int32_t)(pResult->rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery));
return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage +
pQuery->pSelectExpr[columnIndex].bytes * realRowId;
pQuery->pExpr1[columnIndex].bytes * realRowId;
}
bool isNull_filter(SColumnFilterElem *pFilter, char* minval, char* maxval);

View File

@ -242,7 +242,7 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) {
int64_t maxOutput = 0;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].base.functionId;
int32_t functionId = pQuery->pExpr1[j].base.functionId;
/*
* ts, tag, tagprj function can not decide the output number of current query
@ -337,7 +337,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) {
int32_t numOfSelectivity = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functId = pQuery->pSelectExpr[i].base.functionId;
int32_t functId = pQuery->pExpr1[i].base.functionId;
if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY) {
hasTags = true;
continue;
@ -357,7 +357,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) {
bool isProjQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functId = pQuery->pSelectExpr[i].base.functionId;
int32_t functId = pQuery->pExpr1[i].base.functionId;
if (functId != TSDB_FUNC_PRJ && functId != TSDB_FUNC_TAGPRJ) {
return false;
}
@ -366,7 +366,7 @@ bool isProjQuery(SQuery *pQuery) {
return true;
}
bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].base.functionId == TSDB_FUNC_TS_COMP; }
bool isTSCompQuery(SQuery *pQuery) { return pQuery->pExpr1[0].base.functionId == TSDB_FUNC_TS_COMP; }
static bool limitResults(SQueryRuntimeEnv* pRuntimeEnv) {
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
@ -387,7 +387,7 @@ static bool limitResults(SQueryRuntimeEnv* pRuntimeEnv) {
static bool isTopBottomQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_TS) {
continue;
}
@ -401,12 +401,12 @@ static bool isTopBottomQuery(SQuery *pQuery) {
}
static bool hasTagValOutput(SQuery* pQuery) {
SExprInfo *pExprInfo = &pQuery->pSelectExpr[0];
SExprInfo *pExprInfo = &pQuery->pExpr1[0];
if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) {
return true;
} else { // set tag value, by which the results are aggregated.
for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
SExprInfo *pLocalExprInfo = &pQuery->pSelectExpr[idx];
SExprInfo *pLocalExprInfo = &pQuery->pExpr1[idx];
// ts_comp column required the tag value for join filter
if (TSDB_COL_IS_TAG(pLocalExprInfo->base.colInfo.flag)) {
@ -784,7 +784,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, bool closed
pCtx[k].size = forwardStep;
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1);
int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
pCtx[k].ptsList = &tsCol[pCtx[k].startOffset];
}
@ -813,7 +813,7 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, bool closed,
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey;
int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
}
@ -922,9 +922,9 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
char *dataBlock = NULL;
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t functionId = pQuery->pSelectExpr[col].base.functionId;
int32_t functionId = pQuery->pExpr1[col].base.functionId;
if (functionId == TSDB_FUNC_ARITHM) {
sas->pArithExpr = &pQuery->pSelectExpr[col];
sas->pArithExpr = &pQuery->pExpr1[col];
sas->offset = 0;
sas->colList = pQuery->colList;
@ -954,9 +954,9 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
}
} else { // other type of query function
SColIndex *pCol = &pQuery->pSelectExpr[col].base.colInfo;
SColIndex *pCol = &pQuery->pExpr1[col].base.colInfo;
if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) {
SColIndex* pColIndex = &pQuery->pSelectExpr[col].base.colInfo;
SColIndex* pColIndex = &pQuery->pExpr1[col].base.colInfo;
SColumnInfoData *p = taosArrayGet(pDataBlock, pColIndex->colIndex);
assert(p->info.colId == pColIndex->colId);
@ -1067,7 +1067,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
* tag_prj function are changed to be TSDB_FUNC_TAG_DUMMY
*/
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]);
}
@ -1075,7 +1075,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) {
if (pQuery->pExpr1[i].base.functionId != TSDB_FUNC_ARITHM) {
continue;
}
@ -1375,7 +1375,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
}
@ -1404,7 +1404,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
// todo refactor: extract method
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) {
if (pQuery->pExpr1[i].base.functionId != TSDB_FUNC_ARITHM) {
continue;
}
@ -1464,11 +1464,11 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
SDataStatis *pStatis, void *param, int32_t colIndex, int32_t vgId) {
int32_t functionId = pQuery->pSelectExpr[colIndex].base.functionId;
int32_t colId = pQuery->pSelectExpr[colIndex].base.colInfo.colId;
int32_t functionId = pQuery->pExpr1[colIndex].base.functionId;
int32_t colId = pQuery->pExpr1[colIndex].base.colInfo.colId;
SDataStatis *tpField = NULL;
pCtx->hasNull = hasNullValue(&pQuery->pSelectExpr[colIndex].base.colInfo, pStatis, &tpField);
pCtx->hasNull = hasNullValue(&pQuery->pExpr1[colIndex].base.colInfo, pStatis, &tpField);
pCtx->aInputElemBuf = inputData;
if (tpField != NULL) {
@ -1501,7 +1501,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
functionId == TSDB_FUNC_DIFF || (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) {
/*
* least squares function needs two columns of input, currently, the x value of linear equation is set to
* timestamp column, and the y-value is the column specified in pQuery->pSelectExpr[i].colIdxInBuffer
* timestamp column, and the y-value is the column specified in pQuery->pExpr1[i].colIdxInBuffer
*
* top/bottom function needs timestamp to indicate when the
* top/bottom values emerge, so does diff function
@ -1574,7 +1574,7 @@ static int32_t setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
}
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base;
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pExpr1[i].base;
if (pSqlFuncMsg->functionId == TSDB_FUNC_TAG_DUMMY || pSqlFuncMsg->functionId == TSDB_FUNC_TS_DUMMY) {
tagLen += pCtx[i].outputBytes;
@ -1615,7 +1615,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
pRuntimeEnv->offset[0] = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base;
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pExpr1[i].base;
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
SColIndex* pIndex = &pSqlFuncMsg->colInfo;
@ -1649,13 +1649,13 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
assert(isValidDataType(pCtx->inputType));
pCtx->ptsOutputBuf = NULL;
pCtx->outputBytes = pQuery->pSelectExpr[i].bytes;
pCtx->outputType = pQuery->pSelectExpr[i].type;
pCtx->outputBytes = pQuery->pExpr1[i].bytes;
pCtx->outputType = pQuery->pExpr1[i].type;
pCtx->order = pQuery->order.order;
pCtx->functionId = pSqlFuncMsg->functionId;
pCtx->stableQuery = pRuntimeEnv->stableQuery;
pCtx->interBufBytes = pQuery->pSelectExpr[i].interBytes;
pCtx->interBufBytes = pQuery->pExpr1[i].interBytes;
pCtx->numOfParams = pSqlFuncMsg->numOfParams;
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
@ -1672,7 +1672,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
int32_t functionId = pCtx->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
int32_t f = pQuery->pSelectExpr[0].base.functionId;
int32_t f = pQuery->pExpr1[0].base.functionId;
assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY);
pCtx->param[2].i64Key = order;
@ -1685,7 +1685,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
if (i > 0) {
pRuntimeEnv->offset[i] = pRuntimeEnv->offset[i - 1] + pRuntimeEnv->pCtx[i - 1].outputBytes;
pRuntimeEnv->rowCellInfoOffset[i] = pRuntimeEnv->rowCellInfoOffset[i - 1] + sizeof(SResultRowCellInfo) + pQuery->pSelectExpr[i - 1].interBytes;
pRuntimeEnv->rowCellInfoOffset[i] = pRuntimeEnv->rowCellInfoOffset[i - 1] + sizeof(SResultRowCellInfo) + pQuery->pExpr1[i - 1].interBytes;
}
}
@ -1779,7 +1779,7 @@ static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) {
}
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pExprMsg = &pQuery->pSelectExpr[i].base;
SSqlFuncMsg *pExprMsg = &pQuery->pExpr1[i].base;
// ignore the ts_comp function
if (i == 0 && pExprMsg->functionId == TSDB_FUNC_PRJ && pExprMsg->numOfParams == 1 &&
@ -1802,7 +1802,7 @@ static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) {
// todo refactor with isLastRowQuery
static bool isPointInterpoQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionID = pQuery->pSelectExpr[i].base.functionId;
int32_t functionID = pQuery->pExpr1[i].base.functionId;
if (functionID == TSDB_FUNC_INTERP) {
return true;
}
@ -1814,7 +1814,7 @@ static bool isPointInterpoQuery(SQuery *pQuery) {
// TODO REFACTOR:MERGE WITH CLIENT-SIDE FUNCTION
static bool isSumAvgRateQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_TS) {
continue;
}
@ -1830,7 +1830,7 @@ static bool isSumAvgRateQuery(SQuery *pQuery) {
static bool isFirstLastRowQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionID = pQuery->pSelectExpr[i].base.functionId;
int32_t functionID = pQuery->pExpr1[i].base.functionId;
if (functionID == TSDB_FUNC_LAST_ROW) {
return true;
}
@ -1841,7 +1841,7 @@ static bool isFirstLastRowQuery(SQuery *pQuery) {
static bool needReverseScan(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG) {
continue;
}
@ -1852,7 +1852,7 @@ static bool needReverseScan(SQuery *pQuery) {
if (functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_LAST_DST) {
// the scan order to acquire the last result of the specified column
int32_t order = (int32_t)pQuery->pSelectExpr[i].base.arg->argValue.i64;
int32_t order = (int32_t)pQuery->pExpr1[i].base.arg->argValue.i64;
if (order != pQuery->order.order) {
return true;
}
@ -1868,7 +1868,7 @@ static bool needReverseScan(SQuery *pQuery) {
*/
static bool onlyQueryTags(SQuery* pQuery) {
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SExprInfo* pExprInfo = &pQuery->pSelectExpr[i];
SExprInfo* pExprInfo = &pQuery->pExpr1[i];
int32_t functionId = pExprInfo->base.functionId;
@ -1911,7 +1911,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) {
} else {
bool hasMultioutput = false;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pExprMsg = &pQuery->pSelectExpr[i].base;
SSqlFuncMsg *pExprMsg = &pQuery->pExpr1[i].base;
if (pExprMsg->functionId == TSDB_FUNC_TS || pExprMsg->functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
@ -1945,7 +1945,7 @@ bool colIdCheck(SQuery *pQuery) {
// the scan order is not matter
static bool onlyOneQueryType(SQuery *pQuery, int32_t functId, int32_t functIdDst) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG ||
functionId == TSDB_FUNC_TAG_DUMMY) {
@ -2175,7 +2175,7 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat
if (pRuntimeEnv->topBotQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
return topbot_datablock_filter(&pCtx[i], functionId, (char *)&pDataStatis[i].min, (char *)&pDataStatis[i].max);
}
@ -2266,7 +2266,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo * pW
}
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base;
SSqlFuncMsg* pSqlFunc = &pQuery->pExpr1[i].base;
int32_t functionId = pSqlFunc->functionId;
int32_t colId = pSqlFunc->colInfo.colId;
@ -2390,7 +2390,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa
}
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t bytes = pQuery->pSelectExpr[i].bytes;
int32_t bytes = pQuery->pExpr1[i].bytes;
assert(bytes > 0 && capacity > 0);
char *tmp = realloc(pQuery->sdata[i], bytes * capacity + sizeof(tFilePage));
@ -2421,7 +2421,7 @@ static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pB
int32_t newSize = (int32_t)(pRec->capacity + (pBlockInfo->rows - remain));
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t bytes = pQuery->pSelectExpr[i].bytes;
int32_t bytes = pQuery->pExpr1[i].bytes;
assert(bytes > 0 && newSize > 0);
char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(tFilePage));
@ -2435,7 +2435,7 @@ static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pB
// set the pCtx output buffer position
pRuntimeEnv->pCtx[i].aOutputBuf = pQuery->sdata[i]->data + pRec->rows * bytes;
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pRuntimeEnv->pCtx[i].ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
}
@ -2599,7 +2599,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SExprInfo *pExprInfo = &pQuery->pSelectExpr[0];
SExprInfo *pExprInfo = &pQuery->pExpr1[0];
if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) {
assert(pExprInfo->base.numOfParams == 1);
@ -2610,7 +2610,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
} else {
// set tag value, by which the results are aggregated.
for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
SExprInfo* pLocalExprInfo = &pQuery->pSelectExpr[idx];
SExprInfo* pLocalExprInfo = &pQuery->pExpr1[idx];
// ts_comp column required the tag value for join filter
if (!TSDB_COL_IS_TAG(pLocalExprInfo->base.colInfo.flag)) {
@ -2652,7 +2652,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SResultRow
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (!mergeFlag) {
pCtx[i].aOutputBuf = pCtx[i].aOutputBuf + pCtx[i].outputBytes;
pCtx[i].currentStage = FIRST_STAGE_MERGE;
@ -2680,7 +2680,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SResultRow
}
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY) {
continue;
}
@ -2766,25 +2766,25 @@ void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntim
for (int32_t j = 0; j < numOfRows; ++j) {
for (int32_t i = 0; i < numOfCols; ++i) {
switch (pQuery->pSelectExpr[i].type) {
switch (pQuery->pExpr1[i].type) {
case TSDB_DATA_TYPE_BINARY: {
int32_t type = pQuery->pSelectExpr[i].type;
printBinaryData(pQuery->pSelectExpr[i].base.functionId, pdata[i]->data + pQuery->pSelectExpr[i].bytes * j,
int32_t type = pQuery->pExpr1[i].type;
printBinaryData(pQuery->pExpr1[i].base.functionId, pdata[i]->data + pQuery->pExpr1[i].bytes * j,
type);
break;
}
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT:
printf("%" PRId64 "\t", *(int64_t *)(pdata[i]->data + pQuery->pSelectExpr[i].bytes * j));
printf("%" PRId64 "\t", *(int64_t *)(pdata[i]->data + pQuery->pExpr1[i].bytes * j));
break;
case TSDB_DATA_TYPE_INT:
printf("%d\t", *(int32_t *)(pdata[i]->data + pQuery->pSelectExpr[i].bytes * j));
printf("%d\t", *(int32_t *)(pdata[i]->data + pQuery->pExpr1[i].bytes * j));
break;
case TSDB_DATA_TYPE_FLOAT:
printf("%f\t", *(float *)(pdata[i]->data + pQuery->pSelectExpr[i].bytes * j));
printf("%f\t", *(float *)(pdata[i]->data + pQuery->pExpr1[i].bytes * j));
break;
case TSDB_DATA_TYPE_DOUBLE:
printf("%lf\t", *(double *)(pdata[i]->data + pQuery->pSelectExpr[i].bytes * j));
printf("%lf\t", *(double *)(pdata[i]->data + pQuery->pExpr1[i].bytes * j));
break;
}
}
@ -2951,7 +2951,7 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResu
SQuery* pQuery = pRuntimeEnv->pQuery;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].base.functionId;
int32_t functionId = pQuery->pExpr1[j].base.functionId;
/*
* ts, tag, tagprj function can not decide the output number of current query
@ -3236,7 +3236,7 @@ static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SWindowR
// open/close the specified query for each group result
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functId = pQuery->pSelectExpr[j].base.functionId;
int32_t functId = pQuery->pExpr1[j].base.functionId;
SResultRowCellInfo* pInfo = getResultCell(pRuntimeEnv, pRow, j);
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) ||
@ -3260,7 +3260,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
disableFuncInReverseScanImpl(pRuntimeEnv, pWindowResInfo, order);
} else { // for simple result of table query,
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo refactor
int32_t functId = pQuery->pSelectExpr[j].base.functionId;
int32_t functId = pQuery->pExpr1[j].base.functionId;
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[j];
if (pCtx->resultInfo == NULL) {
@ -3331,12 +3331,12 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
pCtx->resultInfo = pCellInfo;
// set the timestamp output buffer for top/bottom/diff query
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
}
memset(pQuery->sdata[i]->data, 0, (size_t)(pQuery->pSelectExpr[i].bytes * pQuery->rec.capacity));
memset(pQuery->sdata[i]->data, 0, (size_t)(pQuery->pExpr1[i].bytes * pQuery->rec.capacity));
}
initCtxOutputBuf(pRuntimeEnv);
@ -3347,7 +3347,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
// reset the execution contexts
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].base.functionId;
int32_t functionId = pQuery->pExpr1[j].base.functionId;
assert(functionId != TSDB_FUNC_DIFF);
// set next output position
@ -3374,7 +3374,7 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].base.functionId;
int32_t functionId = pQuery->pExpr1[j].base.functionId;
pRuntimeEnv->pCtx[j].currentStage = 0;
SResultRowCellInfo* pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]);
@ -3412,7 +3412,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
0, pQuery->rec.rows);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t functionId = pQuery->pExpr1[i].base.functionId;
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
memmove(pQuery->sdata[i]->data, (char*)pQuery->sdata[i]->data + bytes * numOfSkip, (size_t)(pQuery->rec.rows * bytes));
@ -3454,7 +3454,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
setResultOutputBuf(pRuntimeEnv, pResult);
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int16_t functId = pQuery->pSelectExpr[j].base.functionId;
int16_t functId = pQuery->pExpr1[j].base.functionId;
if (functId == TSDB_FUNC_TS) {
continue;
}
@ -3467,7 +3467,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
}
} else {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int16_t functId = pQuery->pSelectExpr[j].base.functionId;
int16_t functId = pQuery->pExpr1[j].base.functionId;
if (functId == TSDB_FUNC_TS) {
continue;
}
@ -3680,7 +3680,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
setResultOutputBuf(pRuntimeEnv, buf);
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
aAggs[pQuery->pSelectExpr[j].base.functionId].xFinalize(&pRuntimeEnv->pCtx[j]);
aAggs[pQuery->pExpr1[j].base.functionId].xFinalize(&pRuntimeEnv->pCtx[j]);
}
/*
@ -3692,14 +3692,14 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
} else {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
aAggs[pQuery->pSelectExpr[j].base.functionId].xFinalize(&pRuntimeEnv->pCtx[j]);
aAggs[pQuery->pExpr1[j].base.functionId].xFinalize(&pRuntimeEnv->pCtx[j]);
}
}
}
static bool hasMainOutput(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TAGPRJ) {
return true;
@ -3798,7 +3798,7 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult, page);
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
}
@ -3941,7 +3941,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
bool requireTimestamp(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; i++) {
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_NEED_TS) != 0) {
return true;
}
@ -4130,12 +4130,16 @@ bool queryHasRemainResForTableQuery(SQueryRuntimeEnv* pRuntimeEnv) {
return false;
}
static int16_t getNumOfFinalResCol(SQuery* pQuery) {
return pQuery->pExpr2 == NULL? pQuery->numOfOutput:pQuery->numOfExpr2;
}
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if (pQuery->pExpr2 == NULL) {
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
int32_t bytes = pQuery->pSelectExpr[col].bytes;
int32_t bytes = pQuery->pExpr1[col].bytes;
memmove(data, pQuery->sdata[col]->data, bytes * numOfRows);
data += bytes * numOfRows;
@ -4196,10 +4200,9 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int
ret -= (int32_t)pQuery->limit.offset;
// todo !!!!there exactly number of interpo is not valid.
// todo refactor move to the beginning of buffer
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
memmove(pDst[i]->data, pDst[i]->data + pQuery->pSelectExpr[i].bytes * pQuery->limit.offset,
ret * pQuery->pSelectExpr[i].bytes);
memmove(pDst[i]->data, pDst[i]->data + pQuery->pExpr1[i].bytes * pQuery->limit.offset,
ret * pQuery->pExpr1[i].bytes);
}
pQuery->limit.offset = 0;
@ -4559,7 +4562,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
}
static SFillColInfo* createFillColInfo(SQuery* pQuery) {
int32_t numOfCols = pQuery->pExpr2 == NULL? pQuery->numOfOutput:pQuery->numOfExpr2;
int32_t numOfCols = getNumOfFinalResCol(pQuery);
int32_t offset = 0;
SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo));
@ -4569,7 +4572,7 @@ static SFillColInfo* createFillColInfo(SQuery* pQuery) {
// TODO refactor
for(int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExprInfo = (pQuery->pExpr2 == NULL)? &pQuery->pSelectExpr[i]:&pQuery->pExpr2[i];
SExprInfo* pExprInfo = (pQuery->pExpr2 == NULL)? &pQuery->pExpr1[i]:&pQuery->pExpr2[i];
pFillCol[i].col.bytes = pExprInfo->bytes;
pFillCol[i].col.type = (int8_t)pExprInfo->type;
@ -4681,7 +4684,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
TSKEY ek = MAX(pQuery->window.skey, pQuery->window.ekey);
getAlignQueryTimeWindow(pQuery, pQuery->window.skey, sk, ek, &w);
int32_t numOfCols = pQuery->pExpr2 == NULL? pQuery->numOfOutput:pQuery->numOfExpr2;
int32_t numOfCols = getNumOfFinalResCol(pQuery);
pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, w.skey, 0, (int32_t)pQuery->rec.capacity, numOfCols,
pQuery->interval.sliding, pQuery->interval.slidingUnit, (int8_t)pQuery->precision,
pQuery->fillType, pColInfo, pQInfo);
@ -5363,7 +5366,7 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) {
pArithSup->offset = 0;
pArithSup->numOfCols = (int32_t)pQuery->numOfOutput;
pArithSup->exprList = pQuery->pSelectExpr;
pArithSup->exprList = pQuery->pExpr1;
pArithSup->data = calloc(pArithSup->numOfCols, POINTER_BYTES);
for (int32_t k = 0; k < pArithSup->numOfCols; ++k) {
@ -5378,9 +5381,9 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) {
if (pSqlFunc->functionId != TSDB_FUNC_ARITHM) {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
if (pSqlFunc->functionId == pQuery->pSelectExpr[j].base.functionId &&
pSqlFunc->colInfo.colId == pQuery->pSelectExpr[j].base.colInfo.colId) {
memcpy(data[i]->data, pQuery->sdata[j]->data, pQuery->pSelectExpr[j].bytes * pQuery->rec.rows);
if (pSqlFunc->functionId == pQuery->pExpr1[j].base.functionId &&
pSqlFunc->colInfo.colId == pQuery->pExpr1[j].base.colInfo.colId) {
memcpy(data[i]->data, pQuery->sdata[j]->data, pQuery->pExpr1[j].bytes * pQuery->rec.rows);
break;
}
}
@ -6299,10 +6302,10 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) {
}
static void doUpdateExprColumnIndex(SQuery *pQuery) {
assert(pQuery->pSelectExpr != NULL && pQuery != NULL);
assert(pQuery->pExpr1 != NULL && pQuery != NULL);
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
SSqlFuncMsg *pSqlExprMsg = &pQuery->pSelectExpr[k].base;
SSqlFuncMsg *pSqlExprMsg = &pQuery->pExpr1[k].base;
if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM) {
continue;
}
@ -6383,7 +6386,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQuery->limit.offset = pQueryMsg->offset;
pQuery->order.order = pQueryMsg->order;
pQuery->order.orderColId = pQueryMsg->orderColId;
pQuery->pSelectExpr = pExprs;
pQuery->pExpr1 = pExprs;
pQuery->pExpr2 = pSecExprs;
pQuery->numOfExpr2 = pQueryMsg->secondStageOutput;
pQuery->pGroupbyExpr = pGroupbyExpr;
@ -6425,6 +6428,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
// allocate additional memory for interResults that are usually larger then final results
// TODO refactor
int16_t bytes = 0;
if (pQuery->pExpr2 == NULL || col > pQuery->numOfExpr2) {
bytes = pExprs[col].bytes;
@ -6676,16 +6680,16 @@ static void freeQInfo(SQInfo *pQInfo) {
}
}
if (pQuery->pSelectExpr != NULL) {
if (pQuery->pExpr1 != NULL) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SExprInfo *pExprInfo = &pQuery->pSelectExpr[i];
SExprInfo *pExprInfo = &pQuery->pExpr1[i];
if (pExprInfo->pExpr != NULL) {
tExprTreeDestroy(&pExprInfo->pExpr, NULL);
}
}
tfree(pQuery->pSelectExpr);
tfree(pQuery->pExpr1);
}
if (pQuery->pGroupbyExpr != NULL) {
@ -7188,11 +7192,11 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
assert(num == pQInfo->tableqinfoGroupInfo.numOfTables);
int32_t count = 0;
int32_t functionId = pQuery->pSelectExpr[0].base.functionId;
int32_t functionId = pQuery->pExpr1[0].base.functionId;
if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id
assert(pQuery->numOfOutput == 1);
SExprInfo* pExprInfo = &pQuery->pSelectExpr[0];
SExprInfo* pExprInfo = &pQuery->pExpr1[0];
int32_t rsize = pExprInfo->bytes;
count = 0;
@ -7266,7 +7270,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
continue;
}
SExprInfo* pExprInfo = pQuery->pSelectExpr;
SExprInfo* pExprInfo = pQuery->pExpr1;
STableQueryInfo* item = taosArrayGetP(pa, i);
char *data = NULL, *dst = NULL;

View File

@ -24,7 +24,7 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) {
int32_t size = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
size += pQuery->pSelectExpr[i].interBytes;
size += pQuery->pExpr1[i].interBytes;
}
assert(size >= 0);
@ -237,7 +237,7 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes) {
SResultRowCellInfo *pResultInfo = &pWindowRes->pCellInfo[i];
char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes, page);
size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].bytes;
size_t size = pRuntimeEnv->pQuery->pExpr1[i].bytes;
memset(s, 0, size);
RESET_RESULT_INFO(pResultInfo);
@ -280,7 +280,7 @@ void copyResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *dst, const SResult
tFilePage *srcpage = getResBufPage(pRuntimeEnv->pResultBuf, src->pageId);
char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SResultRow *)src, srcpage);
size_t s = pRuntimeEnv->pQuery->pSelectExpr[i].bytes;
size_t s = pRuntimeEnv->pQuery->pExpr1[i].bytes;
memcpy(dstBuf, srcBuf, s);
}