diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ed23290be4..96001afc91 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2115,6 +2115,7 @@ typedef struct { int8_t precision; int8_t compressed; int8_t streamBlockType; + int32_t payloadLen; int32_t compLen; int32_t numOfBlocks; int64_t numOfRows; // from int32_t change to int64_t @@ -2143,6 +2144,7 @@ typedef struct { int8_t compressed; int32_t compLen; int32_t numOfRows; + int32_t fullLen; char data[]; } SRetrieveMetaTableRsp; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 9507472df0..e06c7e63ac 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -195,8 +195,9 @@ typedef struct SReqResultInfo { uint64_t current; bool localResultFetched; bool completed; - int32_t precision; bool convertUcs4; + char* decompressBuf; + int32_t precision; int32_t payloadLen; char* convertJson; } SReqResultInfo; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 439103e5c4..38a895e4fd 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -370,6 +370,7 @@ void doFreeReqResultInfo(SReqResultInfo *pResInfo) { taosMemoryFreeClear(pResInfo->fields); taosMemoryFreeClear(pResInfo->userFields); taosMemoryFreeClear(pResInfo->convertJson); + taosMemoryFreeClear(pResInfo->decompressBuf); if (pResInfo->convertBuf != NULL) { for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 1d5bc9cfb4..6f1c3fccd7 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1711,10 +1711,8 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) } SReqResultInfo* pResInfo = &pRequest->body.resInfo; - SSchedulerReq req = { - .syncReq = true, - .pFetchRes = (void**)&pResInfo->pData, - }; + SSchedulerReq req = { .syncReq = true, .pFetchRes = (void**)&pResInfo->pData }; + pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req); if (pRequest->code != TSDB_CODE_SUCCESS) { pResultInfo->numOfRows = 0; @@ -2195,17 +2193,46 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR taosMemoryFreeClear(pResultInfo->pRspMsg); pResultInfo->pRspMsg = (const char*)pRsp; - pResultInfo->pData = (void*)pRsp->data; pResultInfo->numOfRows = htobe64(pRsp->numOfRows); pResultInfo->current = 0; pResultInfo->completed = (pRsp->completed == 1); - pResultInfo->payloadLen = htonl(pRsp->compLen); pResultInfo->precision = pRsp->precision; + // decompress data if needed + if (pRsp->compressed) { + int32_t payloadLen = htonl(pRsp->payloadLen); + + if (pResultInfo->decompressBuf == NULL) { + pResultInfo->decompressBuf = taosMemoryMalloc(payloadLen); + } else { + char* p = taosMemoryRealloc(pResultInfo->decompressBuf, payloadLen); + if (p == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tscError("failed to prepare the decompress buffer, size:%d", payloadLen); + return terrno; + } + + pResultInfo->decompressBuf = p; + } + + int32_t len = tsDecompressString((void*)pRsp->data, htonl(pRsp->compLen), 1, pResultInfo->decompressBuf, payloadLen, + ONE_STAGE_COMP, NULL, 0); + ASSERT(len == payloadLen); + + pResultInfo->pData = pResultInfo->decompressBuf; + pResultInfo->payloadLen = payloadLen; + } else { + pResultInfo->pData = (void*)pRsp->data; + pResultInfo->payloadLen = htonl(pRsp->compLen); + ASSERT(pRsp->compLen == pRsp->payloadLen); + } + // TODO handle the compressed case pResultInfo->totalRows += pResultInfo->numOfRows; - return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows, - convertUcs4); + + int32_t code = + setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows, convertUcs4); + return code; } TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) { diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 360a346fb7..3e178c876f 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -511,6 +511,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) { (*pRsp)->precision = 0; (*pRsp)->compressed = 0; (*pRsp)->compLen = 0; + (*pRsp)->payloadLen = 0; (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS); @@ -623,6 +624,7 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr (*pRsp)->precision = 0; (*pRsp)->compressed = 0; (*pRsp)->compLen = 0; + (*pRsp)->payloadLen = 0; (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS); diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index d19b4c6913..8f3263cbeb 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -37,6 +37,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe (*pRsp)->precision = 0; (*pRsp)->compressed = 0; (*pRsp)->compLen = 0; + (*pRsp)->payloadLen = 0; (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfCols = htonl(numOfCols); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index abe566473f..b9a586c52c 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -63,12 +63,14 @@ typedef struct SDataDispatchHandle { static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { int32_t numOfCols = 0; SNode* pNode; + FOREACH(pNode, pHandle->pSchema->pSlots) { SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; if (pSlotDesc->output) { ++numOfCols; } } + SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; pEntry->compressed = 0; pEntry->numOfRows = pInput->pData->info.rows; diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 06dd43e170..72d0af1726 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -410,6 +410,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp; pRsp->numOfRows = htobe64(pRsp->numOfRows); pRsp->compLen = htonl(pRsp->compLen); + pRsp->payloadLen = htonl(pRsp->payloadLen); pRsp->numOfCols = htonl(pRsp->numOfCols); pRsp->useconds = htobe64(pRsp->useconds); pRsp->numOfBlocks = htonl(pRsp->numOfBlocks); @@ -674,6 +675,16 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa char* pStart = pRetrieveRsp->data; int32_t index = 0; int32_t code = 0; + char* p = NULL; + + if (pRetrieveRsp->compressed) { // decompress the data + p = taosMemoryMalloc(pRetrieveRsp->payloadLen); + int32_t t = tsDecompressString(pRetrieveRsp->data, pRetrieveRsp->compLen, 1, p, pRetrieveRsp->payloadLen, + ONE_STAGE_COMP, NULL, 0); + ASSERT(t == pRetrieveRsp->payloadLen); + pStart = p; + } + while (index++ < pRetrieveRsp->numOfBlocks) { SSDataBlock* pb = NULL; if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) { @@ -692,6 +703,7 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa taosArrayPush(pExchangeInfo->pResultBlockList, &pb); } + taosMemoryFreeClear(p); return code; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 594bb205d8..39fdcdee6f 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -904,6 +904,25 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); + + { + if (dataLen > 8192) { + char* p = taosMemoryMalloc(dataLen); + int32_t len = + tsCompressString(((SRetrieveTableRsp *)rsp)->data, dataLen, 1, p, dataLen, ONE_STAGE_COMP, NULL, 0); + + memcpy(((SRetrieveTableRsp*)rsp)->data, p, len); + ((SRetrieveTableRsp*)rsp)->payloadLen = htonl(dataLen); + ((SRetrieveTableRsp*)rsp)->compLen = htonl(len); + + ((SRetrieveTableRsp*)rsp)->compressed = 1; + taosMemoryFree(p); + } else { + ((SRetrieveTableRsp*)rsp)->payloadLen = ((SRetrieveTableRsp*)rsp)->compLen; + ((SRetrieveTableRsp*)rsp)->compressed = 0; + } + } + if (qComplete) { atomic_store_8((int8_t *)&ctx->queryEnd, true); }