[TD-5623]<feature>: add compression code on server side

This commit is contained in:
Ganlin Zhao 2021-08-06 09:32:07 +08:00
parent 02f6f5e457
commit 1a3cf2e505
4 changed files with 37 additions and 9 deletions

View File

@ -536,6 +536,7 @@ typedef struct SRetrieveTableRsp {
int16_t precision; int16_t precision;
int64_t offset; // updated offset value for multi-vnode projection query int64_t offset; // updated offset value for multi-vnode projection query
int64_t useconds; int64_t useconds;
int8_t compressed;
char data[]; char data[];
} SRetrieveTableRsp; } SRetrieveTableRsp;

View File

@ -43,6 +43,9 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows) #define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows)
#define QUERY_COMP_THRESHOLD 60
#define NEEDTO_COMPRESS_QUERY(size) ((size) > QUERY_COMP_THRESHOLD ? 1 : 0)
enum { enum {
// when query starts to execute, this status will set // when query starts to execute, this status will set
QUERY_NOT_COMPLETED = 0x1u, QUERY_NOT_COMPLETED = 0x1u,
@ -647,7 +650,7 @@ void destroyUdfInfo(SUdfInfo* pUdfInfo);
bool isValidQInfo(void *param); bool isValidQInfo(void *param);
int32_t doDumpQueryResult(SQInfo *pQInfo, char *data); int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen);
size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows); size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows);
void setQueryKilled(SQInfo *pQInfo); void setQueryKilled(SQInfo *pQInfo);

View File

@ -4193,7 +4193,15 @@ static void updateNumOfRowsInResultRows(SQueryRuntimeEnv* pRuntimeEnv, SQLFuncti
} }
} }
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) { static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
int32_t colLen = pColRes->info.bytes * numOfRows;
int32_t colCompLen = (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colLen, numOfRows, data,
colLen + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
data += colCompLen;
return colCompLen;
}
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int8_t compressed, int32_t *compLen) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
@ -4202,14 +4210,22 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
if (pQueryAttr->pExpr2 == NULL) { if (pQueryAttr->pExpr2 == NULL) {
for (int32_t col = 0; col < pQueryAttr->numOfOutput; ++col) { for (int32_t col = 0; col < pQueryAttr->numOfOutput; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col);
memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows); if (compressed) {
data += pColRes->info.bytes * pRes->info.rows; *compLen += compressQueryColData(pColRes, pRes->info.rows, data, compressed);
} else {
memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows);
data += pColRes->info.bytes * pRes->info.rows;
}
} }
} else { } else {
for (int32_t col = 0; col < pQueryAttr->numOfExpr2; ++col) { for (int32_t col = 0; col < pQueryAttr->numOfExpr2; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col);
memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows); if (compressed) {
data += pColRes->info.bytes * numOfRows; *compLen += compressQueryColData(pColRes, numOfRows, data, compressed);
} else {
memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows);
data += pColRes->info.bytes * numOfRows;
}
} }
} }
@ -8695,7 +8711,7 @@ void freeQInfo(SQInfo *pQInfo) {
tfree(pQInfo); tfree(pQInfo);
} }
int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen) {
// the remained number of retrieved rows, not the interpolated result // the remained number of retrieved rows, not the interpolated result
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
@ -8738,7 +8754,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
setQueryStatus(pRuntimeEnv, QUERY_OVER); setQueryStatus(pRuntimeEnv, QUERY_OVER);
} }
} else { } else {
doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data); doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data, compressed, compLen);
} }
qDebug("QInfo:0x%"PRIx64" current numOfRes rows:%d, total:%" PRId64, pQInfo->qId, qDebug("QInfo:0x%"PRIx64" current numOfRes rows:%d, total:%" PRId64, pQInfo->qId,

View File

@ -324,6 +324,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen, bool* continueExec) { int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen, bool* continueExec) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
int32_t compLen = 0;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) { if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE; return TSDB_CODE_QRY_INVALID_QHANDLE;
@ -356,12 +357,19 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
} }
(*pRsp)->precision = htons(pQueryAttr->precision); (*pRsp)->precision = htons(pQueryAttr->precision);
(*pRsp)->compressed = NEEDTO_COMPRESS_QUERY(pQueryAttr->resultRowSize * s);
if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) {
doDumpQueryResult(pQInfo, (*pRsp)->data); doDumpQueryResult(pQInfo, (*pRsp)->data, (*pRsp)->compressed, &compLen);
} else { } else {
setQueryStatus(pRuntimeEnv, QUERY_OVER); setQueryStatus(pRuntimeEnv, QUERY_OVER);
} }
if ((*pRsp)->compressed && compLen != 0) {
*contLen = *contLen - pQueryAttr->resultRowSize * s + compLen;
*pRsp = (SRetrieveTableRsp *)rpcReallocCont(*pRsp, *contLen);
}
pQInfo->rspContext = NULL; pQInfo->rspContext = NULL;
pQInfo->dataReady = QUERY_RESULT_NOT_READY; pQInfo->dataReady = QUERY_RESULT_NOT_READY;