enh(query): compress msg before sending.

This commit is contained in:
Haojun Liao 2024-05-16 11:35:34 +08:00
parent c87ced12b6
commit 45d78aff8c
9 changed files with 76 additions and 9 deletions

View File

@ -2115,6 +2115,7 @@ typedef struct {
int8_t precision;
int8_t compressed;
int8_t streamBlockType;
int32_t payloadLen;
int32_t compLen;
int32_t numOfBlocks;
int64_t numOfRows; // from int32_t change to int64_t
@ -2143,6 +2144,7 @@ typedef struct {
int8_t compressed;
int32_t compLen;
int32_t numOfRows;
int32_t fullLen;
char data[];
} SRetrieveMetaTableRsp;

View File

@ -195,8 +195,9 @@ typedef struct SReqResultInfo {
uint64_t current;
bool localResultFetched;
bool completed;
int32_t precision;
bool convertUcs4;
char* decompressBuf;
int32_t precision;
int32_t payloadLen;
char* convertJson;
} SReqResultInfo;

View File

@ -370,6 +370,7 @@ void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
taosMemoryFreeClear(pResInfo->fields);
taosMemoryFreeClear(pResInfo->userFields);
taosMemoryFreeClear(pResInfo->convertJson);
taosMemoryFreeClear(pResInfo->decompressBuf);
if (pResInfo->convertBuf != NULL) {
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {

View File

@ -1711,10 +1711,8 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
}
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
SSchedulerReq req = {
.syncReq = true,
.pFetchRes = (void**)&pResInfo->pData,
};
SSchedulerReq req = { .syncReq = true, .pFetchRes = (void**)&pResInfo->pData };
pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
@ -2195,17 +2193,46 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
taosMemoryFreeClear(pResultInfo->pRspMsg);
pResultInfo->pRspMsg = (const char*)pRsp;
pResultInfo->pData = (void*)pRsp->data;
pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
pResultInfo->current = 0;
pResultInfo->completed = (pRsp->completed == 1);
pResultInfo->payloadLen = htonl(pRsp->compLen);
pResultInfo->precision = pRsp->precision;
// decompress data if needed
if (pRsp->compressed) {
int32_t payloadLen = htonl(pRsp->payloadLen);
if (pResultInfo->decompressBuf == NULL) {
pResultInfo->decompressBuf = taosMemoryMalloc(payloadLen);
} else {
char* p = taosMemoryRealloc(pResultInfo->decompressBuf, payloadLen);
if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
return terrno;
}
pResultInfo->decompressBuf = p;
}
int32_t len = tsDecompressString((void*)pRsp->data, htonl(pRsp->compLen), 1, pResultInfo->decompressBuf, payloadLen,
ONE_STAGE_COMP, NULL, 0);
ASSERT(len == payloadLen);
pResultInfo->pData = pResultInfo->decompressBuf;
pResultInfo->payloadLen = payloadLen;
} else {
pResultInfo->pData = (void*)pRsp->data;
pResultInfo->payloadLen = htonl(pRsp->compLen);
ASSERT(pRsp->compLen == pRsp->payloadLen);
}
// TODO handle the compressed case
pResultInfo->totalRows += pResultInfo->numOfRows;
return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows,
convertUcs4);
int32_t code =
setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows, convertUcs4);
return code;
}
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {

View File

@ -511,6 +511,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
(*pRsp)->precision = 0;
(*pRsp)->compressed = 0;
(*pRsp)->compLen = 0;
(*pRsp)->payloadLen = 0;
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
@ -623,6 +624,7 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr
(*pRsp)->precision = 0;
(*pRsp)->compressed = 0;
(*pRsp)->compLen = 0;
(*pRsp)->payloadLen = 0;
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);

View File

@ -37,6 +37,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
(*pRsp)->precision = 0;
(*pRsp)->compressed = 0;
(*pRsp)->compLen = 0;
(*pRsp)->payloadLen = 0;
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(numOfCols);

View File

@ -63,12 +63,14 @@ typedef struct SDataDispatchHandle {
static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
int32_t numOfCols = 0;
SNode* pNode;
FOREACH(pNode, pHandle->pSchema->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) {
++numOfCols;
}
}
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
pEntry->compressed = 0;
pEntry->numOfRows = pInput->pData->info.rows;

View File

@ -410,6 +410,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
pRsp->numOfRows = htobe64(pRsp->numOfRows);
pRsp->compLen = htonl(pRsp->compLen);
pRsp->payloadLen = htonl(pRsp->payloadLen);
pRsp->numOfCols = htonl(pRsp->numOfCols);
pRsp->useconds = htobe64(pRsp->useconds);
pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
@ -674,6 +675,16 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa
char* pStart = pRetrieveRsp->data;
int32_t index = 0;
int32_t code = 0;
char* p = NULL;
if (pRetrieveRsp->compressed) { // decompress the data
p = taosMemoryMalloc(pRetrieveRsp->payloadLen);
int32_t t = tsDecompressString(pRetrieveRsp->data, pRetrieveRsp->compLen, 1, p, pRetrieveRsp->payloadLen,
ONE_STAGE_COMP, NULL, 0);
ASSERT(t == pRetrieveRsp->payloadLen);
pStart = p;
}
while (index++ < pRetrieveRsp->numOfBlocks) {
SSDataBlock* pb = NULL;
if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
@ -692,6 +703,7 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa
taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
}
taosMemoryFreeClear(p);
return code;
}

View File

@ -904,6 +904,25 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
{
if (dataLen > 8192) {
char* p = taosMemoryMalloc(dataLen);
int32_t len =
tsCompressString(((SRetrieveTableRsp *)rsp)->data, dataLen, 1, p, dataLen, ONE_STAGE_COMP, NULL, 0);
memcpy(((SRetrieveTableRsp*)rsp)->data, p, len);
((SRetrieveTableRsp*)rsp)->payloadLen = htonl(dataLen);
((SRetrieveTableRsp*)rsp)->compLen = htonl(len);
((SRetrieveTableRsp*)rsp)->compressed = 1;
taosMemoryFree(p);
} else {
((SRetrieveTableRsp*)rsp)->payloadLen = ((SRetrieveTableRsp*)rsp)->compLen;
((SRetrieveTableRsp*)rsp)->compressed = 0;
}
}
if (qComplete) {
atomic_store_8((int8_t *)&ctx->queryEnd, true);
}