feature/qnode

This commit is contained in:
dapan1121 2022-01-20 10:36:06 +08:00
parent c3bf03cc15
commit b9033a7501
2 changed files with 40 additions and 8 deletions

View File

@ -105,6 +105,7 @@ typedef struct SQWTaskCtx {
SRWLatch lock; SRWLatch lock;
int8_t phase; int8_t phase;
bool emptyRes;
int8_t queryContinue; int8_t queryContinue;
int8_t inQueue; int8_t inQueue;
int32_t rspCode; int32_t rspCode;

View File

@ -532,6 +532,20 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
bool queryEnd = false; bool queryEnd = false;
int32_t code = 0; int32_t code = 0;
if (ctx->emptyRes) {
QW_TASK_DLOG("query empty result, query end, phase:%d", ctx->phase);
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED));
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
*rspMsg = rsp;
*dataLen = 0;
pOutput->queryEnd = true;
return TSDB_CODE_SUCCESS;
}
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
if (len < 0) { if (len < 0) {
@ -644,6 +658,10 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
ctx->taskHandle = input->taskHandle; ctx->taskHandle = input->taskHandle;
ctx->sinkHandle = input->sinkHandle; ctx->sinkHandle = input->sinkHandle;
if (NULL == ctx->taskHandle && NULL == ctx->sinkHandle) {
ctx->emptyRes = true;
}
if (input->code) { if (input->code) {
QW_SET_RSP_CODE(ctx, input->code); QW_SET_RSP_CODE(ctx, input->code);
@ -885,12 +903,14 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, TSDB_CODE_SUCCESS)); QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, code));
QW_TASK_DLOG("query msg rsped, code:%d", code);
queryRsped = true; queryRsped = true;
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), pTaskInfo, sinkHandle)); if (pTaskInfo && sinkHandle) {
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), pTaskInfo, sinkHandle));
}
_return: _return:
if (code) { if (code) {
@ -899,6 +919,10 @@ _return:
if (!queryRsped) { if (!queryRsped) {
code = qwBuildAndSendQueryRsp(qwMsg->connection, rspCode); code = qwBuildAndSendQueryRsp(qwMsg->connection, rspCode);
if (TSDB_CODE_SUCCESS == code) {
QW_TASK_DLOG("query msg rsped, code:%d", rspCode);
}
if (TSDB_CODE_SUCCESS == rspCode && code) { if (TSDB_CODE_SUCCESS == rspCode && code) {
rspCode = code; rspCode = code;
} }
@ -921,7 +945,8 @@ _return:
QW_ERR_RET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, &output)); QW_ERR_RET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, &output));
if (queryRsped && output.needRsp) { if (queryRsped && output.needRsp) {
qwBuildAndSendReadyRsp(qwMsg->connection, output.rspCode); qwBuildAndSendReadyRsp(qwMsg->connection, output.rspCode);
QW_TASK_DLOG("ready msg rsped, code:%x", output.rspCode);
} }
QW_RET(rspCode); QW_RET(rspCode);
@ -940,9 +965,11 @@ int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
if (phase == QW_PHASE_PRE_QUERY) { if (phase == QW_PHASE_PRE_QUERY) {
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY);
QW_TASK_DLOG("ready msg not rsped, phase:%d", phase);
} else if (phase == QW_PHASE_POST_QUERY) { } else if (phase == QW_PHASE_POST_QUERY) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
QW_ERR_JRET(qwBuildAndSendReadyRsp(qwMsg->connection, ctx->rspCode)); QW_ERR_JRET(qwBuildAndSendReadyRsp(qwMsg->connection, ctx->rspCode));
QW_TASK_DLOG("ready msg rsped, code:%x", ctx->rspCode);
} else { } else {
QW_TASK_ELOG("invalid phase when got ready msg, phase:%d", phase); QW_TASK_ELOG("invalid phase when got ready msg, phase:%d", phase);
assert(0); assert(0);
@ -1009,7 +1036,8 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code); qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code);
QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen);
} else { } else {
atomic_store_8(&ctx->queryContinue, 1); atomic_store_8(&ctx->queryContinue, 1);
} }
@ -1022,6 +1050,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
qwFreeFetchRsp(rsp); qwFreeFetchRsp(rsp);
rsp = NULL; rsp = NULL;
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code); qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code);
QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, 0);
} }
input.code = code; input.code = code;
@ -1072,7 +1101,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
qwBuildFetchRsp(rsp, &sOutput, dataLen); qwBuildFetchRsp(rsp, &sOutput, dataLen);
} }
if ((!sOutput.queryEnd) && (/* DS_BUF_LOW == sOutput.bufStatus || */ DS_BUF_EMPTY == sOutput.bufStatus)) { if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", sOutput.bufStatus); QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", sOutput.bufStatus);
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
@ -1103,12 +1132,14 @@ _return:
if (code) { if (code) {
qwFreeFetchRsp(rsp); qwFreeFetchRsp(rsp);
rsp = NULL; rsp = NULL;
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code); dataLen = 0;
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code);
QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen);
} else if (rsp) { } else if (rsp) {
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code); qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code);
QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen);
} }
QW_RET(code); QW_RET(code);
} }