enh: remove hb and query rsp immediately
This commit is contained in:
parent
b809fd4821
commit
0ad85dc0e5
|
@ -138,6 +138,7 @@ typedef struct SQWTaskCtx {
|
|||
int32_t execId;
|
||||
int32_t level;
|
||||
|
||||
bool queryGotData;
|
||||
bool queryRsped;
|
||||
bool queryEnd;
|
||||
bool queryContinue;
|
||||
|
|
|
@ -471,8 +471,10 @@ _return:
|
|||
|
||||
if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
|
||||
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC);
|
||||
ctx->queryGotData = true;
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (QW_PHASE_POST_QUERY == phase && ctx) {
|
||||
ctx->queryRsped = true;
|
||||
|
||||
|
@ -485,6 +487,7 @@ _return:
|
|||
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
if (ctx) {
|
||||
QW_UPDATE_RSP_CODE(ctx, code);
|
||||
|
@ -531,6 +534,15 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
|
||||
_return:
|
||||
|
||||
#if 1
|
||||
qwBuildAndSendQueryRsp(qwMsg->msgType + 1, &qwMsg->connInfo, code, ctx);
|
||||
if (ctx) {
|
||||
ctx->queryRsped = true;
|
||||
ctx->phase = -1;
|
||||
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
|
||||
}
|
||||
#endif
|
||||
|
||||
if (ctx) {
|
||||
QW_UPDATE_RSP_CODE(ctx, code);
|
||||
qwReleaseTaskCtx(mgmt, ctx);
|
||||
|
@ -600,10 +612,31 @@ _return:
|
|||
input.msgType = qwMsg->msgType;
|
||||
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
|
||||
|
||||
// if (!queryRsped) {
|
||||
// qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
|
||||
// QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
||||
//}
|
||||
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
||||
void *rsp = NULL;
|
||||
int32_t dataLen = 0;
|
||||
SOutputData sOutput = {0};
|
||||
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
|
||||
|
||||
if (rsp) {
|
||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||
|
||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
||||
if (qComplete) {
|
||||
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
||||
}
|
||||
|
||||
qwMsg->connInfo = ctx->dataConnInfo;
|
||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||
|
||||
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
|
||||
rsp = NULL;
|
||||
|
||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
|
||||
tstrerror(code), dataLen);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
QW_RET(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
@ -726,7 +759,9 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
locked = true;
|
||||
|
||||
// RC WARNING
|
||||
if (QW_QUERY_RUNNING(ctx)) {
|
||||
if (-1 == ctx->phase || false == ctx->queryGotData) {
|
||||
QW_TASK_DLOG_E("task query unfinished");
|
||||
} else if (QW_QUERY_RUNNING(ctx)) {
|
||||
atomic_store_8((int8_t *)&ctx->queryContinue, 1);
|
||||
} else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
|
||||
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC);
|
||||
|
|
|
@ -867,9 +867,11 @@ int32_t schLaunchTaskImpl(void *param) {
|
|||
|
||||
SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
|
||||
|
||||
#if 0
|
||||
if (SCH_IS_QUERY_JOB(pJob)) {
|
||||
SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
|
||||
}
|
||||
#endif
|
||||
|
||||
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
|
||||
|
||||
|
|
Loading…
Reference in New Issue