fix bugs founded in regression test(reverse scan failed).
This commit is contained in:
parent
3932d0accd
commit
a7fc925f1b
|
@ -169,9 +169,9 @@ typedef struct SExtTagsInfo {
|
|||
|
||||
// sql function runtime context
|
||||
typedef struct SQLFunctionCtx {
|
||||
int32_t startOffset;
|
||||
int32_t size; // number of rows
|
||||
int32_t order; // asc|desc
|
||||
int32_t startOffset;
|
||||
int32_t size; // number of rows
|
||||
uint32_t order; // asc|desc
|
||||
uint32_t scanFlag; // TODO merge with currentStage
|
||||
|
||||
int16_t inputType;
|
||||
|
|
|
@ -1861,7 +1861,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
|
|||
}
|
||||
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
if (isIntervalQuery(pQuery) && pQuery->slidingTime > 0) {
|
||||
if (isIntervalQuery(pQuery)) {
|
||||
int32_t offset = GET_COL_DATA_POS(pQuery, 0, step);
|
||||
TSKEY ts = primaryKeyCol[offset];
|
||||
|
||||
|
@ -2423,7 +2423,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
|
|||
* because the results of group by normal column is put into intermediate buffer.
|
||||
*/
|
||||
int32_t num = 0;
|
||||
if (!groupbyStateValue && !(isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
|
||||
if (!groupbyStateValue && !isIntervalQuery(pQuery)) {
|
||||
num = getNumOfResult(pRuntimeEnv) - prevNumOfRes;
|
||||
}
|
||||
|
||||
|
@ -4588,7 +4588,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
|
|||
vnodeRecordAllFiles(pQInfo, pMeterObj->vnode);
|
||||
|
||||
pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, false);
|
||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
|
||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
|
||||
int32_t rows = getInitialPageNum(pSupporter);
|
||||
|
||||
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize);
|
||||
|
@ -4731,9 +4731,9 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
|
|||
}
|
||||
|
||||
if (pSupporter->pSidSet != NULL || isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) ||
|
||||
(isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
|
||||
isIntervalQuery(pQuery)) {
|
||||
int32_t size = 0;
|
||||
if (isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
|
||||
if (isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) || isIntervalQuery(pQuery)) {
|
||||
size = 10000;
|
||||
} else if (pSupporter->pSidSet != NULL) {
|
||||
size = pSupporter->pSidSet->numOfSubSet;
|
||||
|
@ -5889,7 +5889,11 @@ void disableFunctForTableSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order
|
|||
void disableFunctForSuppleScan(STableQuerySupportObj *pSupporter, int32_t order) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
|
||||
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u;
|
||||
}
|
||||
|
||||
if (isIntervalQuery(pQuery)) {
|
||||
for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) {
|
||||
SMeterQueryInfo *pMeterQueryInfo = pSupporter->pMeterDataInfo[i].pMeterQInfo;
|
||||
|
@ -5898,10 +5902,6 @@ void disableFunctForSuppleScan(STableQuerySupportObj *pSupporter, int32_t order)
|
|||
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
|
||||
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u;
|
||||
}
|
||||
|
||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
|
||||
}
|
||||
|
@ -6193,7 +6193,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
bool toContinue = false;
|
||||
|
||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
|
||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
|
||||
// for each group result, call the finalize function for each column
|
||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||
|
||||
|
@ -6530,7 +6530,7 @@ void changeMeterQueryInfoForSuppleQuery(SQuery *pQuery, SMeterQueryInfo *pMeterQ
|
|||
SWAP(pMeterQueryInfo->skey, pMeterQueryInfo->ekey, TSKEY);
|
||||
pMeterQueryInfo->lastKey = pMeterQueryInfo->skey;
|
||||
|
||||
pMeterQueryInfo->cur.order = pMeterQueryInfo->cur.order ^ 1;
|
||||
pMeterQueryInfo->cur.order = pMeterQueryInfo->cur.order ^ 1u;
|
||||
pMeterQueryInfo->cur.vnodeIndex = -1;
|
||||
}
|
||||
|
||||
|
@ -7115,7 +7115,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk
|
|||
pQuery->pSelectExpr[i].pBase.colInfo.colId, *blkStatus);
|
||||
}
|
||||
|
||||
if (pRuntimeEnv->pTSBuf > 0 || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
|
||||
if (pRuntimeEnv->pTSBuf > 0 || isIntervalQuery(pQuery)) {
|
||||
req |= BLK_DATA_ALL_NEEDED;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -872,7 +872,7 @@ static void doMultiMeterSupplementaryScan(SQInfo *pQInfo) {
|
|||
disableFunctForSuppleScan(pSupporter, pQuery->order.order);
|
||||
|
||||
if (pRuntimeEnv->pTSBuf != NULL) {
|
||||
pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1;
|
||||
pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1u;
|
||||
}
|
||||
|
||||
SWAP(pSupporter->rawSKey, pSupporter->rawEKey, TSKEY);
|
||||
|
@ -945,7 +945,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
|
|||
doOrderedScan(pQInfo);
|
||||
int64_t et = taosGetTimestampMs();
|
||||
dTrace("QInfo:%p main scan completed, elapsed time: %lldms, supplementary scan start, order:%d", pQInfo, et - st,
|
||||
pQuery->order.order ^ 1);
|
||||
pQuery->order.order ^ 1u);
|
||||
|
||||
if (pQuery->intervalTime > 0) {
|
||||
for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) {
|
||||
|
|
|
@ -91,7 +91,7 @@ void taosInterpoSetStartInfo(SInterpolationInfo* pInterpoInfo, int32_t numOfRawD
|
|||
return;
|
||||
}
|
||||
|
||||
pInterpoInfo->rowIdx = 0; // INTERPOL_IS_ASC_INTERPOL(pInterpoInfo) ? 0 : numOfRawDataInRows - 1;
|
||||
pInterpoInfo->rowIdx = 0;
|
||||
pInterpoInfo->numOfRawDataInRows = numOfRawDataInRows;
|
||||
}
|
||||
|
||||
|
@ -295,6 +295,20 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp
|
|||
(*num) += 1;
|
||||
}
|
||||
|
||||
static void initBeforeAfterDataBuf(SColumnModel* pModel, char** nextValues) {
|
||||
if (*nextValues != NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
*nextValues = calloc(1, pModel->rowSize);
|
||||
for (int i = 1; i < pModel->numOfCols; i++) {
|
||||
int16_t offset = getColumnModelOffset(pModel, i);
|
||||
SSchema* pSchema = getColumnModelSchema(pModel, i);
|
||||
|
||||
setNull(*nextValues + offset, pSchema->type, pSchema->bytes);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoType, tFilePage** data,
|
||||
int32_t numOfRawDataInRows, int32_t outputRows, int64_t nInterval,
|
||||
const int64_t* pPrimaryKeyArray, SColumnModel* pModel, char** srcData, int64_t* defaultVal,
|
||||
|
@ -329,16 +343,8 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp
|
|||
if ((pInterpoInfo->startTimestamp < currentTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) ||
|
||||
(pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) {
|
||||
/* set the next value for interpolation */
|
||||
if (*nextValues == NULL) {
|
||||
*nextValues = calloc(1, pModel->rowSize);
|
||||
for (int i = 1; i < pModel->numOfCols; i++) {
|
||||
int16_t offset = getColumnModelOffset(pModel, i);
|
||||
SSchema* pSchema = getColumnModelSchema(pModel, i);
|
||||
|
||||
setNull(*nextValues + offset, pSchema->type, pSchema->bytes);
|
||||
}
|
||||
}
|
||||
|
||||
initBeforeAfterDataBuf(pModel, nextValues);
|
||||
|
||||
int32_t offset = pInterpoInfo->rowIdx;
|
||||
for (int32_t tlen = 0, i = 0; i < pModel->numOfCols - numOfTags; ++i) {
|
||||
SSchema* pSchema = getColumnModelSchema(pModel, i);
|
||||
|
@ -365,17 +371,10 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp
|
|||
return outputRows;
|
||||
}
|
||||
} else {
|
||||
// if (pInterpoInfo->startTimestamp == currentTimestamp) {
|
||||
if (*prevValues == NULL) {
|
||||
*prevValues = calloc(1, pModel->rowSize);
|
||||
for (int i = 1; i < pModel->numOfCols; i++) {
|
||||
int16_t offset = getColumnModelOffset(pModel, i);
|
||||
SSchema* pSchema = getColumnModelSchema(pModel, i);
|
||||
|
||||
setNull(*prevValues + offset, pSchema->type, pSchema->bytes);
|
||||
}
|
||||
}
|
||||
|
||||
assert(pInterpoInfo->startTimestamp == currentTimestamp);
|
||||
|
||||
initBeforeAfterDataBuf(pModel, prevValues);
|
||||
|
||||
// assign rows to dst buffer
|
||||
int32_t i = 0;
|
||||
for (int32_t tlen = 0; i < pModel->numOfCols - numOfTags; ++i) {
|
||||
|
@ -383,19 +382,19 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp
|
|||
SSchema* pSchema = getColumnModelSchema(pModel, i);
|
||||
|
||||
char* val1 = getPos(data[i]->data, pSchema->bytes, num);
|
||||
|
||||
char* src = srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes;
|
||||
|
||||
if (i == 0 ||
|
||||
(functionIDs[i] != TSDB_FUNC_COUNT &&
|
||||
!isNull(srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes, pSchema->type)) ||
|
||||
(functionIDs[i] == TSDB_FUNC_COUNT &&
|
||||
*(int64_t*)(srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes) != 0)) {
|
||||
assignVal(val1, srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes, pSchema->bytes, pSchema->type);
|
||||
memcpy(*prevValues + tlen, srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes, pSchema->bytes);
|
||||
} else { // i > 0 and isNULL, do interpolation
|
||||
(functionIDs[i] != TSDB_FUNC_COUNT && !isNull(src, pSchema->type)) ||
|
||||
(functionIDs[i] == TSDB_FUNC_COUNT && *(int64_t*)(src) != 0)) {
|
||||
assignVal(val1, src, pSchema->bytes, pSchema->type);
|
||||
memcpy(*prevValues + tlen, src, pSchema->bytes);
|
||||
} else { // i > 0 and data is null , do interpolation
|
||||
if (interpoType == TSDB_INTERPO_PREV) {
|
||||
assignVal(val1, *prevValues + offset, pSchema->bytes, pSchema->type);
|
||||
} else if (interpoType == TSDB_INTERPO_LINEAR) {
|
||||
// TODO:
|
||||
assignVal(val1, src, pSchema->bytes, pSchema->type);
|
||||
memcpy(*prevValues + tlen, src, pSchema->bytes);
|
||||
} else {
|
||||
assignVal(val1, (char*)&defaultVal[i], pSchema->bytes, pSchema->type);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue