feature/qnode
This commit is contained in:
parent
3af9da8f51
commit
ff0200ae28
|
@ -72,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
|
||||||
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
|
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes);
|
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process the query job, generated according to the query physical plan.
|
* Process the query job, generated according to the query physical plan.
|
||||||
|
@ -80,7 +80,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru
|
||||||
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
|
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob);
|
int32_t schedulerAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetch query result from the remote query executor
|
* Fetch query result from the remote query executor
|
||||||
|
@ -88,7 +88,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
|
||||||
* @param data
|
* @param data
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t scheduleFetchRows(struct SSchJob *pJob, void **data);
|
int32_t schedulerFetchRows(struct SSchJob *pJob, void **data);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -102,7 +102,7 @@ int32_t scheduleFetchRows(struct SSchJob *pJob, void **data);
|
||||||
* Free the query job
|
* Free the query job
|
||||||
* @param pJob
|
* @param pJob
|
||||||
*/
|
*/
|
||||||
void scheduleFreeJob(void *pJob);
|
void schedulerFreeJob(void *pJob);
|
||||||
|
|
||||||
void schedulerDestroy(void);
|
void schedulerDestroy(void);
|
||||||
|
|
||||||
|
|
|
@ -354,7 +354,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_QRY_SCH_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0710) //"Scheduler not exist")
|
#define TSDB_CODE_QRY_SCH_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0710) //"Scheduler not exist")
|
||||||
#define TSDB_CODE_QRY_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0711) //"Task not exist")
|
#define TSDB_CODE_QRY_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0711) //"Task not exist")
|
||||||
#define TSDB_CODE_QRY_TASK_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0712) //"Task already exist")
|
#define TSDB_CODE_QRY_TASK_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0712) //"Task already exist")
|
||||||
#define TSDB_CODE_QRY_RES_CACHE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0713) //"Task result cache not exist")
|
#define TSDB_CODE_QRY_TASK_CTX_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0713) //"Task context not exist")
|
||||||
#define TSDB_CODE_QRY_TASK_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x0714) //"Task cancelled")
|
#define TSDB_CODE_QRY_TASK_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x0714) //"Task cancelled")
|
||||||
#define TSDB_CODE_QRY_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x0715) //"Task dropped")
|
#define TSDB_CODE_QRY_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x0715) //"Task dropped")
|
||||||
#define TSDB_CODE_QRY_TASK_CANCELLING TAOS_DEF_ERROR_CODE(0, 0x0716) //"Task cancelling")
|
#define TSDB_CODE_QRY_TASK_CANCELLING TAOS_DEF_ERROR_CODE(0, 0x0716) //"Task cancelling")
|
||||||
|
|
|
@ -242,12 +242,12 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
|
||||||
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
|
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
|
||||||
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
|
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
|
||||||
|
|
||||||
int32_t code = scheduleExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
|
int32_t code = schedulerExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// handle error and retry
|
// handle error and retry
|
||||||
} else {
|
} else {
|
||||||
if (pRequest->body.pQueryJob != NULL) {
|
if (pRequest->body.pQueryJob != NULL) {
|
||||||
scheduleFreeJob(pRequest->body.pQueryJob);
|
schedulerFreeJob(pRequest->body.pQueryJob);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -263,7 +263,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
|
||||||
strcpy(addr.epAddr[0].fqdn, "localhost");
|
strcpy(addr.epAddr[0].fqdn, "localhost");
|
||||||
|
|
||||||
taosArrayPush(execNode, &addr);
|
taosArrayPush(execNode, &addr);
|
||||||
return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, execNode, pDag, &pRequest->body.pQueryJob);
|
return schedulerAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, execNode, pDag, &pRequest->body.pQueryJob);
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct tmq_t tmq_t;
|
typedef struct tmq_t tmq_t;
|
||||||
|
@ -714,7 +714,7 @@ void* doFetchRow(SRequestObj* pRequest) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData);
|
int32_t code = schedulerFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pRequest->code = code;
|
pRequest->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -116,6 +116,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
|
||||||
// Requests handled by VNODE
|
// Requests handled by VNODE
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg;
|
||||||
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_CONTINUE)] = dndProcessVnodeQueryMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg;
|
||||||
|
|
|
@ -892,7 +892,7 @@ int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
|
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
|
||||||
if (pVnode == NULL) return -1;
|
if (pVnode == NULL) return -1;
|
||||||
|
|
||||||
int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, false);
|
int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, false);
|
||||||
dndReleaseVnode(pDnode, pVnode);
|
dndReleaseVnode(pDnode, pVnode);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, &pVnode->pQuery, pVnode, vnodePutReqToVQueryQ); }
|
int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, &pVnode->pQuery, pVnode, vnodePutReqToVQueryQ); }
|
||||||
|
|
||||||
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
vTrace("query message is processing");
|
vTrace("message in query queue is processing");
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_VND_QUERY:
|
case TDMT_VND_QUERY:
|
||||||
|
@ -36,7 +36,7 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
vTrace("fetch message is processed");
|
vTrace("message in fetch queue is processing");
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_VND_FETCH:
|
case TDMT_VND_FETCH:
|
||||||
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
|
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
|
||||||
|
|
|
@ -22,17 +22,17 @@ extern "C" {
|
||||||
|
|
||||||
#include "tlockfree.h"
|
#include "tlockfree.h"
|
||||||
|
|
||||||
#define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000
|
#define QW_DEFAULT_SCHEDULER_NUMBER 10000
|
||||||
#define QWORKER_DEFAULT_TASK_NUMBER 10000
|
#define QW_DEFAULT_TASK_NUMBER 10000
|
||||||
#define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000
|
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
|
||||||
|
#define QW_DEFAULT_SHORT_RUN_TIMES 2
|
||||||
enum {
|
enum {
|
||||||
QW_PHASE_PRE_QUERY = 1,
|
QW_PHASE_PRE_QUERY = 1,
|
||||||
QW_PHASE_POST_QUERY,
|
QW_PHASE_POST_QUERY,
|
||||||
QW_PHASE_PRE_CQUERY,
|
|
||||||
QW_PHASE_POST_CQUERY,
|
|
||||||
QW_PHASE_PRE_FETCH,
|
QW_PHASE_PRE_FETCH,
|
||||||
QW_PHASE_POST_FETCH,
|
QW_PHASE_POST_FETCH,
|
||||||
|
QW_PHASE_PRE_CQUERY,
|
||||||
|
QW_PHASE_POST_CQUERY,
|
||||||
};
|
};
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -133,7 +133,7 @@ typedef struct SQWorkerMgmt {
|
||||||
int8_t nodeType;
|
int8_t nodeType;
|
||||||
int32_t nodeId;
|
int32_t nodeId;
|
||||||
SRWLatch schLock;
|
SRWLatch schLock;
|
||||||
SRWLatch ctxLock;
|
//SRWLatch ctxLock;
|
||||||
SHashObj *schHash; //key: schedulerId, value: SQWSchStatus
|
SHashObj *schHash; //key: schedulerId, value: SQWSchStatus
|
||||||
SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx
|
SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx
|
||||||
void *nodeObj;
|
void *nodeObj;
|
||||||
|
@ -144,6 +144,8 @@ typedef struct SQWorkerMgmt {
|
||||||
#define QW_IDS() sId, qId, tId
|
#define QW_IDS() sId, qId, tId
|
||||||
#define QW_FPARAMS() mgmt, QW_IDS()
|
#define QW_FPARAMS() mgmt, QW_IDS()
|
||||||
|
|
||||||
|
#define QW_GET_EVENT_VALUE(ctx, event) atomic_load_8(&(ctx)->events[event])
|
||||||
|
|
||||||
#define QW_IS_EVENT_RECEIVED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_RECEIVED)
|
#define QW_IS_EVENT_RECEIVED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_RECEIVED)
|
||||||
#define QW_IS_EVENT_PROCESSED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_PROCESSED)
|
#define QW_IS_EVENT_PROCESSED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_PROCESSED)
|
||||||
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
|
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
|
||||||
|
@ -151,9 +153,10 @@ typedef struct SQWorkerMgmt {
|
||||||
|
|
||||||
#define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase)
|
#define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase)
|
||||||
|
|
||||||
#define QW_SET_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
|
#define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code)
|
||||||
|
#define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
|
||||||
|
|
||||||
#define QW_IN_EXECUTOR(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_FETCH)
|
#define QW_IS_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
|
||||||
|
|
||||||
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
|
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
|
||||||
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
|
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
|
||||||
|
|
|
@ -30,6 +30,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
|
||||||
int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
|
int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
|
||||||
|
|
||||||
int32_t qwBuildAndSendDropRsp(void *connection, int32_t code);
|
int32_t qwBuildAndSendDropRsp(void *connection, int32_t code);
|
||||||
|
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code);
|
||||||
int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code);
|
int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code);
|
||||||
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len);
|
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len);
|
||||||
int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection);
|
int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection);
|
||||||
|
|
|
@ -11,7 +11,7 @@
|
||||||
|
|
||||||
SQWDebug gQWDebug = {0};
|
SQWDebug gQWDebug = {0};
|
||||||
|
|
||||||
int32_t qwValidateStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t oriStatus, int8_t newStatus) {
|
int32_t qwValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (oriStatus == newStatus) {
|
if (oriStatus == newStatus) {
|
||||||
|
@ -35,6 +35,7 @@ int32_t qwValidateStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
|
||||||
break;
|
break;
|
||||||
case JOB_TASK_STATUS_EXECUTING:
|
case JOB_TASK_STATUS_EXECUTING:
|
||||||
if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED
|
if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED
|
||||||
|
&& newStatus != JOB_TASK_STATUS_SUCCEED
|
||||||
&& newStatus != JOB_TASK_STATUS_FAILED
|
&& newStatus != JOB_TASK_STATUS_FAILED
|
||||||
&& newStatus != JOB_TASK_STATUS_CANCELLING
|
&& newStatus != JOB_TASK_STATUS_CANCELLING
|
||||||
&& newStatus != JOB_TASK_STATUS_CANCELLED
|
&& newStatus != JOB_TASK_STATUS_CANCELLED
|
||||||
|
@ -77,7 +78,7 @@ _return:
|
||||||
QW_RET(code);
|
QW_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwSetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskStatus *task, int8_t status) {
|
int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int8_t origStatus = 0;
|
int8_t origStatus = 0;
|
||||||
|
|
||||||
|
@ -99,7 +100,7 @@ int32_t qwSetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch) {
|
int32_t qwAddSchedulerImpl(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch) {
|
||||||
SQWSchStatus newSch = {0};
|
SQWSchStatus newSch = {0};
|
||||||
newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
if (NULL == newSch.tasksHash) {
|
if (NULL == newSch.tasksHash) {
|
||||||
|
@ -125,7 +126,7 @@ int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
|
int32_t qwAcquireSchedulerImpl(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
|
||||||
while (true) {
|
while (true) {
|
||||||
QW_LOCK(rwType, &mgmt->schLock);
|
QW_LOCK(rwType, &mgmt->schLock);
|
||||||
*sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
|
*sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
|
||||||
|
@ -152,11 +153,11 @@ int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, u
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwAcquireAddScheduler(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch) {
|
int32_t qwAcquireAddScheduler(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch) {
|
||||||
return qwAcquireSchedulerImpl(QW_FPARAMS(), rwType, sch, QW_NOT_EXIST_ADD);
|
return qwAcquireSchedulerImpl(QW_FPARAMS(), rwType, sch, QW_NOT_EXIST_ADD);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwAcquireScheduler(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch) {
|
int32_t qwAcquireScheduler(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch) {
|
||||||
return qwAcquireSchedulerImpl(QW_FPARAMS(), rwType, sch, QW_NOT_EXIST_RET_ERR);
|
return qwAcquireSchedulerImpl(QW_FPARAMS(), rwType, sch, QW_NOT_EXIST_RET_ERR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,7 +166,7 @@ void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qwAcquireTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
|
int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
|
||||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||||
QW_SET_QTID(id, qId, tId);
|
QW_SET_QTID(id, qId, tId);
|
||||||
|
|
||||||
|
@ -181,7 +182,7 @@ int32_t qwAcquireTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int32_t qwAddTaskStatusImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) {
|
int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||||
|
@ -215,7 +216,7 @@ int32_t qwAddTaskStatusImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwAddTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t status) {
|
int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status) {
|
||||||
SQWSchStatus *tsch = NULL;
|
SQWSchStatus *tsch = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
QW_ERR_RET(qwAcquireAddScheduler(QW_FPARAMS(), QW_READ, &tsch));
|
QW_ERR_RET(qwAcquireAddScheduler(QW_FPARAMS(), QW_READ, &tsch));
|
||||||
|
@ -230,7 +231,7 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qwAddAcquireTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus *sch, int32_t status, SQWTaskStatus **task) {
|
int32_t qwAddAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, int32_t status, SQWTaskStatus **task) {
|
||||||
return qwAddTaskStatusImpl(QW_FPARAMS(), sch, rwType, status, task);
|
return qwAddTaskStatusImpl(QW_FPARAMS(), sch, rwType, status, task);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -240,7 +241,7 @@ void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) {
|
int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
|
||||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||||
QW_SET_QTID(id, qId, tId);
|
QW_SET_QTID(id, qId, tId);
|
||||||
|
|
||||||
|
@ -249,26 +250,26 @@ int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
|
||||||
if (NULL == (*ctx)) {
|
if (NULL == (*ctx)) {
|
||||||
//QW_UNLOCK(rwType, &mgmt->ctxLock);
|
//QW_UNLOCK(rwType, &mgmt->ctxLock);
|
||||||
QW_TASK_ELOG("ctx not in ctxHash, id:%s", id);
|
QW_TASK_ELOG("ctx not in ctxHash, id:%s", id);
|
||||||
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
|
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) {
|
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
|
||||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||||
QW_SET_QTID(id, qId, tId);
|
QW_SET_QTID(id, qId, tId);
|
||||||
|
|
||||||
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
|
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
|
||||||
if (NULL == (*ctx)) {
|
if (NULL == (*ctx)) {
|
||||||
QW_TASK_ELOG("ctx not in ctxHash, ctxHashSize:%d", taosHashGetSize(mgmt->ctxHash));
|
QW_TASK_ELOG("ctx not in ctxHash, ctxHashSize:%d", taosHashGetSize(mgmt->ctxHash));
|
||||||
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
|
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, bool acquire, int32_t status, SQWTaskCtx **ctx) {
|
int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, int32_t status, SQWTaskCtx **ctx) {
|
||||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||||
QW_SET_QTID(id, qId, tId);
|
QW_SET_QTID(id, qId, tId);
|
||||||
|
|
||||||
|
@ -281,7 +282,7 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
|
||||||
|
|
||||||
if (HASH_NODE_EXIST(code)) {
|
if (HASH_NODE_EXIST(code)) {
|
||||||
if (acquire && ctx) {
|
if (acquire && ctx) {
|
||||||
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), rwType, ctx));
|
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
|
||||||
} else if (ctx) {
|
} else if (ctx) {
|
||||||
QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
|
QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
|
||||||
} else {
|
} else {
|
||||||
|
@ -296,7 +297,7 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
|
||||||
//QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
|
//QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
|
||||||
|
|
||||||
if (acquire && ctx) {
|
if (acquire && ctx) {
|
||||||
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), rwType, ctx));
|
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
|
||||||
} else if (ctx) {
|
} else if (ctx) {
|
||||||
QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
|
QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
|
||||||
}
|
}
|
||||||
|
@ -304,17 +305,17 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwAddTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
|
int32_t qwAddTaskCtx(QW_FPARAMS_DEF) {
|
||||||
QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, 0, NULL));
|
QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, 0, NULL));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int32_t qwAddAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) {
|
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
|
||||||
return qwAddTaskCtxImpl(QW_FPARAMS(), true, 0, ctx);
|
return qwAddTaskCtxImpl(QW_FPARAMS(), true, 0, ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwAddGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) {
|
int32_t qwAddGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
|
||||||
return qwAddTaskCtxImpl(QW_FPARAMS(), false, 0, ctx);
|
return qwAddTaskCtxImpl(QW_FPARAMS(), false, 0, ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -356,14 +357,14 @@ void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
||||||
|
|
||||||
|
|
||||||
// Note: NEED CTX HASH LOCKED BEFORE ENTRANCE
|
// Note: NEED CTX HASH LOCKED BEFORE ENTRANCE
|
||||||
int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType) {
|
int32_t qwDropTaskCtx(QW_FPARAMS_DEF, int32_t rwType) {
|
||||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||||
QW_SET_QTID(id, qId, tId);
|
QW_SET_QTID(id, qId, tId);
|
||||||
SQWTaskCtx octx;
|
SQWTaskCtx octx;
|
||||||
|
|
||||||
SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
|
SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
|
||||||
if (NULL == ctx) {
|
if (NULL == ctx) {
|
||||||
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
|
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
||||||
}
|
}
|
||||||
|
|
||||||
octx = *ctx;
|
octx = *ctx;
|
||||||
|
@ -371,13 +372,15 @@ int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t
|
||||||
atomic_store_ptr(&ctx->taskHandle, NULL);
|
atomic_store_ptr(&ctx->taskHandle, NULL);
|
||||||
atomic_store_ptr(&ctx->sinkHandle, NULL);
|
atomic_store_ptr(&ctx->sinkHandle, NULL);
|
||||||
|
|
||||||
|
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP);
|
||||||
|
|
||||||
if (rwType) {
|
if (rwType) {
|
||||||
QW_UNLOCK(rwType, &ctx->lock);
|
QW_UNLOCK(rwType, &ctx->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
|
if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
|
||||||
QW_TASK_ELOG_E("taosHashRemove from ctx hash failed");
|
QW_TASK_ELOG_E("taosHashRemove from ctx hash failed");
|
||||||
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
|
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (octx.taskHandle) {
|
if (octx.taskHandle) {
|
||||||
|
@ -394,7 +397,7 @@ int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qwDropTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
|
int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
|
||||||
SQWSchStatus *sch = NULL;
|
SQWSchStatus *sch = NULL;
|
||||||
SQWTaskStatus *task = NULL;
|
SQWTaskStatus *task = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -429,7 +432,7 @@ _return:
|
||||||
QW_RET(code);
|
QW_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwUpdateTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status) {
|
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) {
|
||||||
SQWSchStatus *sch = NULL;
|
SQWSchStatus *sch = NULL;
|
||||||
SQWTaskStatus *task = NULL;
|
SQWTaskStatus *task = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -447,12 +450,13 @@ _return:
|
||||||
QW_RET(code);
|
QW_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkHandle, int8_t taskType, bool execOnce) {
|
int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkHandle, int8_t taskType, bool shortRun) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
bool qcontinue = true;
|
bool qcontinue = true;
|
||||||
SSDataBlock* pRes = NULL;
|
SSDataBlock* pRes = NULL;
|
||||||
uint64_t useconds = 0;
|
uint64_t useconds = 0;
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
|
int32_t leftRun = QW_DEFAULT_SHORT_RUN_TIMES;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
QW_TASK_DLOG("start to execTask in executor, loopIdx:%d", i++);
|
QW_TASK_DLOG("start to execTask in executor, loopIdx:%d", i++);
|
||||||
|
@ -484,7 +488,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkH
|
||||||
|
|
||||||
QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue);
|
QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue);
|
||||||
|
|
||||||
if (execOnce || (!qcontinue)) {
|
if (!qcontinue) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (shortRun && ((--leftRun) <= 0)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -576,29 +584,35 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
|
||||||
int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int8_t status = 0;
|
int8_t status = 0;
|
||||||
SQWTaskCtx *ctx = NULL;
|
SQWTaskCtx *ctx = NULL;
|
||||||
bool locked = false;
|
bool locked = false;
|
||||||
void *readyConnection = NULL;
|
|
||||||
void *dropConnection = NULL;
|
void *dropConnection = NULL;
|
||||||
void *cancelConnection = NULL;
|
void *cancelConnection = NULL;
|
||||||
|
|
||||||
QW_SCH_TASK_DLOG("start to handle event at phase %d", phase);
|
QW_SCH_TASK_DLOG("start to handle event at phase %d", phase);
|
||||||
|
|
||||||
|
output->needStop = false;
|
||||||
|
|
||||||
|
if (QW_PHASE_PRE_QUERY == phase) {
|
||||||
|
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
||||||
|
} else {
|
||||||
|
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
||||||
|
}
|
||||||
|
|
||||||
|
QW_LOCK(QW_WRITE, &ctx->lock);
|
||||||
|
locked = true;
|
||||||
|
|
||||||
|
atomic_store_32(&ctx->phase, phase);
|
||||||
|
|
||||||
switch (phase) {
|
switch (phase) {
|
||||||
case QW_PHASE_PRE_QUERY: {
|
case QW_PHASE_PRE_QUERY: {
|
||||||
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
|
||||||
|
|
||||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
|
||||||
|
|
||||||
atomic_store_32(&ctx->phase, phase);
|
|
||||||
atomic_store_8(&ctx->taskType, input->taskType);
|
atomic_store_8(&ctx->taskType, input->taskType);
|
||||||
|
|
||||||
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
|
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
||||||
QW_TASK_ELOG("task already cancelled at wrong phase, phase:%d", phase);
|
QW_TASK_ELOG("task already cancelled/dropped at wrong phase, phase:%d", phase);
|
||||||
|
|
||||||
output->needStop = true;
|
output->needStop = true;
|
||||||
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
|
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
|
||||||
|
@ -611,18 +625,27 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
|
||||||
|
|
||||||
output->needStop = true;
|
output->needStop = true;
|
||||||
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
|
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
|
||||||
|
QW_SET_RSP_CODE(ctx, output->rspCode);
|
||||||
dropConnection = ctx->dropConnection;
|
dropConnection = ctx->dropConnection;
|
||||||
|
|
||||||
// Note: ctx freed, no need to unlock it
|
// Note: ctx freed, no need to unlock it
|
||||||
locked = false;
|
locked = false;
|
||||||
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
|
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
|
||||||
output->needStop = true;
|
|
||||||
|
|
||||||
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
|
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
|
||||||
|
|
||||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
|
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
|
||||||
|
output->needStop = true;
|
||||||
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
|
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
|
||||||
|
QW_SET_RSP_CODE(ctx, output->rspCode);
|
||||||
|
|
||||||
|
cancelConnection = ctx->cancelConnection;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ctx->rspCode) {
|
||||||
|
QW_TASK_ELOG("task already failed at wrong phase, code:%x, phase:%d", ctx->rspCode, phase);
|
||||||
|
output->needStop = true;
|
||||||
|
output->rspCode = ctx->rspCode;
|
||||||
|
QW_ERR_JRET(output->rspCode);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!output->needStop) {
|
if (!output->needStop) {
|
||||||
|
@ -630,64 +653,55 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QW_PHASE_POST_QUERY: {
|
case QW_PHASE_PRE_FETCH: {
|
||||||
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
||||||
|
QW_TASK_WLOG("task already dropped, phase:%d", phase);
|
||||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
output->needStop = true;
|
||||||
|
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
|
||||||
locked = true;
|
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
|
||||||
|
|
||||||
ctx->taskHandle = input->taskHandle;
|
|
||||||
ctx->sinkHandle = input->sinkHandle;
|
|
||||||
|
|
||||||
if (NULL == ctx->taskHandle && NULL == ctx->sinkHandle) {
|
|
||||||
ctx->emptyRes = true;
|
|
||||||
}
|
}
|
||||||
|
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
|
||||||
if (input->code) {
|
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
|
||||||
output->rspCode = input->code;
|
output->needStop = true;
|
||||||
|
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
|
||||||
|
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
|
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
|
||||||
|
QW_TASK_ELOG("drop event at wrong phase, phase:%d", phase);
|
||||||
output->needStop = true;
|
output->needStop = true;
|
||||||
|
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
|
||||||
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
|
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
|
||||||
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
|
|
||||||
|
|
||||||
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
|
|
||||||
dropConnection = ctx->dropConnection;
|
|
||||||
|
|
||||||
// Note: ctx freed, no need to unlock it
|
|
||||||
locked = false;
|
|
||||||
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
|
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
|
||||||
|
QW_TASK_ELOG("cancel event at wrong phase, phase:%d", phase);
|
||||||
output->needStop = true;
|
output->needStop = true;
|
||||||
|
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
|
||||||
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
|
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
|
||||||
qwFreeTask(QW_FPARAMS(), ctx);
|
|
||||||
|
|
||||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
|
|
||||||
|
|
||||||
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
|
|
||||||
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
|
|
||||||
readyConnection = ctx->readyConnection;
|
|
||||||
|
|
||||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!output->needStop) {
|
if (ctx->rspCode) {
|
||||||
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), input->taskStatus));
|
QW_TASK_ELOG("task already failed, code:%x, phase:%d", ctx->rspCode, phase);
|
||||||
|
output->needStop = true;
|
||||||
|
output->rspCode = ctx->rspCode;
|
||||||
|
QW_ERR_JRET(output->rspCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
||||||
|
QW_TASK_WLOG("last fetch not finished, phase:%d", phase);
|
||||||
|
output->needStop = true;
|
||||||
|
output->rspCode = TSDB_CODE_QRY_DUPLICATTED_OPERATION;
|
||||||
|
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) {
|
||||||
|
QW_TASK_ELOG("query rsp are not ready, phase:%d", phase);
|
||||||
|
output->needStop = true;
|
||||||
|
output->rspCode = TSDB_CODE_QRY_TASK_MSG_ERROR;
|
||||||
|
QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QW_PHASE_PRE_FETCH: {
|
case QW_PHASE_PRE_CQUERY: {
|
||||||
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
|
|
||||||
|
|
||||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
|
||||||
|
|
||||||
locked = true;
|
|
||||||
|
|
||||||
atomic_store_32(&ctx->phase, phase);
|
|
||||||
|
|
||||||
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
|
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
|
||||||
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
|
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
|
||||||
output->needStop = true;
|
output->needStop = true;
|
||||||
|
@ -714,20 +728,6 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
|
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
|
||||||
QW_TASK_WLOG("last fetch not finished, phase:%d", phase);
|
|
||||||
output->needStop = true;
|
|
||||||
output->rspCode = TSDB_CODE_QRY_DUPLICATTED_OPERATION;
|
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) {
|
|
||||||
QW_TASK_ELOG("query rsp are not ready, phase:%d", phase);
|
|
||||||
output->needStop = true;
|
|
||||||
output->rspCode = TSDB_CODE_QRY_TASK_MSG_ERROR;
|
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ctx->rspCode) {
|
if (ctx->rspCode) {
|
||||||
QW_TASK_ELOG("task already failed, code:%x, phase:%d", ctx->rspCode, phase);
|
QW_TASK_ELOG("task already failed, code:%x, phase:%d", ctx->rspCode, phase);
|
||||||
output->needStop = true;
|
output->needStop = true;
|
||||||
|
@ -736,160 +736,156 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QW_PHASE_POST_FETCH: {
|
|
||||||
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
|
||||||
|
|
||||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
|
||||||
|
|
||||||
locked = true;
|
|
||||||
|
|
||||||
if (input->code) {
|
|
||||||
output->rspCode = input->code;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
|
|
||||||
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
|
|
||||||
output->needStop = true;
|
|
||||||
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
|
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
|
|
||||||
QW_TASK_WLOG("start to drop task, phase:%d", phase);
|
|
||||||
output->needStop = true;
|
|
||||||
|
|
||||||
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
|
|
||||||
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
|
|
||||||
|
|
||||||
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
|
|
||||||
dropConnection = ctx->dropConnection;
|
|
||||||
|
|
||||||
// Note: ctx freed, no need to unlock it
|
|
||||||
locked = false;
|
|
||||||
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
|
|
||||||
QW_TASK_WLOG("start to cancel task, phase:%d", phase);
|
|
||||||
output->needStop = true;
|
|
||||||
|
|
||||||
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
|
|
||||||
|
|
||||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
|
|
||||||
|
|
||||||
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
|
|
||||||
cancelConnection = ctx->cancelConnection;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ctx->rspCode) {
|
|
||||||
QW_TASK_ELOG("task failed, code:%x, phase:%d", ctx->rspCode, phase);
|
|
||||||
output->needStop = true;
|
|
||||||
output->rspCode = ctx->rspCode;
|
|
||||||
QW_ERR_JRET(output->rspCode);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case QW_PHASE_PRE_CQUERY: {
|
|
||||||
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
|
|
||||||
|
|
||||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
|
||||||
|
|
||||||
locked = true;
|
|
||||||
|
|
||||||
atomic_store_32(&ctx->phase, phase);
|
|
||||||
|
|
||||||
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
|
|
||||||
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
|
|
||||||
output->needStop = true;
|
|
||||||
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
|
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
|
|
||||||
QW_TASK_ELOG("drop event at wrong phase, phase:%d", phase);
|
|
||||||
output->needStop = true;
|
|
||||||
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
|
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
|
|
||||||
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
|
|
||||||
QW_TASK_ELOG("cancel event at wrong phase, phase:%d", phase);
|
|
||||||
output->needStop = true;
|
|
||||||
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
|
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ctx->rspCode) {
|
|
||||||
QW_TASK_ELOG("task already failed, code:%x, phase:%d", ctx->rspCode, phase);
|
|
||||||
output->needStop = true;
|
|
||||||
output->rspCode = ctx->rspCode;
|
|
||||||
QW_ERR_JRET(output->rspCode);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case QW_PHASE_POST_CQUERY: {
|
|
||||||
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
|
||||||
|
|
||||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
|
||||||
|
|
||||||
locked = true;
|
|
||||||
|
|
||||||
if (input->code) {
|
|
||||||
output->rspCode = input->code;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
|
|
||||||
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
|
|
||||||
output->needStop = true;
|
|
||||||
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
|
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
|
|
||||||
QW_TASK_WLOG("start to drop task, phase:%d", phase);
|
|
||||||
output->needStop = true;
|
|
||||||
|
|
||||||
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
|
|
||||||
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
|
|
||||||
|
|
||||||
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
|
|
||||||
dropConnection = ctx->dropConnection;
|
|
||||||
|
|
||||||
// Note: ctx freed, no need to unlock it
|
|
||||||
locked = false;
|
|
||||||
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
|
|
||||||
QW_TASK_WLOG("start to cancel task, phase:%d", phase);
|
|
||||||
output->needStop = true;
|
|
||||||
|
|
||||||
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
|
|
||||||
|
|
||||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
|
|
||||||
|
|
||||||
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
|
|
||||||
cancelConnection = ctx->cancelConnection;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ctx->rspCode) {
|
|
||||||
QW_TASK_ELOG("task failed, code:%x, phase:%d", ctx->rspCode, phase);
|
|
||||||
output->needStop = true;
|
|
||||||
output->rspCode = ctx->rspCode;
|
|
||||||
QW_ERR_JRET(output->rspCode);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
if (output->rspCode) {
|
if (ctx) {
|
||||||
QW_SET_RSP_CODE(ctx, output->rspCode);
|
if (output->rspCode) {
|
||||||
|
QW_UPDATE_RSP_CODE(ctx, output->rspCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (locked) {
|
||||||
|
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
qwReleaseTaskCtx(mgmt, ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (locked) {
|
if (code) {
|
||||||
atomic_store_32(&ctx->phase, phase);
|
output->needStop = true;
|
||||||
|
if (TSDB_CODE_SUCCESS == output->rspCode) {
|
||||||
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
output->rspCode = code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (dropConnection) {
|
||||||
|
qwBuildAndSendDropRsp(dropConnection, output->rspCode);
|
||||||
|
QW_TASK_DLOG("drop msg rsped, code:%x", output->rspCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cancelConnection) {
|
||||||
|
qwBuildAndSendCancelRsp(cancelConnection, output->rspCode);
|
||||||
|
QW_TASK_DLOG("cancel msg rsped, code:%x", output->rspCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
QW_SCH_TASK_DLOG("end to handle event at phase %d", phase);
|
||||||
|
|
||||||
|
QW_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int8_t status = 0;
|
||||||
|
SQWTaskCtx *ctx = NULL;
|
||||||
|
bool locked = false;
|
||||||
|
void *readyConnection = NULL;
|
||||||
|
void *dropConnection = NULL;
|
||||||
|
void *cancelConnection = NULL;
|
||||||
|
|
||||||
|
QW_SCH_TASK_DLOG("start to handle event at phase %d", phase);
|
||||||
|
|
||||||
|
output->needStop = false;
|
||||||
|
|
||||||
|
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
||||||
|
|
||||||
|
QW_LOCK(QW_WRITE, &ctx->lock);
|
||||||
|
locked = true;
|
||||||
|
|
||||||
|
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
||||||
|
QW_TASK_WLOG("task already dropped, phase:%d", phase);
|
||||||
|
output->needStop = true;
|
||||||
|
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
|
||||||
|
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
|
||||||
|
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
|
||||||
|
output->needStop = true;
|
||||||
|
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
|
||||||
|
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (input->code) {
|
||||||
|
output->rspCode = input->code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (QW_PHASE_POST_QUERY == phase) {
|
||||||
|
ctx->taskHandle = input->taskHandle;
|
||||||
|
ctx->sinkHandle = input->sinkHandle;
|
||||||
|
|
||||||
|
if (NULL == ctx->taskHandle && NULL == ctx->sinkHandle) {
|
||||||
|
ctx->emptyRes = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
|
||||||
|
readyConnection = ctx->readyConnection;
|
||||||
|
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
|
||||||
|
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
|
||||||
|
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
|
||||||
|
|
||||||
|
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
|
||||||
|
output->needStop = true;
|
||||||
|
QW_SET_RSP_CODE(ctx, output->rspCode);
|
||||||
|
dropConnection = ctx->dropConnection;
|
||||||
|
|
||||||
|
// Note: ctx freed, no need to unlock it
|
||||||
|
locked = false;
|
||||||
|
|
||||||
|
QW_ERR_JRET(output->rspCode);
|
||||||
|
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
|
||||||
|
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
|
||||||
|
qwFreeTask(QW_FPARAMS(), ctx);
|
||||||
|
|
||||||
|
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
|
||||||
|
|
||||||
|
output->needStop = true;
|
||||||
|
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
|
||||||
|
QW_SET_RSP_CODE(ctx, output->rspCode);
|
||||||
|
cancelConnection = ctx->cancelConnection;
|
||||||
|
|
||||||
|
QW_ERR_JRET(output->rspCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ctx->rspCode) {
|
||||||
|
QW_TASK_ELOG("task failed, code:%x, phase:%d", ctx->rspCode, phase);
|
||||||
|
output->needStop = true;
|
||||||
|
output->rspCode = ctx->rspCode;
|
||||||
|
QW_ERR_JRET(output->rspCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (QW_PHASE_POST_QUERY == phase && (!output->needStop)) {
|
||||||
|
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), input->taskStatus));
|
||||||
|
}
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
if (ctx) {
|
if (ctx) {
|
||||||
|
if (output->rspCode) {
|
||||||
|
QW_UPDATE_RSP_CODE(ctx, output->rspCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic_store_32(&ctx->phase, phase);
|
||||||
|
|
||||||
|
if (locked) {
|
||||||
|
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
||||||
|
}
|
||||||
|
|
||||||
qwReleaseTaskCtx(mgmt, ctx);
|
qwReleaseTaskCtx(mgmt, ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (code) {
|
||||||
|
output->needStop = true;
|
||||||
|
if (TSDB_CODE_SUCCESS == output->rspCode) {
|
||||||
|
output->rspCode = code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (readyConnection) {
|
if (readyConnection) {
|
||||||
qwBuildAndSendReadyRsp(readyConnection, output->rspCode);
|
qwBuildAndSendReadyRsp(readyConnection, output->rspCode);
|
||||||
QW_TASK_DLOG("ready msg rsped, code:%x", output->rspCode);
|
QW_TASK_DLOG("ready msg rsped, code:%x", output->rspCode);
|
||||||
|
@ -924,7 +920,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
|
||||||
|
|
||||||
input.taskType = taskType;
|
input.taskType = taskType;
|
||||||
|
|
||||||
QW_ERR_JRET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, &output));
|
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, &output));
|
||||||
|
|
||||||
needStop = output.needStop;
|
needStop = output.needStop;
|
||||||
code = output.rspCode;
|
code = output.rspCode;
|
||||||
|
@ -973,21 +969,17 @@ _return:
|
||||||
QW_TASK_DLOG("query msg rsped, code:%x", rspCode);
|
QW_TASK_DLOG("query msg rsped, code:%x", rspCode);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (needStop) {
|
|
||||||
QW_RET(rspCode);
|
|
||||||
}
|
|
||||||
|
|
||||||
input.code = rspCode;
|
input.code = rspCode;
|
||||||
input.taskHandle = pTaskInfo;
|
input.taskHandle = pTaskInfo;
|
||||||
input.sinkHandle = sinkHandle;
|
input.sinkHandle = sinkHandle;
|
||||||
input.taskStatus = rspCode ? JOB_TASK_STATUS_FAILED : JOB_TASK_STATUS_PARTIAL_SUCCEED;
|
input.taskStatus = rspCode ? JOB_TASK_STATUS_FAILED : JOB_TASK_STATUS_PARTIAL_SUCCEED;
|
||||||
|
|
||||||
QW_ERR_RET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, &output));
|
QW_ERR_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, &output));
|
||||||
|
|
||||||
QW_RET(rspCode);
|
QW_RET(rspCode);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) {
|
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SQWTaskCtx *ctx = NULL;
|
SQWTaskCtx *ctx = NULL;
|
||||||
int8_t phase = 0;
|
int8_t phase = 0;
|
||||||
|
@ -998,6 +990,12 @@ int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
|
||||||
|
|
||||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
QW_LOCK(QW_WRITE, &ctx->lock);
|
||||||
|
|
||||||
|
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) ||
|
||||||
|
QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
|
||||||
|
QW_TASK_WLOG("task already cancelled/dropped, phase:%d", phase);
|
||||||
|
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
|
||||||
|
}
|
||||||
|
|
||||||
phase = QW_GET_PHASE(ctx);
|
phase = QW_GET_PHASE(ctx);
|
||||||
|
|
||||||
if (phase == QW_PHASE_PRE_QUERY) {
|
if (phase == QW_PHASE_PRE_QUERY) {
|
||||||
|
@ -1019,7 +1017,7 @@ int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
if (code && ctx) {
|
if (code && ctx) {
|
||||||
QW_SET_RSP_CODE(ctx, code);
|
QW_UPDATE_RSP_CODE(ctx, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ctx) {
|
if (ctx) {
|
||||||
|
@ -1036,7 +1034,7 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) {
|
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
SQWTaskCtx *ctx = NULL;
|
SQWTaskCtx *ctx = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
bool queryRsped = false;
|
bool queryRsped = false;
|
||||||
|
@ -1048,7 +1046,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
QW_ERR_JRET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, &output));
|
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, &output));
|
||||||
|
|
||||||
needStop = output.needStop;
|
needStop = output.needStop;
|
||||||
code = output.rspCode;
|
code = output.rspCode;
|
||||||
|
@ -1100,7 +1098,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
|
||||||
}
|
}
|
||||||
|
|
||||||
input.code = code;
|
input.code = code;
|
||||||
qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, &output);
|
qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, &output);
|
||||||
|
|
||||||
needStop = output.needStop;
|
needStop = output.needStop;
|
||||||
code = output.rspCode;
|
code = output.rspCode;
|
||||||
|
@ -1110,7 +1108,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) {
|
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t needRsp = true;
|
int32_t needRsp = true;
|
||||||
void *data = NULL;
|
void *data = NULL;
|
||||||
|
@ -1126,7 +1124,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
|
||||||
SQWPhaseInput input = {0};
|
SQWPhaseInput input = {0};
|
||||||
SQWPhaseOutput output = {0};
|
SQWPhaseOutput output = {0};
|
||||||
|
|
||||||
QW_ERR_JRET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, &output));
|
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, &output));
|
||||||
|
|
||||||
needStop = output.needStop;
|
needStop = output.needStop;
|
||||||
code = output.rspCode;
|
code = output.rspCode;
|
||||||
|
@ -1154,7 +1152,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
|
||||||
locked = true;
|
locked = true;
|
||||||
|
|
||||||
// RC WARNING
|
// RC WARNING
|
||||||
if (QW_IN_EXECUTOR(ctx)) {
|
if (QW_IS_QUERY_RUNNING(ctx)) {
|
||||||
atomic_store_8(&ctx->queryContinue, 1);
|
atomic_store_8(&ctx->queryContinue, 1);
|
||||||
} else if (0 == atomic_load_8(&ctx->queryInQueue)) {
|
} else if (0 == atomic_load_8(&ctx->queryInQueue)) {
|
||||||
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
|
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
|
||||||
|
@ -1162,6 +1160,8 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
|
||||||
atomic_store_8(&ctx->queryInQueue, 1);
|
atomic_store_8(&ctx->queryInQueue, 1);
|
||||||
|
|
||||||
QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), qwMsg->connection));
|
QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), qwMsg->connection));
|
||||||
|
|
||||||
|
QW_TASK_DLOG("schedule query in queue, phase:%d", ctx->phase);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1173,7 +1173,11 @@ _return:
|
||||||
|
|
||||||
input.code = code;
|
input.code = code;
|
||||||
|
|
||||||
qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, &output);
|
qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, &output);
|
||||||
|
|
||||||
|
if (output.rspCode) {
|
||||||
|
code = output.rspCode;
|
||||||
|
}
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
qwFreeFetchRsp(rsp);
|
qwFreeFetchRsp(rsp);
|
||||||
|
@ -1190,7 +1194,7 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) {
|
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
bool needRsp = false;
|
bool needRsp = false;
|
||||||
SQWTaskCtx *ctx = NULL;
|
SQWTaskCtx *ctx = NULL;
|
||||||
|
@ -1207,7 +1211,7 @@ int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
|
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (QW_IN_EXECUTOR(ctx)) {
|
if (QW_IS_QUERY_RUNNING(ctx)) {
|
||||||
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
|
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
|
||||||
|
|
||||||
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING));
|
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING));
|
||||||
|
@ -1217,8 +1221,12 @@ int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t
|
||||||
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
|
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
|
||||||
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
|
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
|
||||||
|
|
||||||
|
QW_SET_RSP_CODE(ctx, TSDB_CODE_QRY_TASK_DROPPED);
|
||||||
|
|
||||||
locked = false;
|
locked = false;
|
||||||
needRsp = true;
|
needRsp = true;
|
||||||
|
|
||||||
|
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!needRsp) {
|
if (!needRsp) {
|
||||||
|
@ -1228,7 +1236,7 @@ int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
QW_SET_RSP_CODE(ctx, code);
|
QW_UPDATE_RSP_CODE(ctx, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ctx) {
|
if (ctx) {
|
||||||
|
@ -1259,18 +1267,18 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
|
||||||
if (cfg) {
|
if (cfg) {
|
||||||
mgmt->cfg = *cfg;
|
mgmt->cfg = *cfg;
|
||||||
if (0 == mgmt->cfg.maxSchedulerNum) {
|
if (0 == mgmt->cfg.maxSchedulerNum) {
|
||||||
mgmt->cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER;
|
mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
|
||||||
}
|
}
|
||||||
if (0 == mgmt->cfg.maxTaskNum) {
|
if (0 == mgmt->cfg.maxTaskNum) {
|
||||||
mgmt->cfg.maxTaskNum = QWORKER_DEFAULT_TASK_NUMBER;
|
mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
|
||||||
}
|
}
|
||||||
if (0 == mgmt->cfg.maxSchTaskNum) {
|
if (0 == mgmt->cfg.maxSchTaskNum) {
|
||||||
mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER;
|
mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
mgmt->cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER;
|
mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
|
||||||
mgmt->cfg.maxTaskNum = QWORKER_DEFAULT_TASK_NUMBER;
|
mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
|
||||||
mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER;
|
mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
|
mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
|
||||||
|
|
|
@ -306,15 +306,11 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||||
SQWTaskCtx *handles = NULL;
|
SQWTaskCtx *handles = NULL;
|
||||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
|
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
|
||||||
|
|
||||||
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
|
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||||
QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
msg->sId = be64toh(msg->sId);
|
|
||||||
msg->queryId = be64toh(msg->queryId);
|
|
||||||
msg->taskId = be64toh(msg->taskId);
|
|
||||||
|
|
||||||
uint64_t sId = msg->sId;
|
uint64_t sId = msg->sId;
|
||||||
uint64_t qId = msg->queryId;
|
uint64_t qId = msg->queryId;
|
||||||
uint64_t tId = msg->taskId;
|
uint64_t tId = msg->taskId;
|
||||||
|
@ -335,14 +331,13 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
|
||||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
|
||||||
SResReadyReq *msg = pMsg->pCont;
|
SResReadyReq *msg = pMsg->pCont;
|
||||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||||
QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
|
|
||||||
|
|
||||||
msg->sId = be64toh(msg->sId);
|
msg->sId = be64toh(msg->sId);
|
||||||
msg->queryId = be64toh(msg->queryId);
|
msg->queryId = be64toh(msg->queryId);
|
||||||
msg->taskId = be64toh(msg->taskId);
|
msg->taskId = be64toh(msg->taskId);
|
||||||
|
|
|
@ -38,6 +38,10 @@
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
|
#define qwtTestQueryQueueSize 1000
|
||||||
|
#define qwtTestFetchQueueSize 1000
|
||||||
|
#define qwtTestMaxExecTaskUsec 1000000
|
||||||
|
|
||||||
bool qwtTestEnableSleep = true;
|
bool qwtTestEnableSleep = true;
|
||||||
bool qwtTestStop = false;
|
bool qwtTestStop = false;
|
||||||
bool qwtTestDeadLoop = true;
|
bool qwtTestDeadLoop = true;
|
||||||
|
@ -45,6 +49,36 @@ int32_t qwtTestMTRunSec = 10;
|
||||||
int32_t qwtTestPrintNum = 100000;
|
int32_t qwtTestPrintNum = 100000;
|
||||||
int32_t qwtTestCaseIdx = 0;
|
int32_t qwtTestCaseIdx = 0;
|
||||||
int32_t qwtTestCaseNum = 4;
|
int32_t qwtTestCaseNum = 4;
|
||||||
|
bool qwtTestCaseFinished = false;
|
||||||
|
tsem_t qwtTestQuerySem;
|
||||||
|
tsem_t qwtTestFetchSem;
|
||||||
|
|
||||||
|
int32_t qwtTestQueryQueueRIdx = 0;
|
||||||
|
int32_t qwtTestQueryQueueWIdx = 0;
|
||||||
|
int32_t qwtTestQueryQueueNum = 0;
|
||||||
|
SRWLatch qwtTestQueryQueueLock = 0;
|
||||||
|
struct SRpcMsg *qwtTestQueryQueue[qwtTestQueryQueueSize] = {0};
|
||||||
|
|
||||||
|
int32_t qwtTestFetchQueueRIdx = 0;
|
||||||
|
int32_t qwtTestFetchQueueWIdx = 0;
|
||||||
|
int32_t qwtTestFetchQueueNum = 0;
|
||||||
|
SRWLatch qwtTestFetchQueueLock = 0;
|
||||||
|
struct SRpcMsg *qwtTestFetchQueue[qwtTestFetchQueueSize] = {0};
|
||||||
|
|
||||||
|
|
||||||
|
int32_t qwtTestSinkBlockNum = 0;
|
||||||
|
int32_t qwtTestSinkMaxBlockNum = 0;
|
||||||
|
bool qwtTestSinkQueryEnd = false;
|
||||||
|
SRWLatch qwtTestSinkLock = 0;
|
||||||
|
|
||||||
|
|
||||||
|
SRpcMsg qwtfetchRpc = {0};
|
||||||
|
SResFetchReq qwtfetchMsg = {0};
|
||||||
|
SRpcMsg qwtreadyRpc = {0};
|
||||||
|
SResReadyReq qwtreadyMsg = {0};
|
||||||
|
SRpcMsg qwtdropRpc = {0};
|
||||||
|
STaskDropReq qwtdropMsg = {0};
|
||||||
|
|
||||||
|
|
||||||
void qwtInitLogFile() {
|
void qwtInitLogFile() {
|
||||||
const char *defaultLogFileNamePrefix = "taosdlog";
|
const char *defaultLogFileNamePrefix = "taosdlog";
|
||||||
|
@ -103,30 +137,112 @@ void qwtBuildStatusReqMsg(SSchTasksStatusReq *statusMsg, SRpcMsg *statusRpc) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwtStringToPlan(const char* str, SSubplan** subplan) {
|
int32_t qwtStringToPlan(const char* str, SSubplan** subplan) {
|
||||||
|
*subplan = 0x1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwtPutReqToQueue(void *node, struct SRpcMsg *pMsg) {
|
int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
|
||||||
|
taosWLockLatch(&qwtTestFetchQueueLock);
|
||||||
|
qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = pMsg;
|
||||||
|
if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) {
|
||||||
|
qwtTestFetchQueueWIdx = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
qwtTestFetchQueueNum++;
|
||||||
|
|
||||||
|
if (qwtTestFetchQueueWIdx == qwtTestFetchQueueRIdx) {
|
||||||
|
printf("Fetch queue is full");
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&qwtTestFetchQueueLock);
|
||||||
|
|
||||||
|
tsem_post(&qwtTestFetchSem);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t qwtPutReqToQueue(void *node, struct SRpcMsg *pMsg) {
|
||||||
|
taosWLockLatch(&qwtTestQueryQueueLock);
|
||||||
|
qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = pMsg;
|
||||||
|
if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) {
|
||||||
|
qwtTestQueryQueueWIdx = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
qwtTestQueryQueueNum++;
|
||||||
|
|
||||||
|
if (qwtTestQueryQueueWIdx == qwtTestQueryQueueRIdx) {
|
||||||
|
printf("query queue is full");
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&qwtTestQueryQueueLock);
|
||||||
|
|
||||||
|
tsem_post(&qwtTestQuerySem);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
|
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
|
||||||
/*
|
|
||||||
if (TDMT_VND_TASKS_STATUS_RSP == pRsp->msgType) {
|
switch (pRsp->msgType) {
|
||||||
SSchedulerStatusRsp *rsp = (SSchedulerStatusRsp *)pRsp->pCont;
|
case TDMT_VND_QUERY_RSP: {
|
||||||
printf("task num:%d\n", rsp->num);
|
SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont;
|
||||||
for (int32_t i = 0; i < rsp->num; ++i) {
|
|
||||||
STaskStatus *task = &rsp->status[i];
|
if (0 == pRsp->code) {
|
||||||
printf("qId:%"PRIx64",tId:%"PRIx64",status:%d\n", task->queryId, task->taskId, task->status);
|
qwtBuildReadyReqMsg(&qwtreadyMsg, &qwtreadyRpc);
|
||||||
|
qwtPutReqToFetchQueue(0x1, &qwtreadyRpc);
|
||||||
|
} else {
|
||||||
|
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
|
||||||
|
qwtPutReqToFetchQueue(0x1, &qwtdropRpc);
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TDMT_VND_RES_READY_RSP: {
|
||||||
|
SResReadyRsp *rsp = (SResReadyRsp *)pRsp->pCont;
|
||||||
|
|
||||||
|
if (0 == pRsp->code) {
|
||||||
|
qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc);
|
||||||
|
qwtPutReqToFetchQueue(0x1, &qwtfetchRpc);
|
||||||
|
} else {
|
||||||
|
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
|
||||||
|
qwtPutReqToFetchQueue(0x1, &qwtdropRpc);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TDMT_VND_FETCH_RSP: {
|
||||||
|
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)pRsp->pCont;
|
||||||
|
|
||||||
|
if (0 == pRsp->code && 0 == rsp->completed) {
|
||||||
|
qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc);
|
||||||
|
qwtPutReqToFetchQueue(0x1, &qwtfetchRpc);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
|
||||||
|
qwtPutReqToFetchQueue(0x1, &qwtdropRpc);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TDMT_VND_DROP_TASK: {
|
||||||
|
STaskDropRsp *rsp = (STaskDropRsp *)pRsp->pCont;
|
||||||
|
|
||||||
|
qwtTestCaseFinished = true;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
|
int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
|
||||||
int32_t idx = qwtTestCaseIdx % qwtTestCaseNum;
|
int32_t idx = abs((++qwtTestCaseIdx) % qwtTestCaseNum);
|
||||||
|
|
||||||
|
qwtTestSinkBlockNum = 0;
|
||||||
|
qwtTestSinkMaxBlockNum = rand() % 100 + 1;
|
||||||
|
qwtTestSinkQueryEnd = false;
|
||||||
|
|
||||||
if (0 == idx) {
|
if (0 == idx) {
|
||||||
*pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx;
|
*pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx;
|
||||||
|
@ -141,13 +257,30 @@ int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTas
|
||||||
*pTaskInfo = NULL;
|
*pTaskInfo = NULL;
|
||||||
*handle = (DataSinkHandle)qwtTestCaseIdx;
|
*handle = (DataSinkHandle)qwtTestCaseIdx;
|
||||||
}
|
}
|
||||||
|
|
||||||
++qwtTestCaseIdx;
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
||||||
|
int32_t endExec = 0;
|
||||||
|
|
||||||
|
if (NULL == tinfo) {
|
||||||
|
*pRes = NULL;
|
||||||
|
*useconds = 0;
|
||||||
|
} else {
|
||||||
|
endExec = rand() % 5;
|
||||||
|
|
||||||
|
if (endExec) {
|
||||||
|
usleep(rand() % qwtTestMaxExecTaskUsec);
|
||||||
|
|
||||||
|
*pRes = (SSDataBlock*)0x1;
|
||||||
|
} else {
|
||||||
|
*pRes = NULL;
|
||||||
|
usleep(rand() % qwtTestMaxExecTaskUsec);
|
||||||
|
*useconds = rand() % 10;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -156,21 +289,85 @@ int32_t qwtKillTask(qTaskInfo_t qinfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void qwtDestroyTask(qTaskInfo_t qHandle) {
|
void qwtDestroyTask(qTaskInfo_t qHandle) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qwtPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) {
|
int32_t qwtPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) {
|
||||||
|
if (NULL == handle || NULL == pInput || NULL == pContinue) {
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosWLockLatch(&qwtTestSinkLock);
|
||||||
|
|
||||||
|
qwtTestSinkBlockNum++;
|
||||||
|
|
||||||
|
if (qwtTestSinkBlockNum >= qwtTestSinkMaxBlockNum) {
|
||||||
|
*pContinue = true;
|
||||||
|
} else {
|
||||||
|
*pContinue = false;
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&qwtTestSinkLock);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void qwtEndPut(DataSinkHandle handle, uint64_t useconds) {
|
void qwtEndPut(DataSinkHandle handle, uint64_t useconds) {
|
||||||
|
if (NULL == handle) {
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
qwtTestSinkQueryEnd = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void qwtGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) {
|
void qwtGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) {
|
||||||
|
static int32_t in = 0;
|
||||||
|
|
||||||
|
if (in > 0) {
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic_add_fetch_32(&in, 1);
|
||||||
|
|
||||||
|
if (NULL == handle) {
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosWLockLatch(&qwtTestSinkLock);
|
||||||
|
if (qwtTestSinkBlockNum > 0) {
|
||||||
|
*pLen = rand() % 100 + 1;
|
||||||
|
qwtTestSinkBlockNum--;
|
||||||
|
} else {
|
||||||
|
*pLen = 0;
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&qwtTestSinkLock);
|
||||||
|
|
||||||
|
*pQueryEnd = qwtTestSinkQueryEnd;
|
||||||
|
|
||||||
|
atomic_sub_fetch_32(&in, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
|
int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
|
||||||
|
taosWLockLatch(&qwtTestSinkLock);
|
||||||
|
if (qwtTestSinkBlockNum > 0) {
|
||||||
|
qwtTestSinkBlockNum--;
|
||||||
|
pOutput->numOfRows = rand() % 10 + 1;
|
||||||
|
pOutput->compressed = 1;
|
||||||
|
pOutput->pData = malloc(pOutput->numOfRows);
|
||||||
|
pOutput->queryEnd = qwtTestSinkQueryEnd;
|
||||||
|
if (qwtTestSinkBlockNum == 0) {
|
||||||
|
pOutput->bufStatus = DS_BUF_EMPTY;
|
||||||
|
} else if (qwtTestSinkBlockNum <= qwtTestSinkMaxBlockNum*0.5) {
|
||||||
|
pOutput->bufStatus = DS_BUF_LOW;
|
||||||
|
} else {
|
||||||
|
pOutput->bufStatus = DS_BUF_FULL;
|
||||||
|
}
|
||||||
|
pOutput->useconds = rand() % 10 + 1;
|
||||||
|
pOutput->precision = 1;
|
||||||
|
} else {
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&qwtTestSinkLock);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -438,20 +635,31 @@ void *statusThread(void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void *controlThread(void *param) {
|
void *clientThread(void *param) {
|
||||||
SRpcMsg queryRpc = {0};
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
uint32_t n = 0;
|
uint32_t n = 0;
|
||||||
void *mockPointer = (void *)0x1;
|
|
||||||
void *mgmt = param;
|
void *mgmt = param;
|
||||||
|
void *mockPointer = (void *)0x1;
|
||||||
|
SRpcMsg queryRpc = {0};
|
||||||
|
|
||||||
|
sleep(1);
|
||||||
|
|
||||||
while (!qwtTestStop) {
|
while (!qwtTestStop) {
|
||||||
|
qwtTestCaseFinished = false;
|
||||||
|
|
||||||
qwtBuildQueryReqMsg(&queryRpc);
|
qwtBuildQueryReqMsg(&queryRpc);
|
||||||
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
|
qwtPutReqToQueue(0x1, &queryRpc);
|
||||||
|
|
||||||
|
while (!qwtTestCaseFinished) {
|
||||||
|
usleep(1);
|
||||||
|
}
|
||||||
|
|
||||||
free(queryRpc.pCont);
|
free(queryRpc.pCont);
|
||||||
|
|
||||||
if (qwtTestEnableSleep) {
|
if (qwtTestEnableSleep) {
|
||||||
usleep(rand()%5);
|
usleep(rand()%5);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (++n % qwtTestPrintNum == 0) {
|
if (++n % qwtTestPrintNum == 0) {
|
||||||
printf("query:%d\n", n);
|
printf("query:%d\n", n);
|
||||||
}
|
}
|
||||||
|
@ -461,10 +669,79 @@ void *controlThread(void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void *queryQueueThread(void *param) {
|
void *queryQueueThread(void *param) {
|
||||||
|
void *mockPointer = (void *)0x1;
|
||||||
|
SRpcMsg *queryRpc = NULL;
|
||||||
|
void *mgmt = param;
|
||||||
|
|
||||||
|
while (!qwtTestStop) {
|
||||||
|
tsem_wait(&qwtTestQuerySem);
|
||||||
|
|
||||||
|
taosWLockLatch(&qwtTestQueryQueueLock);
|
||||||
|
if (qwtTestQueryQueueNum <= 0 || qwtTestQueryQueueRIdx == qwtTestQueryQueueWIdx) {
|
||||||
|
printf("query queue is empty\n");
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
queryRpc = qwtTestQueryQueue[qwtTestQueryQueueRIdx++];
|
||||||
|
|
||||||
|
if (qwtTestQueryQueueRIdx >= qwtTestQueryQueueSize) {
|
||||||
|
qwtTestQueryQueueRIdx = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
qwtTestQueryQueueNum--;
|
||||||
|
taosWUnLockLatch(&qwtTestQueryQueueLock);
|
||||||
|
|
||||||
|
if (TDMT_VND_QUERY == queryRpc->msgType) {
|
||||||
|
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
|
||||||
|
} else if (TDMT_VND_QUERY_CONTINUE == queryRpc->msgType) {
|
||||||
|
qWorkerProcessCQueryMsg(mockPointer, mgmt, &queryRpc)
|
||||||
|
} else {
|
||||||
|
printf("unknown msg in query queue, type:%d\n", queryRpc->msgType);
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void *fetchQueueThread(void *param) {
|
void *fetchQueueThread(void *param) {
|
||||||
|
void *mockPointer = (void *)0x1;
|
||||||
|
SRpcMsg *fetchRpc = NULL;
|
||||||
|
void *mgmt = param;
|
||||||
|
|
||||||
|
while (!qwtTestStop) {
|
||||||
|
tsem_wait(&qwtTestFetchSem);
|
||||||
|
|
||||||
|
taosWLockLatch(&qwtTestFetchQueueLock);
|
||||||
|
if (qwtTestFetchQueueNum <= 0 || qwtTestFetchQueueRIdx == qwtTestFetchQueueWIdx) {
|
||||||
|
printf("Fetch queue is empty\n");
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
fetchRpc = qwtTestFetchQueue[qwtTestFetchQueueRIdx++];
|
||||||
|
|
||||||
|
if (qwtTestFetchQueueRIdx >= qwtTestFetchQueueSize) {
|
||||||
|
qwtTestFetchQueueRIdx = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
qwtTestFetchQueueNum--;
|
||||||
|
taosWUnLockLatch(&qwtTestFetchQueueLock);
|
||||||
|
|
||||||
|
switch (fetchRpc->msgType) {
|
||||||
|
case TDMT_VND_FETCH:
|
||||||
|
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc);
|
||||||
|
case TDMT_VND_RES_READY:
|
||||||
|
qWorkerProcessReadyMsg(mockPointer, mgmt, fetchRpc);
|
||||||
|
case TDMT_VND_TASKS_STATUS:
|
||||||
|
qWorkerProcessStatusMsg(mockPointer, mgmt, fetchRpc);
|
||||||
|
case TDMT_VND_CANCEL_TASK:
|
||||||
|
qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc);
|
||||||
|
case TDMT_VND_DROP_TASK:
|
||||||
|
qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc);
|
||||||
|
default:
|
||||||
|
printf("unknown msg type:%d in fetch queue", fetchRpc->msgType);
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -753,13 +1030,16 @@ TEST(rcTest, multithread) {
|
||||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
|
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
tsem_init(&qwtTestQuerySem, 0, 0);
|
||||||
|
tsem_init(&qwtTestFetchSem, 0, 0);
|
||||||
|
|
||||||
pthread_attr_t thattr;
|
pthread_attr_t thattr;
|
||||||
pthread_attr_init(&thattr);
|
pthread_attr_init(&thattr);
|
||||||
|
|
||||||
pthread_t t1,t2,t3,t4,t5;
|
pthread_t t1,t2,t3,t4,t5;
|
||||||
pthread_create(&(t1), &thattr, controlThread, mgmt);
|
pthread_create(&(t1), &thattr, clientThread, mgmt);
|
||||||
pthread_create(&(t2), &thattr, queryQueueThread, NULL);
|
pthread_create(&(t2), &thattr, queryQueueThread, mgmt);
|
||||||
pthread_create(&(t3), &thattr, fetchQueueThread, NULL);
|
pthread_create(&(t3), &thattr, fetchQueueThread, mgmt);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (qwtTestDeadLoop) {
|
if (qwtTestDeadLoop) {
|
||||||
|
@ -779,6 +1059,7 @@ TEST(rcTest, multithread) {
|
||||||
|
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
srand(time(NULL));
|
||||||
testing::InitGoogleTest(&argc, argv);
|
testing::InitGoogleTest(&argc, argv);
|
||||||
return RUN_ALL_TESTS();
|
return RUN_ALL_TESTS();
|
||||||
}
|
}
|
||||||
|
|
|
@ -276,9 +276,12 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
|
||||||
|
|
||||||
|
|
||||||
int32_t schRecordTaskSucceedNode(SSchTask *pTask) {
|
int32_t schRecordTaskSucceedNode(SSchTask *pTask) {
|
||||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx));
|
int32_t idx = atomic_load_8(&pTask->candidateIdx);
|
||||||
|
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, idx);
|
||||||
assert(NULL != addr);
|
if (NULL == addr) {
|
||||||
|
SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", idx, taosArrayGetSize(pTask->candidateAddrs));
|
||||||
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
pTask->succeedAddr = *addr;
|
pTask->succeedAddr = *addr;
|
||||||
|
|
||||||
|
@ -578,9 +581,10 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod
|
||||||
tsem_post(&pJob->rspSem);
|
tsem_post(&pJob->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_ERR_RET(atomic_load_32(&pJob->errCode));
|
int32_t code = atomic_load_32(&pJob->errCode);
|
||||||
|
SCH_ERR_RET(code);
|
||||||
|
|
||||||
assert(0);
|
SCH_JOB_ELOG("job errCode is invalid, errCode:%d", code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -725,7 +729,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
|
||||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
|
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
|
||||||
|
|
||||||
SCH_ERR_JRET(schRecordTaskSucceedNode(pTask));
|
SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask));
|
||||||
|
|
||||||
int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
|
int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
|
||||||
if (parentNum == 0) {
|
if (parentNum == 0) {
|
||||||
|
@ -738,11 +742,11 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
||||||
SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
|
SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
|
||||||
|
|
||||||
if (taskDone < pTask->level->taskNum) {
|
if (taskDone < pTask->level->taskNum) {
|
||||||
SCH_TASK_ELOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
|
SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else if (taskDone > pTask->level->taskNum) {
|
} else if (taskDone > pTask->level->taskNum) {
|
||||||
assert(0);
|
SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->level->taskFailed > 0) {
|
if (pTask->level->taskFailed > 0) {
|
||||||
|
@ -875,19 +879,22 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_store_ptr(&pJob->res, rsp);
|
atomic_store_ptr(&pJob->res, rsp);
|
||||||
atomic_store_32(&pJob->resNumOfRows, rsp->numOfRows);
|
atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
|
||||||
|
|
||||||
if (rsp->completed) {
|
if (rsp->completed) {
|
||||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
|
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);
|
||||||
|
|
||||||
SCH_ERR_JRET(schProcessOnDataFetched(pJob));
|
SCH_ERR_JRET(schProcessOnDataFetched(pJob));
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TDMT_VND_DROP_TASK: {
|
case TDMT_VND_DROP_TASK: {
|
||||||
// SHOULD NEVER REACH HERE
|
// SHOULD NEVER REACH HERE
|
||||||
assert(0);
|
SCH_TASK_ELOG("invalid status to handle drop task rsp, ref:%d", atomic_load_32(&pJob->ref));
|
||||||
|
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
@ -936,7 +943,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
|
||||||
|
|
||||||
pTask = *task;
|
pTask = *task;
|
||||||
|
|
||||||
SCH_TASK_DLOG("rsp msg received, type:%d, code:%x", msgType, rspCode);
|
SCH_TASK_DLOG("rsp msg received, type:%d, %s, code:%x", msgType, TMSG_INFO(msgType), rspCode);
|
||||||
|
|
||||||
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
|
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
|
||||||
|
|
||||||
|
@ -1037,6 +1044,8 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t
|
||||||
qError("QID:%"PRIx64 ",TID:%"PRIx64 " asyncSendMsgToServer failed, code:%x", qId, tId, code);
|
qError("QID:%"PRIx64 ",TID:%"PRIx64 " asyncSendMsgToServer failed, code:%x", qId, tId, code);
|
||||||
SCH_ERR_JRET(code);
|
SCH_ERR_JRET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qDebug("QID:%"PRIx64 ",TID:%"PRIx64 " req msg sent, type:%d, %s", qId, tId, msgType, TMSG_INFO(msgType));
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -1296,6 +1305,8 @@ void schDropJobAllTasks(SSchJob *pJob) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** job, bool syncSchedule) {
|
int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** job, bool syncSchedule) {
|
||||||
|
qDebug("QID:%"PRIx64" job started", pDag->queryId);
|
||||||
|
|
||||||
if (nodeList && taosArrayGetSize(nodeList) <= 0) {
|
if (nodeList && taosArrayGetSize(nodeList) <= 0) {
|
||||||
qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId);
|
qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId);
|
||||||
}
|
}
|
||||||
|
@ -1363,7 +1374,7 @@ _return:
|
||||||
|
|
||||||
*(SSchJob **)job = NULL;
|
*(SSchJob **)job = NULL;
|
||||||
|
|
||||||
scheduleFreeJob(pJob);
|
schedulerFreeJob(pJob);
|
||||||
|
|
||||||
SCH_RET(code);
|
SCH_RET(code);
|
||||||
}
|
}
|
||||||
|
@ -1408,7 +1419,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes) {
|
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes) {
|
||||||
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
|
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
@ -1425,7 +1436,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob) {
|
int32_t schedulerAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob) {
|
||||||
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
|
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
@ -1558,7 +1569,7 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
|
int32_t schedulerFetchRows(SSchJob *pJob, void** pData) {
|
||||||
if (NULL == pJob || NULL == pData) {
|
if (NULL == pJob || NULL == pData) {
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
@ -1624,11 +1635,12 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
*pData = rsp;
|
*pData = rsp;
|
||||||
|
SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
|
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
|
||||||
|
|
||||||
SCH_JOB_DLOG("fetch done, code:%x", code);
|
SCH_JOB_DLOG("fetch done, totalRows:%d, code:%x", pJob->resNumOfRows, code);
|
||||||
|
|
||||||
atomic_sub_fetch_32(&pJob->ref, 1);
|
atomic_sub_fetch_32(&pJob->ref, 1);
|
||||||
|
|
||||||
|
@ -1647,7 +1659,7 @@ int32_t scheduleCancelJob(void *job) {
|
||||||
SCH_RET(code);
|
SCH_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
void scheduleFreeJob(void *job) {
|
void schedulerFreeJob(void *job) {
|
||||||
if (NULL == job) {
|
if (NULL == job) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1676,7 +1688,8 @@ void scheduleFreeJob(void *job) {
|
||||||
|
|
||||||
usleep(1);
|
usleep(1);
|
||||||
} else {
|
} else {
|
||||||
assert(0);
|
SCH_JOB_ELOG("invalid job ref number, ref:%d", ref);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -328,7 +328,7 @@ void schtFreeQueryJob(int32_t freeThread) {
|
||||||
SSchJob *job = atomic_load_ptr(&pQueryJob);
|
SSchJob *job = atomic_load_ptr(&pQueryJob);
|
||||||
|
|
||||||
if (job && atomic_val_compare_exchange_ptr(&pQueryJob, job, NULL)) {
|
if (job && atomic_val_compare_exchange_ptr(&pQueryJob, job, NULL)) {
|
||||||
scheduleFreeJob(job);
|
schedulerFreeJob(job);
|
||||||
if (freeThread) {
|
if (freeThread) {
|
||||||
if (++freeNum % schtTestPrintNum == 0) {
|
if (++freeNum % schtTestPrintNum == 0) {
|
||||||
printf("FreeNum:%d\n", freeNum);
|
printf("FreeNum:%d\n", freeNum);
|
||||||
|
@ -372,7 +372,7 @@ void* schtRunJobThread(void *aa) {
|
||||||
qnodeAddr.port = 6031;
|
qnodeAddr.port = 6031;
|
||||||
taosArrayPush(qnodeList, &qnodeAddr);
|
taosArrayPush(qnodeList, &qnodeAddr);
|
||||||
|
|
||||||
code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &job);
|
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job);
|
||||||
assert(code == 0);
|
assert(code == 0);
|
||||||
|
|
||||||
execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
|
execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
|
||||||
|
@ -466,7 +466,7 @@ void* schtRunJobThread(void *aa) {
|
||||||
atomic_store_32(&schtStartFetch, 1);
|
atomic_store_32(&schtStartFetch, 1);
|
||||||
|
|
||||||
void *data = NULL;
|
void *data = NULL;
|
||||||
code = scheduleFetchRows(pQueryJob, &data);
|
code = schedulerFetchRows(pQueryJob, &data);
|
||||||
assert(code == 0 || code);
|
assert(code == 0 || code);
|
||||||
|
|
||||||
if (0 == code) {
|
if (0 == code) {
|
||||||
|
@ -476,7 +476,7 @@ void* schtRunJobThread(void *aa) {
|
||||||
}
|
}
|
||||||
|
|
||||||
data = NULL;
|
data = NULL;
|
||||||
code = scheduleFetchRows(pQueryJob, &data);
|
code = schedulerFetchRows(pQueryJob, &data);
|
||||||
assert(code == 0 || code);
|
assert(code == 0 || code);
|
||||||
|
|
||||||
schtFreeQueryJob(0);
|
schtFreeQueryJob(0);
|
||||||
|
@ -533,7 +533,7 @@ TEST(queryTest, normalCase) {
|
||||||
schtSetExecNode();
|
schtSetExecNode();
|
||||||
schtSetAsyncSendMsgToServer();
|
schtSetAsyncSendMsgToServer();
|
||||||
|
|
||||||
code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob);
|
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &pJob);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
SSchJob *job = (SSchJob *)pJob;
|
SSchJob *job = (SSchJob *)pJob;
|
||||||
|
@ -588,7 +588,7 @@ TEST(queryTest, normalCase) {
|
||||||
pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, job);
|
pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, job);
|
||||||
|
|
||||||
void *data = NULL;
|
void *data = NULL;
|
||||||
code = scheduleFetchRows(job, &data);
|
code = schedulerFetchRows(job, &data);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
|
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
|
||||||
|
@ -597,11 +597,11 @@ TEST(queryTest, normalCase) {
|
||||||
tfree(data);
|
tfree(data);
|
||||||
|
|
||||||
data = NULL;
|
data = NULL;
|
||||||
code = scheduleFetchRows(job, &data);
|
code = schedulerFetchRows(job, &data);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_TRUE(data);
|
ASSERT_TRUE(data);
|
||||||
|
|
||||||
scheduleFreeJob(pJob);
|
schedulerFreeJob(pJob);
|
||||||
|
|
||||||
schtFreeQueryDag(&dag);
|
schtFreeQueryDag(&dag);
|
||||||
|
|
||||||
|
@ -643,11 +643,11 @@ TEST(insertTest, normalCase) {
|
||||||
pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob);
|
pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob);
|
||||||
|
|
||||||
SQueryResult res = {0};
|
SQueryResult res = {0};
|
||||||
code = scheduleExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &res);
|
code = schedulerExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &res);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(res.numOfRows, 20);
|
ASSERT_EQ(res.numOfRows, 20);
|
||||||
|
|
||||||
scheduleFreeJob(pInsertJob);
|
schedulerFreeJob(pInsertJob);
|
||||||
|
|
||||||
schedulerDestroy();
|
schedulerDestroy();
|
||||||
}
|
}
|
||||||
|
|
|
@ -353,7 +353,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_INPUT, "invalid input")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SCH_NOT_EXIST, "Scheduler not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SCH_NOT_EXIST, "Scheduler not exist")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_NOT_EXIST, "Task not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_NOT_EXIST, "Task not exist")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_ALREADY_EXIST, "Task already exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_ALREADY_EXIST, "Task already exist")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST, "Task result cache not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST, "Task context not exist")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLED, "Task cancelled")
|
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLED, "Task cancelled")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPED, "Task dropped")
|
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPED, "Task dropped")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLING, "Task cancelling")
|
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLING, "Task cancelling")
|
||||||
|
|
Loading…
Reference in New Issue