enh: add taosd query exit processing

This commit is contained in:
dapan1121 2022-07-15 14:17:00 +08:00
parent 41ba297964
commit c67fa0c1ce
7 changed files with 27 additions and 16 deletions

View File

@ -135,7 +135,7 @@ void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg
int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
void rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); void rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
int64_t rpcAllocHandle(); void* rpcAllocHandle();
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -171,6 +171,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes; pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes; pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
pTscObj->connId = pRsp->query->connId; pTscObj->connId = pRsp->query->connId;
tscTrace("conn %p hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes, pTscObj->pAppInfo->totalDnodes);
if (pRsp->query->killRid) { if (pRsp->query->killRid) {
tscDebug("request rid %" PRIx64 " need to be killed now", pRsp->query->killRid); tscDebug("request rid %" PRIx64 " need to be killed now", pRsp->query->killRid);
@ -286,7 +287,8 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
if (code != 0) { if (code != 0) {
(*pInst)->onlineDnodes = 0; (*pInst)->onlineDnodes = ((*pInst)->totalDnodes ? 0 : -1);
tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), (*pInst)->onlineDnodes, (*pInst)->totalDnodes);
} }
if (rspNum) { if (rspNum) {

View File

@ -362,7 +362,7 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx); void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx);
int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx); int32_t qwKillTaskHandle(SQWTaskCtx *ctx);
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status); int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status);
int32_t qwDropTask(QW_FPARAMS_DEF); int32_t qwDropTask(QW_FPARAMS_DEF);
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx);
@ -372,7 +372,7 @@ int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);
int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type); int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type);
void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch); void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch);
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch); int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx); void qwFreeTaskCtx(SQWTaskCtx *ctx);
void qwDbgDumpMgmtInfo(SQWorker *mgmt); void qwDbgDumpMgmtInfo(SQWorker *mgmt);
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);

View File

@ -270,7 +270,7 @@ int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTask
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); } void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); }
void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) { void qwFreeTaskHandle(qTaskInfo_t *taskHandle) {
// Note: free/kill may in RC // Note: free/kill may in RC
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
@ -278,7 +278,7 @@ void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) {
} }
} }
int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { int32_t qwKillTaskHandle(SQWTaskCtx *ctx) {
int32_t code = 0; int32_t code = 0;
// Note: free/kill may in RC // Note: free/kill may in RC
qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
@ -290,7 +290,7 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
QW_RET(code); QW_RET(code);
} }
void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { void qwFreeTaskCtx(SQWTaskCtx *ctx) {
if (ctx->ctrlConnInfo.handle) { if (ctx->ctrlConnInfo.handle) {
tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER); tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER);
} }
@ -300,7 +300,7 @@ void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
// NO need to release dataConnInfo // NO need to release dataConnInfo
qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle); qwFreeTaskHandle(&ctx->taskHandle);
if (ctx->sinkHandle) { if (ctx->sinkHandle) {
dsDestroyDataSinker(ctx->sinkHandle); dsDestroyDataSinker(ctx->sinkHandle);
@ -336,7 +336,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
} }
qwFreeTaskCtx(QW_FPARAMS(), &octx); qwFreeTaskCtx(&octx);
QW_TASK_DLOG_E("task ctx dropped"); QW_TASK_DLOG_E("task ctx dropped");
@ -463,13 +463,21 @@ void qwDestroyImpl(void *pMgmt) {
mgmt->hbTimer = NULL; mgmt->hbTimer = NULL;
taosTmrCleanUp(mgmt->timer); taosTmrCleanUp(mgmt->timer);
// TODO STOP ALL QUERY uint64_t qId, tId;
int32_t eId;
// TODO FREE ALL void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) {
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
void *key = taosHashGetKey(pIter, NULL);
QW_GET_QTID(key, qId, tId, eId);
qwFreeTaskCtx(ctx);
QW_TASK_DLOG("task ctx freed");
pIter = taosHashIterate(mgmt->ctxHash, pIter);
}
taosHashCleanup(mgmt->ctxHash); taosHashCleanup(mgmt->ctxHash);
void *pIter = taosHashIterate(mgmt->schHash, NULL); pIter = taosHashIterate(mgmt->schHash, NULL);
while (pIter) { while (pIter) {
SQWSchStatus *sch = (SQWSchStatus *)pIter; SQWSchStatus *sch = (SQWSchStatus *)pIter;
qwDestroySchStatus(sch); qwDestroySchStatus(sch);

View File

@ -726,7 +726,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} }
if (QW_QUERY_RUNNING(ctx)) { if (QW_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); QW_ERR_JRET(qwKillTaskHandle(ctx));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP);
} else if (QW_GET_PHASE(ctx) > 0) { } else if (QW_GET_PHASE(ctx) > 0) {
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
@ -940,7 +940,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
_return: _return:
qwFreeTaskCtx(QW_FPARAMS(), &ctx); qwFreeTaskCtx(&ctx);
QW_RET(TSDB_CODE_SUCCESS); QW_RET(TSDB_CODE_SUCCESS);
} }

View File

@ -1010,6 +1010,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen); memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
persistHandle = true; persistHandle = true;
SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle());
break; break;
} }
case TDMT_SCH_FETCH: case TDMT_SCH_FETCH:

View File

@ -170,7 +170,7 @@ void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
transSetDefaultAddr(thandle, ip, fqdn); transSetDefaultAddr(thandle, ip, fqdn);
} }
int64_t rpcAllocHandle() { return transAllocHandle(); } void* rpcAllocHandle() { return (void*)transAllocHandle(); }
int32_t rpcInit() { int32_t rpcInit() {
transInit(); transInit();