enh: stop query process

This commit is contained in:
dapan1121 2022-06-26 15:13:22 +08:00
parent 4ad36f714d
commit b6c38a4f1b
13 changed files with 422 additions and 155 deletions

View File

@ -183,7 +183,7 @@ typedef struct SRequestSendRecvBody {
void* param;
SDataBuf requestMsg;
int64_t queryJob; // query job, created according to sql query DAG.
struct SQueryPlan* pDag; // the query dag, generated according to the sql statement.
int32_t subplanNum;
SReqResultInfo resInfo;
} SRequestSendRecvBody;
@ -300,6 +300,7 @@ void* createRequest(STscObj* pObj, int32_t type);
void destroyRequest(SRequestObj* pRequest);
SRequestObj* acquireRequest(int64_t rid);
int32_t releaseRequest(int64_t rid);
int32_t removeRequest(int64_t rid);
char* getDbOfConnection(STscObj* pObj);
void setConnectionDB(STscObj* pTscObj, const char* db);

View File

@ -119,7 +119,7 @@ void closeAllRequests(SHashObj *pRequests) {
while (pIter != NULL) {
int64_t *rid = pIter;
releaseRequest(*rid);
removeRequest(*rid);
pIter = taosHashIterate(pRequests, pIter);
}
@ -222,6 +222,12 @@ void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
}
}
SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); }
int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); }
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
static void doDestroyRequest(void *p) {
assert(p != NULL);
SRequestObj *pRequest = (SRequestObj *)p;
@ -239,7 +245,6 @@ static void doDestroyRequest(void *p) {
taosMemoryFreeClear(pRequest->pDb);
doFreeReqResultInfo(&pRequest->body.resInfo);
qDestroyQueryPlan(pRequest->body.pDag);
taosArrayDestroy(pRequest->tableList);
taosArrayDestroy(pRequest->dbList);
@ -255,13 +260,9 @@ void destroyRequest(SRequestObj *pRequest) {
return;
}
taosRemoveRef(clientReqRefPool, pRequest->self);
removeRequest(pRequest->self);
}
SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); }
int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); }
void taos_init_imp(void) {
// In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.

View File

@ -320,7 +320,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
desc.reqRid = pRequest->self;
desc.stableQuery = pRequest->stableQuery;
taosGetFqdn(desc.fqdn);
desc.subPlanNum = pRequest->body.pDag ? pRequest->body.pDag->numOfSubplans : 0;
desc.subPlanNum = pRequest->body.subplanNum;
if (desc.subPlanNum) {
desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc));

View File

@ -876,13 +876,17 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
break;
case QUERY_EXEC_MODE_SCHEDULE: {
SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pMnodeList);
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
SArray* pNodeList = NULL;
buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList);
taosArrayDestroy(pNodeList);
SQueryPlan* pDag = NULL;
code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
if (TSDB_CODE_SUCCESS == code) {
pRequest->body.subplanNum = pDag->numOfSubplans;
if (!pRequest->validateOnly) {
SArray* pNodeList = NULL;
buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
code = scheduleQuery(pRequest, pDag, pNodeList);
taosArrayDestroy(pNodeList);
}
}
taosArrayDestroy(pMnodeList);
break;
@ -959,10 +963,13 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData *pResultM
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pMnodeList);
SQueryPlan* pDag = NULL;
code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
if (code) {
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
} else {
pRequest->body.subplanNum = pDag->numOfSubplans;
}
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
@ -973,7 +980,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData *pResultM
.pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
SSchedulerReq req = {.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pRequest->body.pDag,
.pDag = pDag,
.sql = pRequest->sqlstr,
.startTs = pRequest->metric.start,
.fp = schedulerExecCb,
@ -2026,6 +2033,7 @@ void taosAsyncQueryImpl(TAOS *taos, const char *sql, __taos_async_fn_t fp, void
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
releaseTscObj(*(int64_t *)taos);
fp(param, NULL, terrno);
return;
@ -2035,6 +2043,7 @@ void taosAsyncQueryImpl(TAOS *taos, const char *sql, __taos_async_fn_t fp, void
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
releaseTscObj(*(int64_t *)taos);
fp(param, NULL, terrno);
return;
}
@ -2043,6 +2052,7 @@ void taosAsyncQueryImpl(TAOS *taos, const char *sql, __taos_async_fn_t fp, void
pRequest->body.queryFp = fp;
pRequest->body.param = param;
doAsyncQuery(pRequest, false);
releaseTscObj(*(int64_t *)taos);
}

View File

@ -1536,8 +1536,6 @@ void ctgClearAllInstance(void) {
pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
}
taosHashClear(gCtgMgmt.pCluster);
}
void ctgFreeAllInstance(void) {

View File

@ -19,7 +19,7 @@
#include "catalogInt.h"
extern SCatalogMgmt gCtgMgmt;
SCtgDebug gCTGDebug = {.lockEnable = true, .apiEnable = true};
SCtgDebug gCTGDebug = {0};
void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
ASSERT(*(int32_t*)param == 1);

View File

@ -1757,7 +1757,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN
}
int32_t code = TSDB_CODE_SUCCESS;
SSchema* pSchema = pDataBlock->pTableMeta->schema;
SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
bool isJson = false;
STag* pTag = NULL;

View File

@ -212,7 +212,7 @@ typedef struct SSchJob {
SRequestConnInfo conn;
SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeLoad>
SArray *levels; // starting from 0. SArray<SSchLevel>
SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler
SQueryPlan *pDag;
SArray *dataSrcTasks; // SArray<SQueryTask*>
int32_t levelIdx;
@ -334,13 +334,13 @@ extern SSchedulerMgmt schMgmt;
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
void schCleanClusterHb(void* pTrans);
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
void schCleanClusterHb(void* pTrans);
int32_t schLaunchTask(SSchJob *job, SSchTask *task);
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
SSchJob *schAcquireJob(int64_t refId);
int32_t schReleaseJob(int64_t refId);
void schFreeFlowCtrl(SSchJob *pJob);
void schFreeFlowCtrl(SSchJob *pJob);
int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
@ -351,38 +351,40 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction);
int32_t schCloneSMsgSendInfo(void *src, void **dst);
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob);
void schFreeJobImpl(void *job);
void schFreeJobImpl(void *job);
int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx);
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask);
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans);
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code);
void schFreeRpcCtx(SRpcCtx *pCtx);
void schFreeRpcCtx(SRpcCtx *pCtx);
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp);
bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus);
bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus);
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask);
int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp);
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp);
void schProcessOnDataFetched(SSchJob *job);
void schProcessOnDataFetched(SSchJob *job);
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask);
void schFreeRpcCtxVal(const void *arg);
void schFreeRpcCtxVal(const void *arg);
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execIdx);
int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync);
int32_t schExecJobImpl(SSchedulerReq *pReq, int64_t *job, SQueryResult* pRes, bool sync);
int32_t schExecJobImpl(SSchedulerReq *pReq, SSchJob *pJob, bool sync);
int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus);
int32_t schCancelJob(SSchJob *pJob);
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode);
uint64_t schGenTaskId(void);
void schCloseJobRef(void);
void schCloseJobRef(void);
int32_t schExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes);
int32_t schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob);
int32_t schFetchRows(SSchJob *pJob);
int32_t schAsyncFetchRows(SSchJob *pJob);
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execIdx);
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList);
void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo);
char* schGetOpStr(SCH_OP_TYPE type);
void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo);
char* schGetOpStr(SCH_OP_TYPE type);
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob);
int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes);
#ifdef __cplusplus

View File

@ -42,7 +42,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
return TSDB_CODE_SUCCESS;
}
int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob, SQueryResult* pRes, bool syncSchedule) {
int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob) {
int32_t code = 0;
int64_t refId = -1;
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
@ -54,12 +54,14 @@ int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob, SQueryResult* pRes, b
pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
pJob->conn = *pReq->pConn;
pJob->sql = pReq->sql;
pJob->pDag = pReq->pDag;
pJob->reqKilled = pReq->reqKilled;
pJob->userRes.queryRes = pRes;
pJob->userRes.execFp = pReq->fp;
pJob->userRes.userParam = pReq->cbParam;
if (pReq->pNodeList != NULL) {
if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
} else {
pJob->nodeList = taosArrayDup(pReq->pNodeList);
}
@ -547,8 +549,6 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
pJob->levelNum = levelNum;
pJob->levelIdx = levelNum - 1;
pJob->subPlans = pDag->pSubplans;
SSchLevel level = {0};
SNodeListNode *plans = NULL;
int32_t taskNum = 0;
@ -1491,8 +1491,6 @@ void schFreeJobImpl(void *job) {
schDropJobAllTasks(pJob);
pJob->subPlans = NULL; // it is a reference to pDag->pSubplans
int32_t numOfLevels = taosArrayGetSize(pJob->levels);
for (int32_t i = 0; i < numOfLevels; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
@ -1521,6 +1519,8 @@ void schFreeJobImpl(void *job) {
destroyQueryExecRes(&pJob->execRes);
qDestroyQueryPlan(pJob->pDag);
taosMemoryFreeClear(pJob->userRes.queryRes);
taosMemoryFreeClear(pJob->resData);
taosMemoryFree(pJob);
@ -1533,88 +1533,11 @@ void schFreeJobImpl(void *job) {
}
}
int32_t schExecJobImpl(SSchedulerReq *pReq, int64_t *job, SQueryResult* pRes, bool sync) {
if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
}
int32_t code = 0;
SSchJob *pJob = NULL;
SCH_ERR_JRET(schInitJob(pReq, &pJob, pRes, sync));
qDebug("QID:0x%" PRIx64 " sch job refId 0x%"PRIx64 " started", pReq->pDag->queryId, pJob->refId);
*job = pJob->refId;
SCH_ERR_JRET(schBeginOperation(pJob, SCH_OP_EXEC, sync));
code = schLaunchJob(pJob);
if (sync) {
SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
tsem_wait(&pJob->rspSem);
schEndOperation(pJob);
} else if (code) {
schPostJobRes(pJob, SCH_OP_EXEC);
}
SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
schReleaseJob(pJob->refId);
SCH_RET(code);
_return:
if (!sync) {
pReq->fp(NULL, pReq->cbParam, code);
}
schReleaseJob(pJob->refId);
SCH_RET(code);
}
int32_t schExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes) {
int32_t code = 0;
*pJob = 0;
if (EXPLAIN_MODE_STATIC == pReq->pDag->explainInfo.mode) {
SCH_ERR_JRET(schExecStaticExplainJob(pReq, pJob, true));
} else {
SCH_ERR_JRET(schExecJobImpl(pReq, pJob, NULL, true));
}
_return:
if (*pJob) {
SSchJob *job = schAcquireJob(*pJob);
schSetJobQueryRes(job, pRes);
schReleaseJob(*pJob);
}
return code;
}
int32_t schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob) {
int32_t code = 0;
*pJob = 0;
if (EXPLAIN_MODE_STATIC == pReq->pDag->explainInfo.mode) {
SCH_RET(schExecStaticExplainJob(pReq, pJob, false));
}
SCH_ERR_RET(schExecJobImpl(pReq, pJob, NULL, false));
return code;
}
int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) {
int32_t schLaunchStaticExplainJob(SSchedulerReq *pReq, SSchJob *pJob, bool sync) {
qDebug("QID:0x%" PRIx64 " job started", pReq->pDag->queryId);
int32_t code = 0;
/*
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
if (NULL == pJob) {
qError("QID:0x%" PRIx64 " calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob));
@ -1625,10 +1548,10 @@ int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) {
pJob->sql = pReq->sql;
pJob->reqKilled = pReq->reqKilled;
pJob->pDag = pReq->pDag;
pJob->attr.queryJob = true;
pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
pJob->queryId = pReq->pDag->queryId;
pJob->subPlans = pReq->pDag->pSubplans;
pJob->userRes.execFp = pReq->fp;
pJob->userRes.userParam = pReq->cbParam;
@ -1637,11 +1560,14 @@ int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) {
code = schBeginOperation(pJob, SCH_OP_EXEC, sync);
if (code) {
pReq->fp(NULL, pReq->cbParam, code);
schFreeJobImpl(pJob);
SCH_ERR_RET(code);
}
*/
SCH_ERR_JRET(qExecStaticExplain(pReq->pDag, (SRetrieveTableRsp **)&pJob->resData));
/*
int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
if (refId < 0) {
SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
@ -1656,10 +1582,10 @@ int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) {
pJob->refId = refId;
SCH_JOB_DLOG("job refId:0x%" PRIx64, pJob->refId);
*/
pJob->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
*job = pJob->refId;
SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
if (!sync) {
@ -1668,7 +1594,7 @@ int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) {
schEndOperation(pJob);
}
schReleaseJob(pJob->refId);
// schReleaseJob(pJob->refId);
SCH_RET(code);
@ -1715,3 +1641,39 @@ int32_t schAsyncFetchRows(SSchJob *pJob) {
}
int32_t schExecJobImpl(SSchedulerReq *pReq, SSchJob *pJob, bool sync) {
int32_t code = 0;
qDebug("QID:0x%" PRIx64 " sch job refId 0x%"PRIx64 " started", pReq->pDag->queryId, pJob->refId);
SCH_ERR_JRET(schBeginOperation(pJob, SCH_OP_EXEC, sync));
if (EXPLAIN_MODE_STATIC == pReq->pDag->explainInfo.mode) {
code = schLaunchStaticExplainJob(pReq, pJob, true);
} else {
code = schLaunchJob(pJob);
if (sync) {
SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
tsem_wait(&pJob->rspSem);
schEndOperation(pJob);
} else if (code) {
schPostJobRes(pJob, SCH_OP_EXEC);
}
}
SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
SCH_RET(code);
_return:
if (!sync) {
pReq->fp(NULL, pReq->cbParam, code);
}
SCH_RET(code);
}

View File

@ -67,33 +67,57 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
return TSDB_CODE_SUCCESS;
}
int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes) {
int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJobId, SQueryResult *pRes) {
qDebug("scheduler sync exec job start");
int32_t code = 0;
SSchJob *pJob = NULL;
SCH_ERR_JRET(schInitJob(pReq, &pJob));
*pJobId = pJob->refId;
if (NULL == pReq || NULL == pJob || NULL == pRes) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SCH_RET(schExecJob(pReq, pJob, pRes));
}
int32_t schedulerAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob) {
qDebug("scheduler async exec job start");
int32_t code = 0;
if (NULL == pReq || NULL == pJob) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
schAsyncExecJob(pReq, pJob);
SCH_ERR_JRET(schExecJobImpl(pReq, pJob, true));
_return:
if (code && NULL == pJob) {
qDestroyQueryPlan(pReq->pDag);
}
if (pJob) {
schSetJobQueryRes(pJob, pRes);
schReleaseJob(pJob->refId);
}
return code;
}
int32_t schedulerAsyncExecJob(SSchedulerReq *pReq, int64_t *pJobId) {
qDebug("scheduler async exec job start");
int32_t code = 0;
SSchJob *pJob = NULL;
SCH_ERR_JRET(schInitJob(pReq, &pJob));
*pJobId = pJob->refId;
SCH_ERR_JRET(schExecJobImpl(pReq, pJob, false));
_return:
if (code && NULL == pJob) {
qDestroyQueryPlan(pReq->pDag);
}
if (pJob) {
schReleaseJob(pJob->refId);
}
if (code != TSDB_CODE_SUCCESS) {
pReq->fp(NULL, pReq->cbParam, code);
}
SCH_RET(code);
return code;
}
int32_t schedulerFetchRows(int64_t job, void **pData) {

View File

@ -915,7 +915,7 @@ int32_t prepareInsertData(BindData *data) {
data->colNum = 0;
data->colTypes = taosMemoryCalloc(30, sizeof(int32_t));
data->sql = taosMemoryCalloc(1, 1024);
data->pBind = taosMemoryCalloc((allRowNum/gCurCase->bindRowNum)*gCurCase->bindColNum, sizeof(TAOS_MULTI_BIND));
data->pBind = taosMemoryCalloc((int32_t)(allRowNum/gCurCase->bindRowNum)*gCurCase->bindColNum, sizeof(TAOS_MULTI_BIND));
data->pTags = taosMemoryCalloc(gCurCase->tblNum*gCurCase->bindTagNum, sizeof(TAOS_MULTI_BIND));
data->tsData = taosMemoryMalloc(allRowNum * sizeof(int64_t));
data->boolData = taosMemoryMalloc(allRowNum * sizeof(bool));
@ -932,7 +932,7 @@ int32_t prepareInsertData(BindData *data) {
data->binaryData = taosMemoryMalloc(allRowNum * gVarCharSize);
data->binaryLen = taosMemoryMalloc(allRowNum * sizeof(int32_t));
if (gCurCase->bindNullNum) {
data->isNull = taosMemoryCalloc(allRowNum, sizeof(char));
data->isNull = taosMemoryCalloc((int32_t)allRowNum, sizeof(char));
}
for (int32_t i = 0; i < allRowNum; ++i) {
@ -950,7 +950,7 @@ int32_t prepareInsertData(BindData *data) {
data->doubleData[i] = (double)(i+1);
memset(data->binaryData + gVarCharSize * i, 'a'+i%26, gVarCharLen);
if (gCurCase->bindNullNum) {
data->isNull[i] = i % 2;
data->isNull[i] = (char)(i % 2);
}
data->binaryLen[i] = gVarCharLen;
}
@ -979,7 +979,7 @@ int32_t prepareQueryCondData(BindData *data, int32_t tblIdx) {
data->colNum = 0;
data->colTypes = taosMemoryCalloc(30, sizeof(int32_t));
data->sql = taosMemoryCalloc(1, 1024);
data->pBind = taosMemoryCalloc(bindNum*gCurCase->bindColNum, sizeof(TAOS_MULTI_BIND));
data->pBind = taosMemoryCalloc((int32_t)bindNum*gCurCase->bindColNum, sizeof(TAOS_MULTI_BIND));
data->tsData = taosMemoryMalloc(bindNum * sizeof(int64_t));
data->boolData = taosMemoryMalloc(bindNum * sizeof(bool));
data->tinyData = taosMemoryMalloc(bindNum * sizeof(int8_t));
@ -995,7 +995,7 @@ int32_t prepareQueryCondData(BindData *data, int32_t tblIdx) {
data->binaryData = taosMemoryMalloc(bindNum * gVarCharSize);
data->binaryLen = taosMemoryMalloc(bindNum * sizeof(int32_t));
if (gCurCase->bindNullNum) {
data->isNull = taosMemoryCalloc(bindNum, sizeof(char));
data->isNull = taosMemoryCalloc((int32_t)bindNum, sizeof(char));
}
for (int32_t i = 0; i < bindNum; ++i) {
@ -1013,7 +1013,7 @@ int32_t prepareQueryCondData(BindData *data, int32_t tblIdx) {
data->doubleData[i] = (double)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum);
memset(data->binaryData + gVarCharSize * i, 'a'+i%26, gVarCharLen);
if (gCurCase->bindNullNum) {
data->isNull[i] = i % 2;
data->isNull[i] = (char)(i % 2);
}
data->binaryLen[i] = gVarCharLen;
}
@ -1036,7 +1036,7 @@ int32_t prepareQueryMiscData(BindData *data, int32_t tblIdx) {
data->colNum = 0;
data->colTypes = taosMemoryCalloc(30, sizeof(int32_t));
data->sql = taosMemoryCalloc(1, 1024);
data->pBind = taosMemoryCalloc(bindNum*gCurCase->bindColNum, sizeof(TAOS_MULTI_BIND));
data->pBind = taosMemoryCalloc((int32_t)bindNum*gCurCase->bindColNum, sizeof(TAOS_MULTI_BIND));
data->tsData = taosMemoryMalloc(bindNum * sizeof(int64_t));
data->boolData = taosMemoryMalloc(bindNum * sizeof(bool));
data->tinyData = taosMemoryMalloc(bindNum * sizeof(int8_t));
@ -1052,7 +1052,7 @@ int32_t prepareQueryMiscData(BindData *data, int32_t tblIdx) {
data->binaryData = taosMemoryMalloc(bindNum * gVarCharSize);
data->binaryLen = taosMemoryMalloc(bindNum * sizeof(int32_t));
if (gCurCase->bindNullNum) {
data->isNull = taosMemoryCalloc(bindNum, sizeof(char));
data->isNull = taosMemoryCalloc((int32_t)bindNum, sizeof(char));
}
for (int32_t i = 0; i < bindNum; ++i) {
@ -1070,7 +1070,7 @@ int32_t prepareQueryMiscData(BindData *data, int32_t tblIdx) {
data->doubleData[i] = (double)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum);
memset(data->binaryData + gVarCharSize * i, 'a'+i%26, gVarCharLen);
if (gCurCase->bindNullNum) {
data->isNull[i] = i % 2;
data->isNull[i] = (char)(i % 2);
}
data->binaryLen[i] = gVarCharLen;
}
@ -1279,7 +1279,7 @@ void bpCheckQueryResult(TAOS_STMT *stmt, TAOS *taos, char *stmtSql, TAOS_MULTI_B
}
memcpy(&sql[len], p, (int64_t)s - (int64_t)p);
len += (int64_t)s - (int64_t)p;
len += (int32_t)((int64_t)s - (int64_t)p);
if (bind[i].is_null && bind[i].is_null[0]) {
bpAppendValueString(sql, TSDB_DATA_TYPE_NULL, NULL, 0, &len);
@ -2669,7 +2669,7 @@ int main(int argc, char *argv[])
{
TAOS *taos = NULL;
srand(time(NULL));
srand((unsigned int)time(NULL));
// connect to server
if (argc < 2) {

View File

@ -12,6 +12,7 @@ all: $(TARGET)
exe:
gcc $(CFLAGS) ./batchprepare.c -o $(ROOT)batchprepare $(LFLAGS)
gcc $(CFLAGS) ./stopquery.c -o $(ROOT)stopquery $(LFLAGS)
clean:
rm $(ROOT)batchprepare

View File

@ -0,0 +1,268 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS asynchronous API example
// this example opens multiple tables, insert/retrieve multiple tables
// it is used by TAOS internally for one performance testing
// to compiple: gcc -o asyncdemo asyncdemo.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <unistd.h>
#include <string.h>
#include "taos.h"
int points = 5;
int numOfTables = 3;
int tablesInsertProcessed = 0;
int tablesSelectProcessed = 0;
int64_t st, et;
char hostName[128];
char dbName[128];
char tbName[128];
char runTimes = 1;
typedef struct {
int id;
TAOS *taos;
char name[16];
time_t timeStamp;
int value;
int rowsInserted;
int rowsTried;
int rowsRetrieved;
} STable;
typedef struct SSP_CB_PARAM {
bool fetch;
int32_t *end;
} SSP_CB_PARAM;
#define CASE_ENTER() do { printf("enter case %s\n", __FUNCTION__); } while (0)
#define CASE_LEAVE() do { printf("leave case %s, runTimes %d\n", __FUNCTION__, runTimes); } while (0)
static void sqExecSQL(TAOS *taos, char *command) {
int i;
int32_t code = -1;
TAOS_RES *pSql = taos_query(taos, command);
code = taos_errno(pSql);
if (code != 0) {
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql));
taos_free_result(pSql);
taos_close(taos);
taos_cleanup();
exit(EXIT_FAILURE);
}
taos_free_result(pSql);
}
void sqExit(char* prefix, const char* errMsg) {
fprintf(stderr, "%s error: %s\n", prefix, errMsg);
exit(1);
}
void sqStopFetchCb(void *param, TAOS_RES *pRes, int numOfRows) {
SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param;
taos_stop_query(pRes);
taos_free_result(pRes);
*qParam->end = 1;
}
void sqStopQueryCb(void *param, TAOS_RES *pRes, int code) {
SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param;
if (code == 0 && pRes) {
if (qParam->fetch) {
taos_fetch_rows_a(pRes, sqStopFetchCb, param);
} else {
taos_stop_query(pRes);
taos_free_result(pRes);
*qParam->end = 1;
}
} else {
sqExit("select", taos_errstr(pRes));
}
}
void sqFreeFetchCb(void *param, TAOS_RES *pRes, int numOfRows) {
SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param;
taos_free_result(pRes);
*qParam->end = 1;
}
void sqFreeQueryCb(void *param, TAOS_RES *pRes, int code) {
SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param;
if (code == 0 && pRes) {
if (qParam->fetch) {
taos_fetch_rows_a(pRes, sqFreeFetchCb, param);
} else {
taos_free_result(pRes);
*qParam->end = 1;
}
} else {
sqExit("select", taos_errstr(pRes));
}
}
int sqSyncStopQuery(bool fetch) {
CASE_ENTER();
for (int32_t i = 0; i < runTimes; ++i) {
char sql[1024] = {0};
int32_t code = 0;
TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0);
if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL));
sprintf(sql, "use %s", dbName);
sqExecSQL(taos, sql);
sprintf(sql, "select * from %s", tbName);
TAOS_RES* pRes = taos_query(taos, sql);
code = taos_errno(pRes);
if (code) {
sqExit("taos_query", taos_errstr(pRes));
}
if (fetch) {
taos_fetch_row(pRes);
}
taos_stop_query(pRes);
taos_free_result(pRes);
taos_close(taos);
}
CASE_LEAVE();
}
int sqAsyncStopQuery(bool fetch) {
CASE_ENTER();
for (int32_t i = 0; i < runTimes; ++i) {
char sql[1024] = {0};
int32_t code = 0;
TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0);
if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL));
sprintf(sql, "use %s", dbName);
sqExecSQL(taos, sql);
sprintf(sql, "select * from %s", tbName);
int32_t qEnd = 0;
SSP_CB_PARAM param = {0};
param.fetch = fetch;
param.end = &qEnd;
taos_query_a(taos, sql, sqStopQueryCb, &param);
while (0 == qEnd) {
usleep(5000);
}
taos_close(taos);
}
CASE_LEAVE();
}
int sqSyncFreeQuery(bool fetch) {
CASE_ENTER();
for (int32_t i = 0; i < runTimes; ++i) {
char sql[1024] = {0};
int32_t code = 0;
TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0);
if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL));
sprintf(sql, "use %s", dbName);
sqExecSQL(taos, sql);
sprintf(sql, "select * from %s", tbName);
TAOS_RES* pRes = taos_query(taos, sql);
code = taos_errno(pRes);
if (code) {
sqExit("taos_query", taos_errstr(pRes));
}
if (fetch) {
taos_fetch_row(pRes);
}
taos_free_result(pRes);
taos_close(taos);
}
CASE_LEAVE();
}
int sqAsyncFreeQuery(bool fetch) {
CASE_ENTER();
for (int32_t i = 0; i < runTimes; ++i) {
char sql[1024] = {0};
int32_t code = 0;
TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0);
if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL));
sprintf(sql, "use %s", dbName);
sqExecSQL(taos, sql);
sprintf(sql, "select * from %s", tbName);
int32_t qEnd = 0;
SSP_CB_PARAM param = {0};
param.fetch = fetch;
param.end = &qEnd;
taos_query_a(taos, sql, sqFreeQueryCb, &param);
while (0 == qEnd) {
usleep(5000);
}
taos_close(taos);
}
CASE_LEAVE();
}
void sqRunAllCase(void) {
sqSyncStopQuery(false);
sqSyncStopQuery(true);
sqAsyncStopQuery(false);
sqAsyncStopQuery(true);
sqSyncFreeQuery(false);
sqSyncFreeQuery(true);
sqAsyncFreeQuery(false);
sqAsyncFreeQuery(true);
}
int main(int argc, char *argv[]) {
if (argc != 4) {
printf("usage: %s server-ip dbname tablename\n", argv[0]);
exit(0);
}
strcpy(hostName, argv[1]);
strcpy(dbName, argv[2]);
strcpy(tbName, argv[3]);
sqRunAllCase();
return 0;
}