From 43bec6c00f4df76b0c9ff51257325c8161f304d9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 18 May 2024 09:47:31 +0800 Subject: [PATCH] fix(query): fix error in refactor. --- source/client/src/clientImpl.c | 16 ++++++++---- source/client/test/clientTests.cpp | 10 +++---- source/libs/executor/src/dataDispatcher.c | 3 +-- source/libs/executor/src/exchangeoperator.c | 26 ++++++++++++++---- source/libs/qworker/src/qworker.c | 29 ++++++++++----------- 5 files changed, 52 insertions(+), 32 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 7c5aeeb375..bc5bffdb48 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2218,15 +2218,21 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR pResultInfo->decompBufSize = payloadLen; } } + } - int32_t len = tsDecompressString((void*)pRsp->data, htonl(pRsp->compLen), 1, pResultInfo->decompBuf, payloadLen, - ONE_STAGE_COMP, NULL, 0); - ASSERT(len == payloadLen); + int32_t compLen = *(int32_t*)pRsp->data; + int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t)); + + char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2; + + if (pRsp->compressed && compLen < rawLen) { + int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0); + ASSERT(len == rawLen); pResultInfo->pData = pResultInfo->decompBuf; - pResultInfo->payloadLen = payloadLen; + pResultInfo->payloadLen = rawLen; } else { - pResultInfo->pData = (void*)pRsp->data; + pResultInfo->pData = pStart; pResultInfo->payloadLen = htonl(pRsp->compLen); ASSERT(pRsp->compLen == pRsp->payloadLen); } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index a22716df59..61a97f2c24 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -901,13 +901,13 @@ TEST(clientCase, tsbs_perf_test) { } TEST(clientCase, projection_query_stables) { - TAOS* pConn = taos_connect("192.168.1.53", "root", "taosdata", NULL, 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); - TAOS_RES* pRes = taos_query(pConn, "use qzdata"); + TAOS_RES* pRes = taos_query(pConn, "use abc1"); taos_free_result(pRes); - pRes = taos_query(pConn, "select * from `QZ_212_DYS_02`"); + pRes = taos_query(pConn, "select * from st2"); if (taos_errno(pRes) != 0) { printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); @@ -924,8 +924,8 @@ TEST(clientCase, projection_query_stables) { char str[512] = {0}; while (1) { - int32_t c = taos_fetch_block_s(pRes, &numOfRows, &pRow); - if (numOfRows <= 0) { + pRow = taos_fetch_row(pRes); + if (pRow == NULL) { break; } i += numOfRows; diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index eb6a5d9f9e..ea65db7779 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -77,10 +77,9 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn pEntry->numOfRows = pInput->pData->info.rows; pEntry->numOfCols = numOfCols; pEntry->dataLen = 0; + pEntry->rawLen = 0; pBuf->useSize = sizeof(SDataCacheEntry); - // ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); - // ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); { if (pBuf->allocSize > 16384) { diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index dff92eb52d..4c2976ed94 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -677,7 +677,9 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) { SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; - char* pStart = pRetrieveRsp->data; + char* pNextStart = pRetrieveRsp->data; + char* pStart = pNextStart; + int32_t index = 0; int32_t code = 0; @@ -694,14 +696,13 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa } } } - int32_t t = tsDecompressString(pRetrieveRsp->data, pRetrieveRsp->compLen, 1, pDataInfo->decompBuf, - pRetrieveRsp->payloadLen, ONE_STAGE_COMP, NULL, 0); - ASSERT(t == pRetrieveRsp->payloadLen); - pStart = pDataInfo->decompBuf; } + while (index++ < pRetrieveRsp->numOfBlocks) { SSDataBlock* pb = NULL; + pStart = pNextStart; + if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) { pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks); blockDataCleanup(pb); @@ -709,6 +710,21 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); } + int32_t compLen = *(int32_t*) pStart; + pStart += sizeof(int32_t); + + int32_t rawLen = *(int32_t*) pStart; + pStart += sizeof(int32_t); + ASSERT(compLen <= rawLen && compLen != 0); + + if (pRetrieveRsp->compressed && (compLen < rawLen)) { + int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0); + ASSERT(t == rawLen); + + pNextStart = pStart + compLen; + pStart = pDataInfo->decompBuf; + } + code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart); if (code != 0) { taosMemoryFreeClear(pDataInfo->pRsp); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 89b42873de..b799a72cce 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -289,7 +289,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, SOutputData *pOutput) { int64_t len = 0; int64_t rawLen = 0; - SRetrieveTableRsp *rsp = NULL; + SRetrieveTableRsp *pRsp = NULL; bool queryEnd = false; int32_t code = 0; SOutputData output = {0}; @@ -325,32 +325,36 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask); } - if (NULL == rsp) { - QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &rsp)); + if (NULL == pRsp) { + QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &pRsp)); *pOutput = output; } else { pOutput->queryEnd = output.queryEnd; pOutput->bufStatus = output.bufStatus; pOutput->useconds = output.useconds; } - break; } pOutput->bufStatus = DS_BUF_EMPTY; - break; } // Got data from sink QW_TASK_DLOG("there are data in sink, dataLength:%" PRId64 "", len); - *dataLen += len; - *pRawDataLen += rawLen; + *dataLen += len + sizeof(int32_t) * 2; + *pRawDataLen += rawLen + sizeof(int32_t) * 2; - QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &rsp)); + QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &pRsp)); + + // set the serialize start position + output.pData = pRsp->data + *dataLen - (len + sizeof(int32_t) * 2); + + ((int32_t*) output.pData)[0] = len; + ((int32_t*) output.pData)[1] = rawLen; + output.pData += sizeof(int32_t) * 2; - output.pData = rsp->data + *dataLen - len; code = dsGetDataBlock(ctx->sinkHandle, &output); if (code) { QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); @@ -386,8 +390,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, } } - *rspMsg = rsp; - + *rspMsg = pRsp; return TSDB_CODE_SUCCESS; } @@ -1106,7 +1109,6 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { _return: memcpy(&rsp.epId, &req->epId, sizeof(req->epId)); - qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); if (code) { @@ -1133,7 +1135,6 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { } SQWSchStatus *sch = NULL; - int32_t taskNum = 0; SQWHbInfo *rspList = NULL; SArray *pExpiredSch = NULL; int32_t code = 0; @@ -1166,8 +1167,6 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { return; } - void *key = NULL; - size_t keyLen = 0; int32_t i = 0; int64_t currentMs = taosGetTimestampMs();