diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 2e2882e35e..a052bc3359 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -104,7 +104,7 @@ void dsReset(DataSinkHandle handle); * @param handle * @param pLen data length */ -void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, bool* pQueryEnd); +void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd); /** * Get data, the caller needs to allocate data memory. diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index bb55ae4437..833722d86f 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2217,13 +2217,14 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR int32_t len = tsDecompressString((void*)pRsp->data, htonl(pRsp->compLen), 1, pResultInfo->decompressBuf, payloadLen, ONE_STAGE_COMP, NULL, 0); - ASSERT(len == payloadLen); +// ASSERT(len == payloadLen); pResultInfo->pData = pResultInfo->decompressBuf; pResultInfo->payloadLen = payloadLen; } else { pResultInfo->pData = (void*)pRsp->data; pResultInfo->payloadLen = htonl(pRsp->compLen); + ASSERT(pRsp->compLen == pRsp->payloadLen); } // TODO handle the compressed case diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 1498476634..b5710ffb99 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -821,32 +821,21 @@ TEST(clientCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); + TAOS_RES* pRes = NULL; + // TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); // if (taos_errno(pRes) != 0) { // printf("error in create db, reason:%s\n", taos_errstr(pRes)); // } // taos_free_result(pRes); - - TAOS_RES* pRes = taos_query(pConn, "alter local 'fqdn 127.0.0.1'"); - if (taos_errno(pRes) != 0) { - printf("failed to exec query, %s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - - pRes = taos_query(pConn, "select last(ts), ts from cache_1.t1"); +/* + TAOS_RES* pRes = taos_query(pConn, "select last(ts), ts from cache_1.t1"); // pRes = taos_query(pConn, "select last(ts), ts from cache_1.no_pk_t1"); if (taos_errno(pRes) != 0) { - printf("failed to exec query, %s\n", taos_errstr(pRes)); + printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); } taos_free_result(pRes); -// pRes = taos_query(pConn, "create stream stream_1 trigger at_once fill_history 1 ignore expired 0 into str_res1 as select _wstart as ts, count(*) from stable_1 interval(10s);"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); - pRes = taos_query(pConn, "create table tu using st2 tags(2)"); if (taos_errno(pRes) != 0) { printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); @@ -876,27 +865,27 @@ TEST(clientCase, projection_query_tables) { for(int32_t j = 0; j < 1; ++j) { start += 20; for (int32_t i = 0; i < 1; ++i) { - createNewTable(pConn, i, 100, start, pstr); + createNewTable(pConn, i, 100000, 0, pstr); } } +*/ - // - // pRes = taos_query(pConn, "select * from tu"); - // if (taos_errno(pRes) != 0) { - // printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); - // taos_free_result(pRes); - // ASSERT_TRUE(false); - // } - // - // TAOS_ROW pRow = NULL; - // TAOS_FIELD* pFields = taos_fetch_fields(pRes); - // int32_t numOfFields = taos_num_fields(pRes); - // - // char str[512] = {0}; - // while ((pRow = taos_fetch_row(pRes)) != NULL) { - // int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - // printf("%s\n", str); - // } + pRes = taos_query(pConn, "select * from abc1.st2"); + if (taos_errno(pRes) != 0) { + printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); + } taos_free_result(pRes); taos_close(pConn); diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 5afc629865..bde0b47489 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -1957,6 +1957,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock)); rsp->compLen = htonl(len); + rsp->payloadLen = rsp->compLen; blockDataDestroy(pBlock); diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h index dcebd2c6fd..a294d7e487 100644 --- a/source/libs/executor/inc/dataSinkInt.h +++ b/source/libs/executor/inc/dataSinkInt.h @@ -36,7 +36,7 @@ typedef struct SDataSinkManager { typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue); typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds); typedef void (*FReset)(struct SDataSinkHandle* pHandle); -typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd); +typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRowLen, bool* pQueryEnd); typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput); typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); typedef int32_t (*FGetCacheSize)(struct SDataSinkHandle* pHandle, uint64_t* size); diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 960ae14fcf..15288c4406 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -154,7 +154,7 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) { taosThreadMutexUnlock(&pDeleter->mutex); } -static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) { +static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) { SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; if (taosQueueEmpty(pDeleter->pDataBlocks)) { *pQueryEnd = pDeleter->queryEnd; @@ -171,6 +171,8 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE SDataCacheEntry* pEntry = (SDataCacheEntry*)pDeleter->nextOutput.pData; *pLen = pEntry->dataLen; + *pRawLen = pEntry->dataLen; + *pQueryEnd = pDeleter->queryEnd; qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows); @@ -186,6 +188,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { pOutput->queryEnd = pDeleter->queryEnd; return TSDB_CODE_SUCCESS; } + SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData); memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); pDeleter->pParam->pUidList = NULL; diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index b9a586c52c..2c127fba12 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -31,6 +31,7 @@ typedef struct SDataDispatchBuf { } SDataDispatchBuf; typedef struct SDataCacheEntry { + int32_t rawLen; int32_t dataLen; int32_t numOfRows; int32_t numOfCols; @@ -78,10 +79,26 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn pEntry->dataLen = 0; pBuf->useSize = sizeof(SDataCacheEntry); - pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols); // ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); // ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); + { + if (pBuf->allocSize > 8192) { + char* p = taosMemoryMalloc(pBuf->allocSize); + int32_t dataLen = blockEncode(pInput->pData, p, numOfCols); + + int32_t len = tsCompressString(p, dataLen, 1, pEntry->data, pBuf->allocSize, ONE_STAGE_COMP, NULL, 0); + + pEntry->compressed = 1; + pEntry->dataLen = len; + pEntry->rawLen = dataLen; + taosMemoryFree(p); + } else { + pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols); + pEntry->rawLen = pEntry->dataLen; + } + } + pBuf->useSize += pEntry->dataLen; atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen); @@ -165,7 +182,7 @@ static void resetDispatcher(struct SDataSinkHandle* pHandle) { taosThreadMutexUnlock(&pDispatcher->mutex); } -static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) { +static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRowLen, bool* pQueryEnd) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; if (taosQueueEmpty(pDispatcher->pDataBlocks)) { *pQueryEnd = pDispatcher->queryEnd; @@ -182,9 +199,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE SDataCacheEntry* pEntry = (SDataCacheEntry*)pDispatcher->nextOutput.pData; *pLen = pEntry->dataLen; - - // ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); - // ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); + *pRowLen = pEntry->rawLen; *pQueryEnd = pDispatcher->queryEnd; qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, @@ -202,6 +217,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { pOutput->queryEnd = pDispatcher->queryEnd; return TSDB_CODE_SUCCESS; } + SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData); memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); pOutput->numOfRows = pEntry->numOfRows; diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 39bbc1bc69..5ba2f8bf42 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -370,7 +370,7 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) { taosThreadMutexUnlock(&pInserter->mutex); } -static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) { +static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) { SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle; *pLen = pDispatcher->submitRes.affectedRows; qDebug("got total affectedRows %" PRId64, *pLen); diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 2a59bbf1dc..16d0d8f340 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -75,9 +75,9 @@ void dsReset(DataSinkHandle handle) { } } -void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, bool* pQueryEnd) { +void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - pHandleImpl->fGetLen(pHandleImpl, pLen, pQueryEnd); + pHandleImpl->fGetLen(pHandleImpl, pLen, pRawLen, pQueryEnd); } int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) { diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index ae68f69802..6d57a3df46 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -38,7 +38,7 @@ int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); -void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); +void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, int32_t rawDataLen, bool qComplete); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx); int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray *pExecList); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index faa90dcbf8..e2a99a11e7 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -29,13 +29,14 @@ int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **r return TSDB_CODE_SUCCESS; } -void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) { +void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, int32_t rawDataLen, bool qComplete) { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; rsp->useconds = htobe64(input->useconds); rsp->completed = qComplete; rsp->precision = input->precision; rsp->compressed = input->compressed; + rsp->payloadLen = htonl(rawDataLen); rsp->compLen = htonl(len); rsp->numOfRows = htobe64(input->numOfRows); rsp->numOfCols = htonl(input->numOfCols); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 39fdcdee6f..2a24deafd2 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -102,7 +102,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { } if (!ctx->needFetch) { - dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL); + dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL, NULL); } } @@ -285,8 +285,10 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) return TSDB_CODE_SUCCESS; } -int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) { +int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, int32_t *pRawDataLen, void **rspMsg, + SOutputData *pOutput) { int64_t len = 0; + int64_t rawLen = 0; SRetrieveTableRsp *rsp = NULL; bool queryEnd = false; int32_t code = 0; @@ -300,7 +302,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, *dataLen = 0; while (true) { - dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); + dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd); if (len < 0) { QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64 "", len); @@ -343,6 +345,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, QW_TASK_DLOG("there are data in sink, dataLength:%" PRId64 "", len); *dataLen += len; + *pRawDataLen += rawLen; QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &rsp)); @@ -375,11 +378,11 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, break; } - if (pOutput->numOfRows >= QW_MIN_RES_ROWS) { - QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks, - pOutput->numOfRows); +// if (pOutput->numOfRows >= QW_MIN_RES_ROWS) { +// QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks, +// pOutput->numOfRows); break; - } +// } } *rspMsg = rsp; @@ -389,11 +392,12 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) { int64_t len = 0; + int64_t rawLen = 0; bool queryEnd = false; int32_t code = 0; SOutputData output = {0}; - dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); + dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd); if (len <= 0 || len != sizeof(SDeleterRes)) { QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len); @@ -433,9 +437,10 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { void *rsp = NULL; int32_t dataLen = 0; + int32_t rawLen = 0; SOutputData sOutput = {0}; if (TSDB_CODE_SUCCESS == code) { - code = qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput); + code = qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rawLen, &rsp, &sOutput); } if (NULL == rsp && TSDB_CODE_SUCCESS == code) { @@ -445,7 +450,7 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i if (NULL != rsp) { bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); - qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); + qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete); if (qComplete) { atomic_store_8((int8_t *)&ctx->queryEnd, true); } @@ -791,6 +796,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { SQWPhaseInput input = {0}; void *rsp = NULL; int32_t dataLen = 0; + int32_t rawLen = 0; bool queryStop = false; bool qComplete = false; @@ -810,7 +816,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { SOutputData sOutput = {0}; - QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); + QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rawLen, &rsp, &sOutput)); if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus)); @@ -821,7 +827,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (rsp) { qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); - qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); + qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete); if (qComplete) { atomic_store_8((int8_t *)&ctx->queryEnd, true); atomic_store_8((int8_t *)&ctx->queryContinue, 0); @@ -878,6 +884,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; int32_t dataLen = 0; + int32_t rawDataLen = 0; + bool locked = false; SQWTaskCtx *ctx = NULL; void *rsp = NULL; @@ -896,17 +904,17 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } SOutputData sOutput = {0}; - QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); + QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rawDataLen, &rsp, &sOutput)); if (NULL == rsp) { QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); } else { bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); - qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); + qwBuildFetchRsp(rsp, &sOutput, dataLen, rawDataLen, qComplete); { - if (dataLen > 8192) { + if (/*dataLen > 8192*/ 0) { char* p = taosMemoryMalloc(dataLen); int32_t len = tsCompressString(((SRetrieveTableRsp *)rsp)->data, dataLen, 1, p, dataLen, ONE_STAGE_COMP, NULL, 0); @@ -918,8 +926,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ((SRetrieveTableRsp*)rsp)->compressed = 1; taosMemoryFree(p); } else { - ((SRetrieveTableRsp*)rsp)->payloadLen = ((SRetrieveTableRsp*)rsp)->compLen; - ((SRetrieveTableRsp*)rsp)->compressed = 0; +// ((SRetrieveTableRsp*)rsp)->payloadLen = ((SRetrieveTableRsp*)rsp)->compLen; +// ((SRetrieveTableRsp*)rsp)->compressed = 0; } } @@ -1484,6 +1492,7 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64 SQWorker *mgmt = (SQWorker *)pMgmt; int32_t code = 0; int32_t dataLen = 0; + int32_t rawLen = 0; SQWTaskCtx *ctx = NULL; void *rsp = NULL; bool queryStop = false; @@ -1491,7 +1500,6 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64 SQWPhaseInput input = {0}; QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL)); - QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); ctx->fetchMsgType = TDMT_SCH_MERGE_FETCH; @@ -1500,7 +1508,7 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64 SOutputData sOutput = {0}; while (true) { - QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); + QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rawLen, &rsp, &sOutput)); if (NULL == rsp) { QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop)); @@ -1509,7 +1517,7 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64 } else { bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); - qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); + qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete); if (qComplete) { atomic_store_8((int8_t *)&ctx->queryEnd, true); }