enh: kill query
This commit is contained in:
parent
f421ab09dd
commit
a657413fa6
|
@ -153,7 +153,13 @@ void destroyAppInst(SAppInstInfo *pAppInfo) {
|
|||
}
|
||||
|
||||
void destroyTscObj(void *pObj) {
|
||||
if (NULL == pObj) {
|
||||
return;
|
||||
}
|
||||
|
||||
STscObj *pTscObj = pObj;
|
||||
int64_t tscId = pTscObj->id;
|
||||
tscDebug("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj);
|
||||
|
||||
SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
|
||||
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
|
||||
|
@ -168,6 +174,8 @@ void destroyTscObj(void *pObj) {
|
|||
}
|
||||
taosThreadMutexDestroy(&pTscObj->mutex);
|
||||
taosMemoryFreeClear(pTscObj);
|
||||
|
||||
tscDebug("end to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj);
|
||||
}
|
||||
|
||||
void *createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo) {
|
||||
|
@ -261,9 +269,14 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri
|
|||
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
|
||||
|
||||
void doDestroyRequest(void *p) {
|
||||
assert(p != NULL);
|
||||
if (NULL == p) {
|
||||
return;
|
||||
}
|
||||
|
||||
SRequestObj *pRequest = (SRequestObj *)p;
|
||||
|
||||
int64_t reqId = pRequest->self;
|
||||
tscDebug("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
|
||||
|
||||
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
|
||||
|
||||
if (pRequest->body.queryJob != 0) {
|
||||
|
@ -285,6 +298,8 @@ void doDestroyRequest(void *p) {
|
|||
deregisterRequest(pRequest);
|
||||
}
|
||||
taosMemoryFreeClear(pRequest);
|
||||
|
||||
tscDebug("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
|
||||
}
|
||||
|
||||
void destroyRequest(SRequestObj *pRequest) {
|
||||
|
|
|
@ -1495,6 +1495,8 @@ void schFreeJobImpl(void *job) {
|
|||
uint64_t queryId = pJob->queryId;
|
||||
int64_t refId = pJob->refId;
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " begin to free sch job, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
|
||||
|
||||
if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
|
||||
schCancelJob(pJob);
|
||||
}
|
||||
|
@ -1535,12 +1537,12 @@ void schFreeJobImpl(void *job) {
|
|||
taosMemoryFreeClear(pJob->resData);
|
||||
taosMemoryFree(pJob);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
|
||||
|
||||
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
|
||||
if (jobNum == 0) {
|
||||
schCloseJobRef();
|
||||
}
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
|
||||
}
|
||||
|
||||
int32_t schLaunchStaticExplainJob(SSchedulerReq *pReq, SSchJob *pJob, bool sync) {
|
||||
|
|
|
@ -156,6 +156,28 @@ void sqCloseQueryCb(void *param, TAOS_RES *pRes, int code) {
|
|||
}
|
||||
}
|
||||
|
||||
void sqKillFetchCb(void *param, TAOS_RES *pRes, int numOfRows) {
|
||||
SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param;
|
||||
taos_kill_query(qParam->taos);
|
||||
|
||||
*qParam->end = 1;
|
||||
}
|
||||
|
||||
void sqKillQueryCb(void *param, TAOS_RES *pRes, int code) {
|
||||
SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param;
|
||||
if (code == 0 && pRes) {
|
||||
if (qParam->fetch) {
|
||||
taos_fetch_rows_a(pRes, sqKillFetchCb, param);
|
||||
} else {
|
||||
taos_kill_query(qParam->taos);
|
||||
*qParam->end = 1;
|
||||
}
|
||||
} else {
|
||||
sqExit("select", taos_errstr(pRes));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int sqSyncStopQuery(bool fetch) {
|
||||
CASE_ENTER();
|
||||
for (int32_t i = 0; i < runTimes; ++i) {
|
||||
|
@ -391,6 +413,69 @@ int sqConSyncCloseQuery(bool fetch) {
|
|||
CASE_LEAVE();
|
||||
}
|
||||
|
||||
int sqSyncKillQuery(bool fetch) {
|
||||
CASE_ENTER();
|
||||
for (int32_t i = 0; i < runTimes; ++i) {
|
||||
char sql[1024] = {0};
|
||||
int32_t code = 0;
|
||||
TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0);
|
||||
if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL));
|
||||
|
||||
sprintf(sql, "reset query cache");
|
||||
sqExecSQL(taos, sql);
|
||||
|
||||
sprintf(sql, "use %s", dbName);
|
||||
sqExecSQL(taos, sql);
|
||||
|
||||
sprintf(sql, "select * from %s", tbName);
|
||||
TAOS_RES* pRes = taos_query(taos, sql);
|
||||
code = taos_errno(pRes);
|
||||
if (code) {
|
||||
sqExit("taos_query", taos_errstr(pRes));
|
||||
}
|
||||
|
||||
if (fetch) {
|
||||
taos_fetch_row(pRes);
|
||||
}
|
||||
|
||||
taos_kill_query(taos);
|
||||
|
||||
taos_close(taos);
|
||||
}
|
||||
CASE_LEAVE();
|
||||
}
|
||||
|
||||
int sqAsyncKillQuery(bool fetch) {
|
||||
CASE_ENTER();
|
||||
for (int32_t i = 0; i < runTimes; ++i) {
|
||||
char sql[1024] = {0};
|
||||
int32_t code = 0;
|
||||
TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0);
|
||||
if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL));
|
||||
|
||||
sprintf(sql, "reset query cache");
|
||||
sqExecSQL(taos, sql);
|
||||
|
||||
sprintf(sql, "use %s", dbName);
|
||||
sqExecSQL(taos, sql);
|
||||
|
||||
sprintf(sql, "select * from %s", tbName);
|
||||
|
||||
int32_t qEnd = 0;
|
||||
SSP_CB_PARAM param = {0};
|
||||
param.fetch = fetch;
|
||||
param.end = &qEnd;
|
||||
taos_query_a(taos, sql, sqKillQueryCb, ¶m);
|
||||
while (0 == qEnd) {
|
||||
usleep(5000);
|
||||
}
|
||||
|
||||
taos_close(taos);
|
||||
}
|
||||
CASE_LEAVE();
|
||||
}
|
||||
|
||||
|
||||
void sqRunAllCase(void) {
|
||||
/*
|
||||
sqSyncStopQuery(false);
|
||||
|
@ -409,8 +494,14 @@ void sqRunAllCase(void) {
|
|||
sqAsyncCloseQuery(true);
|
||||
*/
|
||||
sqConSyncCloseQuery(false);
|
||||
/*
|
||||
sqConSyncCloseQuery(true);
|
||||
|
||||
sqSyncKillQuery(false);
|
||||
sqSyncKillQuery(true);
|
||||
sqAsyncKillQuery(false);
|
||||
sqAsyncKillQuery(true);
|
||||
*/
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue