From 27c4e3634cc8cf84f860f96659339e8258f27882 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 17 May 2024 17:04:05 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/client/src/clientImpl.c | 2 +- source/dnode/vnode/src/tsdb/tsdbRead2.c | 1 - source/libs/executor/src/exchangeoperator.c | 2 +- source/libs/qworker/src/qworker.c | 20 ++++++++++---------- 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 52a776f66b..6f1c3fccd7 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2224,7 +2224,7 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR } else { pResultInfo->pData = (void*)pRsp->data; pResultInfo->payloadLen = htonl(pRsp->compLen); -// ASSERT(pRsp->compLen == pRsp->payloadLen); + ASSERT(pRsp->compLen == pRsp->payloadLen); } // TODO handle the compressed case diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 76dbfd8a45..381e040c5b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4242,7 +4242,6 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi SVnodeCfg* pConf = &(((SVnode*)pVnode)->config); int32_t capacity = pConf->tsdbCfg.maxRows; - capacity = 16384; // for debug purpose if (pResBlock != NULL) { blockDataEnsureCapacity(pResBlock, capacity); } diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index a563897122..72d0af1726 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -681,7 +681,7 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa p = taosMemoryMalloc(pRetrieveRsp->payloadLen); int32_t t = tsDecompressString(pRetrieveRsp->data, pRetrieveRsp->compLen, 1, p, pRetrieveRsp->payloadLen, ONE_STAGE_COMP, NULL, 0); -// ASSERT(t == pRetrieveRsp->payloadLen); + ASSERT(t == pRetrieveRsp->payloadLen); pStart = p; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 3fee7d94b9..ca03a61023 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -914,20 +914,21 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { qwBuildFetchRsp(rsp, &sOutput, dataLen, rawDataLen, qComplete); { - if (/*dataLen > 8192*/ 0) { + SRetrieveTableRsp* pRsp = rsp; + + if (dataLen > 8192) { char* p = taosMemoryMalloc(dataLen); - int32_t len = - tsCompressString(((SRetrieveTableRsp *)rsp)->data, dataLen, 1, p, dataLen, ONE_STAGE_COMP, NULL, 0); - memcpy(((SRetrieveTableRsp*)rsp)->data, p, len); - ((SRetrieveTableRsp*)rsp)->payloadLen = htonl(dataLen); - ((SRetrieveTableRsp*)rsp)->compLen = htonl(len); + int32_t len = tsCompressString(pRsp->data, dataLen, 1, p, dataLen, ONE_STAGE_COMP, NULL, 0); + memcpy(pRsp->data, p, len); - ((SRetrieveTableRsp*)rsp)->compressed = 1; + pRsp->payloadLen = htonl(dataLen); + pRsp->compLen = htonl(len); + pRsp->compressed = 1; taosMemoryFree(p); } else { -// ((SRetrieveTableRsp*)rsp)->payloadLen = ((SRetrieveTableRsp*)rsp)->compLen; -// ((SRetrieveTableRsp*)rsp)->compressed = 0; + pRsp->payloadLen = pRsp->compLen; + pRsp->compressed = 0; } } @@ -949,7 +950,6 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { atomic_store_8((int8_t *)&ctx->queryContinue, 1); } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) { qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask); - atomic_store_8((int8_t *)&ctx->queryInQueue, 1); QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));