diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index e2126f1fc4..eb6a5d9f9e 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -83,10 +83,10 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn // ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); { - if (/*pBuf->allocSize > 8192*/ 0) { - char* p = taosMemoryMalloc(pBuf->allocSize); - int32_t dataLen = blockEncode(pInput->pData, p, numOfCols); + if (pBuf->allocSize > 16384) { + char* p = taosMemoryMalloc(pBuf->allocSize); + int32_t dataLen = blockEncode(pInput->pData, p, numOfCols); int32_t len = tsCompressString(p, dataLen, 1, pEntry->data, pBuf->allocSize, ONE_STAGE_COMP, NULL, 0); pEntry->compressed = 1; diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index e2a99a11e7..ef9ef42c85 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -41,6 +41,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, int32_t rawData rsp->numOfRows = htobe64(input->numOfRows); rsp->numOfCols = htonl(input->numOfCols); rsp->numOfBlocks = htonl(input->numOfBlocks); + ASSERT(rawDataLen != 100446); } void qwFreeFetchRsp(void *msg) { diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index ca03a61023..89b42873de 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -300,6 +300,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, } *dataLen = 0; + *pRawDataLen = 0; while (true) { dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd); @@ -912,26 +913,6 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); qwBuildFetchRsp(rsp, &sOutput, dataLen, rawDataLen, qComplete); - - { - SRetrieveTableRsp* pRsp = rsp; - - if (dataLen > 8192) { - char* p = taosMemoryMalloc(dataLen); - - int32_t len = tsCompressString(pRsp->data, dataLen, 1, p, dataLen, ONE_STAGE_COMP, NULL, 0); - memcpy(pRsp->data, p, len); - - pRsp->payloadLen = htonl(dataLen); - pRsp->compLen = htonl(len); - pRsp->compressed = 1; - taosMemoryFree(p); - } else { - pRsp->payloadLen = pRsp->compLen; - pRsp->compressed = 0; - } - } - if (qComplete) { atomic_store_8((int8_t *)&ctx->queryEnd, true); }