[TD-5623]<feature>: changed compression threshhold check, if column size

exceeds certain amount will compress the whole block.
This commit is contained in:
Ganlin Zhao 2021-08-12 09:49:31 +08:00
parent 65fefbd16d
commit 8a140660a4
3 changed files with 26 additions and 5 deletions

View File

@ -43,7 +43,7 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows) #define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows)
#define QUERY_COMP_THRESHOLD 60 #define QUERY_COMP_THRESHOLD 16
#define NEEDTO_COMPRESS_QUERY(size) ((size) > QUERY_COMP_THRESHOLD ? 1 : 0) #define NEEDTO_COMPRESS_QUERY(size) ((size) > QUERY_COMP_THRESHOLD ? 1 : 0)
enum { enum {
@ -642,6 +642,7 @@ int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg);
bool isQueryKilled(SQInfo *pQInfo); bool isQueryKilled(SQInfo *pQInfo);
int32_t checkForQueryBuf(size_t numOfTables); int32_t checkForQueryBuf(size_t numOfTables);
bool checkNeedToCompressQueryCol(SQInfo *pQInfo);
bool doBuildResCheck(SQInfo* pQInfo); bool doBuildResCheck(SQInfo* pQInfo);
void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status); void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status);

View File

@ -4194,9 +4194,9 @@ static void updateNumOfRowsInResultRows(SQueryRuntimeEnv* pRuntimeEnv, SQLFuncti
} }
static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
int32_t colLen = pColRes->info.bytes * numOfRows; int32_t colSize = pColRes->info.bytes * numOfRows;
return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colLen, numOfRows, data, return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data,
colLen + COMP_OVERFLOW_BYTES, compressed, NULL, 0); colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
} }
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int8_t compressed, int32_t *compLen) { static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int8_t compressed, int32_t *compLen) {
@ -8858,6 +8858,26 @@ int32_t checkForQueryBuf(size_t numOfTables) {
return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER; return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
} }
bool checkNeedToCompressQueryCol(SQInfo *pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
SSDataBlock* pRes = pRuntimeEnv->outputBuf;
int32_t numOfRows = pQueryAttr->pExpr2 ? GET_NUM_OF_RESULTS(pRuntimeEnv) : pRes->info.rows;
int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput;
for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col);
int32_t colSize = pColRes->info.bytes * numOfRows;
if (NEEDTO_COMPRESS_QUERY(colSize)) {
return true;
}
}
return false;
}
void releaseQueryBuf(size_t numOfTables) { void releaseQueryBuf(size_t numOfTables) {
if (tsQueryBufferSizeBytes < 0) { if (tsQueryBufferSizeBytes < 0) {
return; return;

View File

@ -357,7 +357,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
} }
(*pRsp)->precision = htons(pQueryAttr->precision); (*pRsp)->precision = htons(pQueryAttr->precision);
(*pRsp)->compressed = NEEDTO_COMPRESS_QUERY(pQueryAttr->resultRowSize * s); (*pRsp)->compressed = checkNeedToCompressQueryCol(pQInfo);
if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) {
doDumpQueryResult(pQInfo, (*pRsp)->data, (*pRsp)->compressed, &compLen); doDumpQueryResult(pQInfo, (*pRsp)->data, (*pRsp)->compressed, &compLen);