refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-05-17 17:04:05 +08:00
parent 15181eb14b
commit 27c4e3634c
4 changed files with 12 additions and 13 deletions

View File

@ -2224,7 +2224,7 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
} else { } else {
pResultInfo->pData = (void*)pRsp->data; pResultInfo->pData = (void*)pRsp->data;
pResultInfo->payloadLen = htonl(pRsp->compLen); pResultInfo->payloadLen = htonl(pRsp->compLen);
// ASSERT(pRsp->compLen == pRsp->payloadLen); ASSERT(pRsp->compLen == pRsp->payloadLen);
} }
// TODO handle the compressed case // TODO handle the compressed case

View File

@ -4242,7 +4242,6 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
SVnodeCfg* pConf = &(((SVnode*)pVnode)->config); SVnodeCfg* pConf = &(((SVnode*)pVnode)->config);
int32_t capacity = pConf->tsdbCfg.maxRows; int32_t capacity = pConf->tsdbCfg.maxRows;
capacity = 16384; // for debug purpose
if (pResBlock != NULL) { if (pResBlock != NULL) {
blockDataEnsureCapacity(pResBlock, capacity); blockDataEnsureCapacity(pResBlock, capacity);
} }

View File

@ -681,7 +681,7 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa
p = taosMemoryMalloc(pRetrieveRsp->payloadLen); p = taosMemoryMalloc(pRetrieveRsp->payloadLen);
int32_t t = tsDecompressString(pRetrieveRsp->data, pRetrieveRsp->compLen, 1, p, pRetrieveRsp->payloadLen, int32_t t = tsDecompressString(pRetrieveRsp->data, pRetrieveRsp->compLen, 1, p, pRetrieveRsp->payloadLen,
ONE_STAGE_COMP, NULL, 0); ONE_STAGE_COMP, NULL, 0);
// ASSERT(t == pRetrieveRsp->payloadLen); ASSERT(t == pRetrieveRsp->payloadLen);
pStart = p; pStart = p;
} }

View File

@ -914,20 +914,21 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
qwBuildFetchRsp(rsp, &sOutput, dataLen, rawDataLen, qComplete); qwBuildFetchRsp(rsp, &sOutput, dataLen, rawDataLen, qComplete);
{ {
if (/*dataLen > 8192*/ 0) { SRetrieveTableRsp* pRsp = rsp;
if (dataLen > 8192) {
char* p = taosMemoryMalloc(dataLen); char* p = taosMemoryMalloc(dataLen);
int32_t len =
tsCompressString(((SRetrieveTableRsp *)rsp)->data, dataLen, 1, p, dataLen, ONE_STAGE_COMP, NULL, 0);
memcpy(((SRetrieveTableRsp*)rsp)->data, p, len); int32_t len = tsCompressString(pRsp->data, dataLen, 1, p, dataLen, ONE_STAGE_COMP, NULL, 0);
((SRetrieveTableRsp*)rsp)->payloadLen = htonl(dataLen); memcpy(pRsp->data, p, len);
((SRetrieveTableRsp*)rsp)->compLen = htonl(len);
((SRetrieveTableRsp*)rsp)->compressed = 1; pRsp->payloadLen = htonl(dataLen);
pRsp->compLen = htonl(len);
pRsp->compressed = 1;
taosMemoryFree(p); taosMemoryFree(p);
} else { } else {
// ((SRetrieveTableRsp*)rsp)->payloadLen = ((SRetrieveTableRsp*)rsp)->compLen; pRsp->payloadLen = pRsp->compLen;
// ((SRetrieveTableRsp*)rsp)->compressed = 0; pRsp->compressed = 0;
} }
} }
@ -949,7 +950,6 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
atomic_store_8((int8_t *)&ctx->queryContinue, 1); atomic_store_8((int8_t *)&ctx->queryContinue, 1);
} else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) { } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask);
atomic_store_8((int8_t *)&ctx->queryInQueue, 1); atomic_store_8((int8_t *)&ctx->queryInQueue, 1);
QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo)); QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));