fix bugs found in regression test.
This commit is contained in:
parent
acc083b2b2
commit
960d40d4b4
|
@ -228,7 +228,7 @@ void destroyMeterQueryInfo(SMeterQueryInfo* pMeterQueryInfo, int32_t numOfCols);
|
|||
* @param skey
|
||||
* @param ekey
|
||||
*/
|
||||
void changeMeterQueryInfoForSuppleQuery(SQueryDiskbasedResultBuf* pResultBuf, SMeterQueryInfo* pMeterQueryInfo,
|
||||
void changeMeterQueryInfoForSuppleQuery(SQuery* pQuery, SMeterQueryInfo* pMeterQueryInfo,
|
||||
TSKEY skey, TSKEY ekey);
|
||||
|
||||
/**
|
||||
|
|
|
@ -1445,9 +1445,15 @@ static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sa
|
|||
}
|
||||
|
||||
static bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) {
|
||||
assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size);
|
||||
return (pWindowResInfo->pResult[slot].status.closed == true);
|
||||
}
|
||||
|
||||
static SWindowResult* getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) {
|
||||
assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size);
|
||||
return &pWindowResInfo->pResult[slot];
|
||||
}
|
||||
|
||||
static int32_t curTimeWindow(SWindowResInfo *pWindowResInfo) {
|
||||
assert(pWindowResInfo->curIndex >= 0 && pWindowResInfo->curIndex < pWindowResInfo->size);
|
||||
return pWindowResInfo->curIndex;
|
||||
|
@ -1516,8 +1522,6 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
|
|||
w.ekey = w.skey + pQuery->intervalTime - 1;
|
||||
}
|
||||
|
||||
assert(ts >= w.skey && ts <= w.ekey && w.skey != 0);
|
||||
|
||||
// query border check
|
||||
if (w.ekey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
w.ekey = pQuery->ekey;
|
||||
|
@ -1526,6 +1530,11 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
|
|||
w.skey = pQuery->ekey;
|
||||
}
|
||||
|
||||
assert(ts >= w.skey && ts <= w.ekey && w.skey != 0);
|
||||
if (w.skey == 1542597000000) {
|
||||
int32_t k = 1;
|
||||
}
|
||||
|
||||
return w;
|
||||
}
|
||||
|
||||
|
@ -1559,6 +1568,9 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SQueryDiskbasedR
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (pageId == 153 && pData->numOfElems >= 138) {
|
||||
int32_t k = 1;
|
||||
}
|
||||
// set the number of rows in current disk page
|
||||
if (pWindowRes->pos.pageId == -1) { // not allocated yet, allocate new buffer
|
||||
pWindowRes->pos.pageId = pageId;
|
||||
|
@ -1686,7 +1698,7 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SBlockInfo *pBlockInfo,
|
|||
assert(ekey < pPrimaryColumn[startPos]);
|
||||
} else {
|
||||
if (updateLastKey) {
|
||||
pQuery->lastKey = MAX(ekey, pPrimaryColumn[startPos + (num - 1)]) + step;
|
||||
pQuery->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -1702,7 +1714,7 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SBlockInfo *pBlockInfo,
|
|||
assert(ekey > pPrimaryColumn[startPos]);
|
||||
} else {
|
||||
if (updateLastKey) {
|
||||
pQuery->lastKey = MIN(ekey, pPrimaryColumn[startPos - (num - 1)]) + step;
|
||||
pQuery->lastKey = pPrimaryColumn[startPos - (num - 1)] + step;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -1713,6 +1725,10 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SBlockInfo *pBlockInfo,
|
|||
}
|
||||
}
|
||||
|
||||
if (pQuery->lastKey == 1542597000001) {
|
||||
int32_t k = 1;
|
||||
}
|
||||
|
||||
assert(num >= 0);
|
||||
return num;
|
||||
}
|
||||
|
@ -1849,12 +1865,20 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
|
|||
TSKEY ts = primaryKeyCol[offset];
|
||||
|
||||
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
|
||||
if (win.skey >= 1542597000000 && pRuntimeEnv->pMeterObj->sid == 9 && IS_MASTER_SCAN(pRuntimeEnv)) {
|
||||
int32_t k = 1;
|
||||
}
|
||||
|
||||
if (win.skey >= 1542597000000 && pRuntimeEnv->pMeterObj->sid == 9 && !IS_MASTER_SCAN(pRuntimeEnv)) {
|
||||
int32_t k = 1;
|
||||
}
|
||||
|
||||
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
TSKEY ekey = QUERY_IS_ASC_QUERY(pQuery) ? win.ekey : win.skey;
|
||||
forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, false);
|
||||
forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, true);
|
||||
|
||||
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
|
||||
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep);
|
||||
|
@ -1869,6 +1893,10 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
|
|||
break;
|
||||
}
|
||||
|
||||
if (nextWin.skey >= 1542597000000 && pRuntimeEnv->pMeterObj->sid == 9 && IS_MASTER_SCAN(pRuntimeEnv)) {
|
||||
int32_t k = 1;
|
||||
}
|
||||
|
||||
// null data, failed to allocate more memory buffer
|
||||
int32_t sid = pRuntimeEnv->pMeterObj->sid;
|
||||
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, sid, &nextWin) != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1876,7 +1904,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
|
|||
}
|
||||
|
||||
ekey = QUERY_IS_ASC_QUERY(pQuery) ? nextWin.ekey : nextWin.skey;
|
||||
forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, startPos, ekey, searchFn, false);
|
||||
forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, startPos, ekey, searchFn, true);
|
||||
|
||||
pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
|
||||
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep);
|
||||
|
@ -5486,7 +5514,7 @@ static void printBinaryData(int32_t functionId, char *data, int32_t srcDataType)
|
|||
|
||||
void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOfRows) {
|
||||
int32_t numOfCols = pQuery->numOfOutputCols;
|
||||
printf("metric query intern-result, total:%d\n", numOfRows);
|
||||
printf("super table query intermediate result, total:%d\n", numOfRows);
|
||||
|
||||
SQInfo * pQInfo = (SQInfo *)(GET_QINFO_ADDR(pQuery));
|
||||
SMeterObj *pMeterObj = pQInfo->pObj;
|
||||
|
@ -5720,6 +5748,8 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery
|
|||
tFilePage * pPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx);
|
||||
|
||||
int64_t ts = getCurrentTimestamp(&cs, pos);
|
||||
assert(ts > 0);
|
||||
|
||||
if (ts == lastTimestamp) { // merge with the last one
|
||||
doMerge(pRuntimeEnv, ts, pPage, position->rowIdx, true);
|
||||
} else {
|
||||
|
@ -6559,12 +6589,22 @@ void destroyMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, int32_t numOfCols)
|
|||
free(pMeterQueryInfo);
|
||||
}
|
||||
|
||||
void changeMeterQueryInfoForSuppleQuery(SQueryDiskbasedResultBuf *pResultBuf, SMeterQueryInfo *pMeterQueryInfo,
|
||||
void changeMeterQueryInfoForSuppleQuery(SQuery* pQuery, SMeterQueryInfo *pMeterQueryInfo,
|
||||
TSKEY skey, TSKEY ekey) {
|
||||
if (pMeterQueryInfo == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
//order has change already!
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
if (!QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
assert(pMeterQueryInfo->ekey >= pMeterQueryInfo->lastKey + step);
|
||||
} else {
|
||||
assert(pMeterQueryInfo->ekey <= pMeterQueryInfo->lastKey + step);
|
||||
}
|
||||
|
||||
pMeterQueryInfo->ekey = pMeterQueryInfo->lastKey + step;
|
||||
|
||||
SWAP(pMeterQueryInfo->skey, pMeterQueryInfo->ekey, TSKEY);
|
||||
pMeterQueryInfo->lastKey = pMeterQueryInfo->skey;
|
||||
|
||||
|
@ -7524,15 +7564,20 @@ void applyIntervalQueryOnBlock(STableQuerySupportObj *pSupporter, SMeterDataInfo
|
|||
if (pQuery->intervalTime == 0) { // todo refactor
|
||||
SWindowResInfo *p1 = &pRuntimeEnv->windowResInfo;
|
||||
for (int32_t i = 0; i < p1->size; ++i) {
|
||||
p1->pResult[i].numOfRows = p1->pResult->resultInfo->numOfRes;
|
||||
SWindowResult* pResult = getWindowResult(p1, i);
|
||||
if (isWindowResClosed(p1, i) && pResult->numOfRows == 0) {
|
||||
pResult->numOfRows = getNumOfResult(pRuntimeEnv);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
if ((pQuery->lastKey > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||
(pQuery->lastKey < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
||||
pMeterQueryInfo->ekey = pQuery->lastKey - step;
|
||||
}
|
||||
// get the true maximum timestamp within the query range to set the correct time window
|
||||
// in the supplementary query
|
||||
// int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
// if ((pQuery->lastKey > pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||
// (pQuery->lastKey < pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery))) {
|
||||
// pMeterQueryInfo->ekey = pQuery->lastKey - step;
|
||||
// }
|
||||
|
||||
updatelastkey(pQuery, pMeterQueryInfo);
|
||||
}
|
||||
|
|
|
@ -849,10 +849,11 @@ static void doOrderedScan(SQInfo *pQInfo) {
|
|||
}
|
||||
|
||||
static void setupMeterQueryInfoForSupplementQuery(STableQuerySupportObj *pSupporter) {
|
||||
SQuery* pQuery = pSupporter->runtimeEnv.pQuery;
|
||||
|
||||
for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) {
|
||||
SMeterQueryInfo * pMeterQueryInfo = pSupporter->pMeterDataInfo[i].pMeterQInfo;
|
||||
SQueryDiskbasedResultBuf *pResultBuf = pSupporter->runtimeEnv.pResultBuf;
|
||||
changeMeterQueryInfoForSuppleQuery(pResultBuf, pMeterQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey);
|
||||
SMeterQueryInfo *pMeterQueryInfo = pSupporter->pMeterDataInfo[i].pMeterQInfo;
|
||||
changeMeterQueryInfoForSuppleQuery(pQuery, pMeterQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue