From 8a140660a455ef3ee14bea68ff74d079ce3030be Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 12 Aug 2021 09:49:31 +0800 Subject: [PATCH] [TD-5623]: changed compression threshhold check, if column size exceeds certain amount will compress the whole block. --- src/query/inc/qExecutor.h | 3 ++- src/query/src/qExecutor.c | 26 +++++++++++++++++++++++--- src/query/src/queryMain.c | 2 +- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 9fb7644cda..f6e07ae6b1 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -43,7 +43,7 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int #define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows) -#define QUERY_COMP_THRESHOLD 60 +#define QUERY_COMP_THRESHOLD 16 #define NEEDTO_COMPRESS_QUERY(size) ((size) > QUERY_COMP_THRESHOLD ? 1 : 0) enum { @@ -642,6 +642,7 @@ int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg); bool isQueryKilled(SQInfo *pQInfo); int32_t checkForQueryBuf(size_t numOfTables); +bool checkNeedToCompressQueryCol(SQInfo *pQInfo); bool doBuildResCheck(SQInfo* pQInfo); void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 828225b180..7c3e74ec20 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4194,9 +4194,9 @@ static void updateNumOfRowsInResultRows(SQueryRuntimeEnv* pRuntimeEnv, SQLFuncti } static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { - int32_t colLen = pColRes->info.bytes * numOfRows; - return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colLen, numOfRows, data, - colLen + COMP_OVERFLOW_BYTES, compressed, NULL, 0); + int32_t colSize = pColRes->info.bytes * numOfRows; + return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data, + colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); } static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int8_t compressed, int32_t *compLen) { @@ -8858,6 +8858,26 @@ int32_t checkForQueryBuf(size_t numOfTables) { return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER; } +bool checkNeedToCompressQueryCol(SQInfo *pQInfo) { + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; + + SSDataBlock* pRes = pRuntimeEnv->outputBuf; + + int32_t numOfRows = pQueryAttr->pExpr2 ? GET_NUM_OF_RESULTS(pRuntimeEnv) : pRes->info.rows; + int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput; + + for (int32_t col = 0; col < numOfCols; ++col) { + SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); + int32_t colSize = pColRes->info.bytes * numOfRows; + if (NEEDTO_COMPRESS_QUERY(colSize)) { + return true; + } + } + + return false; +} + void releaseQueryBuf(size_t numOfTables) { if (tsQueryBufferSizeBytes < 0) { return; diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 84aa4973a0..b1a029f46c 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -357,7 +357,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co } (*pRsp)->precision = htons(pQueryAttr->precision); - (*pRsp)->compressed = NEEDTO_COMPRESS_QUERY(pQueryAttr->resultRowSize * s); + (*pRsp)->compressed = checkNeedToCompressQueryCol(pQInfo); if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { doDumpQueryResult(pQInfo, (*pRsp)->data, (*pRsp)->compressed, &compLen);