feature/qnode

This commit is contained in:
dapan1121 2022-04-09 18:12:15 +08:00
parent 3295507835
commit c33f31b6d5
4 changed files with 15 additions and 12 deletions

View File

@ -52,14 +52,12 @@ void qExplainFreeCtx(SExplainCtx *pCtx) {
void *pIter = taosHashIterate(pCtx->groupHash, NULL); void *pIter = taosHashIterate(pCtx->groupHash, NULL);
while (pIter) { while (pIter) {
SExplainGroup *group = (SExplainGroup *)pIter; SExplainGroup *group = (SExplainGroup *)pIter;
if (NULL == group->nodeExecInfo) { if (group->nodeExecInfo) {
continue; int32_t num = taosArrayGetSize(group->nodeExecInfo);
} for (int32_t i = 0; i < num; ++i) {
SExplainRsp *rsp = taosArrayGet(group->nodeExecInfo, i);
int32_t num = taosArrayGetSize(group->nodeExecInfo); taosMemoryFreeClear(rsp->subplanInfo);
for (int32_t i = 0; i < num; ++i) { }
SExplainRsp *rsp = taosArrayGet(group->nodeExecInfo, i);
taosMemoryFreeClear(rsp->subplanInfo);
} }
pIter = taosHashIterate(pCtx->groupHash, pIter); pIter = taosHashIterate(pCtx->groupHash, pIter);

View File

@ -79,6 +79,7 @@ typedef struct SQWConnInfo {
typedef struct SQWMsg { typedef struct SQWMsg {
void *node; void *node;
int32_t code;
char *msg; char *msg;
int32_t msgLen; int32_t msgLen;
SQWConnInfo connInfo; SQWConnInfo connInfo;

View File

@ -536,6 +536,8 @@ int32_t qwDropTask(QW_FPARAMS_DEF) {
QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS())); QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS())); QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS()));
QW_TASK_DLOG_E("task is dropped");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1239,8 +1241,10 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
} else if (ctx->phase > 0) { } else if (ctx->phase > 0) {
qwBuildAndSendDropRsp(&qwMsg->connInfo, code); if (0 == qwMsg->code) {
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
}
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
rsped = true; rsped = true;
@ -1273,7 +1277,7 @@ _return:
qwReleaseTaskCtx(mgmt, ctx); qwReleaseTaskCtx(mgmt, ctx);
} }
if (TSDB_CODE_SUCCESS != code) { if ((TSDB_CODE_SUCCESS != code) && (0 == qwMsg->code)) {
qwBuildAndSendDropRsp(&qwMsg->connInfo, code); qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
} }

View File

@ -549,7 +549,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t tId = msg->taskId; uint64_t tId = msg->taskId;
int64_t rId = msg->refId; int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle; qwMsg.connInfo.ahandle = pMsg->ahandle;