feature/scheduler
This commit is contained in:
parent
216c53174b
commit
08541c0f22
|
@ -592,6 +592,23 @@ int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
|
|||
int32_t tDeserializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
|
||||
void tFreeSUsedbRsp(SUseDbRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
int32_t rowNum;
|
||||
} SQnodeListReq;
|
||||
|
||||
int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
|
||||
int32_t tDeserializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
SArray *epSetList; // SArray<SEpSet>
|
||||
} SQnodeListRsp;
|
||||
|
||||
int32_t tSerializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
|
||||
int32_t tDeserializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
|
||||
void tFreeSQnodeListRsp(SQnodeListRsp* pRsp);
|
||||
|
||||
|
||||
|
||||
typedef struct {
|
||||
SArray* pArray; // Array of SUseDbRsp
|
||||
} SUseDbBatchRsp;
|
||||
|
|
|
@ -129,6 +129,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STB, "mnode-drop-stb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_TABLE_META, "mnode-table-meta", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "mnode-vgroup-list", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "mnode-qnode-list", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "mnode-kill-query", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "mnode-kill-conn", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "mnode-heartbeat", SClientHbBatchReq, SClientHbBatchRsp)
|
||||
|
|
|
@ -51,7 +51,7 @@ typedef struct SMetaData {
|
|||
SArray *pTableMeta; // STableMeta array
|
||||
SArray *pVgroupInfo; // SVgroupInfo list
|
||||
SArray *pUdfList; // udf info list
|
||||
SEpSet *pEpSet; // qnode epset list
|
||||
SArray *pEpSetList; // qnode epset list, SArray<SEpSet>
|
||||
} SMetaData;
|
||||
|
||||
typedef struct SCatalogCfg {
|
||||
|
|
|
@ -1446,6 +1446,70 @@ int32_t tDeserializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq) {
|
||||
SCoder encoder = {0};
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->rowNum) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tCoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSQnodeListReq(void *buf, int32_t bufLen, SQnodeListReq *pReq) {
|
||||
SCoder decoder = {0};
|
||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->rowNum) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tCoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) {
|
||||
SCoder encoder = {0};
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
int32_t num = taosArrayGetSize(pRsp->epSetList);
|
||||
if (tEncodeI32(&encoder, num) < 0) return -1;
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SEpSet *epSet = taosArrayGet(pRsp->epSetList, i);
|
||||
if (tEncodeSEpSet(&encoder, epSet) < 0) return -1;
|
||||
}
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tCoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) {
|
||||
SCoder decoder = {0};
|
||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
int32_t num = 0;
|
||||
if (tDecodeI32(&decoder, &num) < 0) return -1;
|
||||
pRsp->epSetList = taosArrayInit(num, sizeof(SEpSet));
|
||||
if (NULL == pRsp->epSetList) return -1;
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
if (tDecodeSEpSet(&decoder, TARRAY_GET_ELEM(pRsp->epSetList, i)) < 0) return -1;
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tCoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tFreeSQnodeListRsp(SQnodeListRsp *pRsp) { taosArrayDestroy(pRsp->epSetList); }
|
||||
|
||||
int32_t tSerializeSSyncDbReq(void *buf, int32_t bufLen, SSyncDbReq *pReq) {
|
||||
SCoder encoder = {0};
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||
|
|
|
@ -48,6 +48,7 @@ int32_t mndInitQnode(SMnode *pMnode) {
|
|||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_QNODE, mndProcessCreateQnodeReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_QNODE, mndProcessDropQnodeReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_QNODE_LIST, mndProcessQnodeListReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_QNODE_RSP, mndProcessCreateQnodeRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_DND_DROP_QNODE_RSP, mndProcessDropQnodeRsp);
|
||||
|
||||
|
@ -430,6 +431,69 @@ DROP_QNODE_OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
|
||||
static int32_t mndProcessQnodeListReq(SMnodeMsg *pReq) {
|
||||
int32_t code = -1;
|
||||
SQnodeListReq qlistReq = {0};
|
||||
int32_t numOfRows = 0;
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SQnodeObj *pObj = NULL;
|
||||
SQnodeListRsp qlistRsp = {0};
|
||||
|
||||
if (tDeserializeSQnodeListReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &qlistReq) != 0) {
|
||||
mError("invalid qnode list msg");
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
goto QNODE_LIST_OVER;
|
||||
}
|
||||
|
||||
qlistRsp->epSetList = taosArrayInit(5, sizeof(SEpSet));
|
||||
if (NULL == qlistRsp->epSetList) {
|
||||
mError("taosArrayInit epSet failed");
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto QNODE_LIST_OVER;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
void *pIter = sdbFetch(pSdb, SDB_QNODE, NULL, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SEpSet epSet = {0};
|
||||
strcpy(epSet.eps[0].fqdn, pObj->pDnode->fqdn);
|
||||
epSet.eps[0].port = pObj->pDnode->port;
|
||||
epSet.numOfEps = 1;
|
||||
|
||||
taosArrayPush(qlistRsp->epSetList, &epSet);
|
||||
|
||||
numOfRows++;
|
||||
sdbRelease(pSdb, pObj);
|
||||
|
||||
if (qlistReq->rowNum > 0 && numOfRows >= qlistReq->rowNum) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t rspLen = tSerializeSQnodeListRsp(NULL, 0, &qlistRsp);
|
||||
void *pRsp = malloc(rspLen);
|
||||
if (pRsp == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto QNODE_LIST_OVER;
|
||||
}
|
||||
|
||||
tSerializeSQnodeListRsp(pRsp, rspLen, &qlistRsp);
|
||||
|
||||
pReq->contLen = rspLen;
|
||||
pReq->pCont = pRsp;
|
||||
code = 0;
|
||||
|
||||
QNODE_LIST_OVER:
|
||||
|
||||
tFreeSQnodeListRsp(&qlistRsp);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
static int32_t mndProcessCreateQnodeRsp(SMnodeMsg *pRsp) {
|
||||
mndTransProcessRsp(pRsp);
|
||||
return 0;
|
||||
|
|
|
@ -29,11 +29,13 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
typedef struct SQnode {
|
||||
int32_t qndId;
|
||||
SQnodeOpt opt;
|
||||
SQHandle* pQuery;
|
||||
} SQnode;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_QNODE_INT_H_*/
|
||||
#endif /*_TD_QNODE_INT_H_*/
|
||||
|
|
|
@ -17,10 +17,25 @@
|
|||
|
||||
SQnode *qndOpen(const SQnodeOpt *pOption) {
|
||||
SQnode *pQnode = calloc(1, sizeof(SQnode));
|
||||
if (NULL == pQnode) {
|
||||
qError("calloc SQnode failed");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, pQnode,
|
||||
(putReqToQueryQFp)vnodePutReqToVQueryQ, (sendReqToDnodeFp)vnodeSendReqToDnode)) {
|
||||
tfree(pQnode);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pQnode;
|
||||
}
|
||||
|
||||
void qndClose(SQnode *pQnode) { free(pQnode); }
|
||||
void qndClose(SQnode *pQnode) {
|
||||
qWorkerDestroy(&pQnode->pQuery);
|
||||
|
||||
free(pQnode);
|
||||
}
|
||||
|
||||
int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; }
|
||||
|
||||
|
@ -29,3 +44,51 @@ int32_t qndProcessMsg(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int qnodeProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) {
|
||||
qTrace("message in query queue is processing");
|
||||
SReadHandle handle = {0};
|
||||
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_VND_QUERY:{
|
||||
return qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg);
|
||||
}
|
||||
case TDMT_VND_QUERY_CONTINUE:
|
||||
return qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg);
|
||||
default:
|
||||
vError("unknown msg type:%d in query queue", pMsg->msgType);
|
||||
return TSDB_CODE_VND_APP_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
int qnodeProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) {
|
||||
qTrace("message in fetch queue is processing");
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_VND_FETCH:
|
||||
return qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg);
|
||||
case TDMT_VND_FETCH_RSP:
|
||||
return qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg);
|
||||
case TDMT_VND_RES_READY:
|
||||
return qWorkerProcessReadyMsg(pQnode, pQnode->pQuery, pMsg);
|
||||
case TDMT_VND_TASKS_STATUS:
|
||||
return qWorkerProcessStatusMsg(pQnode, pQnode->pQuery, pMsg);
|
||||
case TDMT_VND_CANCEL_TASK:
|
||||
return qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg);
|
||||
case TDMT_VND_DROP_TASK:
|
||||
return qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg);
|
||||
case TDMT_VND_SHOW_TABLES:
|
||||
return qWorkerProcessShowMsg(pQnode, pQnode->pQuery, pMsg);
|
||||
case TDMT_VND_SHOW_TABLES_FETCH:
|
||||
//return vnodeGetTableList(pQnode, pMsg);
|
||||
case TDMT_VND_TABLE_META:
|
||||
//return vnodeGetTableMeta(pQnode, pMsg);
|
||||
case TDMT_VND_CONSUME:
|
||||
//return tqProcessConsumeReq(pQnode->pTq, pMsg);
|
||||
default:
|
||||
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
||||
return TSDB_CODE_VND_APP_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -558,6 +558,42 @@ int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCac
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SArray **out) {
|
||||
char *msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
|
||||
ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pMgmtEps->inUse);
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)](NULL, &msg, 0, &msgLen);
|
||||
if (code) {
|
||||
ctgError("Build qnode list msg failed, error:%s", tstrerror(code));
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = TDMT_MND_QNODE_LIST,
|
||||
.pCont = msg,
|
||||
.contLen = msgLen,
|
||||
};
|
||||
|
||||
SRpcMsg rpcRsp = {0};
|
||||
|
||||
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
|
||||
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
|
||||
ctgError("error rsp for qnode list, error:%s, db:%s", tstrerror(rpcRsp.code));
|
||||
CTG_ERR_RET(rpcRsp.code);
|
||||
}
|
||||
|
||||
code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)](out, rpcRsp.pCont, rpcRsp.contLen);
|
||||
if (code) {
|
||||
ctgError("Process qnode list rsp failed, error:%s", tstrerror(rpcRsp.code));
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
ctgDebug("Got qnode list from mnode, listNum:%d", taosArrayGetSize(*out));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
|
||||
|
@ -2602,6 +2638,10 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
|
|||
}
|
||||
}
|
||||
|
||||
if (pReq->qNodeRequired) {
|
||||
CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pTrans, pMgmtEps, &pRsp->pEpSetList));
|
||||
}
|
||||
|
||||
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
||||
|
||||
_return:
|
||||
|
|
|
@ -97,6 +97,25 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
|
||||
if (NULL == msg || NULL == msgLen) {
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SQnodeListReq qnodeListReq = {0};
|
||||
qnodeListReq.rowNum = -1;
|
||||
|
||||
int32_t bufLen = tSerializeSQnodeListReq(NULL, 0, &qnodeListReq);
|
||||
void *pBuf = rpcMallocCont(bufLen);
|
||||
tSerializeSQnodeListReq(pBuf, bufLen, &qnodeListReq);
|
||||
|
||||
*msg = pBuf;
|
||||
*msgLen = bufLen;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
|
||||
SUseDbOutput *pOut = output;
|
||||
SUseDbRsp usedbRsp = {0};
|
||||
|
@ -256,14 +275,45 @@ PROCESS_META_OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
|
||||
int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) {
|
||||
SQnodeListRsp out = {0};
|
||||
int32_t code = -1;
|
||||
|
||||
if (NULL == output || NULL == msg || msgSize <= 0) {
|
||||
code = TSDB_CODE_TSC_INVALID_INPUT;
|
||||
goto PROCESS_QLIST_OVER;
|
||||
}
|
||||
|
||||
if (tDeserializeSQnodeListRsp(msg, msgSize, &out) != 0) {
|
||||
qError("invalid qnode list rsp msg, msgSize:%d", msgSize);
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto PROCESS_QLIST_OVER;
|
||||
}
|
||||
|
||||
PROCESS_QLIST_OVER:
|
||||
|
||||
if (code != 0) {
|
||||
tFreeSQnodeListRsp(&out);
|
||||
out.epSetList = NULL;
|
||||
}
|
||||
|
||||
*(SArray **)output = out.epSetList;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
void initQueryModuleMsgHandle() {
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
|
||||
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic pop
|
||||
|
|
|
@ -26,6 +26,8 @@ extern "C" {
|
|||
#define QW_DEFAULT_TASK_NUMBER 10000
|
||||
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
|
||||
#define QW_DEFAULT_SHORT_RUN_TIMES 2
|
||||
#define QW_DEFAULT_HEARTBEAT_MSEC 3000
|
||||
|
||||
enum {
|
||||
QW_PHASE_PRE_QUERY = 1,
|
||||
QW_PHASE_POST_QUERY,
|
||||
|
@ -131,6 +133,8 @@ typedef struct SQWorkerMgmt {
|
|||
SQWorkerCfg cfg;
|
||||
int8_t nodeType;
|
||||
int32_t nodeId;
|
||||
void *timer;
|
||||
tmr_h hbTimer;
|
||||
SRWLatch schLock;
|
||||
//SRWLatch ctxLock;
|
||||
SHashObj *schHash; //key: schedulerId, value: SQWSchStatus
|
||||
|
|
|
@ -521,6 +521,47 @@ _return:
|
|||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SSchedulerStatusRsp **rsp) {
|
||||
int32_t taskNum = 0;
|
||||
|
||||
QW_LOCK(QW_READ, &sch->tasksLock);
|
||||
|
||||
taskNum = taosHashGetSize(sch->tasksHash);
|
||||
|
||||
int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum;
|
||||
*rsp = calloc(1, size);
|
||||
if (NULL == *rsp) {
|
||||
QW_SCH_ELOG("calloc %d failed", size);
|
||||
QW_UNLOCK(QW_READ, &sch->tasksLock);
|
||||
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
void *key = NULL;
|
||||
size_t keyLen = 0;
|
||||
int32_t i = 0;
|
||||
|
||||
void *pIter = taosHashIterate(sch->tasksHash, NULL);
|
||||
while (pIter) {
|
||||
SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
|
||||
taosHashGetKey(pIter, &key, &keyLen);
|
||||
|
||||
//TODO GET EXECUTOR API TO GET MORE INFO
|
||||
|
||||
QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
|
||||
(*rsp)->status[i].status = taskStatus->status;
|
||||
|
||||
++i;
|
||||
pIter = taosHashIterate(sch->tasksHash, pIter);
|
||||
}
|
||||
|
||||
QW_UNLOCK(QW_READ, &sch->tasksLock);
|
||||
|
||||
(*rsp)->num = taskNum;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
|
||||
int32_t len = 0;
|
||||
|
@ -1312,6 +1353,58 @@ _return:
|
|||
QW_RET(code);
|
||||
}
|
||||
|
||||
void qwProcessHbTimer(void *param, void *tmrId) {
|
||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)param;
|
||||
SQWSchStatus *sch = NULL;
|
||||
int32_t taskNum = 0;
|
||||
SSchedulerStatusRsp **rspList = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
QW_LOCK(QW_READ, &mgmt->schLock);
|
||||
|
||||
int32_t schNum = taosHashGetSize(mgmt->schHash);
|
||||
if (schNum <= 0) {
|
||||
QW_UNLOCK(QW_READ, &mgmt->schLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
rspList = calloc(schNum, POINTER_BYTES);
|
||||
if (NULL == rspList) {
|
||||
QW_UNLOCK(QW_READ, &mgmt->schLock);
|
||||
QW_ELOG("calloc %d rsp pointer failed", schNum);
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
void *key = NULL;
|
||||
size_t keyLen = 0;
|
||||
int32_t i = 0;
|
||||
|
||||
void *pIter = taosHashIterate(mgmt->schHash, NULL);
|
||||
while (pIter) {
|
||||
code = qwGenerateSchHbRsp(mgmt, (SQWSchStatus *)pIter, &rspList[i]);
|
||||
if (code) {
|
||||
taosHashCancelIterate(mgmt->schHash, pIter);
|
||||
QW_ERR_JRET(code);
|
||||
}
|
||||
|
||||
++i;
|
||||
pIter = taosHashIterate(mgmt->schHash, pIter);
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
QW_UNLOCK(QW_READ, &mgmt->schLock);
|
||||
|
||||
for (int32_t j = 0; j < i; ++j) {
|
||||
qwBuildAndSendStatusRsp(NULL, rspList[j]); //TODO SCHEDULER CONNECTION
|
||||
tfree(rspList[j]);
|
||||
}
|
||||
|
||||
tfree(rspList);
|
||||
|
||||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj,
|
||||
putReqToQueryQFp fp1, sendReqToDnodeFp fp2) {
|
||||
if (NULL == qWorkerMgmt || NULL == nodeObj || NULL == fp1 || NULL == fp2) {
|
||||
|
@ -1319,6 +1412,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
|
|||
QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt));
|
||||
if (NULL == mgmt) {
|
||||
qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt));
|
||||
|
@ -1346,16 +1440,25 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
|
|||
if (NULL == mgmt->schHash) {
|
||||
tfree(mgmt);
|
||||
qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
mgmt->ctxHash = taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||
if (NULL == mgmt->ctxHash) {
|
||||
qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum);
|
||||
taosHashCleanup(mgmt->schHash);
|
||||
mgmt->schHash = NULL;
|
||||
tfree(mgmt);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
mgmt->timer = taosTmrInit(0, 0, 0, "qworker");
|
||||
if (NULL == mgmt->timer) {
|
||||
qError("init timer failed, error:%s", tstrerror(terrno));
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
mgmt->hbTimer = taosTmrStart(qwProcessHbTimer, QW_DEFAULT_HEARTBEAT_MSEC, mgmt, mgmt->timer);
|
||||
if (NULL == mgmt->hbTimer) {
|
||||
qError("start hb timer failed");
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
mgmt->nodeType = nodeType;
|
||||
|
@ -1369,6 +1472,17 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
|
|||
qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
taosHashCleanup(mgmt->schHash);
|
||||
taosHashCleanup(mgmt->ctxHash);
|
||||
|
||||
taosTmrCleanUp(mgmt->timer);
|
||||
|
||||
tfree(mgmt);
|
||||
|
||||
QW_RET(code);
|
||||
}
|
||||
|
||||
void qWorkerDestroy(void **qWorkerMgmt) {
|
||||
|
@ -1385,12 +1499,11 @@ void qWorkerDestroy(void **qWorkerMgmt) {
|
|||
tfree(*qWorkerMgmt);
|
||||
}
|
||||
|
||||
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SSchedulerStatusRsp **rsp) {
|
||||
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) {
|
||||
SQWSchStatus *sch = NULL;
|
||||
int32_t taskNum = 0;
|
||||
|
||||
/*
|
||||
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
|
||||
QW_ERR_RET(qwAcquireScheduler(QW_FPARAMS(), QW_READ, &sch));
|
||||
|
||||
sch->lastAccessTs = taosGetTimestampSec();
|
||||
|
||||
|
@ -1401,7 +1514,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint
|
|||
int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum;
|
||||
*rsp = calloc(1, size);
|
||||
if (NULL == *rsp) {
|
||||
qError("calloc %d failed", size);
|
||||
QW_SCH_ELOG("calloc %d failed", size);
|
||||
QW_UNLOCK(QW_READ, &sch->tasksLock);
|
||||
qwReleaseScheduler(QW_READ, mgmt);
|
||||
|
||||
|
@ -1420,6 +1533,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint
|
|||
QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
|
||||
(*rsp)->status[i].status = taskStatus->status;
|
||||
|
||||
++i;
|
||||
pIter = taosHashIterate(sch->tasksHash, pIter);
|
||||
}
|
||||
|
||||
|
@ -1427,7 +1541,6 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint
|
|||
qwReleaseScheduler(QW_READ, mgmt);
|
||||
|
||||
(*rsp)->num = taskNum;
|
||||
*/
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -392,7 +392,7 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
|
||||
SSchedulerStatusRsp *sStatus = NULL;
|
||||
|
||||
//QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
|
||||
QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
|
||||
|
||||
_return:
|
||||
|
||||
|
|
Loading…
Reference in New Issue