diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 38927be4ac..29c0eb693f 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2678,6 +2678,50 @@ int tscProcessQueryRsp(SSqlObj *pSql) { return 0; } +static void decompressQueryColData(SSqlRes *pRes, SQueryInfo* pQueryInfo, char **data, int8_t compressed, int compLen) { + int32_t decompLen = 0; + int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; + int32_t *compSizes; + char *pData = *data; + compSizes = (int32_t *)(pData + compLen); + + TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1); + int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1); + char *outputBuf = tcalloc(pRes->numOfRows, (pField->bytes + offset)); + + char *p = outputBuf; + int32_t bufOffset; + for (int32_t i = 0; i < numOfCols; ++i) { + SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); + bufOffset = pInfo->field.bytes * pRes->numOfRows; + + int32_t flen = (*(tDataTypes[pInfo->field.type].decompFunc))(pData, htonl(compSizes[i]), pRes->numOfRows, p, bufOffset, + compressed, NULL, 0); + + p += flen; + decompLen +=flen; + pData += htonl(compSizes[i]); + } + + /* Resize rsp as decompressed data will occupy more space */ + pRes->rspLen = pRes->rspLen - (compLen + numOfCols * sizeof(int32_t)) + decompLen; + char *new_rsp = (char *)realloc(pRes->pRsp, pRes->rspLen); + if (new_rsp == NULL) { + pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + return; + } else { + pRes->pRsp = new_rsp; + *data = ((SRetrieveTableRsp *)pRes->pRsp)->data; + pData = *data + compLen + numOfCols * sizeof(int32_t); + } + + int32_t tailLen = pRes->rspLen - sizeof(SRetrieveTableRsp) - decompLen; + memmove(*data + decompLen, pData, tailLen); + memmove(*data, outputBuf, decompLen); + + tfree(outputBuf); +} + int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; @@ -2690,18 +2734,24 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { return pRes->code; } - pRes->numOfRows = htonl(pRetrieve->numOfRows); - pRes->precision = htons(pRetrieve->precision); - pRes->offset = htobe64(pRetrieve->offset); - pRes->useconds = htobe64(pRetrieve->useconds); - pRes->completed = (pRetrieve->completed == 1); - pRes->data = pRetrieve->data; - + pRes->numOfRows = htonl(pRetrieve->numOfRows); + pRes->precision = htons(pRetrieve->precision); + pRes->offset = htobe64(pRetrieve->offset); + pRes->useconds = htobe64(pRetrieve->useconds); + pRes->completed = (pRetrieve->completed == 1); + pRes->data = pRetrieve->data; + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) { return pRes->code; } + //Decompress col data if compressed from server + if (pRetrieve->compressed) { + int32_t compLen = htonl(pRetrieve->compLen); + decompressQueryColData(pRes, pQueryInfo, &pRes->data, pRetrieve->compressed, compLen); + } + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); if ((pCmd->command == TSDB_SQL_RETRIEVE) || ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) && @@ -2714,10 +2764,10 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { if (pSql->pSubscription != NULL) { int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; - + TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1); int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1); - + char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows; int32_t numOfTables = htonl(*(int32_t*)p); diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 10673beb92..947fed60e4 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -60,6 +60,7 @@ extern char tsLocale[]; extern char tsCharset[]; // default encode string extern int8_t tsEnableCoreFile; extern int32_t tsCompressMsgSize; +extern int32_t tsCompressColData; extern int32_t tsMaxNumOfDistinctResults; extern char tsTempDir[]; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index c7e7a7362b..44b3e87e7d 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -75,6 +75,14 @@ int32_t tsMaxBinaryDisplayWidth = 30; */ int32_t tsCompressMsgSize = -1; +/* denote if server needs to compress the retrieved column data before adding to the rpc response message body. + * 0: disable column data compression + * 1: enable column data compression + * This option is default to disabled. Once enabled, compression will be conducted if any column has size more + * than QUERY_COMP_THRESHOLD. Otherwise, no further compression is needed. + */ +int32_t tsCompressColData = 0; + // client int32_t tsMaxSQLStringLen = TSDB_MAX_ALLOWED_SQL_LEN; int32_t tsMaxWildCardsLen = TSDB_PATTERN_STRING_MAX_LEN; @@ -991,6 +999,16 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "compressColData"; + cfg.ptr = &tsCompressColData; + cfg.valType = TAOS_CFG_VTYPE_INT8; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + cfg.option = "maxSQLLength"; cfg.ptr = &tsMaxSQLStringLen; cfg.valType = TAOS_CFG_VTYPE_INT32; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 837db72274..8f5269c158 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -536,6 +536,8 @@ typedef struct SRetrieveTableRsp { int16_t precision; int64_t offset; // updated offset value for multi-vnode projection query int64_t useconds; + int8_t compressed; + int32_t compLen; char data[]; } SRetrieveTableRsp; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index a694e9b1dd..6e8eec2456 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -43,6 +43,10 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int #define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows) +//TODO: may need to fine tune this threshold +#define QUERY_COMP_THRESHOLD (1024 * 512) +#define NEEDTO_COMPRESS_QUERY(size) ((size) > QUERY_COMP_THRESHOLD ? 1 : 0) + enum { // when query starts to execute, this status will set QUERY_NOT_COMPLETED = 0x1u, @@ -638,6 +642,7 @@ int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg); bool isQueryKilled(SQInfo *pQInfo); int32_t checkForQueryBuf(size_t numOfTables); +bool checkNeedToCompressQueryCol(SQInfo *pQInfo); bool doBuildResCheck(SQInfo* pQInfo); void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status); @@ -646,7 +651,7 @@ void destroyUdfInfo(SUdfInfo* pUdfInfo); bool isValidQInfo(void *param); -int32_t doDumpQueryResult(SQInfo *pQInfo, char *data); +int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen); size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows); void setQueryKilled(SQInfo *pQInfo); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index fe362f51a9..303612fc8e 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4190,26 +4190,60 @@ static void updateNumOfRowsInResultRows(SQueryRuntimeEnv* pRuntimeEnv, SQLFuncti } } -static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) { +static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { + int32_t colSize = pColRes->info.bytes * numOfRows; + return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data, + colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); +} + +static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int8_t compressed, int32_t *compLen) { SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; SSDataBlock* pRes = pRuntimeEnv->outputBuf; + int32_t *compSizes = NULL; + int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput; + + if (compressed) { + compSizes = tcalloc(numOfCols, sizeof(int32_t)); + } + if (pQueryAttr->pExpr2 == NULL) { - for (int32_t col = 0; col < pQueryAttr->numOfOutput; ++col) { + for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); - memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows); - data += pColRes->info.bytes * pRes->info.rows; + if (compressed) { + compSizes[col] = compressQueryColData(pColRes, pRes->info.rows, data, compressed); + data += compSizes[col]; + *compLen += compSizes[col]; + compSizes[col] = htonl(compSizes[col]); + } else { + memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows); + data += pColRes->info.bytes * pRes->info.rows; + } } } else { - for (int32_t col = 0; col < pQueryAttr->numOfExpr2; ++col) { + for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); - memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows); - data += pColRes->info.bytes * numOfRows; + if (compressed) { + compSizes[col] = htonl(compressQueryColData(pColRes, numOfRows, data, compressed)); + data += compSizes[col]; + *compLen += compSizes[col]; + compSizes[col] = htonl(compSizes[col]); + } else { + memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows); + data += pColRes->info.bytes * numOfRows; + } } } + if (compressed) { + memmove(data, (char *)compSizes, numOfCols * sizeof(int32_t)); + data += numOfCols * sizeof(int32_t); + + tfree(compSizes); + } + int32_t numOfTables = (int32_t) taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap); *(int32_t*)data = htonl(numOfTables); data += sizeof(int32_t); @@ -8692,7 +8726,7 @@ void freeQInfo(SQInfo *pQInfo) { tfree(pQInfo); } -int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { +int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen) { // the remained number of retrieved rows, not the interpolated result SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; @@ -8735,7 +8769,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { setQueryStatus(pRuntimeEnv, QUERY_OVER); } } else { - doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data); + doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data, compressed, compLen); } qDebug("QInfo:0x%"PRIx64" current numOfRes rows:%d, total:%" PRId64, pQInfo->qId, @@ -8821,6 +8855,30 @@ int32_t checkForQueryBuf(size_t numOfTables) { return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER; } +bool checkNeedToCompressQueryCol(SQInfo *pQInfo) { + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; + + SSDataBlock* pRes = pRuntimeEnv->outputBuf; + + if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) <= 0) { + return false; + } + + int32_t numOfRows = pQueryAttr->pExpr2 ? GET_NUM_OF_RESULTS(pRuntimeEnv) : pRes->info.rows; + int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput; + + for (int32_t col = 0; col < numOfCols; ++col) { + SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); + int32_t colSize = pColRes->info.bytes * numOfRows; + if (NEEDTO_COMPRESS_QUERY(colSize)) { + return true; + } + } + + return false; +} + void releaseQueryBuf(size_t numOfTables) { if (tsQueryBufferSizeBytes < 0) { return; diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 1a9c057ef0..d25f5eab7a 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -324,6 +324,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen, bool* continueExec) { SQInfo *pQInfo = (SQInfo *)qinfo; + int32_t compLen = 0; if (pQInfo == NULL || !isValidQInfo(pQInfo)) { return TSDB_CODE_QRY_INVALID_QHANDLE; @@ -356,12 +357,21 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co } (*pRsp)->precision = htons(pQueryAttr->precision); + (*pRsp)->compressed = (int8_t)(tsCompressColData && checkNeedToCompressQueryCol(pQInfo)); + if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { - doDumpQueryResult(pQInfo, (*pRsp)->data); + doDumpQueryResult(pQInfo, (*pRsp)->data, (*pRsp)->compressed, &compLen); } else { setQueryStatus(pRuntimeEnv, QUERY_OVER); } + if ((*pRsp)->compressed && compLen != 0) { + int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput; + *contLen = *contLen - pQueryAttr->resultRowSize * s + compLen + numOfCols * sizeof(int32_t); + *pRsp = (SRetrieveTableRsp *)rpcReallocCont(*pRsp, *contLen); + } + (*pRsp)->compLen = htonl(compLen); + pQInfo->rspContext = NULL; pQInfo->dataReady = QUERY_RESULT_NOT_READY; diff --git a/tests/pytest/functions/showOfflineThresholdIs864000.py b/tests/pytest/functions/showOfflineThresholdIs864000.py index 57d0b1921b..8ec25cef26 100644 --- a/tests/pytest/functions/showOfflineThresholdIs864000.py +++ b/tests/pytest/functions/showOfflineThresholdIs864000.py @@ -25,7 +25,7 @@ class TDTestCase: def run(self): tdSql.query("show variables") - tdSql.checkData(54, 1, 864000) + tdSql.checkData(55, 1, 864000) def stop(self): tdSql.close()