Merge pull request #4030 from taosdata/bugfix/td-1807
[TD-1807]<fix>: interval window can have more rows
This commit is contained in:
commit
5430b88187
|
@ -33,15 +33,11 @@ struct SColumnFilterElem;
|
|||
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
|
||||
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
|
||||
|
||||
typedef struct SPosInfo {
|
||||
int32_t pageId:20;
|
||||
int32_t rowId:12;
|
||||
} SPosInfo;
|
||||
|
||||
typedef struct SGroupResInfo {
|
||||
int32_t groupId;
|
||||
int32_t numOfDataPages;
|
||||
SPosInfo pos;
|
||||
int32_t pageId;
|
||||
int32_t rowId;
|
||||
} SGroupResInfo;
|
||||
|
||||
typedef struct SSqlGroupbyExpr {
|
||||
|
@ -53,9 +49,10 @@ typedef struct SSqlGroupbyExpr {
|
|||
} SSqlGroupbyExpr;
|
||||
|
||||
typedef struct SWindowResult {
|
||||
SPosInfo pos; // Position of current result in disk-based output buffer
|
||||
int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
|
||||
int32_t rowId:15;
|
||||
bool closed:1; // this result status: closed or opened
|
||||
uint16_t numOfRows; // number of rows of current time window
|
||||
bool closed; // this result status: closed or opened
|
||||
SResultInfo* resultInfo; // For each result column, there is a resultInfo
|
||||
union {STimeWindow win; char* key;}; // start key of current time window
|
||||
} SWindowResult;
|
||||
|
|
|
@ -51,7 +51,7 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3
|
|||
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
int32_t realRowId = (int32_t)(pResult->pos.rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery));
|
||||
int32_t realRowId = (int32_t)(pResult->rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery));
|
||||
return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage +
|
||||
pQuery->pSelectExpr[columnIndex].bytes * realRowId;
|
||||
}
|
||||
|
|
|
@ -557,7 +557,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
|
|||
|
||||
static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t sid,
|
||||
int32_t numOfRowsPerPage) {
|
||||
if (pWindowRes->pos.pageId != -1) {
|
||||
if (pWindowRes->pageId != -1) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -590,11 +590,11 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult
|
|||
}
|
||||
|
||||
// set the number of rows in current disk page
|
||||
if (pWindowRes->pos.pageId == -1) { // not allocated yet, allocate new buffer
|
||||
pWindowRes->pos.pageId = pageId;
|
||||
pWindowRes->pos.rowId = (int32_t)(pData->num++);
|
||||
if (pWindowRes->pageId == -1) { // not allocated yet, allocate new buffer
|
||||
pWindowRes->pageId = pageId;
|
||||
pWindowRes->rowId = (int32_t)(pData->num++);
|
||||
|
||||
assert(pWindowRes->pos.pageId >= 0);
|
||||
assert(pWindowRes->pageId >= 0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -616,7 +616,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
|
|||
*newWind = true;
|
||||
|
||||
// not assign result buffer yet, add new result buffer
|
||||
if (pWindowRes->pos.pageId == -1) {
|
||||
if (pWindowRes->pageId == -1) {
|
||||
int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, sid, pRuntimeEnv->numOfRowsPerPage);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return -1;
|
||||
|
@ -1143,7 +1143,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
|
|||
|
||||
assert(pRuntimeEnv->windowResInfo.interval == 0);
|
||||
|
||||
if (pWindowRes->pos.pageId == -1) {
|
||||
if (pWindowRes->pageId == -1) {
|
||||
int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, GROUPRESULTID, pRuntimeEnv->numOfRowsPerPage);
|
||||
if (ret != 0) {
|
||||
return -1;
|
||||
|
@ -2652,7 +2652,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
|
|||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
||||
|
||||
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pos.pageId);
|
||||
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId);
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
|
||||
|
@ -2823,14 +2823,14 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param)
|
|||
|
||||
SWindowResInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo;
|
||||
SWindowResult * pWindowRes1 = getWindowResult(pWindowResInfo1, leftPos);
|
||||
tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pos.pageId);
|
||||
tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pageId);
|
||||
|
||||
char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1, page1);
|
||||
TSKEY leftTimestamp = GET_INT64_VAL(b1);
|
||||
|
||||
SWindowResInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo;
|
||||
SWindowResult * pWindowRes2 = getWindowResult(pWindowResInfo2, rightPos);
|
||||
tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pos.pageId);
|
||||
tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pageId);
|
||||
|
||||
char *b2 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes2, page2);
|
||||
TSKEY rightTimestamp = GET_INT64_VAL(b2);
|
||||
|
@ -2867,7 +2867,7 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
|
|||
}
|
||||
|
||||
SGroupResInfo* info = &pQInfo->groupResInfo;
|
||||
if (pQInfo->groupIndex == numOfGroups && info->pos.pageId == info->numOfDataPages) {
|
||||
if (pQInfo->groupIndex == numOfGroups && info->pageId == info->numOfDataPages) {
|
||||
SET_STABLE_QUERY_OVER(pQInfo);
|
||||
}
|
||||
|
||||
|
@ -2883,10 +2883,10 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
|
|||
SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo;
|
||||
|
||||
// all results have been return to client, try next group
|
||||
if (pGroupResInfo->pos.pageId == pGroupResInfo->numOfDataPages) {
|
||||
if (pGroupResInfo->pageId == pGroupResInfo->numOfDataPages) {
|
||||
pGroupResInfo->numOfDataPages = 0;
|
||||
pGroupResInfo->pos.pageId = 0;
|
||||
pGroupResInfo->pos.rowId = 0;
|
||||
pGroupResInfo->pageId = 0;
|
||||
pGroupResInfo->rowId = 0;
|
||||
|
||||
// current results of group has been sent to client, try next group
|
||||
if (mergeIntoGroupResult(pQInfo) != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2914,22 +2914,22 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
|
|||
assert(size == pGroupResInfo->numOfDataPages);
|
||||
|
||||
bool done = false;
|
||||
for (int32_t j = pGroupResInfo->pos.pageId; j < size; ++j) {
|
||||
for (int32_t j = pGroupResInfo->pageId; j < size; ++j) {
|
||||
SPageInfo* pi = *(SPageInfo**) taosArrayGet(list, j);
|
||||
tFilePage* pData = getResBufPage(pResultBuf, pi->pageId);
|
||||
|
||||
assert(pData->num > 0 && pData->num <= pRuntimeEnv->numOfRowsPerPage && pGroupResInfo->pos.rowId < pData->num);
|
||||
int32_t numOfRes = (int32_t)(pData->num - pGroupResInfo->pos.rowId);
|
||||
assert(pData->num > 0 && pData->num <= pRuntimeEnv->numOfRowsPerPage && pGroupResInfo->rowId < pData->num);
|
||||
int32_t numOfRes = (int32_t)(pData->num - pGroupResInfo->rowId);
|
||||
|
||||
if (numOfRes > pQuery->rec.capacity - offset) {
|
||||
numOfCopiedRows = (int32_t)(pQuery->rec.capacity - offset);
|
||||
pGroupResInfo->pos.rowId += numOfCopiedRows;
|
||||
pGroupResInfo->rowId += numOfCopiedRows;
|
||||
done = true;
|
||||
} else {
|
||||
numOfCopiedRows = (int32_t)pData->num;
|
||||
|
||||
pGroupResInfo->pos.pageId += 1;
|
||||
pGroupResInfo->pos.rowId = 0;
|
||||
pGroupResInfo->pageId += 1;
|
||||
pGroupResInfo->rowId = 0;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
|
@ -3020,8 +3020,8 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
|
|||
|
||||
pGroupResInfo->numOfDataPages = (int32_t)taosArrayGetSize(pageList);
|
||||
pGroupResInfo->groupId = tid;
|
||||
pGroupResInfo->pos.pageId = 0;
|
||||
pGroupResInfo->pos.rowId = 0;
|
||||
pGroupResInfo->pageId = 0;
|
||||
pGroupResInfo->rowId = 0;
|
||||
|
||||
return pGroupResInfo->numOfDataPages;
|
||||
}
|
||||
|
@ -3067,7 +3067,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
|
|||
|
||||
SWindowResInfo *pWindowResInfo = &pTableList[pos]->windowResInfo;
|
||||
SWindowResult *pWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]);
|
||||
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pos.pageId);
|
||||
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId);
|
||||
|
||||
char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes, page);
|
||||
TSKEY ts = GET_INT64_VAL(b);
|
||||
|
@ -3104,7 +3104,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
|
|||
lastTimestamp = ts;
|
||||
|
||||
// move to the next element of current entry
|
||||
int32_t currentPageId = pWindowRes->pos.pageId;
|
||||
int32_t currentPageId = pWindowRes->pageId;
|
||||
|
||||
cs.position[pos] += 1;
|
||||
if (cs.position[pos] >= pWindowResInfo->size) {
|
||||
|
@ -3117,7 +3117,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
|
|||
} else {
|
||||
// current page is not needed anymore
|
||||
SWindowResult *pNextWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]);
|
||||
if (pNextWindowRes->pos.pageId != currentPageId) {
|
||||
if (pNextWindowRes->pageId != currentPageId) {
|
||||
releaseResBufPage(pRuntimeEnv->pResultBuf, page);
|
||||
}
|
||||
}
|
||||
|
@ -3329,7 +3329,8 @@ int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool is
|
|||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pResultRow->pos = (SPosInfo) {-1, -1};
|
||||
pResultRow->pageId = -1;
|
||||
pResultRow->rowId = -1;
|
||||
|
||||
char* buf = (char*) pResultRow->resultInfo + numOfCols * sizeof(SResultInfo);
|
||||
|
||||
|
@ -3796,7 +3797,7 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
|
|||
* not assign result buffer yet, add new result buffer
|
||||
* all group belong to one result set, and each group result has different group id so set the id to be one
|
||||
*/
|
||||
if (pWindowRes->pos.pageId == -1) {
|
||||
if (pWindowRes->pageId == -1) {
|
||||
if (addNewWindowResultBuf(pWindowRes, pRuntimeEnv->pResultBuf, groupIndex, pRuntimeEnv->numOfRowsPerPage) !=
|
||||
TSDB_CODE_SUCCESS) {
|
||||
return;
|
||||
|
@ -3813,7 +3814,7 @@ void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult
|
|||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
|
||||
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pos.pageId);
|
||||
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId);
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
||||
|
@ -3840,7 +3841,7 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
|
|||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
|
||||
tFilePage* bufPage = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pos.pageId);
|
||||
tFilePage* bufPage = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId);
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
||||
|
@ -4019,12 +4020,12 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_
|
|||
for (int32_t i = startIdx; (i < totalSet) && (i >= 0); i += step) {
|
||||
if (result[i].numOfRows == 0) {
|
||||
pQInfo->groupIndex += 1;
|
||||
pGroupResInfo->pos.rowId = 0;
|
||||
pGroupResInfo->rowId = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t numOfRowsToCopy = result[i].numOfRows - pGroupResInfo->pos.rowId;
|
||||
int32_t oldOffset = pGroupResInfo->pos.rowId;
|
||||
int32_t numOfRowsToCopy = result[i].numOfRows - pGroupResInfo->rowId;
|
||||
int32_t oldOffset = pGroupResInfo->rowId;
|
||||
|
||||
/*
|
||||
* current output space is not enough to accommodate all data of this page, only partial results
|
||||
|
@ -4032,13 +4033,13 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_
|
|||
*/
|
||||
if (numOfRowsToCopy > pQuery->rec.capacity - numOfResult) {
|
||||
numOfRowsToCopy = (int32_t) pQuery->rec.capacity - numOfResult;
|
||||
pGroupResInfo->pos.rowId += numOfRowsToCopy;
|
||||
pGroupResInfo->rowId += numOfRowsToCopy;
|
||||
} else {
|
||||
pGroupResInfo->pos.rowId = 0;
|
||||
pGroupResInfo->rowId = 0;
|
||||
pQInfo->groupIndex += 1;
|
||||
}
|
||||
|
||||
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, result[i].pos.pageId);
|
||||
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, result[i].pageId);
|
||||
|
||||
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
||||
int32_t size = pRuntimeEnv->pCtx[j].outputBytes;
|
||||
|
|
|
@ -266,7 +266,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
|
|||
return;
|
||||
}
|
||||
|
||||
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pos.pageId);
|
||||
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId);
|
||||
|
||||
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) {
|
||||
SResultInfo *pResultInfo = &pWindowRes->resultInfo[i];
|
||||
|
@ -279,7 +279,8 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
|
|||
}
|
||||
|
||||
pWindowRes->numOfRows = 0;
|
||||
pWindowRes->pos = (SPosInfo){-1, -1};
|
||||
pWindowRes->pageId = -1;
|
||||
pWindowRes->rowId = -1;
|
||||
pWindowRes->closed = false;
|
||||
pWindowRes->win = TSWINDOW_INITIALIZER;
|
||||
}
|
||||
|
@ -308,10 +309,10 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, con
|
|||
memcpy(pDst->interResultBuf, pSrc->interResultBuf, pDst->bufLen);
|
||||
|
||||
// copy the output buffer data from src to dst, the position info keep unchanged
|
||||
tFilePage *dstpage = getResBufPage(pRuntimeEnv->pResultBuf, dst->pos.pageId);
|
||||
tFilePage *dstpage = getResBufPage(pRuntimeEnv->pResultBuf, dst->pageId);
|
||||
char * dstBuf = getPosInResultPage(pRuntimeEnv, i, dst, dstpage);
|
||||
|
||||
tFilePage *srcpage = getResBufPage(pRuntimeEnv->pResultBuf, src->pos.pageId);
|
||||
tFilePage *srcpage = getResBufPage(pRuntimeEnv->pResultBuf, src->pageId);
|
||||
char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SWindowResult *)src, srcpage);
|
||||
size_t s = pRuntimeEnv->pQuery->pSelectExpr[i].bytes;
|
||||
|
||||
|
|
Loading…
Reference in New Issue