enh: merge multiple blocks in one rsp msg

This commit is contained in:
dapan1121 2022-08-04 20:41:30 +08:00
parent 3a5fcefc8d
commit 27d223356a
5 changed files with 83 additions and 44 deletions

View File

@ -1364,6 +1364,7 @@ typedef struct {
int8_t compressed; int8_t compressed;
int8_t streamBlockType; int8_t streamBlockType;
int32_t compLen; int32_t compLen;
int32_t numOfBlocks;
int32_t numOfRows; int32_t numOfRows;
int32_t numOfCols; int32_t numOfCols;
int64_t skey; int64_t skey;

View File

@ -67,6 +67,7 @@ typedef struct SInputData {
} SInputData; } SInputData;
typedef struct SOutputData { typedef struct SOutputData {
int32_t numOfBlocks;
int32_t numOfRows; int32_t numOfRows;
int32_t numOfCols; int32_t numOfCols;
int8_t compressed; int8_t compressed;

View File

@ -35,6 +35,7 @@ extern "C" {
#define QW_DEFAULT_SHORT_RUN_TIMES 2 #define QW_DEFAULT_SHORT_RUN_TIMES 2
#define QW_DEFAULT_HEARTBEAT_MSEC 5000 #define QW_DEFAULT_HEARTBEAT_MSEC 5000
#define QW_SCH_TIMEOUT_MSEC 180000 #define QW_SCH_TIMEOUT_MSEC 180000
#define QW_MIN_RES_ROWS 4096
enum { enum {
QW_PHASE_PRE_QUERY = 1, QW_PHASE_PRE_QUERY = 1,
@ -135,6 +136,7 @@ typedef struct SQWTaskCtx {
int32_t msgType; int32_t msgType;
int32_t fetchType; int32_t fetchType;
int32_t execId; int32_t execId;
int32_t level;
bool queryRsped; bool queryRsped;
bool queryEnd; bool queryEnd;

View File

@ -12,13 +12,15 @@
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) { int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
int32_t msgSize = sizeof(SRetrieveTableRsp) + length; int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(msgSize); SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcReallocCont(*rsp, msgSize);
if (NULL == pRsp) { if (NULL == pRsp) {
qError("rpcMallocCont %d failed", msgSize); qError("rpcMallocCont %d failed", msgSize);
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
if (NULL == *rsp) {
memset(pRsp, 0, sizeof(SRetrieveTableRsp)); memset(pRsp, 0, sizeof(SRetrieveTableRsp));
}
*rsp = pRsp; *rsp = pRsp;
@ -35,6 +37,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete)
rsp->compLen = htonl(len); rsp->compLen = htonl(len);
rsp->numOfRows = htonl(input->numOfRows); rsp->numOfRows = htonl(input->numOfRows);
rsp->numOfCols = htonl(input->numOfCols); rsp->numOfCols = htonl(input->numOfCols);
rsp->numOfBlocks = htonl(input->numOfBlocks);
} }
void qwFreeFetchRsp(void *msg) { void qwFreeFetchRsp(void *msg) {

View File

@ -203,7 +203,11 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
SRetrieveTableRsp *rsp = NULL; SRetrieveTableRsp *rsp = NULL;
bool queryEnd = false; bool queryEnd = false;
int32_t code = 0; int32_t code = 0;
SOutputData output = {0};
*dataLen = 0;
while (true) {
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
if (len < 0) { if (len < 0) {
@ -213,47 +217,74 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
if (len == 0) { if (len == 0) {
if (queryEnd) { if (queryEnd) {
code = dsGetDataBlock(ctx->sinkHandle, pOutput); code = dsGetDataBlock(ctx->sinkHandle, &output);
if (code) { if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
QW_ERR_RET(code); QW_ERR_RET(code);
} }
QW_TASK_DLOG_E("no data in sink and query end"); QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %d", pOutput->numOfBlocks, pOutput->numOfRows);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
if (NULL == rsp) {
QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
*pOutput = output;
} else {
pOutput->queryEnd = output.queryEnd;
pOutput->bufStatus = output.bufStatus;
pOutput->useconds = output.useconds;
}
*rspMsg = rsp; break;
*dataLen = 0;
return TSDB_CODE_SUCCESS;
} }
pOutput->bufStatus = DS_BUF_EMPTY; pOutput->bufStatus = DS_BUF_EMPTY;
return TSDB_CODE_SUCCESS; break;
} }
// Got data from sink // Got data from sink
QW_TASK_DLOG("there are data in sink, dataLength:%d", len); QW_TASK_DLOG("there are data in sink, dataLength:%d", len);
*dataLen = len; *dataLen += len;
QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); QW_ERR_RET(qwMallocFetchRsp(*dataLen, &rsp));
*rspMsg = rsp;
pOutput->pData = rsp->data; output.pData = rsp->data + *dataLen - len;
code = dsGetDataBlock(ctx->sinkHandle, pOutput); code = dsGetDataBlock(ctx->sinkHandle, &output);
if (code) { if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
QW_ERR_RET(code); QW_ERR_RET(code);
} }
pOutput->queryEnd = output.queryEnd;
pOutput->precision = output.precision;
pOutput->bufStatus = output.bufStatus;
pOutput->useconds = output.useconds;
pOutput->compressed = output.compressed;
pOutput->numOfCols = output.numOfCols;
pOutput->numOfRows += output.numOfRows;
pOutput->numOfBlocks++;
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
QW_TASK_DLOG_E("task all data fetched, done"); QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %d", pOutput->numOfBlocks, pOutput->numOfRows);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
break;
} }
if (0 == ctx->level) {
QW_TASK_DLOG("task fetched blocks %d rows %d, level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level);
break;
}
if (pOutput->numOfRows >= QW_MIN_RES_ROWS) {
QW_TASK_DLOG("task fetched blocks %d rows %d reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows);
break;
}
}
*rspMsg = rsp;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -551,6 +582,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) {
// queryRsped = true; // queryRsped = true;
ctx->level = plan->level;
atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
atomic_store_ptr(&ctx->sinkHandle, sinkHandle); atomic_store_ptr(&ctx->sinkHandle, sinkHandle);