fix(query): fix error in refactor.

This commit is contained in:
Haojun Liao 2024-05-18 09:47:31 +08:00
parent e7aa0ca177
commit 43bec6c00f
5 changed files with 52 additions and 32 deletions

View File

@ -2218,15 +2218,21 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
pResultInfo->decompBufSize = payloadLen;
}
}
}
int32_t len = tsDecompressString((void*)pRsp->data, htonl(pRsp->compLen), 1, pResultInfo->decompBuf, payloadLen,
ONE_STAGE_COMP, NULL, 0);
ASSERT(len == payloadLen);
int32_t compLen = *(int32_t*)pRsp->data;
int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
if (pRsp->compressed && compLen < rawLen) {
int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
ASSERT(len == rawLen);
pResultInfo->pData = pResultInfo->decompBuf;
pResultInfo->payloadLen = payloadLen;
pResultInfo->payloadLen = rawLen;
} else {
pResultInfo->pData = (void*)pRsp->data;
pResultInfo->pData = pStart;
pResultInfo->payloadLen = htonl(pRsp->compLen);
ASSERT(pRsp->compLen == pRsp->payloadLen);
}

View File

@ -901,13 +901,13 @@ TEST(clientCase, tsbs_perf_test) {
}
TEST(clientCase, projection_query_stables) {
TAOS* pConn = taos_connect("192.168.1.53", "root", "taosdata", NULL, 0);
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use qzdata");
TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "select * from `QZ_212_DYS_02`");
pRes = taos_query(pConn, "select * from st2");
if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
@ -924,8 +924,8 @@ TEST(clientCase, projection_query_stables) {
char str[512] = {0};
while (1) {
int32_t c = taos_fetch_block_s(pRes, &numOfRows, &pRow);
if (numOfRows <= 0) {
pRow = taos_fetch_row(pRes);
if (pRow == NULL) {
break;
}
i += numOfRows;

View File

@ -77,10 +77,9 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
pEntry->numOfRows = pInput->pData->info.rows;
pEntry->numOfCols = numOfCols;
pEntry->dataLen = 0;
pEntry->rawLen = 0;
pBuf->useSize = sizeof(SDataCacheEntry);
// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
{
if (pBuf->allocSize > 16384) {

View File

@ -677,7 +677,9 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
char* pStart = pRetrieveRsp->data;
char* pNextStart = pRetrieveRsp->data;
char* pStart = pNextStart;
int32_t index = 0;
int32_t code = 0;
@ -694,14 +696,13 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa
}
}
}
int32_t t = tsDecompressString(pRetrieveRsp->data, pRetrieveRsp->compLen, 1, pDataInfo->decompBuf,
pRetrieveRsp->payloadLen, ONE_STAGE_COMP, NULL, 0);
ASSERT(t == pRetrieveRsp->payloadLen);
pStart = pDataInfo->decompBuf;
}
while (index++ < pRetrieveRsp->numOfBlocks) {
SSDataBlock* pb = NULL;
pStart = pNextStart;
if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
blockDataCleanup(pb);
@ -709,6 +710,21 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa
pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
}
int32_t compLen = *(int32_t*) pStart;
pStart += sizeof(int32_t);
int32_t rawLen = *(int32_t*) pStart;
pStart += sizeof(int32_t);
ASSERT(compLen <= rawLen && compLen != 0);
if (pRetrieveRsp->compressed && (compLen < rawLen)) {
int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
ASSERT(t == rawLen);
pNextStart = pStart + compLen;
pStart = pDataInfo->decompBuf;
}
code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
if (code != 0) {
taosMemoryFreeClear(pDataInfo->pRsp);

View File

@ -289,7 +289,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
SOutputData *pOutput) {
int64_t len = 0;
int64_t rawLen = 0;
SRetrieveTableRsp *rsp = NULL;
SRetrieveTableRsp *pRsp = NULL;
bool queryEnd = false;
int32_t code = 0;
SOutputData output = {0};
@ -325,32 +325,36 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask);
}
if (NULL == rsp) {
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &rsp));
if (NULL == pRsp) {
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &pRsp));
*pOutput = output;
} else {
pOutput->queryEnd = output.queryEnd;
pOutput->bufStatus = output.bufStatus;
pOutput->useconds = output.useconds;
}
break;
}
pOutput->bufStatus = DS_BUF_EMPTY;
break;
}
// Got data from sink
QW_TASK_DLOG("there are data in sink, dataLength:%" PRId64 "", len);
*dataLen += len;
*pRawDataLen += rawLen;
*dataLen += len + sizeof(int32_t) * 2;
*pRawDataLen += rawLen + sizeof(int32_t) * 2;
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &rsp));
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &pRsp));
// set the serialize start position
output.pData = pRsp->data + *dataLen - (len + sizeof(int32_t) * 2);
((int32_t*) output.pData)[0] = len;
((int32_t*) output.pData)[1] = rawLen;
output.pData += sizeof(int32_t) * 2;
output.pData = rsp->data + *dataLen - len;
code = dsGetDataBlock(ctx->sinkHandle, &output);
if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
@ -386,8 +390,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
}
}
*rspMsg = rsp;
*rspMsg = pRsp;
return TSDB_CODE_SUCCESS;
}
@ -1106,7 +1109,6 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
_return:
memcpy(&rsp.epId, &req->epId, sizeof(req->epId));
qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
if (code) {
@ -1133,7 +1135,6 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
}
SQWSchStatus *sch = NULL;
int32_t taskNum = 0;
SQWHbInfo *rspList = NULL;
SArray *pExpiredSch = NULL;
int32_t code = 0;
@ -1166,8 +1167,6 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
return;
}
void *key = NULL;
size_t keyLen = 0;
int32_t i = 0;
int64_t currentMs = taosGetTimestampMs();