From 1a3cf2e5059900088c7268d6273344b412e775d9 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 6 Aug 2021 09:32:07 +0800 Subject: [PATCH 01/19] [TD-5623]: add compression code on server side --- src/inc/taosmsg.h | 1 + src/query/inc/qExecutor.h | 5 ++++- src/query/src/qExecutor.c | 30 +++++++++++++++++++++++------- src/query/src/queryMain.c | 10 +++++++++- 4 files changed, 37 insertions(+), 9 deletions(-) diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 837db72274..d8b94b3da1 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -536,6 +536,7 @@ typedef struct SRetrieveTableRsp { int16_t precision; int64_t offset; // updated offset value for multi-vnode projection query int64_t useconds; + int8_t compressed; char data[]; } SRetrieveTableRsp; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 84f00891e5..9fb7644cda 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -43,6 +43,9 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int #define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows) +#define QUERY_COMP_THRESHOLD 60 +#define NEEDTO_COMPRESS_QUERY(size) ((size) > QUERY_COMP_THRESHOLD ? 1 : 0) + enum { // when query starts to execute, this status will set QUERY_NOT_COMPLETED = 0x1u, @@ -647,7 +650,7 @@ void destroyUdfInfo(SUdfInfo* pUdfInfo); bool isValidQInfo(void *param); -int32_t doDumpQueryResult(SQInfo *pQInfo, char *data); +int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen); size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows); void setQueryKilled(SQInfo *pQInfo); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d799d56cea..71d8d24389 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4193,7 +4193,15 @@ static void updateNumOfRowsInResultRows(SQueryRuntimeEnv* pRuntimeEnv, SQLFuncti } } -static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) { +static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { + int32_t colLen = pColRes->info.bytes * numOfRows; + int32_t colCompLen = (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colLen, numOfRows, data, + colLen + COMP_OVERFLOW_BYTES, compressed, NULL, 0); + data += colCompLen; + return colCompLen; +} + +static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int8_t compressed, int32_t *compLen) { SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -4202,14 +4210,22 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data if (pQueryAttr->pExpr2 == NULL) { for (int32_t col = 0; col < pQueryAttr->numOfOutput; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); - memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows); - data += pColRes->info.bytes * pRes->info.rows; + if (compressed) { + *compLen += compressQueryColData(pColRes, pRes->info.rows, data, compressed); + } else { + memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows); + data += pColRes->info.bytes * pRes->info.rows; + } } } else { for (int32_t col = 0; col < pQueryAttr->numOfExpr2; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); - memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows); - data += pColRes->info.bytes * numOfRows; + if (compressed) { + *compLen += compressQueryColData(pColRes, numOfRows, data, compressed); + } else { + memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows); + data += pColRes->info.bytes * numOfRows; + } } } @@ -8695,7 +8711,7 @@ void freeQInfo(SQInfo *pQInfo) { tfree(pQInfo); } -int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { +int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen) { // the remained number of retrieved rows, not the interpolated result SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; @@ -8738,7 +8754,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { setQueryStatus(pRuntimeEnv, QUERY_OVER); } } else { - doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data); + doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data, compressed, compLen); } qDebug("QInfo:0x%"PRIx64" current numOfRes rows:%d, total:%" PRId64, pQInfo->qId, diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 1a9c057ef0..c1cd431ef4 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -324,6 +324,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen, bool* continueExec) { SQInfo *pQInfo = (SQInfo *)qinfo; + int32_t compLen = 0; if (pQInfo == NULL || !isValidQInfo(pQInfo)) { return TSDB_CODE_QRY_INVALID_QHANDLE; @@ -356,12 +357,19 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co } (*pRsp)->precision = htons(pQueryAttr->precision); + (*pRsp)->compressed = NEEDTO_COMPRESS_QUERY(pQueryAttr->resultRowSize * s); + if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { - doDumpQueryResult(pQInfo, (*pRsp)->data); + doDumpQueryResult(pQInfo, (*pRsp)->data, (*pRsp)->compressed, &compLen); } else { setQueryStatus(pRuntimeEnv, QUERY_OVER); } + if ((*pRsp)->compressed && compLen != 0) { + *contLen = *contLen - pQueryAttr->resultRowSize * s + compLen; + *pRsp = (SRetrieveTableRsp *)rpcReallocCont(*pRsp, *contLen); + } + pQInfo->rspContext = NULL; pQInfo->dataReady = QUERY_RESULT_NOT_READY; From d31eee309a608224431e6386808e3d272a6891e1 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 11 Aug 2021 18:42:03 +0800 Subject: [PATCH 02/19] [TD-5623]: add server code for decompressing col data --- src/client/inc/tsclient.h | 1 + src/client/src/tscServer.c | 39 ++++++++++++++++++++++++++++++++------ src/query/src/qExecutor.c | 24 +++++++++++++++++++---- 3 files changed, 54 insertions(+), 10 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index b6821de87a..b025ca17c3 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -303,6 +303,7 @@ typedef struct { int16_t numOfCols; int16_t precision; bool completed; + bool compressed; int32_t code; int32_t numOfGroups; SResRec * pGroupRec; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 38927be4ac..399a47c4d6 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2678,6 +2678,27 @@ int tscProcessQueryRsp(SSqlObj *pSql) { return 0; } +static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char *data, int8_t compressed) { + int32_t decompLen = 0; + int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; + + TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1); + int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1); + char *outputBuf = tcalloc(pRes->numOfRows, (pField->bytes + offset)); + + char *p = outputBuf; + int32_t bufOffset = 0, compSize = 0; + for(int32_t i = 0; i < numOfCols; ++i) { + SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); + bufOffset = pInfo->field.bytes * pRes->numOfRows; + int32_t flen = (*(tDataTypes[pInfo->field.type].decompFunc))(data, compSize, pRes->numOfRows, p, bufOffset, + compressed, NULL, 0); + p += flen; + decompLen +=flen; + } + tfree(outputBuf); +} + int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; @@ -2690,18 +2711,24 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { return pRes->code; } - pRes->numOfRows = htonl(pRetrieve->numOfRows); - pRes->precision = htons(pRetrieve->precision); - pRes->offset = htobe64(pRetrieve->offset); - pRes->useconds = htobe64(pRetrieve->useconds); - pRes->completed = (pRetrieve->completed == 1); - pRes->data = pRetrieve->data; + pRes->numOfRows = htonl(pRetrieve->numOfRows); + pRes->precision = htons(pRetrieve->precision); + pRes->offset = htobe64(pRetrieve->offset); + pRes->useconds = htobe64(pRetrieve->useconds); + pRes->completed = (pRetrieve->completed == 1); + pRes->compressed = (pRetrieve->compressed == 1); + pRes->data = pRetrieve->data; SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) { return pRes->code; } + //Decompress col data if compressed from server + if (pRes->compressed) { + decompressQueryColData(pRes, pQueryInfo, pRes->data, pRes->compressed); + } + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); if ((pCmd->command == TSDB_SQL_RETRIEVE) || ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) && diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 71d8d24389..e687090106 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4207,21 +4207,30 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data SSDataBlock* pRes = pRuntimeEnv->outputBuf; + int32_t *compSizes = NULL; + int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput; + + if (compressed) { + compSizes = tmalloc(numOfCols); + } + if (pQueryAttr->pExpr2 == NULL) { - for (int32_t col = 0; col < pQueryAttr->numOfOutput; ++col) { + for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); if (compressed) { - *compLen += compressQueryColData(pColRes, pRes->info.rows, data, compressed); + compSizes[col] = compressQueryColData(pColRes, pRes->info.rows, data, compressed); + *compLen += compSizes[col]; } else { memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows); data += pColRes->info.bytes * pRes->info.rows; } } } else { - for (int32_t col = 0; col < pQueryAttr->numOfExpr2; ++col) { + for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); if (compressed) { - *compLen += compressQueryColData(pColRes, numOfRows, data, compressed); + compSizes[col] = compressQueryColData(pColRes, numOfRows, data, compressed); + *compLen += compSizes[col]; } else { memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows); data += pColRes->info.bytes * numOfRows; @@ -4229,6 +4238,13 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data } } + if (compressed) { + memmove(data, (char *)compSizes, numOfCols * sizeof(int32_t)); + data += numOfCols * sizeof(int32_t); + + tfree(compSizes); + } + int32_t numOfTables = (int32_t) taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap); *(int32_t*)data = htonl(numOfTables); data += sizeof(int32_t); From 8271c508899c6146be602527006b59079895f3c2 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 11 Aug 2021 21:42:59 +0800 Subject: [PATCH 03/19] [TD-5623]: append compressed sizes for each col afterdata --- src/client/src/tscServer.c | 14 ++++++++++---- src/inc/taosmsg.h | 1 + src/query/src/qExecutor.c | 2 +- src/query/src/queryMain.c | 4 +++- 4 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 399a47c4d6..f270d7b849 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2678,25 +2678,30 @@ int tscProcessQueryRsp(SSqlObj *pSql) { return 0; } -static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char *data, int8_t compressed) { +static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char *data, int8_t compressed, int compLen) { int32_t decompLen = 0; int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; + int32_t *compSizes = tcalloc(numOfCols, sizeof(int32_t)); + char *pData = data; + compSizes = (int32_t *)(pData + compLen); TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1); int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1); char *outputBuf = tcalloc(pRes->numOfRows, (pField->bytes + offset)); char *p = outputBuf; - int32_t bufOffset = 0, compSize = 0; + int32_t bufOffset = 0; for(int32_t i = 0; i < numOfCols; ++i) { SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); bufOffset = pInfo->field.bytes * pRes->numOfRows; - int32_t flen = (*(tDataTypes[pInfo->field.type].decompFunc))(data, compSize, pRes->numOfRows, p, bufOffset, + int32_t flen = (*(tDataTypes[pInfo->field.type].decompFunc))(pData, compSizes[i], pRes->numOfRows, p, bufOffset, compressed, NULL, 0); p += flen; decompLen +=flen; + pData += compSizes[i]; } tfree(outputBuf); + tfree(compSizes); } int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { @@ -2726,7 +2731,8 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { //Decompress col data if compressed from server if (pRes->compressed) { - decompressQueryColData(pRes, pQueryInfo, pRes->data, pRes->compressed); + int32_t compLen = htonl(pRetrieve->compLen); + decompressQueryColData(pRes, pQueryInfo, pRes->data, pRes->compressed, compLen); } STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index d8b94b3da1..8f5269c158 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -537,6 +537,7 @@ typedef struct SRetrieveTableRsp { int64_t offset; // updated offset value for multi-vnode projection query int64_t useconds; int8_t compressed; + int32_t compLen; char data[]; } SRetrieveTableRsp; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e687090106..40698dad20 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4211,7 +4211,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput; if (compressed) { - compSizes = tmalloc(numOfCols); + compSizes = tcalloc(numOfCols, sizeof(int32_t)); } if (pQueryAttr->pExpr2 == NULL) { diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index c1cd431ef4..84aa4973a0 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -366,9 +366,11 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co } if ((*pRsp)->compressed && compLen != 0) { - *contLen = *contLen - pQueryAttr->resultRowSize * s + compLen; + int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput; + *contLen = *contLen - pQueryAttr->resultRowSize * s + compLen + numOfCols * sizeof(int32_t); *pRsp = (SRetrieveTableRsp *)rpcReallocCont(*pRsp, *contLen); } + (*pRsp)->compLen = htonl(compLen); pQInfo->rspContext = NULL; pQInfo->dataReady = QUERY_RESULT_NOT_READY; From 97e7f0fec0e38a35233129facecffa9e3772d9dc Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 11 Aug 2021 23:03:24 +0800 Subject: [PATCH 04/19] [TD-5623]: fix passing pointer issue --- src/query/src/qExecutor.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 40698dad20..7e67bd0526 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4193,11 +4193,11 @@ static void updateNumOfRowsInResultRows(SQueryRuntimeEnv* pRuntimeEnv, SQLFuncti } } -static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { +static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char **data, int8_t compressed) { int32_t colLen = pColRes->info.bytes * numOfRows; - int32_t colCompLen = (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colLen, numOfRows, data, + int32_t colCompLen = (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colLen, numOfRows, *data, colLen + COMP_OVERFLOW_BYTES, compressed, NULL, 0); - data += colCompLen; + *data += colCompLen; return colCompLen; } @@ -4218,7 +4218,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); if (compressed) { - compSizes[col] = compressQueryColData(pColRes, pRes->info.rows, data, compressed); + compSizes[col] = compressQueryColData(pColRes, pRes->info.rows, &data, compressed); *compLen += compSizes[col]; } else { memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows); @@ -4229,7 +4229,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); if (compressed) { - compSizes[col] = compressQueryColData(pColRes, numOfRows, data, compressed); + compSizes[col] = compressQueryColData(pColRes, numOfRows, &data, compressed); *compLen += compSizes[col]; } else { memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows); From 91c5deb97f79b8c136e20da8fab9f8b21ec3b83a Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 11 Aug 2021 23:03:24 +0800 Subject: [PATCH 05/19] [TD-5623]: fix passing pointer issue --- src/query/src/qExecutor.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 7e67bd0526..7b58063491 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4193,12 +4193,10 @@ static void updateNumOfRowsInResultRows(SQueryRuntimeEnv* pRuntimeEnv, SQLFuncti } } -static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char **data, int8_t compressed) { +static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { int32_t colLen = pColRes->info.bytes * numOfRows; - int32_t colCompLen = (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colLen, numOfRows, *data, + return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colLen, numOfRows, data, colLen + COMP_OVERFLOW_BYTES, compressed, NULL, 0); - *data += colCompLen; - return colCompLen; } static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int8_t compressed, int32_t *compLen) { @@ -4218,7 +4216,8 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); if (compressed) { - compSizes[col] = compressQueryColData(pColRes, pRes->info.rows, &data, compressed); + compSizes[col] = compressQueryColData(pColRes, pRes->info.rows, data, compressed); + data += compSizes[col]; *compLen += compSizes[col]; } else { memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows); @@ -4229,7 +4228,8 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); if (compressed) { - compSizes[col] = compressQueryColData(pColRes, numOfRows, &data, compressed); + compSizes[col] = compressQueryColData(pColRes, numOfRows, data, compressed); + data += compSizes[col]; *compLen += compSizes[col]; } else { memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows); From 90311bb49425f07daebf23a4a1cd8e9acc87bfbd Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 12 Aug 2021 00:03:41 +0800 Subject: [PATCH 06/19] [TD-5623]: fix malloc pointer issue --- src/client/src/tscServer.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index f270d7b849..47d712e51d 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2681,7 +2681,7 @@ int tscProcessQueryRsp(SSqlObj *pSql) { static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char *data, int8_t compressed, int compLen) { int32_t decompLen = 0; int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; - int32_t *compSizes = tcalloc(numOfCols, sizeof(int32_t)); + int32_t *compSizes; char *pData = data; compSizes = (int32_t *)(pData + compLen); @@ -2701,7 +2701,6 @@ static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char * pData += compSizes[i]; } tfree(outputBuf); - tfree(compSizes); } int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { From c8df2af190ff724bdf41947f513de09ab02851ba Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 12 Aug 2021 09:49:31 +0800 Subject: [PATCH 07/19] [TD-5623]: fix endian issue --- src/client/src/tscServer.c | 2 +- src/query/src/qExecutor.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 47d712e51d..b7a5715544 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2694,7 +2694,7 @@ static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char * for(int32_t i = 0; i < numOfCols; ++i) { SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); bufOffset = pInfo->field.bytes * pRes->numOfRows; - int32_t flen = (*(tDataTypes[pInfo->field.type].decompFunc))(pData, compSizes[i], pRes->numOfRows, p, bufOffset, + int32_t flen = (*(tDataTypes[pInfo->field.type].decompFunc))(pData, htonl(compSizes[i]), pRes->numOfRows, p, bufOffset, compressed, NULL, 0); p += flen; decompLen +=flen; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 7b58063491..07c6e12606 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4216,7 +4216,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); if (compressed) { - compSizes[col] = compressQueryColData(pColRes, pRes->info.rows, data, compressed); + compSizes[col] = htonl(compressQueryColData(pColRes, pRes->info.rows, data, compressed)); data += compSizes[col]; *compLen += compSizes[col]; } else { @@ -4228,7 +4228,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); if (compressed) { - compSizes[col] = compressQueryColData(pColRes, numOfRows, data, compressed); + compSizes[col] = htonl(compressQueryColData(pColRes, numOfRows, data, compressed)); data += compSizes[col]; *compLen += compSizes[col]; } else { From d267d287fdc0f4b4b38d4ae2d26a964bf53d0666 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 12 Aug 2021 09:49:31 +0800 Subject: [PATCH 08/19] [TD-5623]: fix endian issue --- src/query/src/qExecutor.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 07c6e12606..828225b180 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4216,9 +4216,10 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); if (compressed) { - compSizes[col] = htonl(compressQueryColData(pColRes, pRes->info.rows, data, compressed)); + compSizes[col] = compressQueryColData(pColRes, pRes->info.rows, data, compressed); data += compSizes[col]; *compLen += compSizes[col]; + compSizes[col] = htonl(compSizes[col]); } else { memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows); data += pColRes->info.bytes * pRes->info.rows; @@ -4231,6 +4232,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data compSizes[col] = htonl(compressQueryColData(pColRes, numOfRows, data, compressed)); data += compSizes[col]; *compLen += compSizes[col]; + compSizes[col] = htonl(compSizes[col]); } else { memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows); data += pColRes->info.bytes * numOfRows; From 96e7876dedd34056ad57ff47148242309c2e9fcc Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 12 Aug 2021 09:49:31 +0800 Subject: [PATCH 09/19] [TD-5623]: fix endian issue --- src/client/src/tscServer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index b7a5715544..cb2860ec56 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2698,7 +2698,7 @@ static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char * compressed, NULL, 0); p += flen; decompLen +=flen; - pData += compSizes[i]; + pData += htonl(compSizes[i]; } tfree(outputBuf); } From 16b2f3609c1d773b4ac643139ed44216add82818 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 12 Aug 2021 09:49:31 +0800 Subject: [PATCH 10/19] [TD-5623]: resize rsp to accommodate decompressed data --- src/client/src/tscServer.c | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index cb2860ec56..1ececd975d 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2690,16 +2690,32 @@ static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char * char *outputBuf = tcalloc(pRes->numOfRows, (pField->bytes + offset)); char *p = outputBuf; - int32_t bufOffset = 0; - for(int32_t i = 0; i < numOfCols; ++i) { + int32_t bufOffset; + for (int32_t i = 0; i < numOfCols; ++i) { SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); bufOffset = pInfo->field.bytes * pRes->numOfRows; + int32_t flen = (*(tDataTypes[pInfo->field.type].decompFunc))(pData, htonl(compSizes[i]), pRes->numOfRows, p, bufOffset, compressed, NULL, 0); + p += flen; decompLen +=flen; - pData += htonl(compSizes[i]; + pData += htonl(compSizes[i]); } + /* Resize rsp as decompressed data will occupy more space */ + pRes->rspLen = pRes->rspLen - (compLen + numOfCols * sizeof(int32_t)) + decompLen; + char *new_rsp = (char *)realloc(pRes->pRsp, pRes->rspLen); + if (new_rsp == NULL) { + pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + return; + } else { + pRes->pRsp = new_rsp; + } + + int32_t tailLen = pRes->rspLen - sizeof(SRetrieveTableRsp) - decompLen; + memmove(data + decompLen, pData, tailLen); + memmove(data, outputBuf, decompLen); + tfree(outputBuf); } From ace6d9f7d01bd6ff1b9f667da58ff9ee1c231d38 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 12 Aug 2021 09:49:31 +0800 Subject: [PATCH 11/19] [TD-5623]: skip compSizes when when memmove --- src/client/src/tscServer.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 1ececd975d..48fa65b690 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2713,6 +2713,8 @@ static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char * } int32_t tailLen = pRes->rspLen - sizeof(SRetrieveTableRsp) - decompLen; + /* Skip compSizes */ + pData += numOfCols * sizeof(int32_t); memmove(data + decompLen, pData, tailLen); memmove(data, outputBuf, decompLen); From 65fefbd16de4c17bafe6882d8386e730d2612fb5 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 12 Aug 2021 09:49:31 +0800 Subject: [PATCH 12/19] [TD-5623]: fix pointer issues after realloc rsp --- src/client/src/tscServer.c | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 48fa65b690..452bbb1117 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2678,11 +2678,11 @@ int tscProcessQueryRsp(SSqlObj *pSql) { return 0; } -static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char *data, int8_t compressed, int compLen) { +static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char **data, int8_t compressed, int compLen) { int32_t decompLen = 0; int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; int32_t *compSizes; - char *pData = data; + char *pData = *data; compSizes = (int32_t *)(pData + compLen); TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1); @@ -2710,13 +2710,13 @@ static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char * return; } else { pRes->pRsp = new_rsp; + *data = ((SRetrieveTableRsp *)pRes->pRsp)->data; + pData = *data + compLen + numOfCols * sizeof(int32_t); } int32_t tailLen = pRes->rspLen - sizeof(SRetrieveTableRsp) - decompLen; - /* Skip compSizes */ - pData += numOfCols * sizeof(int32_t); - memmove(data + decompLen, pData, tailLen); - memmove(data, outputBuf, decompLen); + memmove(*data + decompLen, pData, tailLen); + memmove(*data, outputBuf, decompLen); tfree(outputBuf); } @@ -2740,7 +2740,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { pRes->completed = (pRetrieve->completed == 1); pRes->compressed = (pRetrieve->compressed == 1); pRes->data = pRetrieve->data; - + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) { return pRes->code; @@ -2749,7 +2749,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { //Decompress col data if compressed from server if (pRes->compressed) { int32_t compLen = htonl(pRetrieve->compLen); - decompressQueryColData(pRes, pQueryInfo, pRes->data, pRes->compressed, compLen); + decompressQueryColData(pRes, pQueryInfo, &pRes->data, pRes->compressed, compLen); } STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -2764,10 +2764,10 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { if (pSql->pSubscription != NULL) { int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; - + TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1); int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1); - + char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows; int32_t numOfTables = htonl(*(int32_t*)p); From 8a140660a455ef3ee14bea68ff74d079ce3030be Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 12 Aug 2021 09:49:31 +0800 Subject: [PATCH 13/19] [TD-5623]: changed compression threshhold check, if column size exceeds certain amount will compress the whole block. --- src/query/inc/qExecutor.h | 3 ++- src/query/src/qExecutor.c | 26 +++++++++++++++++++++++--- src/query/src/queryMain.c | 2 +- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 9fb7644cda..f6e07ae6b1 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -43,7 +43,7 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int #define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows) -#define QUERY_COMP_THRESHOLD 60 +#define QUERY_COMP_THRESHOLD 16 #define NEEDTO_COMPRESS_QUERY(size) ((size) > QUERY_COMP_THRESHOLD ? 1 : 0) enum { @@ -642,6 +642,7 @@ int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg); bool isQueryKilled(SQInfo *pQInfo); int32_t checkForQueryBuf(size_t numOfTables); +bool checkNeedToCompressQueryCol(SQInfo *pQInfo); bool doBuildResCheck(SQInfo* pQInfo); void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 828225b180..7c3e74ec20 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4194,9 +4194,9 @@ static void updateNumOfRowsInResultRows(SQueryRuntimeEnv* pRuntimeEnv, SQLFuncti } static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { - int32_t colLen = pColRes->info.bytes * numOfRows; - return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colLen, numOfRows, data, - colLen + COMP_OVERFLOW_BYTES, compressed, NULL, 0); + int32_t colSize = pColRes->info.bytes * numOfRows; + return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data, + colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); } static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int8_t compressed, int32_t *compLen) { @@ -8858,6 +8858,26 @@ int32_t checkForQueryBuf(size_t numOfTables) { return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER; } +bool checkNeedToCompressQueryCol(SQInfo *pQInfo) { + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; + + SSDataBlock* pRes = pRuntimeEnv->outputBuf; + + int32_t numOfRows = pQueryAttr->pExpr2 ? GET_NUM_OF_RESULTS(pRuntimeEnv) : pRes->info.rows; + int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput; + + for (int32_t col = 0; col < numOfCols; ++col) { + SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); + int32_t colSize = pColRes->info.bytes * numOfRows; + if (NEEDTO_COMPRESS_QUERY(colSize)) { + return true; + } + } + + return false; +} + void releaseQueryBuf(size_t numOfTables) { if (tsQueryBufferSizeBytes < 0) { return; diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 84aa4973a0..b1a029f46c 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -357,7 +357,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co } (*pRsp)->precision = htons(pQueryAttr->precision); - (*pRsp)->compressed = NEEDTO_COMPRESS_QUERY(pQueryAttr->resultRowSize * s); + (*pRsp)->compressed = checkNeedToCompressQueryCol(pQInfo); if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { doDumpQueryResult(pQInfo, (*pRsp)->data, (*pRsp)->compressed, &compLen); From 8eb1af221fab41fa3c9a709c764dced9d1ee6490 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 12 Aug 2021 09:49:31 +0800 Subject: [PATCH 14/19] [TD-5623]: fixed crash caused by second dnodeFetchQ thread try access pRuntimeEnv->outputbuf NULL pointer issue --- src/query/src/qExecutor.c | 4 ++++ src/query/src/queryMain.c | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 7c3e74ec20..44b225930c 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -8864,6 +8864,10 @@ bool checkNeedToCompressQueryCol(SQInfo *pQInfo) { SSDataBlock* pRes = pRuntimeEnv->outputBuf; + if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) <= 0) { + return false; + } + int32_t numOfRows = pQueryAttr->pExpr2 ? GET_NUM_OF_RESULTS(pRuntimeEnv) : pRes->info.rows; int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput; diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index b1a029f46c..ab63d863bc 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -357,7 +357,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co } (*pRsp)->precision = htons(pQueryAttr->precision); - (*pRsp)->compressed = checkNeedToCompressQueryCol(pQInfo); + (*pRsp)->compressed = (int8_t)checkNeedToCompressQueryCol(pQInfo); if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { doDumpQueryResult(pQInfo, (*pRsp)->data, (*pRsp)->compressed, &compLen); From 5e631fc86f2b4783a250f213cb45ee9d77f4e977 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 12 Aug 2021 09:49:31 +0800 Subject: [PATCH 15/19] [TD-5623]: Added Global config 'CompressColDaata' to enable/disable col compression. Default to 0. --- src/common/inc/tglobal.h | 1 + src/common/src/tglobal.c | 18 ++++++++++++++++++ src/query/src/queryMain.c | 2 +- 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 10673beb92..947fed60e4 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -60,6 +60,7 @@ extern char tsLocale[]; extern char tsCharset[]; // default encode string extern int8_t tsEnableCoreFile; extern int32_t tsCompressMsgSize; +extern int32_t tsCompressColData; extern int32_t tsMaxNumOfDistinctResults; extern char tsTempDir[]; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index c7e7a7362b..44b3e87e7d 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -75,6 +75,14 @@ int32_t tsMaxBinaryDisplayWidth = 30; */ int32_t tsCompressMsgSize = -1; +/* denote if server needs to compress the retrieved column data before adding to the rpc response message body. + * 0: disable column data compression + * 1: enable column data compression + * This option is default to disabled. Once enabled, compression will be conducted if any column has size more + * than QUERY_COMP_THRESHOLD. Otherwise, no further compression is needed. + */ +int32_t tsCompressColData = 0; + // client int32_t tsMaxSQLStringLen = TSDB_MAX_ALLOWED_SQL_LEN; int32_t tsMaxWildCardsLen = TSDB_PATTERN_STRING_MAX_LEN; @@ -991,6 +999,16 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "compressColData"; + cfg.ptr = &tsCompressColData; + cfg.valType = TAOS_CFG_VTYPE_INT8; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + cfg.option = "maxSQLLength"; cfg.ptr = &tsMaxSQLStringLen; cfg.valType = TAOS_CFG_VTYPE_INT32; diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index ab63d863bc..d25f5eab7a 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -357,7 +357,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co } (*pRsp)->precision = htons(pQueryAttr->precision); - (*pRsp)->compressed = (int8_t)checkNeedToCompressQueryCol(pQInfo); + (*pRsp)->compressed = (int8_t)(tsCompressColData && checkNeedToCompressQueryCol(pQInfo)); if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { doDumpQueryResult(pQInfo, (*pRsp)->data, (*pRsp)->compressed, &compLen); From 05d69e77ce45e83cff563fcb3a58feedf4fa53d9 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 12 Aug 2021 09:49:31 +0800 Subject: [PATCH 16/19] [TD-5623]: removed compressed from SSqlRes --- src/client/inc/tsclient.h | 1 - src/client/src/tscServer.c | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index b025ca17c3..b6821de87a 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -303,7 +303,6 @@ typedef struct { int16_t numOfCols; int16_t precision; bool completed; - bool compressed; int32_t code; int32_t numOfGroups; SResRec * pGroupRec; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 452bbb1117..29c0eb693f 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2702,6 +2702,7 @@ static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char * decompLen +=flen; pData += htonl(compSizes[i]); } + /* Resize rsp as decompressed data will occupy more space */ pRes->rspLen = pRes->rspLen - (compLen + numOfCols * sizeof(int32_t)) + decompLen; char *new_rsp = (char *)realloc(pRes->pRsp, pRes->rspLen); @@ -2738,7 +2739,6 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { pRes->offset = htobe64(pRetrieve->offset); pRes->useconds = htobe64(pRetrieve->useconds); pRes->completed = (pRetrieve->completed == 1); - pRes->compressed = (pRetrieve->compressed == 1); pRes->data = pRetrieve->data; SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); @@ -2747,9 +2747,9 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { } //Decompress col data if compressed from server - if (pRes->compressed) { + if (pRetrieve->compressed) { int32_t compLen = htonl(pRetrieve->compLen); - decompressQueryColData(pRes, pQueryInfo, &pRes->data, pRes->compressed, compLen); + decompressQueryColData(pRes, pQueryInfo, &pRes->data, pRetrieve->compressed, compLen); } STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); From 600c3006add85cfd6b4d37805c2d362a8f903ac1 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 12 Aug 2021 09:49:31 +0800 Subject: [PATCH 17/19] [TD-5623]: Set default column compression threshold to 512KB --- src/query/inc/qExecutor.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index f6e07ae6b1..021369e23d 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -43,7 +43,8 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int #define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows) -#define QUERY_COMP_THRESHOLD 16 +//TODO: may need to fine tune this threshold +#define QUERY_COMP_THRESHOLD 1024 * 512 #define NEEDTO_COMPRESS_QUERY(size) ((size) > QUERY_COMP_THRESHOLD ? 1 : 0) enum { From 3f4e20572488014f96c456a303a3925f4dc9ea4f Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Sun, 15 Aug 2021 14:33:44 +0800 Subject: [PATCH 18/19] [TD-5623]: fixed show variable python test case --- tests/pytest/functions/showOfflineThresholdIs864000.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pytest/functions/showOfflineThresholdIs864000.py b/tests/pytest/functions/showOfflineThresholdIs864000.py index 57d0b1921b..8ec25cef26 100644 --- a/tests/pytest/functions/showOfflineThresholdIs864000.py +++ b/tests/pytest/functions/showOfflineThresholdIs864000.py @@ -25,7 +25,7 @@ class TDTestCase: def run(self): tdSql.query("show variables") - tdSql.checkData(54, 1, 864000) + tdSql.checkData(55, 1, 864000) def stop(self): tdSql.close() From 4945326202e665ccbd8df1ee98996baafd642d5d Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Sun, 15 Aug 2021 14:33:44 +0800 Subject: [PATCH 19/19] [TD-5623]: fix macro quote --- src/query/inc/qExecutor.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 021369e23d..7dfc6857e3 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -44,7 +44,7 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int #define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows) //TODO: may need to fine tune this threshold -#define QUERY_COMP_THRESHOLD 1024 * 512 +#define QUERY_COMP_THRESHOLD (1024 * 512) #define NEEDTO_COMPRESS_QUERY(size) ((size) > QUERY_COMP_THRESHOLD ? 1 : 0) enum {