feature/scheduler
This commit is contained in:
parent
ee8d007801
commit
54de6fd97e
|
@ -547,7 +547,7 @@ int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbI
|
||||||
void *pIter = taosHashIterate(sch->tasksHash, NULL);
|
void *pIter = taosHashIterate(sch->tasksHash, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
|
SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
|
||||||
taosHashGetKey(pIter, &key, &keyLen);
|
key = taosHashGetKey(pIter, &keyLen);
|
||||||
|
|
||||||
//TODO GET EXECUTOR API TO GET MORE INFO
|
//TODO GET EXECUTOR API TO GET MORE INFO
|
||||||
|
|
||||||
|
@ -1372,6 +1372,9 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
|
||||||
|
|
||||||
rsp.seqId = sch->hbSeqId;
|
rsp.seqId = sch->hbSeqId;
|
||||||
|
|
||||||
|
QW_DLOG("hb connection updated, seqId:%" PRIx64 ", sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, connection:%p",
|
||||||
|
sch->hbSeqId, req->sId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connection);
|
||||||
|
|
||||||
qwReleaseScheduler(QW_READ, mgmt);
|
qwReleaseScheduler(QW_READ, mgmt);
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
@ -1383,6 +1386,8 @@ _return:
|
||||||
|
|
||||||
|
|
||||||
void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
||||||
|
return;
|
||||||
|
|
||||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)param;
|
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)param;
|
||||||
SQWSchStatus *sch = NULL;
|
SQWSchStatus *sch = NULL;
|
||||||
int32_t taskNum = 0;
|
int32_t taskNum = 0;
|
||||||
|
@ -1427,6 +1432,7 @@ _return:
|
||||||
QW_UNLOCK(QW_READ, &mgmt->schLock);
|
QW_UNLOCK(QW_READ, &mgmt->schLock);
|
||||||
|
|
||||||
for (int32_t j = 0; j < i; ++j) {
|
for (int32_t j = 0; j < i; ++j) {
|
||||||
|
QW_DLOG("hb on connection %p, taskNum:%d", rspList[j].connection, (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));
|
||||||
qwBuildAndSendHbRsp(rspList[j].connection, &rspList[j].rsp, code);
|
qwBuildAndSendHbRsp(rspList[j].connection, &rspList[j].rsp, code);
|
||||||
tFreeSSchedulerHbRsp(&rspList[j].rsp);
|
tFreeSSchedulerHbRsp(&rspList[j].rsp);
|
||||||
}
|
}
|
||||||
|
|
|
@ -639,6 +639,9 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchHbTrans *trans) {
|
||||||
SCH_ERR_RET(code);
|
SCH_ERR_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qDebug("hb connection updated, seqId:%" PRIx64 ", sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, connection:%p",
|
||||||
|
trans->seqId, schMgmt.sId, epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->trans.transInst, trans->trans.transHandle);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -659,6 +662,9 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchHbTrans *trans) {
|
||||||
memcpy(&hb->trans, &trans->trans, sizeof(trans->trans));
|
memcpy(&hb->trans, &trans->trans, sizeof(trans->trans));
|
||||||
|
|
||||||
SCH_UNLOCK(SCH_WRITE, &hb->lock);
|
SCH_UNLOCK(SCH_WRITE, &hb->lock);
|
||||||
|
|
||||||
|
qDebug("hb connection updated, seqId:%" PRIx64 ", sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, connection:%p",
|
||||||
|
trans->seqId, schMgmt.sId, epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->trans.transInst, trans->trans.transHandle);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue