fix(query): check for the server ep and client ep to set the compress flag.

This commit is contained in:
Haojun Liao 2024-06-04 15:07:02 +08:00
parent 5a91039f04
commit 6d7e1eb576
14 changed files with 35 additions and 39 deletions

View File

@ -2498,6 +2498,7 @@ typedef struct SSubQueryMsg {
int8_t taskType;
int8_t explain;
int8_t needFetch;
int8_t compress;
uint32_t sqlLen;
char* sql;
uint32_t msgLen;

View File

@ -55,6 +55,7 @@ typedef struct SDataSinkStat {
} SDataSinkStat;
typedef struct SDataSinkMgtCfg {
int8_t compress;
uint32_t maxDataBlockNum; // todo: this should be numOfRows?
uint32_t maxDataBlockNumPerQuery;
} SDataSinkMgtCfg;

View File

@ -139,8 +139,9 @@ void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam);
* @param qId
* @return
*/
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model);
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql,
EOPTR_EXEC_MODEL model);
/**
*

View File

@ -61,6 +61,7 @@ typedef struct SQWMsgInfo {
int8_t taskType;
int8_t explain;
int8_t needFetch;
int8_t compressMsg;
} SQWMsgInfo;
typedef struct SQWMsg {

View File

@ -48,16 +48,6 @@ typedef struct SQueryProfileSummary {
uint64_t resultSize; // generated result size in Kb.
} SQueryProfileSummary;
typedef struct STaskInfo {
SQueryNodeAddr addr;
SSubQueryMsg* msg;
} STaskInfo;
typedef struct SSchdFetchParam {
void** pData;
int32_t* code;
} SSchdFetchParam;
typedef void (*schedulerExecFp)(SExecResult* pResult, void* param, int32_t code);
typedef void (*schedulerFetchFp)(void* pResult, void* param, int32_t code);
typedef bool (*schedulerChkKillFp)(void* param);

View File

@ -122,7 +122,6 @@ struct SAppInstInfo {
typedef struct SAppInfo {
int64_t startTime;
char appName[TSDB_APP_NAME_LEN];
char* ep;
int32_t pid;
int32_t numOfThreads;
SHashObj* pInstMap;

View File

@ -757,7 +757,6 @@ void taos_init_imp(void) {
clientConnRefPool = taosOpenRef(200, destroyTscObj);
clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
// transDestroyBuffer(&conn->readBuf);
taosGetAppName(appInfo.appName, NULL);
taosThreadMutexInit(&appInfo.mutex, NULL);

View File

@ -6963,6 +6963,7 @@ int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) {
if (tEncodeI8(&encoder, pReq->taskType) < 0) return -1;
if (tEncodeI8(&encoder, pReq->explain) < 0) return -1;
if (tEncodeI8(&encoder, pReq->needFetch) < 0) return -1;
if (tEncodeI8(&encoder, pReq->compress) < 0) return -1;
if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1;
if (tEncodeCStrWithLen(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1;
if (tEncodeU32(&encoder, pReq->msgLen) < 0) return -1;
@ -7003,6 +7004,7 @@ int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq)
if (tDecodeI8(&decoder, &pReq->taskType) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->explain) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->needFetch) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->compress) < 0) return -1;
if (tDecodeU32(&decoder, &pReq->sqlLen) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1;
if (tDecodeU32(&decoder, &pReq->msgLen) < 0) return -1;

View File

@ -25,7 +25,6 @@ extern "C" {
#include "storageapi.h"
#include "tcommon.h"
struct SDataSink;
struct SDataSinkHandle;
typedef struct SDataSinkManager {

View File

@ -82,7 +82,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
pBuf->useSize = sizeof(SDataCacheEntry);
{
if (pBuf->allocSize > 16384) {
if ((pBuf->allocSize > tsCompressMsgSize) && (tsCompressMsgSize > 0) && pHandle->pManager->cfg.compress) {
char* p = taosMemoryMalloc(pBuf->allocSize);
int32_t dataLen = blockEncode(pInput->pData, p, numOfCols);
@ -295,6 +295,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _return;
}
*pHandle = dispatcher;
return TSDB_CODE_SUCCESS;

View File

@ -286,7 +286,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_QUEUE);
if (code != TSDB_CODE_SUCCESS) {
nodesDestroyNode((SNode*)pPlan);
qDestroyTask(pTaskInfo);
@ -322,7 +322,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(readers, vgId, taskId, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
code = qCreateExecTask(readers, vgId, taskId, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM);
if (code != TSDB_CODE_SUCCESS) {
nodesDestroyNode((SNode*)pPlan);
qDestroyTask(pTaskInfo);
@ -524,7 +524,8 @@ void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) {
}
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) {
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql,
EOPTR_EXEC_MODEL model) {
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
taosThreadOnce(&initPoolOnce, initRefPool);
@ -537,7 +538,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
}
if (handle) {
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50, .compress = compressResult};
void* pSinkManager = NULL;
code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -41,7 +41,6 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, int32_t rawData
rsp->numOfRows = htobe64(input->numOfRows);
rsp->numOfCols = htonl(input->numOfCols);
rsp->numOfBlocks = htonl(input->numOfBlocks);
ASSERT(rawDataLen != 100446);
}
void qwFreeFetchRsp(void *msg) {
@ -156,6 +155,7 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve
.info = *pConn,
};
rpcRsp.info.compressed = pRsp->compressed;
tmsgSendRsp(&rpcRsp);
return TSDB_CODE_SUCCESS;
@ -454,13 +454,15 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
qwMsg.msgInfo.explain = msg.explain;
qwMsg.msgInfo.taskType = msg.taskType;
qwMsg.msgInfo.needFetch = msg.needFetch;
qwMsg.msgInfo.compressMsg = msg.compress;
QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, compress:%d, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType),
msg.compress, pMsg->info.handle, msg.sql);
QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType),
pMsg->info.handle, msg.sql);
code = qwProcessQuery(QW_FPARAMS(), &qwMsg, msg.sql);
msg.sql = NULL;
QW_SCH_TASK_DLOG("processQuery end, node:%p, code:%x", node, code);
QW_SCH_TASK_DLOG("processQuery end, node:%p, code:%x", node, code);
tFreeSSubQueryMsg(&msg);
return TSDB_CODE_SUCCESS;

View File

@ -369,6 +369,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
if (output.compressed) {
pOutput->compressed = output.compressed;
}
pOutput->numOfCols = output.numOfCols;
pOutput->numOfRows += output.numOfRows;
pOutput->numOfBlocks++;
@ -723,7 +724,6 @@ _return:
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
int32_t code = 0;
bool queryRsped = false;
SSubplan *plan = NULL;
SQWPhaseInput input = {0};
qTaskInfo_t pTaskInfo = NULL;
@ -740,8 +740,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
ctx->queryMsgType = qwMsg->msgType;
ctx->localExec = false;
// QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
if (TSDB_CODE_SUCCESS != code) {
code = TSDB_CODE_INVALID_MSG;
@ -749,14 +747,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
QW_ERR_JRET(code);
}
#if 0
SReadHandle* pReadHandle = qwMsg->node;
int64_t delay = 0;
bool fhFinish = false;
pReadHandle->api.tqReaderFn.tqGetStreamExecProgress(pReadHandle->vnode, 0, &delay, &fhFinish);
#endif
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH);
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH);
sql = NULL;
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
@ -768,8 +759,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
QW_ERR_JRET(TSDB_CODE_APP_ERROR);
}
//qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true);
ctx->level = plan->level;
ctx->dynamicTask = qIsDynamicExecTask(pTaskInfo);
atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
@ -964,6 +953,7 @@ _return:
qwDbgSimulateRedirect(qwMsg, ctx, &rsped);
qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
}
if (!rsped) {
qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("fetch rsp send, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1),
@ -1235,7 +1225,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
QW_ERR_JRET(code);
}
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH);
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
@ -1437,7 +1427,7 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64
rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb));
rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle;
code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH);
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);

View File

@ -20,6 +20,9 @@
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
#include "tglobal.h"
#include "tmisce.h"
// clang-format off
int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
int32_t lastMsgType = pTask->lastMsgType;
@ -1127,6 +1130,12 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
qMsg.msgLen = pTask->msgLen;
qMsg.msg = pTask->msg;
if (strcmp(tsLocalFqdn, GET_ACTIVE_EP(&addr->epSet)->fqdn) == 0) {
qMsg.compress = 0;
} else {
qMsg.compress = 1;
}
msgSize = tSerializeSSubQueryMsg(NULL, 0, &qMsg);
if (msgSize < 0) {
SCH_TASK_ELOG("tSerializeSSubQueryMsg get size, msgSize:%d", msgSize);