fix(query): fix error.
This commit is contained in:
parent
a2692dbba4
commit
e7aa0ca177
|
@ -83,10 +83,10 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
|
||||||
// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
|
// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
|
||||||
|
|
||||||
{
|
{
|
||||||
if (/*pBuf->allocSize > 8192*/ 0) {
|
if (pBuf->allocSize > 16384) {
|
||||||
char* p = taosMemoryMalloc(pBuf->allocSize);
|
char* p = taosMemoryMalloc(pBuf->allocSize);
|
||||||
int32_t dataLen = blockEncode(pInput->pData, p, numOfCols);
|
|
||||||
|
|
||||||
|
int32_t dataLen = blockEncode(pInput->pData, p, numOfCols);
|
||||||
int32_t len = tsCompressString(p, dataLen, 1, pEntry->data, pBuf->allocSize, ONE_STAGE_COMP, NULL, 0);
|
int32_t len = tsCompressString(p, dataLen, 1, pEntry->data, pBuf->allocSize, ONE_STAGE_COMP, NULL, 0);
|
||||||
|
|
||||||
pEntry->compressed = 1;
|
pEntry->compressed = 1;
|
||||||
|
|
|
@ -41,6 +41,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, int32_t rawData
|
||||||
rsp->numOfRows = htobe64(input->numOfRows);
|
rsp->numOfRows = htobe64(input->numOfRows);
|
||||||
rsp->numOfCols = htonl(input->numOfCols);
|
rsp->numOfCols = htonl(input->numOfCols);
|
||||||
rsp->numOfBlocks = htonl(input->numOfBlocks);
|
rsp->numOfBlocks = htonl(input->numOfBlocks);
|
||||||
|
ASSERT(rawDataLen != 100446);
|
||||||
}
|
}
|
||||||
|
|
||||||
void qwFreeFetchRsp(void *msg) {
|
void qwFreeFetchRsp(void *msg) {
|
||||||
|
|
|
@ -300,6 +300,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
|
||||||
}
|
}
|
||||||
|
|
||||||
*dataLen = 0;
|
*dataLen = 0;
|
||||||
|
*pRawDataLen = 0;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd);
|
dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd);
|
||||||
|
@ -912,26 +913,6 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||||
|
|
||||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, rawDataLen, qComplete);
|
qwBuildFetchRsp(rsp, &sOutput, dataLen, rawDataLen, qComplete);
|
||||||
|
|
||||||
{
|
|
||||||
SRetrieveTableRsp* pRsp = rsp;
|
|
||||||
|
|
||||||
if (dataLen > 8192) {
|
|
||||||
char* p = taosMemoryMalloc(dataLen);
|
|
||||||
|
|
||||||
int32_t len = tsCompressString(pRsp->data, dataLen, 1, p, dataLen, ONE_STAGE_COMP, NULL, 0);
|
|
||||||
memcpy(pRsp->data, p, len);
|
|
||||||
|
|
||||||
pRsp->payloadLen = htonl(dataLen);
|
|
||||||
pRsp->compLen = htonl(len);
|
|
||||||
pRsp->compressed = 1;
|
|
||||||
taosMemoryFree(p);
|
|
||||||
} else {
|
|
||||||
pRsp->payloadLen = pRsp->compLen;
|
|
||||||
pRsp->compressed = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (qComplete) {
|
if (qComplete) {
|
||||||
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue