diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b5c0a84063..29b9e39fb2 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2498,6 +2498,7 @@ typedef struct SSubQueryMsg { int8_t taskType; int8_t explain; int8_t needFetch; + int8_t compress; uint32_t sqlLen; char* sql; uint32_t msgLen; diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index a052bc3359..130de8c030 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -55,6 +55,7 @@ typedef struct SDataSinkStat { } SDataSinkStat; typedef struct SDataSinkMgtCfg { + int8_t compress; uint32_t maxDataBlockNum; // todo: this should be numOfRows? uint32_t maxDataBlockNumPerQuery; } SDataSinkMgtCfg; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index fe20639c77..2774fce33b 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -139,8 +139,9 @@ void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam); * @param qId * @return */ -int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, - qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model); +int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pSubplan, + qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql, + EOPTR_EXEC_MODEL model); /** * diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index bbd2d76b59..ac27534ab0 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -61,6 +61,7 @@ typedef struct SQWMsgInfo { int8_t taskType; int8_t explain; int8_t needFetch; + int8_t compressMsg; } SQWMsgInfo; typedef struct SQWMsg { diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 952af3c443..1e2ccf8705 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -48,16 +48,6 @@ typedef struct SQueryProfileSummary { uint64_t resultSize; // generated result size in Kb. } SQueryProfileSummary; -typedef struct STaskInfo { - SQueryNodeAddr addr; - SSubQueryMsg* msg; -} STaskInfo; - -typedef struct SSchdFetchParam { - void** pData; - int32_t* code; -} SSchdFetchParam; - typedef void (*schedulerExecFp)(SExecResult* pResult, void* param, int32_t code); typedef void (*schedulerFetchFp)(void* pResult, void* param, int32_t code); typedef bool (*schedulerChkKillFp)(void* param); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 21dd930c8f..21d9729c48 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -122,7 +122,6 @@ struct SAppInstInfo { typedef struct SAppInfo { int64_t startTime; char appName[TSDB_APP_NAME_LEN]; - char* ep; int32_t pid; int32_t numOfThreads; SHashObj* pInstMap; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 77294e7a34..0c13265ad2 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -757,7 +757,6 @@ void taos_init_imp(void) { clientConnRefPool = taosOpenRef(200, destroyTscObj); clientReqRefPool = taosOpenRef(40960, doDestroyRequest); - // transDestroyBuffer(&conn->readBuf); taosGetAppName(appInfo.appName, NULL); taosThreadMutexInit(&appInfo.mutex, NULL); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index b7d1417451..778f149a58 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6963,6 +6963,7 @@ int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) { if (tEncodeI8(&encoder, pReq->taskType) < 0) return -1; if (tEncodeI8(&encoder, pReq->explain) < 0) return -1; if (tEncodeI8(&encoder, pReq->needFetch) < 0) return -1; + if (tEncodeI8(&encoder, pReq->compress) < 0) return -1; if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1; if (tEncodeCStrWithLen(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1; if (tEncodeU32(&encoder, pReq->msgLen) < 0) return -1; @@ -7003,6 +7004,7 @@ int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) if (tDecodeI8(&decoder, &pReq->taskType) < 0) return -1; if (tDecodeI8(&decoder, &pReq->explain) < 0) return -1; if (tDecodeI8(&decoder, &pReq->needFetch) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->compress) < 0) return -1; if (tDecodeU32(&decoder, &pReq->sqlLen) < 0) return -1; if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1; if (tDecodeU32(&decoder, &pReq->msgLen) < 0) return -1; diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h index a294d7e487..01bc762a1d 100644 --- a/source/libs/executor/inc/dataSinkInt.h +++ b/source/libs/executor/inc/dataSinkInt.h @@ -25,7 +25,6 @@ extern "C" { #include "storageapi.h" #include "tcommon.h" -struct SDataSink; struct SDataSinkHandle; typedef struct SDataSinkManager { diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 5058c652ce..51784b345b 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -82,7 +82,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn pBuf->useSize = sizeof(SDataCacheEntry); { - if (pBuf->allocSize > 16384) { + if ((pBuf->allocSize > tsCompressMsgSize) && (tsCompressMsgSize > 0) && pHandle->pManager->cfg.compress) { char* p = taosMemoryMalloc(pBuf->allocSize); int32_t dataLen = blockEncode(pInput->pData, p, numOfCols); @@ -295,6 +295,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD terrno = TSDB_CODE_OUT_OF_MEMORY; goto _return; } + *pHandle = dispatcher; return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 2f360044c9..390eee2203 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -286,7 +286,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE); + code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_QUEUE); if (code != TSDB_CODE_SUCCESS) { nodesDestroyNode((SNode*)pPlan); qDestroyTask(pTaskInfo); @@ -322,7 +322,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(readers, vgId, taskId, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM); + code = qCreateExecTask(readers, vgId, taskId, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM); if (code != TSDB_CODE_SUCCESS) { nodesDestroyNode((SNode*)pPlan); qDestroyTask(pTaskInfo); @@ -524,7 +524,8 @@ void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) { } int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, - qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) { + qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql, + EOPTR_EXEC_MODEL model) { SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; taosThreadOnce(&initPoolOnce, initRefPool); @@ -537,7 +538,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, } if (handle) { - SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50}; + SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50, .compress = compressResult}; void* pSinkManager = NULL; code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index ef9ef42c85..527df395cd 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -41,7 +41,6 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, int32_t rawData rsp->numOfRows = htobe64(input->numOfRows); rsp->numOfCols = htonl(input->numOfCols); rsp->numOfBlocks = htonl(input->numOfBlocks); - ASSERT(rawDataLen != 100446); } void qwFreeFetchRsp(void *msg) { @@ -156,6 +155,7 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve .info = *pConn, }; + rpcRsp.info.compressed = pRsp->compressed; tmsgSendRsp(&rpcRsp); return TSDB_CODE_SUCCESS; @@ -454,13 +454,15 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int qwMsg.msgInfo.explain = msg.explain; qwMsg.msgInfo.taskType = msg.taskType; qwMsg.msgInfo.needFetch = msg.needFetch; + qwMsg.msgInfo.compressMsg = msg.compress; + + QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, compress:%d, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType), + msg.compress, pMsg->info.handle, msg.sql); - QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType), - pMsg->info.handle, msg.sql); code = qwProcessQuery(QW_FPARAMS(), &qwMsg, msg.sql); msg.sql = NULL; - QW_SCH_TASK_DLOG("processQuery end, node:%p, code:%x", node, code); + QW_SCH_TASK_DLOG("processQuery end, node:%p, code:%x", node, code); tFreeSSubQueryMsg(&msg); return TSDB_CODE_SUCCESS; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 93c5628b71..c181997ad2 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -369,6 +369,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, if (output.compressed) { pOutput->compressed = output.compressed; } + pOutput->numOfCols = output.numOfCols; pOutput->numOfRows += output.numOfRows; pOutput->numOfBlocks++; @@ -723,7 +724,6 @@ _return: int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { int32_t code = 0; - bool queryRsped = false; SSubplan *plan = NULL; SQWPhaseInput input = {0}; qTaskInfo_t pTaskInfo = NULL; @@ -740,8 +740,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { ctx->queryMsgType = qwMsg->msgType; ctx->localExec = false; - // QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); - code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan); if (TSDB_CODE_SUCCESS != code) { code = TSDB_CODE_INVALID_MSG; @@ -749,14 +747,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { QW_ERR_JRET(code); } -#if 0 - SReadHandle* pReadHandle = qwMsg->node; - int64_t delay = 0; - bool fhFinish = false; - pReadHandle->api.tqReaderFn.tqGetStreamExecProgress(pReadHandle->vnode, 0, &delay, &fhFinish); -#endif - - code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH); + code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH); sql = NULL; if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); @@ -768,8 +759,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { QW_ERR_JRET(TSDB_CODE_APP_ERROR); } - //qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true); - ctx->level = plan->level; ctx->dynamicTask = qIsDynamicExecTask(pTaskInfo); atomic_store_ptr(&ctx->taskHandle, pTaskInfo); @@ -964,6 +953,7 @@ _return: qwDbgSimulateRedirect(qwMsg, ctx, &rsped); qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); } + if (!rsped) { qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); QW_TASK_DLOG("fetch rsp send, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), @@ -1235,7 +1225,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { QW_ERR_JRET(code); } - code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH); + code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_ERR_JRET(code); @@ -1437,7 +1427,7 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64 rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb)); rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle; - code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH); + code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_ERR_JRET(code); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 4b8695e8ae..d0dc04d6b4 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -20,6 +20,9 @@ #include "tmsg.h" #include "tref.h" #include "trpc.h" +#include "tglobal.h" +#include "tmisce.h" + // clang-format off int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { int32_t lastMsgType = pTask->lastMsgType; @@ -1127,6 +1130,12 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, qMsg.msgLen = pTask->msgLen; qMsg.msg = pTask->msg; + if (strcmp(tsLocalFqdn, GET_ACTIVE_EP(&addr->epSet)->fqdn) == 0) { + qMsg.compress = 0; + } else { + qMsg.compress = 1; + } + msgSize = tSerializeSSubQueryMsg(NULL, 0, &qMsg); if (msgSize < 0) { SCH_TASK_ELOG("tSerializeSSubQueryMsg get size, msgSize:%d", msgSize);