[td-225]refactor
This commit is contained in:
parent
d1d6b192b8
commit
61cd3ff929
|
@ -240,6 +240,7 @@ typedef struct SQuery {
|
||||||
typedef struct SQueryRuntimeEnv {
|
typedef struct SQueryRuntimeEnv {
|
||||||
jmp_buf env;
|
jmp_buf env;
|
||||||
SQuery* pQuery;
|
SQuery* pQuery;
|
||||||
|
void* qinfo;
|
||||||
|
|
||||||
SQLFunctionCtx* pCtx;
|
SQLFunctionCtx* pCtx;
|
||||||
int32_t numOfRowsPerPage;
|
int32_t numOfRowsPerPage;
|
||||||
|
@ -267,14 +268,18 @@ typedef struct SQueryRuntimeEnv {
|
||||||
SArithmeticSupport *sasArray;
|
SArithmeticSupport *sasArray;
|
||||||
|
|
||||||
struct STableScanInfo* pi;
|
struct STableScanInfo* pi;
|
||||||
SSDataBlock *ouptputBuf;
|
SSDataBlock *outputBuf;
|
||||||
|
|
||||||
int32_t groupIndex;
|
int32_t groupIndex;
|
||||||
int32_t tableIndex;
|
int32_t tableIndex;
|
||||||
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
|
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
|
||||||
|
|
||||||
} SQueryRuntimeEnv;
|
} SQueryRuntimeEnv;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char* name;
|
||||||
|
void* info;
|
||||||
|
} SQEStage;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
QUERY_RESULT_NOT_READY = 1,
|
QUERY_RESULT_NOT_READY = 1,
|
||||||
QUERY_RESULT_READY = 2,
|
QUERY_RESULT_READY = 2,
|
||||||
|
@ -325,7 +330,7 @@ typedef struct SQueryParam {
|
||||||
} SQueryParam;
|
} SQueryParam;
|
||||||
|
|
||||||
typedef struct STableScanInfo {
|
typedef struct STableScanInfo {
|
||||||
SQInfo* pQInfo;
|
SQueryRuntimeEnv* pRuntimeEnv;
|
||||||
void *pQueryHandle;
|
void *pQueryHandle;
|
||||||
int32_t numOfBlocks;
|
int32_t numOfBlocks;
|
||||||
int32_t numOfSkipped;
|
int32_t numOfSkipped;
|
||||||
|
@ -341,7 +346,7 @@ typedef struct STableScanInfo {
|
||||||
|
|
||||||
SSDataBlock block;
|
SSDataBlock block;
|
||||||
int64_t elapsedTime;
|
int64_t elapsedTime;
|
||||||
SSDataBlock* (*apply)(void* param);
|
SSDataBlock* (*exec)(void* param);
|
||||||
} STableScanInfo;
|
} STableScanInfo;
|
||||||
|
|
||||||
typedef struct SAggOperatorInfo {
|
typedef struct SAggOperatorInfo {
|
||||||
|
|
|
@ -45,7 +45,7 @@ typedef struct SLimitVal {
|
||||||
|
|
||||||
typedef struct SOrderVal {
|
typedef struct SOrderVal {
|
||||||
uint32_t order;
|
uint32_t order;
|
||||||
int32_t orderColId;
|
int32_t orderColId;
|
||||||
} SOrderVal;
|
} SOrderVal;
|
||||||
|
|
||||||
typedef struct tVariantListItem {
|
typedef struct tVariantListItem {
|
||||||
|
|
|
@ -159,8 +159,9 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
|
||||||
|
|
||||||
static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
|
static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
|
||||||
SDataStatis *pStatis, SExprInfo* pExprInfo);
|
SDataStatis *pStatis, SExprInfo* pExprInfo);
|
||||||
|
static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, TSKEY *tsCol, SExprInfo* pExprInfo);
|
||||||
|
|
||||||
static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
|
static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
|
||||||
static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo);
|
static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo);
|
||||||
static void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
|
static void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
|
||||||
static bool hasMainOutput(SQuery *pQuery);
|
static bool hasMainOutput(SQuery *pQuery);
|
||||||
|
@ -172,8 +173,8 @@ static void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArr
|
||||||
static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win);
|
static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win);
|
||||||
static STableIdInfo createTableIdInfo(SQuery* pQuery);
|
static STableIdInfo createTableIdInfo(SQuery* pQuery);
|
||||||
|
|
||||||
static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime, int32_t reverseTime);
|
static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
|
||||||
static STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime);
|
static STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
|
||||||
static int32_t getNumOfScanTimes(SQuery* pQuery);
|
static int32_t getNumOfScanTimes(SQuery* pQuery);
|
||||||
|
|
||||||
static SSDataBlock* createOutputBuf(SQuery* pQuery) {
|
static SSDataBlock* createOutputBuf(SQuery* pQuery) {
|
||||||
|
@ -975,6 +976,7 @@ static void* getDataBlockImpl(SArray* pDataBlock, int32_t colId) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// todo refactor
|
// todo refactor
|
||||||
static char *getDataBlock(SQuery* pQuery, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock) {
|
static char *getDataBlock(SQuery* pQuery, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock) {
|
||||||
if (pDataBlock == NULL) {
|
if (pDataBlock == NULL) {
|
||||||
|
@ -1164,24 +1166,40 @@ static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void aggApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
|
|
||||||
SArray *pDataBlock) {
|
static void setInputSDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSDataBlock) {
|
||||||
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
|
if (pRuntimeEnv->pCtx[0].pInput == NULL && pSDataBlock->pDataBlock != NULL) {
|
||||||
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
|
SColIndex *pCol = &pQuery->pExpr1[i].base.colInfo;
|
||||||
|
if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) {
|
||||||
|
SColIndex* pColIndex = &pQuery->pExpr1[i].base.colInfo;
|
||||||
|
SColumnInfoData *p = taosArrayGet(pSDataBlock->pDataBlock, pColIndex->colIndex);
|
||||||
|
assert(p->info.colId == pColIndex->colId);
|
||||||
|
|
||||||
|
pRuntimeEnv->pCtx[i].pInput = p->pData;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void aggApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSDataBlock) {
|
||||||
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
TSKEY *tsCols = NULL;
|
TSKEY *tsCols = NULL;
|
||||||
if (pDataBlock != NULL) {
|
if (pSDataBlock->pDataBlock != NULL) {
|
||||||
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, 0);
|
SColumnInfoData *pColInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
|
||||||
tsCols = (TSKEY *)(pColInfo->pData);
|
tsCols = (TSKEY *)(pColInfo->pData);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
||||||
char *dataBlock = getDataBlock(pQuery, &pRuntimeEnv->sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
|
setBlockStatisInfo(&pCtx[k], pSDataBlock, tsCols, &pQuery->pExpr1[k]);
|
||||||
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &pQuery->pExpr1[k]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
||||||
int32_t functionId = pQuery->pExpr1[k].base.functionId;
|
int32_t functionId = pCtx[k].functionId;
|
||||||
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
||||||
pCtx[k].startTs = pQuery->window.skey;
|
pCtx[k].startTs = pQuery->window.skey;
|
||||||
aAggs[functionId].xFunction(&pCtx[k]);
|
aAggs[functionId].xFunction(&pCtx[k]);
|
||||||
|
@ -1817,6 +1835,37 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
|
||||||
return numOfRes;
|
return numOfRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, TSKEY *tsCol, SExprInfo* pExprInfo) {
|
||||||
|
SDataStatis *tpField = NULL;
|
||||||
|
pCtx->hasNull = hasNullValue(&pExprInfo->base.colInfo, pSDataBlock->pBlockStatis, &tpField);
|
||||||
|
|
||||||
|
if (tpField != NULL) {
|
||||||
|
pCtx->preAggVals.isSet = true;
|
||||||
|
pCtx->preAggVals.statis = *tpField;
|
||||||
|
assert(pCtx->preAggVals.statis.numOfNull <= pSDataBlock->info.rows);
|
||||||
|
} else {
|
||||||
|
pCtx->preAggVals.isSet = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
pCtx->preAggVals.dataBlockLoaded = (pSDataBlock->pDataBlock != NULL);
|
||||||
|
|
||||||
|
// limit/offset query will affect this value
|
||||||
|
pCtx->size = pSDataBlock->info.rows;
|
||||||
|
|
||||||
|
uint32_t status = aAggs[pCtx->functionId].status;
|
||||||
|
if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) {
|
||||||
|
pCtx->ptsList = tsCol;
|
||||||
|
}
|
||||||
|
|
||||||
|
// set the statistics data for primary time stamp column
|
||||||
|
// if (pCtx->functionId == TSDB_FUNC_SPREAD &&colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||||
|
// pCtx->preAggVals.isSet = true;
|
||||||
|
// pCtx->preAggVals.statis.min = pBlockInfo->window.skey;
|
||||||
|
// pCtx->preAggVals.statis.max = pBlockInfo->window.ekey;
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
|
void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
|
||||||
SDataStatis *pStatis, SExprInfo* pExprInfo) {
|
SDataStatis *pStatis, SExprInfo* pExprInfo) {
|
||||||
|
|
||||||
|
@ -3248,9 +3297,8 @@ void disableFuncInReverseScan(SQueryRuntimeEnv* pRuntimeEnv) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setupQueryRangeForReverseScan(SQInfo* pQInfo) {
|
static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) {
|
||||||
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||||
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
|
||||||
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv));
|
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv));
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfGroups; ++i) {
|
for(int32_t i = 0; i < numOfGroups; ++i) {
|
||||||
|
@ -3317,6 +3365,37 @@ void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
initCtxOutputBuf(pRuntimeEnv);
|
initCtxOutputBuf(pRuntimeEnv);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void resetDefaultResInfoOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
|
int32_t tid = 0;
|
||||||
|
int64_t uid = 0;
|
||||||
|
SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, (char *)&tid, sizeof(tid), true, uid);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
|
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
||||||
|
SColumnInfoData* pData = taosArrayGet(pRuntimeEnv->outputBuf->pDataBlock, i);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* set the output buffer information and intermediate buffer
|
||||||
|
* not all queries require the interResultBuf, such as COUNT/TAGPRJ/PRJ/TAG etc.
|
||||||
|
*/
|
||||||
|
SResultRowCellInfo* pCellInfo = getResultCell(pRuntimeEnv, pRow, i);
|
||||||
|
RESET_RESULT_INFO(pCellInfo);
|
||||||
|
|
||||||
|
pCtx->resultInfo = pCellInfo;
|
||||||
|
pCtx->pOutput = pData->pData;
|
||||||
|
|
||||||
|
// set the timestamp output buffer for top/bottom/diff query
|
||||||
|
int32_t functionId = pCtx->functionId;
|
||||||
|
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
|
||||||
|
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].pOutput;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
initCtxOutputBuf(pRuntimeEnv);
|
||||||
|
}
|
||||||
|
|
||||||
void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
|
void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
|
@ -3349,7 +3428,7 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
||||||
int32_t functionId = pQuery->pExpr1[j].base.functionId;
|
int32_t functionId = pRuntimeEnv->pCtx[j].functionId;
|
||||||
pRuntimeEnv->pCtx[j].currentStage = 0;
|
pRuntimeEnv->pCtx[j].currentStage = 0;
|
||||||
|
|
||||||
SResultRowCellInfo* pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]);
|
SResultRowCellInfo* pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]);
|
||||||
|
@ -3412,6 +3491,38 @@ void setQueryStatus(SQuery *pQuery, int8_t status) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void prepareRepeatTableScan(SQueryRuntimeEnv* pRuntimeEnv) {
|
||||||
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
|
if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||||
|
// for each group result, call the finalize function for each column
|
||||||
|
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
|
||||||
|
SResultRow *pResult = getResultRow(pWindowResInfo, i);
|
||||||
|
|
||||||
|
setResultOutputBuf(pRuntimeEnv, pResult);
|
||||||
|
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
||||||
|
SQLFunctionCtx* pCtx = &pRuntimeEnv->pCtx[j];
|
||||||
|
if (pCtx->functionId == TSDB_FUNC_TS) { // ignore more table
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
aAggs[pCtx->functionId].xNextStep(pCtx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
||||||
|
SQLFunctionCtx* pCtx = &pRuntimeEnv->pCtx[j];
|
||||||
|
if (pCtx->functionId == TSDB_FUNC_TS) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
aAggs[pCtx->functionId].xNextStep(pCtx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool needRepeatScan(SQueryRuntimeEnv *pRuntimeEnv) {
|
bool needRepeatScan(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
|
@ -3497,7 +3608,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
|
||||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||||
switchCtxOrder(pRuntimeEnv);
|
switchCtxOrder(pRuntimeEnv);
|
||||||
disableFuncInReverseScan(pRuntimeEnv);
|
disableFuncInReverseScan(pRuntimeEnv);
|
||||||
setupQueryRangeForReverseScan(pQInfo);
|
setupQueryRangeForReverseScan(pRuntimeEnv);
|
||||||
|
|
||||||
// clean unused handle
|
// clean unused handle
|
||||||
if (pRuntimeEnv->pSecQueryHandle != NULL) {
|
if (pRuntimeEnv->pSecQueryHandle != NULL) {
|
||||||
|
@ -3511,7 +3622,6 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv) {
|
static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
SQInfo *pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
|
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
if (pRuntimeEnv->pTsBuf) {
|
if (pRuntimeEnv->pTsBuf) {
|
||||||
|
@ -3524,17 +3634,11 @@ static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
||||||
SWITCH_ORDER(pQuery->order.order);
|
SWITCH_ORDER(pQuery->order.order);
|
||||||
|
|
||||||
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
|
||||||
assert(pQuery->window.skey <= pQuery->window.ekey);
|
|
||||||
} else {
|
|
||||||
assert(pQuery->window.skey >= pQuery->window.ekey);
|
|
||||||
}
|
|
||||||
|
|
||||||
SET_REVERSE_SCAN_FLAG(pRuntimeEnv);
|
SET_REVERSE_SCAN_FLAG(pRuntimeEnv);
|
||||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||||
switchCtxOrder(pRuntimeEnv);
|
switchCtxOrder(pRuntimeEnv);
|
||||||
disableFuncInReverseScan(pRuntimeEnv);
|
disableFuncInReverseScan(pRuntimeEnv);
|
||||||
setupQueryRangeForReverseScan(pQInfo);
|
setupQueryRangeForReverseScan(pRuntimeEnv);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) {
|
static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) {
|
||||||
|
@ -4188,14 +4292,16 @@ static int16_t getNumOfFinalResCol(SQuery* pQuery) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) {
|
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) {
|
||||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
if (pQuery->pExpr2 == NULL) {
|
if (pQuery->pExpr2 == NULL) {
|
||||||
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
|
SSDataBlock* pRes = pRuntimeEnv->outputBuf;
|
||||||
int32_t bytes = pQuery->pExpr1[col].bytes;
|
|
||||||
|
|
||||||
memmove(data, pQuery->sdata[col]->data, bytes * numOfRows);
|
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
|
||||||
data += bytes * numOfRows;
|
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col);
|
||||||
|
memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows);
|
||||||
|
data += pColRes->info.bytes * pRes->info.rows;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (int32_t col = 0; col < pQuery->numOfExpr2; ++col) {
|
for (int32_t col = 0; col < pQuery->numOfExpr2; ++col) {
|
||||||
|
@ -4720,9 +4826,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
|
||||||
pQuery->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr);
|
pQuery->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr);
|
||||||
|
|
||||||
if (needReverseScan(pQuery)) {
|
if (needReverseScan(pQuery)) {
|
||||||
pRuntimeEnv->pi = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pQInfo, getNumOfScanTimes(pQuery), 1);
|
pRuntimeEnv->pi = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1);
|
||||||
} else {
|
} else {
|
||||||
pRuntimeEnv->pi = createTableScanInfo(pRuntimeEnv->pQueryHandle, pQInfo, getNumOfScanTimes(pQuery));
|
pRuntimeEnv->pi = createTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTsBuf != NULL) {
|
if (pTsBuf != NULL) {
|
||||||
|
@ -4735,7 +4841,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
|
||||||
getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
|
getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
|
||||||
int32_t TENMB = 1024*1024*10;
|
int32_t TENMB = 1024*1024*10;
|
||||||
|
|
||||||
if (isSTableQuery && !onlyQueryTags(pRuntimeEnv->pQuery)) {
|
if (isSTableQuery && !onlyQueryTags(pQuery)) {
|
||||||
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TENMB, pQInfo);
|
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TENMB, pQInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -5495,7 +5601,7 @@ static int32_t doSaveContext(SQInfo *pQInfo) {
|
||||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||||
switchCtxOrder(pRuntimeEnv);
|
switchCtxOrder(pRuntimeEnv);
|
||||||
disableFuncInReverseScan(pRuntimeEnv);
|
disableFuncInReverseScan(pRuntimeEnv);
|
||||||
setupQueryRangeForReverseScan(pQInfo);
|
setupQueryRangeForReverseScan(pRuntimeEnv);
|
||||||
|
|
||||||
pRuntimeEnv->prevGroupId = INT32_MIN;
|
pRuntimeEnv->prevGroupId = INT32_MIN;
|
||||||
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef);
|
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef);
|
||||||
|
@ -5677,20 +5783,26 @@ static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) {
|
||||||
pTableScanInfo->numOfBlocks += 1;
|
pTableScanInfo->numOfBlocks += 1;
|
||||||
|
|
||||||
// todo check for query cancel
|
// todo check for query cancel
|
||||||
|
|
||||||
tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info);
|
tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info);
|
||||||
|
|
||||||
SDataStatis *pStatis = pBlock->pBlockStatis;
|
|
||||||
|
|
||||||
// this function never returns error?
|
// this function never returns error?
|
||||||
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pStatis);
|
uint32_t status;
|
||||||
pTableScanInfo->numOfBlockStatis += 1;
|
int32_t code = loadDataBlockOnDemand(pTableScanInfo->pRuntimeEnv, NULL, pTableScanInfo->pQueryHandle, &pBlock->info, &pBlock->pBlockStatis,
|
||||||
|
&pBlock->pDataBlock, &status);
|
||||||
if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL);
|
longjmp(pTableScanInfo->pRuntimeEnv->env, code);
|
||||||
pTableScanInfo->numOfRows += pBlock->info.rows;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// current block is ignored according to filter result by block statistics data, continue load the next block
|
||||||
|
if (status == BLK_DATA_DISCARD) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block
|
||||||
|
// pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL);
|
||||||
|
// pTableScanInfo->numOfRows += pBlock->info.rows;
|
||||||
|
// }
|
||||||
|
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5699,7 +5811,7 @@ static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) {
|
||||||
|
|
||||||
static SSDataBlock* doTableScan(void* param) {
|
static SSDataBlock* doTableScan(void* param) {
|
||||||
STableScanInfo * pTableScanInfo = (STableScanInfo *)param;
|
STableScanInfo * pTableScanInfo = (STableScanInfo *)param;
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = &pTableScanInfo->pQInfo->runtimeEnv;
|
SQueryRuntimeEnv *pRuntimeEnv = pTableScanInfo->pRuntimeEnv;
|
||||||
SQuery* pQuery = pRuntimeEnv->pQuery;
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
while (pTableScanInfo->current < pTableScanInfo->times) {
|
while (pTableScanInfo->current < pTableScanInfo->times) {
|
||||||
|
@ -5716,8 +5828,7 @@ static SSDataBlock* doTableScan(void* param) {
|
||||||
tsdbCleanupQueryHandle(pTableScanInfo->pQueryHandle);
|
tsdbCleanupQueryHandle(pTableScanInfo->pQueryHandle);
|
||||||
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
|
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
|
||||||
pTableScanInfo->pQueryHandle =
|
pTableScanInfo->pQueryHandle =
|
||||||
tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo,
|
tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pRuntimeEnv->qinfo, &pQuery->memRef);
|
||||||
pTableScanInfo->pQInfo, &pQuery->memRef);
|
|
||||||
if (pTableScanInfo->pQueryHandle == NULL) {
|
if (pTableScanInfo->pQueryHandle == NULL) {
|
||||||
longjmp(pRuntimeEnv->env, terrno);
|
longjmp(pRuntimeEnv->env, terrno);
|
||||||
}
|
}
|
||||||
|
@ -5732,7 +5843,7 @@ static SSDataBlock* doTableScan(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("QInfo:%p start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
qDebug("QInfo:%p start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
||||||
pTableScanInfo->pQInfo, cond.twindow.skey, cond.twindow.ekey);
|
pRuntimeEnv->qinfo, cond.twindow.skey, cond.twindow.ekey);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTableScanInfo->reverseTimes > 0) {
|
if (pTableScanInfo->reverseTimes > 0) {
|
||||||
|
@ -5742,11 +5853,10 @@ static SSDataBlock* doTableScan(void* param) {
|
||||||
|
|
||||||
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
|
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
|
||||||
pTableScanInfo->pQueryHandle =
|
pTableScanInfo->pQueryHandle =
|
||||||
tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo,
|
tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pRuntimeEnv->qinfo, &pQuery->memRef);
|
||||||
pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->query.memRef);
|
|
||||||
|
|
||||||
qDebug("QInfo:%p start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
qDebug("QInfo:%p start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
||||||
pTableScanInfo->pQInfo, cond.twindow.skey, cond.twindow.ekey);
|
pRuntimeEnv->qinfo, cond.twindow.skey, cond.twindow.ekey);
|
||||||
|
|
||||||
pTableScanInfo->times = 1;
|
pTableScanInfo->times = 1;
|
||||||
pTableScanInfo->current = 0;
|
pTableScanInfo->current = 0;
|
||||||
|
@ -5760,39 +5870,39 @@ static SSDataBlock* doTableScan(void* param) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static UNUSED_FUNC STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime) {
|
static UNUSED_FUNC STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime) {
|
||||||
assert(repeatTime > 0);
|
assert(repeatTime > 0);
|
||||||
|
|
||||||
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
|
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
|
||||||
pInfo->pQueryHandle = pTsdbQueryHandle;
|
pInfo->pQueryHandle = pTsdbQueryHandle;
|
||||||
pInfo->apply = doTableScan;
|
pInfo->exec = doTableScan;
|
||||||
pInfo->times = repeatTime;
|
pInfo->times = repeatTime;
|
||||||
pInfo->reverseTimes = 0;
|
pInfo->reverseTimes = 0;
|
||||||
|
|
||||||
pInfo->current = 0;
|
pInfo->current = 0;
|
||||||
pInfo->pQInfo = pQInfo;
|
pInfo->pRuntimeEnv = pRuntimeEnv;
|
||||||
return pInfo;
|
return pInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime, int32_t reverseTime) {
|
static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
|
||||||
assert(repeatTime > 0);
|
assert(repeatTime > 0);
|
||||||
|
|
||||||
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
|
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
|
||||||
pInfo->pQueryHandle = pTsdbQueryHandle;
|
pInfo->pQueryHandle = pTsdbQueryHandle;
|
||||||
pInfo->apply = doTableScan;
|
pInfo->exec = doTableScan;
|
||||||
pInfo->times = repeatTime;
|
pInfo->times = repeatTime;
|
||||||
pInfo->reverseTimes = reverseTime;
|
pInfo->reverseTimes = reverseTime;
|
||||||
|
|
||||||
pInfo->current = 0;
|
pInfo->current = 0;
|
||||||
pInfo->pQInfo = pQInfo;
|
pInfo->pRuntimeEnv = pRuntimeEnv;
|
||||||
return pInfo;
|
return pInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
static UNUSED_FUNC int32_t getTableScanTime(STableScanInfo* pTableScanInfo) {
|
static UNUSED_FUNC int32_t getTableScanId(STableScanInfo* pTableScanInfo) {
|
||||||
return pTableScanInfo->current;
|
return pTableScanInfo->current;
|
||||||
}
|
}
|
||||||
|
|
||||||
static UNUSED_FUNC int32_t getScanOrder(STableScanInfo* pTableScanInfo) {
|
static UNUSED_FUNC int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
|
||||||
return pTableScanInfo->order;
|
return pTableScanInfo->order;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5800,34 +5910,40 @@ static UNUSED_FUNC int32_t getScanOrder(STableScanInfo* pTableScanInfo) {
|
||||||
static SSDataBlock* doAggOperator(void* param) {
|
static SSDataBlock* doAggOperator(void* param) {
|
||||||
SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param;
|
SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param;
|
||||||
SQueryRuntimeEnv* pRuntimeEnv = pInfo->pRuntimeEnv;
|
SQueryRuntimeEnv* pRuntimeEnv = pInfo->pRuntimeEnv;
|
||||||
|
STableScanInfo* pTableScanInfo = pInfo->pTableScanInfo;
|
||||||
|
|
||||||
int32_t countId = 0;
|
int32_t countId = 0;
|
||||||
int32_t order = getScanOrder(pInfo->pTableScanInfo);
|
int32_t order = getTableScanOrder(pInfo->pTableScanInfo);
|
||||||
|
|
||||||
|
resetDefaultResInfoOutputBuf_rv(pInfo->pRuntimeEnv);
|
||||||
|
pRuntimeEnv->pQuery->pos = 0;
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
SSDataBlock* pBlock = pInfo->pTableScanInfo->apply(pInfo->pTableScanInfo);
|
SSDataBlock* pBlock = pTableScanInfo->exec(pTableScanInfo);
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (countId != getTableScanTime(pInfo->pTableScanInfo)) {
|
if (countId != getTableScanId(pTableScanInfo)) {
|
||||||
needRepeatScan(pRuntimeEnv);
|
prepareRepeatTableScan(pRuntimeEnv);
|
||||||
countId = getTableScanTime(pInfo->pTableScanInfo);
|
countId = getTableScanId(pTableScanInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (order != getScanOrder(pInfo->pTableScanInfo)) {
|
// the order has changed
|
||||||
setEnvBeforeReverseScan_rv(pRuntimeEnv);
|
if (order != getTableScanOrder(pTableScanInfo)) {
|
||||||
order = getScanOrder(pInfo->pTableScanInfo);
|
order = getTableScanOrder(pTableScanInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
aggApplyFunctions(pRuntimeEnv, pBlock->pBlockStatis, &pBlock->info, pBlock->pDataBlock);
|
// the pDataBlock are alway the same one, no need to call this again
|
||||||
|
setInputSDataBlock(pRuntimeEnv, pBlock);
|
||||||
|
aggApplyFunctions(pRuntimeEnv, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
|
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
|
||||||
finalizeQueryResult(pRuntimeEnv);
|
finalizeQueryResult(pRuntimeEnv);
|
||||||
|
|
||||||
pRuntimeEnv->ouptputBuf->info.rows = getNumOfResult(pRuntimeEnv);
|
pRuntimeEnv->outputBuf->info.rows = getNumOfResult(pRuntimeEnv);
|
||||||
return pRuntimeEnv->ouptputBuf;
|
return pRuntimeEnv->outputBuf;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo set the attribute of query scan count
|
// todo set the attribute of query scan count
|
||||||
|
@ -5877,8 +5993,8 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
|
||||||
// doSecondaryArithmeticProcess(pQuery);
|
// doSecondaryArithmeticProcess(pQuery);
|
||||||
|
|
||||||
// TODO limit/offset refactor to be one operator
|
// TODO limit/offset refactor to be one operator
|
||||||
skipResults(pRuntimeEnv);
|
// skipResults(pRuntimeEnv);
|
||||||
limitOperator(pQuery, pQInfo);
|
// limitOperator(pQuery, pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
||||||
|
@ -6891,7 +7007,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr
|
||||||
}
|
}
|
||||||
|
|
||||||
doUpdateExprColumnIndex(pQuery);
|
doUpdateExprColumnIndex(pQuery);
|
||||||
pQInfo->runtimeEnv.ouptputBuf = createOutputBuf(pQuery);
|
pQInfo->runtimeEnv.outputBuf = createOutputBuf(pQuery);
|
||||||
|
|
||||||
int32_t ret = createFilterInfo(pQInfo, pQuery);
|
int32_t ret = createFilterInfo(pQInfo, pQuery);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
|
Loading…
Reference in New Issue