From 6e1fe0ae38cdf5296c2c1819adc9bf72c682d4d6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 16 May 2024 23:30:35 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/executor/src/dataSinkMgt.c | 3 ++- source/libs/executor/src/executor.c | 2 +- source/libs/qworker/inc/qwInt.h | 2 +- source/libs/qworker/src/qworker.c | 8 ++++---- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 16d0d8f340..e711ffdf5c 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -18,13 +18,14 @@ #include "planner.h" #include "tarray.h" -SDataSinkStat gDataSinkStat = {0}; +SDataSinkStat gDataSinkStat = {0}; int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI, void** ppSinkManager) { SDataSinkManager* pSinkManager = taosMemoryMalloc(sizeof(SDataSinkManager)); if (NULL == pSinkManager) { return TSDB_CODE_OUT_OF_MEMORY; } + pSinkManager->cfg = *cfg; pSinkManager->pAPI = pAPI; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 29f667cb66..889a9b62e1 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -537,7 +537,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, } if (handle) { - SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50}; + SDataSinkMgtCfg cfg = {.maxDataBlockNum = 5000, .maxDataBlockNumPerQuery = 500}; void* pSinkManager = NULL; code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index b4bd1943c5..2222f9cb31 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -35,7 +35,7 @@ extern "C" { #define QW_DEFAULT_SHORT_RUN_TIMES 2 #define QW_DEFAULT_HEARTBEAT_MSEC 5000 #define QW_SCH_TIMEOUT_MSEC 180000 -#define QW_MIN_RES_ROWS 4096 +#define QW_MIN_RES_ROWS 16384 enum { QW_PHASE_PRE_QUERY = 1, diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 2a24deafd2..3fee7d94b9 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -378,11 +378,11 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, break; } -// if (pOutput->numOfRows >= QW_MIN_RES_ROWS) { -// QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks, -// pOutput->numOfRows); + if (pOutput->numOfRows >= QW_MIN_RES_ROWS) { + QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks, + pOutput->numOfRows); break; -// } + } } *rspMsg = rsp;