Merge pull request #1267 from taosdata/feature/liaohj

Feature/liaohj
This commit is contained in:
slguan 2020-02-27 21:37:03 +08:00 committed by GitHub
commit 0553473456
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 66 additions and 54 deletions

View File

@ -171,7 +171,7 @@ typedef struct SExtTagsInfo {
typedef struct SQLFunctionCtx {
int32_t startOffset;
int32_t size; // number of rows
int32_t order; // asc|desc
uint32_t order; // asc|desc
uint32_t scanFlag; // TODO merge with currentStage
int16_t inputType;

View File

@ -146,7 +146,7 @@ typedef struct SQueryRuntimeEnv {
SPositionInfo endPos; /* the last access position in query, served as the start pos of reversed order query */
SPositionInfo nextPos; /* start position of the next scan */
SData* colDataBuffer[TSDB_MAX_COLUMNS];
SResultInfo* resultInfo;
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
uint8_t blockStatus; // Indicate if data block is loaded, the block is first/last/internal block
int32_t unzipBufSize;
SData* primaryColBuffer;

View File

@ -1861,7 +1861,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
}
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (isIntervalQuery(pQuery) && pQuery->slidingTime > 0) {
if (isIntervalQuery(pQuery)) {
int32_t offset = GET_COL_DATA_POS(pQuery, 0, step);
TSKEY ts = primaryKeyCol[offset];
@ -2423,7 +2423,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
* because the results of group by normal column is put into intermediate buffer.
*/
int32_t num = 0;
if (!groupbyStateValue && !(isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
if (!groupbyStateValue && !isIntervalQuery(pQuery)) {
num = getNumOfResult(pRuntimeEnv) - prevNumOfRes;
}
@ -3945,9 +3945,9 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) {
// in case of point-interpolation query, use asc order scan
char msg[] =
"QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%" PRId64 "-%" PRId64
", "
"new qrange:%" PRId64 "-%" PRId64;
", new qrange:%" PRId64 "-%" PRId64;
// todo handle the case the the order irrelevant query type mixed up with order critical query type
// descending order query for last_row query
if (isFirstLastRowQuery(pQuery)) {
dTrace("QInfo:%p scan order changed for last_row query, old:%d, new:%d", GET_QINFO_ADDR(pQuery),
@ -4588,7 +4588,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
vnodeRecordAllFiles(pQInfo, pMeterObj->vnode);
pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, false);
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
int32_t rows = getInitialPageNum(pSupporter);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize);
@ -4731,9 +4731,9 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
}
if (pSupporter->pSidSet != NULL || isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) ||
(isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
isIntervalQuery(pQuery)) {
int32_t size = 0;
if (isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
if (isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) || isIntervalQuery(pQuery)) {
size = 10000;
} else if (pSupporter->pSidSet != NULL) {
size = pSupporter->pSidSet->numOfSubSet;
@ -5873,7 +5873,6 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *
void disableFunctForTableSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
SQuery *pQuery = pRuntimeEnv->pQuery;
assert(!pRuntimeEnv->stableQuery);
// group by normal columns and interval query on normal table
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
@ -5881,7 +5880,21 @@ void disableFunctForTableSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order
}
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
} else { // for simple result of table query,
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
int32_t functId = pQuery->pSelectExpr[j].pBase.functionId;
SQLFunctionCtx* pCtx = &pRuntimeEnv->pCtx[j];
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSQL_SO_DESC) ||
((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSQL_SO_ASC)) {
pCtx->resultInfo->complete = false;
} else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) {
pCtx->resultInfo->complete = true;
}
}
}
pQuery->order.order = pQuery->order.order ^ 1u;
}
@ -5890,6 +5903,10 @@ void disableFunctForSuppleScan(STableQuerySupportObj *pSupporter, int32_t order)
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u;
}
if (isIntervalQuery(pQuery)) {
for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) {
SMeterQueryInfo *pMeterQueryInfo = pSupporter->pMeterDataInfo[i].pMeterQInfo;
@ -5898,10 +5915,6 @@ void disableFunctForSuppleScan(STableQuerySupportObj *pSupporter, int32_t order)
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
}
} else {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u;
}
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
}
@ -6193,7 +6206,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
bool toContinue = false;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
// for each group result, call the finalize function for each column
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
@ -6530,7 +6543,7 @@ void changeMeterQueryInfoForSuppleQuery(SQuery *pQuery, SMeterQueryInfo *pMeterQ
SWAP(pMeterQueryInfo->skey, pMeterQueryInfo->ekey, TSKEY);
pMeterQueryInfo->lastKey = pMeterQueryInfo->skey;
pMeterQueryInfo->cur.order = pMeterQueryInfo->cur.order ^ 1;
pMeterQueryInfo->cur.order = pMeterQueryInfo->cur.order ^ 1u;
pMeterQueryInfo->cur.vnodeIndex = -1;
}
@ -7115,7 +7128,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk
pQuery->pSelectExpr[i].pBase.colInfo.colId, *blkStatus);
}
if (pRuntimeEnv->pTSBuf > 0 || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
if (pRuntimeEnv->pTSBuf > 0 || isIntervalQuery(pQuery)) {
req |= BLK_DATA_ALL_NEEDED;
}
}

View File

@ -872,7 +872,7 @@ static void doMultiMeterSupplementaryScan(SQInfo *pQInfo) {
disableFunctForSuppleScan(pSupporter, pQuery->order.order);
if (pRuntimeEnv->pTSBuf != NULL) {
pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1;
pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1u;
}
SWAP(pSupporter->rawSKey, pSupporter->rawEKey, TSKEY);
@ -945,7 +945,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
doOrderedScan(pQInfo);
int64_t et = taosGetTimestampMs();
dTrace("QInfo:%p main scan completed, elapsed time: %lldms, supplementary scan start, order:%d", pQInfo, et - st,
pQuery->order.order ^ 1);
pQuery->order.order ^ 1u);
if (pQuery->intervalTime > 0) {
for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) {

View File

@ -91,7 +91,7 @@ void taosInterpoSetStartInfo(SInterpolationInfo* pInterpoInfo, int32_t numOfRawD
return;
}
pInterpoInfo->rowIdx = 0; // INTERPOL_IS_ASC_INTERPOL(pInterpoInfo) ? 0 : numOfRawDataInRows - 1;
pInterpoInfo->rowIdx = 0;
pInterpoInfo->numOfRawDataInRows = numOfRawDataInRows;
}
@ -295,6 +295,20 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp
(*num) += 1;
}
static void initBeforeAfterDataBuf(SColumnModel* pModel, char** nextValues) {
if (*nextValues != NULL) {
return;
}
*nextValues = calloc(1, pModel->rowSize);
for (int i = 1; i < pModel->numOfCols; i++) {
int16_t offset = getColumnModelOffset(pModel, i);
SSchema* pSchema = getColumnModelSchema(pModel, i);
setNull(*nextValues + offset, pSchema->type, pSchema->bytes);
}
}
int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoType, tFilePage** data,
int32_t numOfRawDataInRows, int32_t outputRows, int64_t nInterval,
const int64_t* pPrimaryKeyArray, SColumnModel* pModel, char** srcData, int64_t* defaultVal,
@ -329,15 +343,7 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp
if ((pInterpoInfo->startTimestamp < currentTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) ||
(pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) {
/* set the next value for interpolation */
if (*nextValues == NULL) {
*nextValues = calloc(1, pModel->rowSize);
for (int i = 1; i < pModel->numOfCols; i++) {
int16_t offset = getColumnModelOffset(pModel, i);
SSchema* pSchema = getColumnModelSchema(pModel, i);
setNull(*nextValues + offset, pSchema->type, pSchema->bytes);
}
}
initBeforeAfterDataBuf(pModel, nextValues);
int32_t offset = pInterpoInfo->rowIdx;
for (int32_t tlen = 0, i = 0; i < pModel->numOfCols - numOfTags; ++i) {
@ -365,16 +371,9 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp
return outputRows;
}
} else {
// if (pInterpoInfo->startTimestamp == currentTimestamp) {
if (*prevValues == NULL) {
*prevValues = calloc(1, pModel->rowSize);
for (int i = 1; i < pModel->numOfCols; i++) {
int16_t offset = getColumnModelOffset(pModel, i);
SSchema* pSchema = getColumnModelSchema(pModel, i);
assert(pInterpoInfo->startTimestamp == currentTimestamp);
setNull(*prevValues + offset, pSchema->type, pSchema->bytes);
}
}
initBeforeAfterDataBuf(pModel, prevValues);
// assign rows to dst buffer
int32_t i = 0;
@ -383,19 +382,19 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp
SSchema* pSchema = getColumnModelSchema(pModel, i);
char* val1 = getPos(data[i]->data, pSchema->bytes, num);
char* src = srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes;
if (i == 0 ||
(functionIDs[i] != TSDB_FUNC_COUNT &&
!isNull(srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes, pSchema->type)) ||
(functionIDs[i] == TSDB_FUNC_COUNT &&
*(int64_t*)(srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes) != 0)) {
assignVal(val1, srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes, pSchema->bytes, pSchema->type);
memcpy(*prevValues + tlen, srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes, pSchema->bytes);
} else { // i > 0 and isNULL, do interpolation
(functionIDs[i] != TSDB_FUNC_COUNT && !isNull(src, pSchema->type)) ||
(functionIDs[i] == TSDB_FUNC_COUNT && *(int64_t*)(src) != 0)) {
assignVal(val1, src, pSchema->bytes, pSchema->type);
memcpy(*prevValues + tlen, src, pSchema->bytes);
} else { // i > 0 and data is null , do interpolation
if (interpoType == TSDB_INTERPO_PREV) {
assignVal(val1, *prevValues + offset, pSchema->bytes, pSchema->type);
} else if (interpoType == TSDB_INTERPO_LINEAR) {
// TODO:
assignVal(val1, src, pSchema->bytes, pSchema->type);
memcpy(*prevValues + tlen, src, pSchema->bytes);
} else {
assignVal(val1, (char*)&defaultVal[i], pSchema->bytes, pSchema->type);
}