[td-2895] refactor

This commit is contained in:
Haojun Liao 2021-02-23 14:01:44 +08:00
parent df667dea6f
commit faea042fcf
8 changed files with 184 additions and 203 deletions

View File

@ -86,7 +86,7 @@ typedef struct SSqlGroupbyExpr {
typedef struct SResultRow {
int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
int32_t rowId:29; // row index in buffer page
int32_t offset:29; // row index in buffer page
bool startInterp; // the time window start timestamp has done the interpolation already.
bool endInterp; // the time window end timestamp has done the interpolation already.
bool closed; // this result status: closed or opened
@ -247,6 +247,8 @@ typedef struct SOperatorInfo {
bool completed;
void *optInfo;
SExprInfo *pExpr;
int32_t* rowCellInfoOffset;
int32_t numOfOutput;
__operator_fn_t exec;
@ -258,10 +260,8 @@ typedef struct SQueryRuntimeEnv {
jmp_buf env;
SQuery* pQuery;
void* qinfo;
// SQLFunctionCtx* pCtx;
int32_t numOfRowsPerPage;
uint16_t* offset;
// int32_t numOfRowsPerPage;
// uint16_t* offset;
uint16_t scanFlag; // denotes reversed scan of data or not
SFillInfo* pFillInfo;
void* pQueryHandle;
@ -272,7 +272,7 @@ typedef struct SQueryRuntimeEnv {
char* keyBuf; // window key buffer
SResultRowPool* pool; // window result object pool
int32_t* rowCellInfoOffset;// offset value for each row result cell info
// int32_t* rowCellInfoOffset;// offset value for each row result cell info
char** prevRow;
SArray* prevResult; // intermediate result, SArray<SInterResult>
@ -355,11 +355,11 @@ typedef struct STableScanInfo {
SSDataBlock block;
SQLFunctionCtx* pCtx; // next operator query context
SResultRowInfo* pResultRowInfo;
SQLFunctionCtx *pCtx; // next operator query context
SResultRowInfo *pResultRowInfo;
int32_t numOfOutput;
int64_t elapsedTime;
int32_t *rowCellInfoOffset;
int64_t elapsedTime;
} STableScanInfo;
typedef struct STagScanInfo {
@ -373,14 +373,15 @@ typedef struct SAggOperatorInfo {
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
SQLFunctionCtx *pCtx;
SSDataBlock* pRes;
int32_t *rowCellInfoOffset;
SSDataBlock *pRes;
} SAggOperatorInfo;
typedef struct SArithOperatorInfo {
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
SQLFunctionCtx *pCtx;
int32_t *rowCellInfoOffset;
SResultRowInfo resultRowInfo;
SSDataBlock *pOutput;
int32_t bufCapacity;
@ -402,6 +403,7 @@ typedef struct SHashIntervalOperatorInfo {
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
SQLFunctionCtx *pCtx;
int32_t *rowCellInfoOffset;
SResultRowInfo resultRowInfo;
SSDataBlock *pRes;
} SHashIntervalOperatorInfo;
@ -415,6 +417,7 @@ typedef struct SHashGroupbyOperatorInfo {
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
SQLFunctionCtx *pCtx;
int32_t *rowCellInfoOffset;
SResultRowInfo resultRowInfo;
SSDataBlock *pRes;
int32_t colIndex;

View File

@ -55,7 +55,6 @@ typedef struct SResultBufStatis {
} SResultBufStatis;
typedef struct SDiskbasedResultBuf {
int32_t numOfRowsPerPage;
int32_t numOfPages;
int64_t totalBufSize;
int64_t fileSize; // disk file size
@ -89,8 +88,7 @@ typedef struct SDiskbasedResultBuf {
* @param handle
* @return
*/
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t rowSize, int32_t pagesize,
int32_t inMemBufSize, const void* handle);
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, const void* handle);
/**
*
@ -101,13 +99,6 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t ro
*/
tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId);
/**
*
* @param pResultBuf
* @return
*/
size_t getNumOfRowsPerPage(const SDiskbasedResultBuf* pResultBuf);
/**
*
* @param pResultBuf

View File

@ -44,22 +44,19 @@ void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot);
bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot);
void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type);
SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index);
SResultRowCellInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset);
static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size);
return pResultRowInfo->pResult[slot];
}
static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SResultRow *pResult,
tFilePage* page) {
assert(pResult != NULL && pRuntimeEnv != NULL);
static FORCE_INLINE char *getPosInResultPage(SQuery *pQuery, tFilePage* page, int32_t rowOffset, int16_t offset) {
assert(rowOffset >= 0 && pQuery != NULL);
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t realRowId = (int32_t)(pResult->rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery));
return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage +
pQuery->pExpr1[columnIndex].bytes * realRowId;
// int32_t realRowId = (int32_t)(rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery));
// return ((char *)page->data) + offset * numOfRowsPerPage + bytes * realRowId;
return ((char *)page->data) + rowOffset + offset;
}
bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
@ -91,6 +88,6 @@ bool hasRemainData(SGroupResInfo* pGroupResInfo);
bool incNextGroup(SGroupResInfo* pGroupResInfo);
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv);
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t* offset);
#endif // TDENGINE_QUERYUTIL_H

View File

@ -155,9 +155,10 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
//static void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult);
static void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfCols);
static void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx,
int32_t numOfCols, int32_t* rowCellInfoOffset);
void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput);
void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
@ -201,7 +202,7 @@ static int32_t getGroupbyColumnData_rv(SSqlGroupbyExpr *pGroupbyExpr, SSDataBloc
//static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo,
SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex, int32_t* offset);
static void destroyOperatorInfo(SOperatorInfo* pOperator);
void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size);
void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
@ -625,8 +626,8 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo *pWindowResInfo, int64_t t
return w;
}
static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t tid,
int32_t numOfRowsPerPage) {
// a new buffer page for each table. Needs to opt this design
static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t tid, uint32_t size) {
if (pWindowRes->pageId != -1) {
return 0;
}
@ -644,7 +645,7 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
pData = getResBufPage(pResultBuf, pi->pageId);
pageId = pi->pageId;
if (pData->num >= numOfRowsPerPage) {
if (pData->num + size > pResultBuf->pageSize) {
// release current page first, and prepare the next one
releaseResBufPageInfo(pResultBuf, pi);
pData = getNewDataBuf(pResultBuf, tid, &pageId);
@ -661,8 +662,9 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
// set the number of rows in current disk page
if (pWindowRes->pageId == -1) { // not allocated yet, allocate new buffer
pWindowRes->pageId = pageId;
pWindowRes->rowId = (int32_t)(pData->num++);
pWindowRes->offset = pData->num;
pData->num += size;
assert(pWindowRes->pageId >= 0);
}
@ -671,7 +673,7 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, STimeWindow *win,
bool masterscan, SResultRow **pResult, int64_t groupId, SQLFunctionCtx* pCtx,
int32_t numOfOutput) {
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
assert(win->skey <= win->ekey);
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
@ -683,7 +685,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRow
// not assign result buffer yet, add new result buffer
if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) groupId, pRuntimeEnv->numOfRowsPerPage);
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) groupId, pRuntimeEnv->pQuery->resultRowSize);
if (ret != TSDB_CODE_SUCCESS) {
return -1;
}
@ -692,12 +694,12 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRow
// set time window for current result
pResultRow->win = (*win);
*pResult = pResultRow;
setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow, pCtx, numOfOutput);
setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
return TSDB_CODE_SUCCESS;
}
static bool getResultRowStatus(SResultRowInfo *pWindowResInfo, int32_t slot) {
static UNUSED_FUNC bool getResultRowStatus(SResultRowInfo *pWindowResInfo, int32_t slot) {
assert(slot >= 0 && slot < pWindowResInfo->size);
return pWindowResInfo->pResult[slot]->closed;
}
@ -1166,8 +1168,8 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC
}
}
static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx, int32_t numOfOutput,
SSDataBlock* pSDataBlock) {
static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, SHashIntervalOperatorInfo* pInfo,
int32_t numOfOutput, SSDataBlock* pSDataBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
@ -1189,7 +1191,8 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResu
bool masterScan = (pRuntimeEnv->scanFlag == MASTER_SCAN)? true:false;
SResultRow *pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, 0, pCtx, numOfOutput);
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, 0,
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
// goto _end;
}
@ -1209,27 +1212,29 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResu
}
STimeWindow w = pRes->win;
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &w, masterScan, &pResult, 0, pCtx, numOfOutput);
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &w, masterScan, &pResult, 0, pInfo->pCtx, numOfOutput,
pInfo->rowCellInfoOffset);
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
// int32_t p = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pSDataBlock->info.rows - 1;
// doRowwiseTimeWindowInterpolation(pRuntimeEnv, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], -1, tsCols[0], p,
// w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
setNotInterpoWindowKey(pInfo->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
doBlockwiseApplyFunctions_rv(pRuntimeEnv, pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows,
doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows,
numOfOutput);
}
// restore current time window
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, 0, pCtx, numOfOutput);
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, 0,
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
assert(ret == TSDB_CODE_SUCCESS);
}
// window start key interpolation
//doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &win, pQuery->pos, forwardStep);
doBlockwiseApplyFunctions_rv(pRuntimeEnv, pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows,
doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows,
numOfOutput);
STimeWindow nextWin = win;
@ -1241,7 +1246,8 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResu
}
// null data, failed to allocate more memory buffer
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &nextWin, masterScan, &pResult, 0, pCtx, numOfOutput);
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &nextWin, masterScan, &pResult, 0,
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
break;
}
@ -1251,18 +1257,18 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResu
// window start(end) key interpolation
// doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &nextWin, startPos, forwardStep);
doBlockwiseApplyFunctions_rv(pRuntimeEnv, pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows,
doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows,
numOfOutput);
}
}
static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SResultRowInfo *pResultRowInfo, SQLFunctionCtx *pCtx,
SSDataBlock *pSDataBlock, int32_t colIndex) {
static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SHashGroupbyOperatorInfo *pInfo,
SSDataBlock *pSDataBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* item = pQuery->current;
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, colIndex);
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
@ -1275,16 +1281,16 @@ static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperat
}
// TODO compare with the previous value to speedup the query processing
int32_t ret = setGroupResultOutputBuf_rv(pRuntimeEnv, pResultRowInfo, pCtx, pOperator->numOfOutput, val, type, bytes, item->groupIndex);
int32_t ret = setGroupResultOutputBuf_rv(pRuntimeEnv, &pInfo->resultRowInfo, pInfo->pCtx, pOperator->numOfOutput, val, type, bytes, item->groupIndex, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
pCtx[k].size = 1; // TODO refactor: extract from here
int32_t functionId = pCtx[k].functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
pInfo->pCtx[k].size = 1; // TODO refactor: extract from here
int32_t functionId = pInfo->pCtx[k].functionId;
if (functionNeedToExecute(pRuntimeEnv, &pInfo->pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pInfo->pCtx[k], offset);
}
}
}
@ -1417,7 +1423,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
#endif
static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo,
SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) {
SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex, int32_t* rowCellInfoOffset) {
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
// not assign result buffer yet, add new result buffer, TODO remove it
@ -1450,13 +1456,13 @@ static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResult
}
if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, groupIndex, pRuntimeEnv->numOfRowsPerPage);
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, groupIndex, pRuntimeEnv->pQuery->resultRowSize);
if (ret != 0) {
return -1;
}
}
setResultOutputBuf_rv(pRuntimeEnv, pResultRow, pCtx, numOfCols);
setResultOutputBuf_rv(pRuntimeEnv, pResultRow, pCtx, numOfCols, rowCellInfoOffset);
initCtxOutputBuf_rv(pCtx, numOfCols);
return TSDB_CODE_SUCCESS;
}
@ -1973,7 +1979,8 @@ void UNUSED_FUNC setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inpu
// return TSDB_CODE_SUCCESS;
//}
static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t** rowCellInfoOffset) {
SQuery* pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx * pFuncCtx = (SQLFunctionCtx *)calloc(numOfOutput, sizeof(SQLFunctionCtx));
@ -1981,6 +1988,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
return NULL;
}
*rowCellInfoOffset = calloc(numOfOutput, sizeof(int32_t));
for (int32_t i = 0; i < numOfOutput; ++i) {
SSqlFuncMsg *pSqlFuncMsg = &pExpr[i].base;
SQLFunctionCtx* pCtx = &pFuncCtx[i];
@ -2014,12 +2023,7 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->inputBytes = pSqlFuncMsg->arg[0].argBytes;
pCtx->inputType = pSqlFuncMsg->arg[0].argType;
}
// } else {
// pCtx->inputBytes = pQuery->colList[index].bytes;
// pCtx->inputType = pQuery->colList[index].type;
// }
//
// assert(isValidDataType(pCtx->inputType));
pCtx->ptsOutputBuf = NULL;
pCtx->outputBytes = pExpr[i].bytes;
@ -2089,9 +2093,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
}
if (i > 0) {
pRuntimeEnv->offset[i] = pRuntimeEnv->offset[i - 1] + pFuncCtx[i - 1].outputBytes;
pRuntimeEnv->rowCellInfoOffset[i] =
pRuntimeEnv->rowCellInfoOffset[i - 1] + sizeof(SResultRowCellInfo) + pExpr[i - 1].interBytes;
// (*offset)[i] = (*offset)[i - 1] + pFuncCtx[i - 1].outputBytes;
(*rowCellInfoOffset)[i] = (*rowCellInfoOffset)[i - 1] + sizeof(SResultRowCellInfo) + pExpr[i - 1].interBytes;
}
}
@ -2134,11 +2137,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv));
pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + pQuery->srcRowSize);
pRuntimeEnv->tagVal = malloc(pQuery->tagLen);
pRuntimeEnv->offset = calloc(pQuery->numOfOutput, sizeof(int16_t));
pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t));
// pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t));
pRuntimeEnv->sasArray = calloc(pQuery->numOfOutput, sizeof(SArithmeticSupport));
if (pRuntimeEnv->offset == NULL || pRuntimeEnv->rowCellInfoOffset == NULL || pRuntimeEnv->sasArray == NULL ||
if (/*pRuntimeEnv->rowCellInfoOffset == NULL || */pRuntimeEnv->sasArray == NULL ||
pRuntimeEnv->pResultRowHashTable == NULL || pRuntimeEnv->keyBuf == NULL || pRuntimeEnv->prevRow == NULL ||
pRuntimeEnv->tagVal == NULL) {
goto _clean;
@ -2150,7 +2152,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->prevRow[i] = pRuntimeEnv->prevRow[i - 1] + pQuery->colList[i-1].bytes;
}
pRuntimeEnv->offset[0] = 0;
*(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN;
// if it is group by normal column, do not set output buffer, the output buffer is pResult
@ -2226,8 +2227,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
return TSDB_CODE_SUCCESS;
_clean:
tfree(pRuntimeEnv->offset);
tfree(pRuntimeEnv->rowCellInfoOffset);
// tfree(pRuntimeEnv->rowCellInfoOffset);
tfree(pRuntimeEnv->sasArray);
tfree(pRuntimeEnv->pResultRowHashTable);
tfree(pRuntimeEnv->keyBuf);
@ -2300,9 +2300,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
pRuntimeEnv->pTsBuf = tsBufDestroy(pRuntimeEnv->pTsBuf);
tfree(pRuntimeEnv->offset);
tfree(pRuntimeEnv->keyBuf);
tfree(pRuntimeEnv->rowCellInfoOffset);
// tfree(pRuntimeEnv->rowCellInfoOffset);
tfree(pRuntimeEnv->prevRow);
tfree(pRuntimeEnv->tagVal);
@ -2660,8 +2659,8 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i
*ps = (*ps << 1u);
}
pRuntimeEnv->numOfRowsPerPage = ((*ps) - sizeof(tFilePage)) / (*rowsize);
assert(pRuntimeEnv->numOfRowsPerPage <= MAX_ROWS_PER_RESBUF_PAGE);
// pRuntimeEnv->numOfRowsPerPage = ((*ps) - sizeof(tFilePage)) / (*rowsize);
// assert(pRuntimeEnv->numOfRowsPerPage <= MAX_ROWS_PER_RESBUF_PAGE);
}
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
@ -2882,8 +2881,7 @@ void filterDataBlock_rv(SSingleColumnFilterInfo *pFilterInfo, int32_t numOfFilte
tfree(p);
}
int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, SResultRowInfo *pWindowResInfo,
void *pQueryHandle, SSDataBlock *pBlock, uint32_t *status, int32_t numOfOutput) {
int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock *pBlock, uint32_t *status) {
*status = BLK_DATA_NO_NEEDED;
pBlock->pDataBlock = NULL;
pBlock->pBlockStatis = NULL;
@ -2911,9 +2909,9 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlock->info.window.skey:pBlock->info.window.ekey;
STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery);
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult,
groupId, pCtx, numOfOutput) != TSDB_CODE_SUCCESS) {
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQuery);
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult,
groupId, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
// todo handle error in set result for timewindow
}
}
@ -2926,7 +2924,7 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *
// group by + first/last should not apply the first/last block filter
if (!pQuery->groupbyColumn && (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST)) {
(*status) |= aAggs[functionId].dataReqFunc(&pCtx[i], &pBlock->info.window, colId);
(*status) |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId);
if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) {
break;
}
@ -2948,10 +2946,10 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *
// this function never returns error?
pCost->loadBlockStatis += 1;
tsdbRetrieveDataBlockStatisInfo(pQueryHandle, &pBlock->pBlockStatis);
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis);
if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block
pBlock->pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL);
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL);
pCost->totalCheckedRows += pBlock->info.rows;
}
} else {
@ -2959,15 +2957,15 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *
// load the data block statistics to perform further filter
pCost->loadBlockStatis += 1;
tsdbRetrieveDataBlockStatisInfo(pQueryHandle, &pBlock->pBlockStatis);
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis);
if (pQuery->topBotQuery && pBlock->pBlockStatis != NULL) {
bool load = false;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pCtx[i].functionId;
int32_t functionId = pTableScanInfo->pCtx[i].functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
load = topbot_datablock_filter(&pCtx[i], (char *)&(pBlock->pBlockStatis[i].min), (char *)&(pBlock->pBlockStatis[i].max));
load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char *)&(pBlock->pBlockStatis[i].min), (char *)&(pBlock->pBlockStatis[i].max));
if (!load) {
// current block has been discard due to filter applied
pCost->discardBlocks += 1;
@ -2981,7 +2979,7 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *
}
// current block has been discard due to filter applied
if (!doDataBlockStaticFilter(pRuntimeEnv, pBlock->pBlockStatis, pCtx, pBlockInfo->rows)) {
if (!doDataBlockStaticFilter(pRuntimeEnv, pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
pCost->discardBlocks += 1;
qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", pQInfo, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
(*status) = BLK_DATA_DISCARD;
@ -2990,7 +2988,7 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *
pCost->totalCheckedRows += pBlockInfo->rows;
pCost->loadBlocks += 1;
pBlock->pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL);
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL);
if (pBlock->pDataBlock == NULL) {
return terrno;
}
@ -3393,7 +3391,7 @@ void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntim
}
}
void copyResToQueryResultBuf_rv(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBlock* pBlock) {
void copyResToQueryResultBuf_rv(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBlock* pBlock, int32_t* offset) {
SGroupResInfo* pGroupResInfo = &pRuntimeEnv->groupResInfo;
int32_t code = TSDB_CODE_SUCCESS;
@ -3401,7 +3399,7 @@ void copyResToQueryResultBuf_rv(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold
// all results in current group have been returned to client, try next group
if ((pGroupResInfo->pRows == NULL) || taosArrayGetSize(pGroupResInfo->pRows) == 0) {
assert(pGroupResInfo->index == 0);
if ((code = mergeIntoGroupResult(&pRuntimeEnv->groupResInfo, pRuntimeEnv)) != TSDB_CODE_SUCCESS) {
if ((code = mergeIntoGroupResult(&pRuntimeEnv->groupResInfo, pRuntimeEnv, offset)) != TSDB_CODE_SUCCESS) {
return;
}
}
@ -3438,7 +3436,8 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *
pTableQueryInfo->resInfo.curIndex = pTableQueryInfo->resInfo.size - 1;
}
static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pWindowResInfo, int32_t order) {
#if 0
static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pWindowResInfo, int32_t order, int32_t numOfOutput) {
SQuery* pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
@ -3452,7 +3451,7 @@ static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SResultR
// open/close the specified query for each group result
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functId = pQuery->pExpr1[j].base.functionId;
SResultRowCellInfo* pInfo = getResultCell(pRuntimeEnv, pRow, j);
SResultRowCellInfo* pInfo = getResultCell(pRow, numOfOutput, j);
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) ||
((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) {
@ -3464,7 +3463,7 @@ static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SResultR
}
}
#endif
static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) {
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv));
@ -3531,13 +3530,12 @@ void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
int32_t initResultRow(SResultRow *pResultRow) {
pResultRow->pCellInfo = (SResultRowCellInfo*)((char*)pResultRow + sizeof(SResultRow));
pResultRow->pageId = -1;
pResultRow->rowId = -1;
pResultRow->offset = -1;
return TSDB_CODE_SUCCESS;
}
void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, SSDataBlock* pDataBlock) {
void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo,
SSDataBlock* pDataBlock, int32_t* rowCellInfoOffset) {
int32_t tid = 0;
int64_t uid = 0;
SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&tid, sizeof(tid), true, uid);
@ -3549,7 +3547,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SR
* set the output buffer information and intermediate buffer
* not all queries require the interResultBuf, such as COUNT/TAGPRJ/PRJ/TAG etc.
*/
SResultRowCellInfo* pCellInfo = getResultCell(pRuntimeEnv, pRow, i);
SResultRowCellInfo* pCellInfo = getResultCell(pRow, i, rowCellInfoOffset);
RESET_RESULT_INFO(pCellInfo);
pCtx[i].resultInfo = pCellInfo;
@ -3835,18 +3833,18 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
}
}
#endif
void disableFuncInReverseScan(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pWindowResInfo, SQLFunctionCtx* pCtx, int32_t numOfOutput) {
void disableFuncInReverseScan(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pWindowResInfo, SQLFunctionCtx* pCtx,
int32_t numOfOutput) {
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t order = pQuery->order.order;
// group by normal columns and interval query on normal table
// SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) {
disableFuncInReverseScanImpl(pRuntimeEnv, pWindowResInfo, order);
// disableFuncInReverseScanImpl(pRuntimeEnv, pWindowResInfo, order, numOfOutput);
} else { // for simple result of table query,
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo refactor
for (int32_t j = 0; j < numOfOutput; ++j) { // todo refactor
int32_t functId = pCtx[j].functionId;
if (pCtx[j].resultInfo == NULL) {
continue; // resultInfo is NULL, means no data checked in previous scan
@ -3861,6 +3859,7 @@ void disableFuncInReverseScan(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pWi
}
}
}
#endif
static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, SQLFunctionCtx* pCtx, int32_t numOfOutput) {
SQuery *pQuery = pRuntimeEnv->pQuery;
@ -3878,7 +3877,8 @@ static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow
SET_REVERSE_SCAN_FLAG(pRuntimeEnv);
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
switchCtxOrder(pCtx, numOfOutput);
disableFuncInReverseScan(pRuntimeEnv, pResultRowInfo, pCtx, numOfOutput);
// disableFuncInReverseScan(pRuntimeEnv, pResultRowInfo, pCtx, numOfOutput);
setupQueryRangeForReverseScan(pRuntimeEnv);
}
@ -4050,7 +4050,8 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
#endif
void finalizeQueryResult_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo) {
void finalizeQueryResult_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput,
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) {
// for each group result, call the finalize function for each column
@ -4064,7 +4065,7 @@ void finalizeQueryResult_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx,
continue;
}
setResultOutputBuf_rv(pRuntimeEnv, buf, pCtx, numOfOutput);
setResultOutputBuf_rv(pRuntimeEnv, buf, pCtx, numOfOutput, rowCellInfoOffset);
for (int32_t j = 0; j < numOfOutput; ++j) {
aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]);
@ -4187,22 +4188,24 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) {
pCtx->resultInfo = getResultCell(pRuntimeEnv, pResult, i);
}
}
#endif
void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput) {
void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
tFilePage* bufPage = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId);
int16_t offset = 0;
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultCell(pRuntimeEnv, pResult, i);
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
if (pCtx->resultInfo->initialized && pCtx->resultInfo->complete) {
offset += pCtx[i].outputBytes;
continue;
}
pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv, i, pResult, bufPage);
pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQuery, bufPage, pResult->offset, offset);
pCtx[i].currentStage = 0;
offset += pCtx[i].outputBytes;
int32_t functionId = pCtx[i].functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
@ -4216,8 +4219,8 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
}
void setExecutionContext_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, SQLFunctionCtx* pCtx,
int32_t numOfOutput, int32_t groupIndex, TSKEY nextKey) {
void setExecutionContext_rv(SQueryRuntimeEnv *pRuntimeEnv, SAggOperatorInfo *pInfo, int32_t numOfOutput,
int32_t groupIndex, TSKEY nextKey) {
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current;
// lastKey needs to be updated
@ -4227,7 +4230,7 @@ void setExecutionContext_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResu
}
int64_t uid = 0;
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&groupIndex,
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, &pInfo->resultRowInfo, (char *)&groupIndex,
sizeof(groupIndex), true, uid);
assert (pResultRow != NULL);
@ -4236,7 +4239,7 @@ void setExecutionContext_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResu
* all group belong to one result set, and each group result has different group id so set the id to be one
*/
if (pResultRow->pageId == -1) {
if (addNewWindowResultBuf(pResultRow, pRuntimeEnv->pResultBuf, groupIndex, pRuntimeEnv->numOfRowsPerPage) !=
if (addNewWindowResultBuf(pResultRow, pRuntimeEnv->pResultBuf, groupIndex, pRuntimeEnv->pQuery->resultRowSize) !=
TSDB_CODE_SUCCESS) {
return;
}
@ -4244,16 +4247,19 @@ void setExecutionContext_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResu
// record the current active group id
pRuntimeEnv->prevGroupId = groupIndex;
setResultOutputBuf_rv(pRuntimeEnv, pResultRow, pCtx, numOfOutput);
initCtxOutputBuf_rv(pCtx, numOfOutput);
setResultOutputBuf_rv(pRuntimeEnv, pResultRow, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
initCtxOutputBuf_rv(pInfo->pCtx, numOfOutput);
}
void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfCols) {
void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx,
int32_t numOfCols, int32_t* rowCellInfoOffset) {
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId);
int16_t offset = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv, i, pResult, page);
pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQuery, page, pResult->offset, offset);
offset += pCtx[i].outputBytes;
int32_t functionId = pCtx[i].functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
@ -4264,7 +4270,7 @@ void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, S
* set the output buffer information and intermediate buffer,
* not all queries require the interResultBuf, such as COUNT
*/
pCtx[i].resultInfo = getResultCell(pRuntimeEnv, pResult, i);
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
}
}
@ -4526,13 +4532,17 @@ static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pG
pGroupResInfo->index += 1;
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pRow->pageId);
int16_t offset = 0;
for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, j);
int32_t bytes = pColInfoData->info.bytes;
char *out = pColInfoData->pData + numOfResult * bytes;
char *in = getPosInResultPage(pRuntimeEnv, j, pRow, page);
char *in = getPosInResultPage(pQuery, page, pRow->offset, offset);
memcpy(out, in, bytes * numOfRowsToCopy);
offset += bytes;
}
numOfResult += numOfRowsToCopy;
@ -4565,7 +4575,7 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti
}
static void updateWindowResNumOfRes_rv(SQueryRuntimeEnv *pRuntimeEnv,
SQLFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo) {
SQLFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// update the number of result for each, only update the number of rows for the corresponding window result.
@ -4582,7 +4592,7 @@ static void updateWindowResNumOfRes_rv(SQueryRuntimeEnv *pRuntimeEnv,
continue;
}
SResultRowCellInfo* pCell = getResultCell(pRuntimeEnv, pResult, j);
SResultRowCellInfo* pCell = getResultCell(pResult, j, rowCellInfoOffset);
pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pCell->numOfRes));
}
}
@ -5218,43 +5228,16 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
int32_t TENMB = 1024*1024*10;
if (isSTableQuery && !onlyQueryTags(pQuery)) {
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TENMB, pQInfo);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, ps, TENMB, pQInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
// int16_t type = TSDB_DATA_TYPE_NULL;
// if (pQuery->groupbyColumn) { // group by columns not tags;
// type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
// } else {
// type = TSDB_DATA_TYPE_INT; // group id
// }
// code = initResultRowInfo(&pRuntimeEnv->resultRowInfo, 8, type);
// if (code != TSDB_CODE_SUCCESS) {
// return code;
// }
}
} else if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery) || (!isSTableQuery)) {
// int32_t numOfResultRows = getInitialPageNum(pQInfo);
getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TENMB, pQInfo);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, ps, TENMB, pQInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// int16_t type = TSDB_DATA_TYPE_NULL;
// if (pQuery->groupbyColumn) {
// type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
// } else {
// type = TSDB_DATA_TYPE_TIMESTAMP;
// }
// code = initResultRowInfo(&pRuntimeEnv->resultRowInfo, numOfResultRows, type);
// if (code != TSDB_CODE_SUCCESS) {
// return code;
// }
}
// create runtime environment
@ -6105,9 +6088,7 @@ static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) {
// this function never returns error?
uint32_t status;
int32_t code =
loadDataBlockOnDemand_rv(pTableScanInfo->pRuntimeEnv, pTableScanInfo->pCtx, pTableScanInfo->pResultRowInfo,
pTableScanInfo->pQueryHandle, pBlock, &status, pTableScanInfo->numOfOutput);
int32_t code = loadDataBlockOnDemand_rv(pTableScanInfo->pRuntimeEnv, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTableScanInfo->pRuntimeEnv->env, code);
}
@ -6215,26 +6196,35 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
pTableScanInfo->pCtx = pAggInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pAggInfo->resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pAggInfo->rowCellInfoOffset;
} else if (strcasecmp(name, "HashIntervalAggOp") == 0) {
SHashIntervalOperatorInfo *pIntervalInfo = pDownstream->optInfo;
pTableScanInfo->pCtx = pIntervalInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset;
} else if (strcasecmp(name, "HashGroupbyAggOp") == 0) {
SHashGroupbyOperatorInfo *pGroupbyInfo = pDownstream->optInfo;
pTableScanInfo->pCtx = pGroupbyInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pGroupbyInfo->resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->rowCellInfoOffset;
} else if (strcasecmp(name, "STableIntervalAggOp") == 0) {
SHashIntervalOperatorInfo *pInfo = pDownstream->optInfo;
pTableScanInfo->pCtx = pInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->rowCellInfoOffset;
} else if (strcasecmp(name, "ArithmeticOp") == 0) {
SArithOperatorInfo *pInfo = pDownstream->optInfo;
pTableScanInfo->pCtx = pInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->rowCellInfoOffset;
} else {
assert(0);
}
@ -6309,7 +6299,7 @@ static SSDataBlock* doAggregation(void* param) {
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
if (!pQuery->stableQuery) {
finalizeQueryResult_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput, &pAggInfo->resultRowInfo);
finalizeQueryResult_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput, &pAggInfo->resultRowInfo, pAggInfo->rowCellInfoOffset);
}
pAggInfo->pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput);
@ -6365,13 +6355,12 @@ static SSDataBlock* doSTableAggregation(void* param) {
setInputSDataBlock(pOperator, pAggInfo->pCtx, pBlock, order);
TSKEY k = (pQuery->order.order == TSDB_ORDER_ASC)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1;
setExecutionContext_rv(pRuntimeEnv, &pAggInfo->resultRowInfo, pAggInfo->pCtx, pOperator->numOfOutput,
pQuery->current->groupIndex, k);
setExecutionContext_rv(pRuntimeEnv, pAggInfo, pOperator->numOfOutput, pQuery->current->groupIndex, k);
aggApplyFunctions(pRuntimeEnv, pOperator, pAggInfo->pCtx, pBlock);
}
closeAllResultRows(&pAggInfo->resultRowInfo);
updateWindowResNumOfRes_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput, &pAggInfo->resultRowInfo);
updateWindowResNumOfRes_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput, &pAggInfo->resultRowInfo, pAggInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pAggInfo->resultRowInfo, 0);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pAggInfo->pRes);
@ -6389,7 +6378,7 @@ static SSDataBlock* doArithmeticOperation(void* param) {
SArithOperatorInfo* pArithInfo = pOperator->optInfo;
SQueryRuntimeEnv* pRuntimeEnv = pArithInfo->pRuntimeEnv;
setDefaultOutputBuf(pRuntimeEnv, pArithInfo->pCtx, &pArithInfo->resultRowInfo, pArithInfo->pOutput);
setDefaultOutputBuf(pRuntimeEnv, pArithInfo->pCtx, &pArithInfo->resultRowInfo, pArithInfo->pOutput, pArithInfo->rowCellInfoOffset);
pRuntimeEnv->pQuery->pos = 0;
pArithInfo->pOutput->info.rows = 0;
@ -6528,7 +6517,7 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
// the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order);
hashIntervalAgg(pRuntimeEnv, &pIntervalInfo->resultRowInfo, pIntervalInfo->pCtx, pOperator->numOfOutput, pBlock);
hashIntervalAgg(pRuntimeEnv, &pIntervalInfo->resultRowInfo, pIntervalInfo, pOperator->numOfOutput, pBlock);
}
// restore the value
@ -6537,7 +6526,7 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
closeAllResultRows(&pIntervalInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
finalizeQueryResult_rv(pRuntimeEnv, pIntervalInfo->pCtx, pOperator->numOfOutput, &pIntervalInfo->resultRowInfo);
finalizeQueryResult_rv(pRuntimeEnv, pIntervalInfo->pCtx, pOperator->numOfOutput, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo, 0);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
@ -6585,14 +6574,15 @@ static SSDataBlock* doSTableIntervalAgg(void* param) {
setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order);
setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey);
hashIntervalAgg(pRuntimeEnv, &pRuntimeEnv->pQuery->current->resInfo, pIntervalInfo->pCtx, pOperator->numOfOutput, pBlock);
hashIntervalAgg(pRuntimeEnv, &pRuntimeEnv->pQuery->current->resInfo, pIntervalInfo, pOperator->numOfOutput,
pBlock);
}
pQuery->order.order = order; // TODO : restore the order
doCloseAllTimeWindow(pRuntimeEnv);
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
copyResToQueryResultBuf_rv(pRuntimeEnv, 3000, pIntervalInfo->pRes);
copyResToQueryResultBuf_rv(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = true;
}
@ -6632,17 +6622,17 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
pInfo->colIndex = getGroupbyColumnData_rv(pRuntimeEnv->pQuery->pGroupbyExpr, pBlock);
}
hashGroupbyAgg(pRuntimeEnv, pOperator, &pInfo->resultRowInfo, pInfo->pCtx, pBlock, pInfo->colIndex);
hashGroupbyAgg(pRuntimeEnv, pOperator, pInfo, pBlock);
}
closeAllResultRows(&pInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
if (!pRuntimeEnv->pQuery->stableQuery) {
finalizeQueryResult_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo);
finalizeQueryResult_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
}
updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo);
updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo, 0);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
@ -6728,11 +6718,11 @@ static SOperatorInfo* createAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQ
pOperator->pExpr = pQuery->pExpr1;
pOperator->numOfOutput = pQuery->numOfOutput;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
setDefaultOutputBuf(pRuntimeEnv, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->pRes);
setDefaultOutputBuf(pRuntimeEnv, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->pRes, pInfo->rowCellInfoOffset);
return pOperator;
}
@ -6754,7 +6744,7 @@ static SOperatorInfo* createStableAggOperatorInfo(STableQueryInfo* pTableQueryIn
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
return pOperator;
@ -6777,7 +6767,7 @@ static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo,
pOperator->numOfOutput = (pRuntimeEnv->pQuery->pExpr2 == NULL)? pRuntimeEnv->pQuery->numOfOutput:pRuntimeEnv->pQuery->numOfExpr2;
pInfo->pOutput = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
return pOperator;
@ -6842,7 +6832,7 @@ static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQ
pOperator->numOfOutput = pQuery->numOfOutput;
pOperator->optInfo = pInfo;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
@ -6867,7 +6857,7 @@ static SOperatorInfo* createStableIntervalOperatorInfo(STableQueryInfo* pTableQu
pOperator->numOfOutput = pQuery->numOfOutput;
pOperator->optInfo = pInfo;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
@ -6893,7 +6883,7 @@ SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo
pOperator->numOfOutput = pQuery->numOfOutput;
pOperator->optInfo = pInfo;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);

View File

@ -254,7 +254,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
resetSlotInfo(pBucket);
int32_t ret = createDiskbasedResultBuffer(&pBucket->pBuffer, pBucket->bytes, pBucket->bufPageSize, pBucket->bufPageSize * 512, NULL);
int32_t ret = createDiskbasedResultBuffer(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, NULL);
if (ret != TSDB_CODE_SUCCESS) {
tMemBucketDestroy(pBucket);
return NULL;

View File

@ -9,8 +9,7 @@
#define GET_DATA_PAYLOAD(_p) ((char *)(_p)->pData + POINTER_BYTES)
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t rowSize, int32_t pagesize,
int32_t inMemBufSize, const void* handle) {
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, const void* handle) {
*pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf));
SDiskbasedResultBuf* pResBuf = *pResultBuf;
@ -31,7 +30,6 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t ro
// at least more than 2 pages must be in memory
assert(inMemBufSize >= pagesize * 2);
pResBuf->numOfRowsPerPage = (pagesize - sizeof(tFilePage)) / rowSize;
pResBuf->lruList = tdListNew(POINTER_BYTES);
// init id hash table
@ -387,8 +385,6 @@ void releaseResBufPageInfo(SDiskbasedResultBuf* pResultBuf, SPageInfo* pi) {
pResultBuf->statis.releasePages += 1;
}
size_t getNumOfRowsPerPage(const SDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; }
size_t getNumOfResultBufGroupId(const SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->groupSet); }
size_t getResBufSize(const SDiskbasedResultBuf* pResultBuf) { return (size_t)pResultBuf->totalBufSize; }

View File

@ -135,20 +135,22 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16
if (pResultRow->pageId >= 0) {
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId);
int16_t offset = 0;
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) {
SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[i];
char * s = getPosInResultPage(pRuntimeEnv, i, pResultRow, page);
size_t size = pRuntimeEnv->pQuery->pExpr1[i].bytes;
char * s = getPosInResultPage(pRuntimeEnv->pQuery, page, pResultRow->offset, offset);
memset(s, 0, size);
offset += size;
RESET_RESULT_INFO(pResultInfo);
}
}
pResultRow->numOfRows = 0;
pResultRow->pageId = -1;
pResultRow->rowId = -1;
pResultRow->offset = -1;
pResultRow->closed = false;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
@ -158,9 +160,10 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16
}
}
SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index) {
assert(index >= 0 && index < pRuntimeEnv->pQuery->numOfOutput);
return (SResultRowCellInfo*)((char*) pRow->pCellInfo + pRuntimeEnv->rowCellInfoOffset[index]);
// TODO refactor: use macro
SResultRowCellInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset) {
assert(index >= 0 && offset != NULL);
return (SResultRowCellInfo*)((char*) pRow->pCellInfo + offset[index]);
}
size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv) {
@ -373,7 +376,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
return taosArrayGetSize(pGroupResInfo->pRows);
}
static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow) {
static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow, int32_t* rowCellInfoOffset) {
SQuery* pQuery = pRuntimeEnv->pQuery;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
@ -387,7 +390,7 @@ static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow
continue;
}
SResultRowCellInfo *pResultInfo = getResultCell(pRuntimeEnv, pResultRow, j);
SResultRowCellInfo *pResultInfo = getResultCell(pResultRow, j, rowCellInfoOffset);
assert(pResultInfo != NULL);
if (pResultInfo->numOfRes > 0) {
@ -438,7 +441,8 @@ static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *
}
}
static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList) {
static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList,
int32_t* rowCellInfoOffset) {
bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQuery);
int32_t code = TSDB_CODE_SUCCESS;
@ -492,7 +496,7 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes
SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->resInfo;
SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.rowIndex[tableIndex]);
int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes);
int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes, rowCellInfoOffset);
if (num <= 0) {
cs.rowIndex[tableIndex] += 1;
@ -539,13 +543,13 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes
return code;
}
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv) {
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t* offset) {
int64_t st = taosGetTimestampUs();
while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) {
SArray *group = GET_TABLEGROUP(pRuntimeEnv, pGroupResInfo->currentGroup);
int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group);
int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group, offset);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}

View File

@ -10,7 +10,7 @@ namespace {
// simple test
void simpleTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 64, 1024, 4096, NULL);
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4096, NULL);
int32_t pageId = 0;
int32_t groupId = 0;
@ -52,7 +52,7 @@ void simpleTest() {
void writeDownTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 64, 1024, 4*1024, NULL);
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, NULL);
int32_t pageId = 0;
int32_t writePageId = 0;
@ -99,7 +99,7 @@ void writeDownTest() {
void recyclePageTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 64, 1024, 4*1024, NULL);
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, NULL);
int32_t pageId = 0;
int32_t writePageId = 0;