From 27d223356a1df0a11a0c2d56801505e5c4923c3b Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 4 Aug 2022 20:41:30 +0800 Subject: [PATCH] enh: merge multiple blocks in one rsp msg --- include/common/tmsg.h | 1 + include/libs/executor/dataSinkMgt.h | 1 + source/libs/qworker/inc/qwInt.h | 2 + source/libs/qworker/src/qwMsg.c | 9 ++- source/libs/qworker/src/qworker.c | 114 ++++++++++++++++++---------- 5 files changed, 83 insertions(+), 44 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bfb80ec8f8..9d001c9534 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1364,6 +1364,7 @@ typedef struct { int8_t compressed; int8_t streamBlockType; int32_t compLen; + int32_t numOfBlocks; int32_t numOfRows; int32_t numOfCols; int64_t skey; diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 47177dc11b..71105d88eb 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -67,6 +67,7 @@ typedef struct SInputData { } SInputData; typedef struct SOutputData { + int32_t numOfBlocks; int32_t numOfRows; int32_t numOfCols; int8_t compressed; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 8f036714c9..729ac474e4 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -35,6 +35,7 @@ extern "C" { #define QW_DEFAULT_SHORT_RUN_TIMES 2 #define QW_DEFAULT_HEARTBEAT_MSEC 5000 #define QW_SCH_TIMEOUT_MSEC 180000 +#define QW_MIN_RES_ROWS 4096 enum { QW_PHASE_PRE_QUERY = 1, @@ -135,6 +136,7 @@ typedef struct SQWTaskCtx { int32_t msgType; int32_t fetchType; int32_t execId; + int32_t level; bool queryRsped; bool queryEnd; diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index e8ffd98153..63a3c1bfea 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -12,14 +12,16 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) { int32_t msgSize = sizeof(SRetrieveTableRsp) + length; - SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(msgSize); + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcReallocCont(*rsp, msgSize); if (NULL == pRsp) { qError("rpcMallocCont %d failed", msgSize); QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - memset(pRsp, 0, sizeof(SRetrieveTableRsp)); - + if (NULL == *rsp) { + memset(pRsp, 0, sizeof(SRetrieveTableRsp)); + } + *rsp = pRsp; return TSDB_CODE_SUCCESS; @@ -35,6 +37,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) rsp->compLen = htonl(len); rsp->numOfRows = htonl(input->numOfRows); rsp->numOfCols = htonl(input->numOfCols); + rsp->numOfBlocks = htonl(input->numOfBlocks); } void qwFreeFetchRsp(void *msg) { diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 36d85f1f12..d93c07ce1e 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -203,57 +203,88 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, SRetrieveTableRsp *rsp = NULL; bool queryEnd = false; int32_t code = 0; + SOutputData output = {0}; - dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); + *dataLen = 0; - if (len < 0) { - QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len); - QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } + while (true) { + dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); - if (len == 0) { - if (queryEnd) { - code = dsGetDataBlock(ctx->sinkHandle, pOutput); - if (code) { - QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); - QW_ERR_RET(code); - } - - QW_TASK_DLOG_E("no data in sink and query end"); - - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); - QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); - - *rspMsg = rsp; - *dataLen = 0; - return TSDB_CODE_SUCCESS; + if (len < 0) { + QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len); + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - pOutput->bufStatus = DS_BUF_EMPTY; + if (len == 0) { + if (queryEnd) { + code = dsGetDataBlock(ctx->sinkHandle, &output); + if (code) { + QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); + QW_ERR_RET(code); + } - return TSDB_CODE_SUCCESS; + QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %d", pOutput->numOfBlocks, pOutput->numOfRows); + + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); + if (NULL == rsp) { + QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); + *pOutput = output; + } else { + pOutput->queryEnd = output.queryEnd; + pOutput->bufStatus = output.bufStatus; + pOutput->useconds = output.useconds; + } + + break; + } + + pOutput->bufStatus = DS_BUF_EMPTY; + + break; + } + + // Got data from sink + QW_TASK_DLOG("there are data in sink, dataLength:%d", len); + + *dataLen += len; + + QW_ERR_RET(qwMallocFetchRsp(*dataLen, &rsp)); + + output.pData = rsp->data + *dataLen - len; + code = dsGetDataBlock(ctx->sinkHandle, &output); + if (code) { + QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); + QW_ERR_RET(code); + } + + pOutput->queryEnd = output.queryEnd; + pOutput->precision = output.precision; + pOutput->bufStatus = output.bufStatus; + pOutput->useconds = output.useconds; + pOutput->compressed = output.compressed; + pOutput->numOfCols = output.numOfCols; + pOutput->numOfRows += output.numOfRows; + pOutput->numOfBlocks++; + + if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { + QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %d", pOutput->numOfBlocks, pOutput->numOfRows); + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); + break; + } + + if (0 == ctx->level) { + QW_TASK_DLOG("task fetched blocks %d rows %d, level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level); + break; + } + + if (pOutput->numOfRows >= QW_MIN_RES_ROWS) { + QW_TASK_DLOG("task fetched blocks %d rows %d reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows); + break; + } } - // Got data from sink - QW_TASK_DLOG("there are data in sink, dataLength:%d", len); - - *dataLen = len; - - QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); *rspMsg = rsp; - pOutput->pData = rsp->data; - code = dsGetDataBlock(ctx->sinkHandle, pOutput); - if (code) { - QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); - QW_ERR_RET(code); - } - - if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { - QW_TASK_DLOG_E("task all data fetched, done"); - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); - } - return TSDB_CODE_SUCCESS; } @@ -551,6 +582,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) { // queryRsped = true; + ctx->level = plan->level; atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->sinkHandle, sinkHandle);