From 934ed8663ffb3f9e8905e2aa419e75a9a7ffa933 Mon Sep 17 00:00:00 2001 From: tzwang Date: Thu, 17 Nov 2022 02:48:53 -0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E5=90=88=E5=B9=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- adaptor/pcm_slurm/idl/slurm.proto | 44 +- adaptor/pcm_slurm/idl/slurm.yaml | 28 +- adaptor/pcm_slurm/idl/slurm_diag.proto | 49 ++ adaptor/pcm_slurm/idl/slurm_job.proto | 314 +++++++++++++ adaptor/pcm_slurm/idl/slurmdb_assoc.proto | 2 +- adaptor/pcm_slurm/idl/slurmdb_job.proto | 92 ++++ adaptor/pcm_slurm/idl/slurmdb_qos.proto | 119 +++-- adaptor/pcm_slurm/server/slurm.go | 103 ++++- adaptor/pcm_slurm/server/slurmImpl.go | 103 ++++- adaptor/pcm_slurm/service/common/diag.go | 16 + adaptor/pcm_slurm/service/common/job.go | 30 ++ adaptor/pcm_slurm/service/common/job_db.go | 23 + .../pcm_slurm/service/common/job_submit.go | 27 ++ adaptor/pcm_slurm/service/common/qos_db.go | 54 ++- adaptor/pcm_slurm/service/common/slurmer.go | 13 +- adaptor/pcm_slurm/service/tianhe/converter.go | 134 ++++++ adaptor/pcm_slurm/service/tianhe/diag.go | 76 +++ adaptor/pcm_slurm/service/tianhe/job.go | 116 +++++ adaptor/pcm_slurm/service/tianhe/job_db.go | 139 ++++++ .../pcm_slurm/service/tianhe/job_submit.go | 435 ++++++++++++++++++ adaptor/pcm_slurm/service/tianhe/node.go | 42 +- adaptor/pcm_slurm/service/tianhe/qos.go | 229 ++++----- 22 files changed, 1984 insertions(+), 204 deletions(-) create mode 100644 adaptor/pcm_slurm/idl/slurm_diag.proto create mode 100644 adaptor/pcm_slurm/idl/slurm_job.proto create mode 100644 adaptor/pcm_slurm/idl/slurmdb_job.proto create mode 100644 adaptor/pcm_slurm/service/common/diag.go create mode 100644 adaptor/pcm_slurm/service/common/job.go create mode 100644 adaptor/pcm_slurm/service/common/job_db.go create mode 100644 adaptor/pcm_slurm/service/common/job_submit.go create mode 100644 adaptor/pcm_slurm/service/tianhe/converter.go create mode 100644 adaptor/pcm_slurm/service/tianhe/diag.go create mode 100644 adaptor/pcm_slurm/service/tianhe/job.go create mode 100644 adaptor/pcm_slurm/service/tianhe/job_db.go create mode 100644 adaptor/pcm_slurm/service/tianhe/job_submit.go diff --git a/adaptor/pcm_slurm/idl/slurm.proto b/adaptor/pcm_slurm/idl/slurm.proto index 13d68d14..3e297f9d 100644 --- a/adaptor/pcm_slurm/idl/slurm.proto +++ b/adaptor/pcm_slurm/idl/slurm.proto @@ -12,8 +12,9 @@ import "idl/slurmdb_assoc.proto"; import "idl/slurmdb_account.proto"; import "idl/slurmdb_qos.proto"; import "idl/slurmdb_wckey.proto"; - - +import "idl/slurm_job.proto"; +import "idl/slurm_diag.proto"; +import "idl/slurmdb_job.proto"; // Slurm Services service SlurmService { @@ -48,9 +49,6 @@ service SlurmService { // delete account rpc DeleteAccount(DeleteAccountReq) returns (DeleteAccountResp); - // list all qos info from slurmdb - rpc ListQoss(ListQossReq) returns (ListQossResp); - // list all wckeys info from slurmdb rpc ListWckeys(ListWckeysReq) returns (ListWckeysResp); @@ -86,4 +84,40 @@ service SlurmService { // get specific Reservation info from slurm rpc GetReservation(GetReservationReq) returns (GetReservationResp); + + // list all jobs from slurm + rpc GetAllJobs(JobInfoMsgReq) returns (JobInfoMsgResp); + + // get job by id from slurm + rpc GetJob(JobInfoMsgReq) returns (JobInfoMsgResp); + + // submit job to slurm + rpc SubmitJob(SubmitJobReq) returns (SubmitJobResp); + + // delete job from slurm + rpc DeleteJob(DeleteJobReq) returns (DeleteJobResp); + + // update job from slurm + rpc UpdateJob(UpdateJobReq) returns (UpdateJobResp); + + // list all diag from slurm + rpc GetDiag(DiagReq) returns (DiagResp); + + // list all qoses from slurm + rpc GetQos(QosReq) returns (QosResp); + + // list all jobs from slurmdb + rpc GetSlurmdbJobs(SlurmDbJobReq) returns (SlurmDbJobResp); + + // get job by id from slurmdb + rpc GetSlurmdbJobById(SlurmDbJobReq) returns (SlurmDbJobResp); + + // delete qos from slurmdb + rpc DeleteQos(DeleteQosReq) returns (DeleteQosResp); + + // get qos by name from slurmdb + rpc GetQosByName(QosReq) returns (QosResp); + + // add qos to slurmdb + rpc AddQos(AddQosReq) returns (AddQosResp); } diff --git a/adaptor/pcm_slurm/idl/slurm.yaml b/adaptor/pcm_slurm/idl/slurm.yaml index 6301f57b..c32d69fa 100644 --- a/adaptor/pcm_slurm/idl/slurm.yaml +++ b/adaptor/pcm_slurm/idl/slurm.yaml @@ -23,8 +23,6 @@ http: post: "/apis/slurm/addAccount" - selector: slurm.SlurmService.DeleteAccount delete: "/apis/slurm/deleteAccount" - - selector: slurm.SlurmService.ListQoss - get: "/apis/slurm/listQoss" - selector: slurm.SlurmService.ListWckeys get: "/apis/slurm/listWckeys" - selector: slurm.SlurmService.GetWckey @@ -49,3 +47,29 @@ http: get: "/apis/slurm/listReservations" - selector: slurm.SlurmService.GetReservation get: "/apis/slurm/getReservation" + - selector: slurm.SlurmService.GetAllJobs + get: "/apis/slurm/getAllJobs" + - selector: slurm.SlurmService.GetJob + get: "/apis/slurm/getJob/{JobId}" + - selector: slurm.SlurmService.SubmitJob + post: "/apis/slurm/submitJob/data" + body: "data" + - selector: slurm.SlurmService.DeleteJob + delete: "/apis/slurm/deleteJob/{JobId}" + - selector: slurm.SlurmService.UpdateJob + post: "/apis/slurm/updateJob/data" + body: "data" + - selector: slurm.SlurmService.GetDiag + get: "/apis/slurm/getDiag" + - selector: slurm.SlurmService.GetQos + get: "/apis/slurm/getQos" + - selector: slurm.SlurmService.GetSlurmdbJobs + get: "/apis/slurm/getAllSlurmdbJobs" + - selector: slurm.SlurmService.GetSlurmdbJobById + get: "/apis/slurm/getSlurmdbJobById/{JobId}" + - selector: slurm.SlurmService.DeleteQos + delete: "/apis/slurm/deleteQos" + - selector: slurm.SlurmService.GetQosByName + get: "/apis/slurm/getQosByName/{name}" + - selector: slurm.SlurmService.AddQos + post: "/apis/slurm/addQos" diff --git a/adaptor/pcm_slurm/idl/slurm_diag.proto b/adaptor/pcm_slurm/idl/slurm_diag.proto new file mode 100644 index 00000000..7264b836 --- /dev/null +++ b/adaptor/pcm_slurm/idl/slurm_diag.proto @@ -0,0 +1,49 @@ +syntax = "proto3"; +package slurm; + +option go_package = "/slurmpb"; +import "idl/static.proto"; + +message DiagReq{ + SlurmVersion slurmVersion = 1; +} + +message DiagResp{ + StatsInfoResponseMsg statsInfoResponseMsg = 1; +} + +message StatsInfoResponseMsg { + uint32 parts_packed =1; + int64 req_time=2; + int64 req_time_start=3; + uint32 server_thread_count=4; + uint32 agent_queue_size=5; + + uint32 schedule_cycle_max=6; + uint32 schedule_cycle_last=7; + uint32 schedule_cycle_sum=8; + uint32 schedule_cycle_counter=9; + uint32 schedule_cycle_depth=10; + uint32 schedule_queue_len=11; + + uint32 jobs_submitted=12; + uint32 jobs_started=13; + uint32 jobs_completed=14; + uint32 jobs_canceled=15; + uint32 jobs_failed=16; + + uint32 bf_backfilled_jobs=17; + uint32 bf_last_backfilled_jobs=18; + uint32 bf_cycle_counter=19; + uint32 bf_cycle_sum=20; + uint32 bf_cycle_last=21; + uint32 bf_cycle_max=22; + uint32 bf_last_depth=23; + uint32 bf_last_depth_try=24; + uint32 bf_depth_sum=25; + uint32 bf_depth_try_sum=26; + uint32 bf_queue_len=27; + uint32 bf_queue_len_sum=28; + int64 bf_when_last_cycle=29; + uint32 bf_active=30; +} \ No newline at end of file diff --git a/adaptor/pcm_slurm/idl/slurm_job.proto b/adaptor/pcm_slurm/idl/slurm_job.proto new file mode 100644 index 00000000..828df347 --- /dev/null +++ b/adaptor/pcm_slurm/idl/slurm_job.proto @@ -0,0 +1,314 @@ +syntax = "proto3"; +package slurm; + +option go_package = "/slurmpb"; +import "idl/static.proto"; + +message UpdateJobReq{ + uint32 JobId = 1; + Update_job_options data = 2; + SlurmVersion slurmVersion = 3; +} + +message UpdateJobResp{ + int32 Error_code=1; +} + +message SubmitJobReq{ + JobDescriptor data = 1; + SlurmVersion slurmVersion = 2; +} + +message SubmitJobResp{ + repeated Submit_response_msg submitResponseMsg = 1; +} + +message DeleteJobReq{ + uint32 JobId = 1; + SlurmVersion slurmVersion = 2; +} + +message DeleteJobResp{ + int32 Error_code=1; +} + +message Argv{ + string argv =1; +} + +message Environment{ + string environment =1; +} + +message JobDescriptor{ + string Account =1; /* charge to specified account */ + string Acctg_freq =2; /* accounting polling intervals (seconds) */ + string Alloc_node=3; /* node making resource allocation request + * NOTE: Normally set by slurm_submit* or + * slurm_allocate* function */ + uint32 Alloc_resp_port=4; /* port to send allocation confirmation to */ + uint32 Alloc_sid =5; /* local sid making resource allocation request + * NOTE: Normally set by slurm_submit* or + * slurm_allocate* function + * NOTE: Also used for update flags, see + * ALLOC_SID_* flags */ + uint32 Argc =6; /* number of arguments to the script */ + repeated Argv argv = 7; /* arguments to the script */ + string Array_inx =8; /* job array index values */ + //void *array_bitmap; /* NOTE: Set by slurmctld */ + int64 Begin_time = 9; /* delay initiation until this time */ + uint32 Ckpt_interval=10; /* periodically checkpoint this job */ + string Ckpt_dir =11; /* directory to store checkpoint images */ + string Comment =12; /* arbitrary comment (used by Moab scheduler) */ + uint32 Contiguous=13; /* 1 if job requires contiguous nodes, + * 0 otherwise,default=0 */ + string Cpu_bind=14; /* binding map for map/mask_cpu */ + uint32 Cpu_bind_type=15; /* see cpu_bind_type_t */ + string Dependency =16; /* synchronize job execution with other jobs */ + int64 End_time=17; /* time by which job must complete, used for + * job update only now, possible deadline + * scheduling in the future */ + repeated Environment environment=18; /* environment variables to set for job, + * name=value pairs, one per line */ + uint32 Env_size =19; /* element count in environment */ + string Exc_nodes =20; /* comma separated list of nodes excluded + * from job's allocation, default NONE */ + string Features =21; /* comma separated list of required features, + * default NONE */ + string Gres =22; /* comma separated list of required generic + * resources, default NONE */ + uint32 Group_id =23; /* group to assume, if run as root. */ + uint32 Immediate=24; /* 1 if allocate to run or fail immediately, + * 0 if to be queued awaiting resources */ + uint32 Job_id =25; /* job ID, default set by SLURM */ + uint32 Kill_on_node_fail=26; /* 1 if node failure to kill job, + * 0 otherwise,default=1 */ + string Licenses=27; /* licenses required by the job */ + uint32 Mail_type=28; /* see MAIL_JOB_ definitions above */ + string Mail_user =29; /* user to receive notification */ + string Mem_bind =30; /* binding map for map/mask_cpu */ + uint32 Mem_bind_type=31; /* see mem_bind_type_t */ + string Name =32; /* name of the job, default "" */ + string Network=33; /* network use spec */ + uint32 Nice =34; /* requested priority change, + * NICE_OFFSET == no change */ + uint32 Num_tasks=35; /* number of tasks to be started, + * for batch only */ + uint32 Open_mode=36; /* out/err open mode truncate or append, + * see OPEN_MODE_* */ + uint32 Other_port=37; /* port to send various notification msg to */ + uint32 Overcommit =38; /* over subscribe resources, for batch only */ + string Partition=39; /* name of requested partition, + * default in SLURM config */ + uint32 Plane_size =40; /* plane size when task_dist = + SLURM_DIST_PLANE */ + uint32 Priority =41; /* relative priority of the job, + * explicitly set only for user root, + * 0 == held (don't initiate) */ + uint32 Profile =42; /* Level of acct_gather_profile {all | none} */ + string Qos =43; /* Quality of Service */ + string Resp_host=44; /* NOTE: Set by slurmctld */ + string Req_nodes=45; /* comma separated list of required nodes + * default NONE */ + uint32 Requeue=46; /* enable or disable job requeue option */ + string Reservation=47; /* name of reservation to use */ + string Script=48; /* the actual job script, default NONE */ + uint32 Shared =49; /* 1 if job can share nodes with other jobs, + * 0 if job needs exclusive access to the node, + * or NO_VAL to accept the system default. + * SHARED_FORCE to eliminate user control. */ + //char **spank_job_env; environment variables for job prolog/epilog + // * scripts as set by SPANK plugins + uint32 Spank_job_env_size=50; /* element count in spank_env */ + uint32 Task_dist =51; /* see enum task_dist_state */ + uint32 Time_limit =52; /* maximum run time in minutes, default is + * partition limit */ + uint32 Time_min =53; /* minimum run time in minutes, default is + * time_limit */ + uint32 User_id=54; /* set only if different from current UID, + * can only be explicitly set by user root */ + uint32 Wait_all_nodes=55; /* 0 to start job immediately after allocation + * 1 to start job after all nodes booted + * or NO_VAL to use system default */ + uint32 Warn_signal=56; /* signal to send when approaching end time */ + uint32 Warn_time=57; /* time before end to send signal (seconds) */ + string Work_dir =58; /* pathname of working directory */ + + /* job constraints: */ + uint32 Cpus_per_task=59; /* number of processors required for + * each task */ + uint32 Min_cpus =60; /* minimum number of processors required, + * default=0 */ + uint32 Max_cpus=61; /* maximum number of processors required, + * default=0 */ + uint32 Min_nodes=62; /* minimum number of nodes required by job, + * default=0 */ + uint32 Max_nodes=63; /* maximum number of nodes usable by job, + * default=0 */ + uint32 Boards_per_node =64; /* boards per node required by job */ + uint32 Sockets_per_board=65; /* sockets per board required by job */ + uint32 Sockets_per_node =66; /* sockets per node required by job */ + uint32 Cores_per_socket=67; /* cores per socket required by job */ + uint32 Threads_per_core=68; /* threads per core required by job */ + uint32 Ntasks_per_node =69; /* number of tasks to invoke on each node */ + uint32 Ntasks_per_socket=70; /* number of tasks to invoke on + * each socket */ + uint32 Ntasks_per_core =71; /* number of tasks to invoke on each core */ + uint32 Ntasks_per_board=72; /* number of tasks to invoke on each board */ + uint32 Pn_min_cpus =73; /* minimum # CPUs per node, default=0 */ + uint32 Pn_min_memory=74; /* minimum real memory per node OR + * real memory per CPU | MEM_PER_CPU, + * default=0 (no limit) */ + uint32 Pn_min_tmp_disk =75; /* minimum tmp disk per node, + * default=0 */ + + /* + * The following parameters are only meaningful on a Blue Gene + * system at present. Some will be of value on other system. Don't remove these + * they are needed for LCRM and others that can't talk to the opaque data type + * select_jobinfo. + */ + //uint16_t geometry[HIGHEST_DIMENSIONS]; node count in various + // * dimensions, e.g. X, Y, and Z + //uint16_t conn_type[HIGHEST_DIMENSIONS]; see enum connection_type + uint32 Reboot=76; /* force node reboot before startup */ + uint32 Rotate=77; /* permit geometry rotation if set */ + //char *blrtsimage; /* BlrtsImage for block */ + //char *linuximage; /* LinuxImage for block */ + //char *mloaderimage; /* MloaderImage for block */ + //char *ramdiskimage; /* RamDiskImage for block */ + + /* End of Blue Gene specific values */ + uint32 Req_switch =78; /* Minimum number of switches */ + //dynamic_plugin_data_t *select_jobinfo; /* opaque data type, + // * SLURM internal use only */ + string Std_err=79; /* pathname of stderr */ + string Std_in =80; /* pathname of stdin */ + string Std_out=81; /* pathname of stdout */ + uint32 Wait4switch=82; /* Maximum time to wait for minimum switches */ + string Wckey =83; /* wckey for job */ +} + +message Submit_response_msg{ + uint32 Job_id = 1; + uint32 Step_id =2; + uint32 Error_code=3; +} + +message JobInfoMsgReq{ + uint32 JobId = 1; + SlurmVersion slurmVersion = 2; +} + +message JobInfoMsgResp { + Job_info_msg jobInfoMsg =1; +} + +message Job_info_msg{ + int64 Last_update = 1; + uint32 Record_count = 2; + repeated Job_info Job_list = 3; +} + +message Job_info{ + string account = 1; /* charge to specified account */ + string alloc_node = 2; /* local node making resource alloc */ + uint32 alloc_sid =3; /* local sid making resource alloc */ + uint32 array_job_id =4; /* job_id of a job array or 0 if N/A */ + uint32 array_task_id =5; /* task_id of a job array */ + uint32 assoc_id =6; /* association id for job */ + uint32 batch_flag =7; /* 1 if batch: queued job with script */ + string batch_host =8; /* name of host running batch script */ + string batch_script=9; /* contents of batch script */ + string command =10; /* command to be executed, built from submitted + * job's argv and NULL for salloc command */ + string comment =11; /* arbitrary comment (used by Moab scheduler) */ + uint32 contiguous =12; /* 1 if job requires contiguous nodes */ + uint32 cpus_per_task=13; /* number of processors required for + * each task */ + string dependency =14; /* synchronize job execution with other jobs */ + uint32 derived_ec =15; /* highest exit code of all job steps */ + int64 eligible_time =16; /* time job is eligible for running */ + int64 end_time =17; /* time of termination, actual or expected */ + string exc_nodes =18; /* comma separated list of excluded nodes */ + int32 exc_node_inx =19; /* excluded list index pairs into node_table: + * start_range_1, end_range_1, + * start_range_2, .., -1 */ + uint32 exit_code =20; /* exit code for job (status from wait call) */ + string features =21; /* comma separated list of required features */ + string gres =22; /* comma separated list of generic resources */ + uint32 group_id =23; /* group job sumitted as */ + uint32 job_id =24; /* job ID */ + uint32 job_state =25; /* state of the job, see enum job_states */ + string licenses =26; /* licenses required by the job */ + uint32 max_cpus =27; /* maximum number of cpus usable by job */ + uint32 max_nodes =28; /* maximum number of nodes usable by job */ + uint32 boards_per_node =29; /* boards per node required by job */ + uint32 sockets_per_board=30; /* sockets per board required by job */ + uint32 sockets_per_node=31; /* sockets per node required by job */ + uint32 cores_per_socket=32; /* cores per socket required by job */ + uint32 threads_per_core=33; /* threads per core required by job */ + string name =34; /* name of the job */ + string network =35; /* network specification */ + string nodes =36; /* list of nodes allocated to job */ + uint32 nice =37; /* requested priority change */ + int32 node_inx =38; /* list index pairs into node_table for *nodes: + * start_range_1, end_range_1, + * start_range_2, .., -1 */ + uint32 ntasks_per_core =39; /* number of tasks to invoke on each core */ + uint32 ntasks_per_node =40; /* number of tasks to invoke on each node */ + uint32 ntasks_per_socket =41; /* number of tasks to invoke on each socket*/ + uint32 ntasks_per_board =42; /* number of tasks to invoke on each board */ + + uint32 num_nodes =43; /* minimum number of nodes required by job */ + uint32 num_cpus =44; /* minimum number of cpus required by job */ + string partition =45; /* name of assigned partition */ + uint32 pn_min_memory =46; /* minimum real memory per node, default=0 */ + uint32 pn_min_cpus =47; /* minimum # CPUs per node, default=0 */ + uint32 pn_min_tmp_disk =48; /* minimum tmp disk per node, default=0 */ + int64 pre_sus_time =49; /* time job ran prior to last suspend */ + uint32 priority =50; /* relative priority of the job, + * 0=held, 1=required nodes DOWN/DRAINED */ + uint32 profile =51; /* Level of acct_gather_profile {all | none} */ + string qos =52; /* Quality of Service */ + string req_nodes =53; /* comma separated list of required nodes */ + int32 req_node_inx =54; /* required list index pairs into node_table: + * start_range_1, end_range_1, + * start_range_2, .., -1 */ + uint32 req_switch =55; /* Minimum number of switches */ + uint32 requeue =56; /* enable or disable job requeue option */ + int64 resize_time =57; /* time of latest size change */ + uint32 restart_cnt =58;/* count of job restarts */ + string resv_name =59; /* reservation name */ + /*dynamic_plugin_data_t *select_jobinfo;*/ /* opaque data type, + * process using + * slurm_get_select_jobinfo() + */ + /*job_resources_t *job_resrcs;*/ /* opaque data type, job resources */ + uint32 shared =60; /* 1 if job can share nodes with other jobs */ + uint32 show_flags =61; /* conveys level of details requested */ + int64 start_time =62; /* time execution begins, actual or expected */ + string state_desc =63; /* optional details for state_reason */ + uint32 state_reason =64; /* reason job still pending or failed, see + * slurm.h:enum job_state_reason */ + int64 submit_time =65; /* time of job submission */ + int64 suspend_time =66; /* time job last suspended or resumed */ + uint32 time_limit =67; /* maximum run time in minutes or INFINITE */ + uint32 time_min =68; /* minimum run time in minutes or INFINITE */ + uint32 user_id =69; /* user the job runs as */ + int64 preempt_time =70; /* preemption signal time */ + uint32 wait4switch =71;/* Maximum time to wait for minimum switches */ + string wckey =72; /* wckey for job */ + string work_dir =73; /* pathname of working directory */ +} + +message Update_job_options { + string Partition =1; + string Qos =2; + uint32 Num_tasks =3; + uint32 Ntasks_per_node =4; + uint32 Ntasks_per_socket =5; + uint32 Ntasks_per_core =6; + uint32 Min_nodes =7; + uint32 Max_nodes =8; +} \ No newline at end of file diff --git a/adaptor/pcm_slurm/idl/slurmdb_assoc.proto b/adaptor/pcm_slurm/idl/slurmdb_assoc.proto index f9b7b053..24ebae18 100644 --- a/adaptor/pcm_slurm/idl/slurmdb_assoc.proto +++ b/adaptor/pcm_slurm/idl/slurmdb_assoc.proto @@ -38,7 +38,7 @@ message AssociationInfo{ string parent_acct = 23 ; int32 parent_id = 24 ; string partition = 25 ; - repeated QosInfo qos_list = 26 ; + repeated Slurmdb_qos_rec qos_list = 26 ; int32 rgt = 27 ; int32 shares_raw = 28 ; int32 uid = 29 ; diff --git a/adaptor/pcm_slurm/idl/slurmdb_job.proto b/adaptor/pcm_slurm/idl/slurmdb_job.proto new file mode 100644 index 00000000..f492d2f6 --- /dev/null +++ b/adaptor/pcm_slurm/idl/slurmdb_job.proto @@ -0,0 +1,92 @@ +syntax = "proto3"; +package slurm; + +option go_package = "/slurmpb"; +import "idl/static.proto"; + +message SlurmDbJobReq{ + SlurmVersion slurmVersion = 1; + uint32 JobId = 2; +} + +message SlurmDbJobResp{ + repeated Slurmdb_job_rec slurmdb_job_rec = 1; +} + +message Slurmdb_job_rec { + uint32 alloc_cpu = 1; + uint32 alloc_nodes =2; + string account= 3; + uint32 associd=4; + string blockid=5; + string cluster=6; + uint32 derived_ec=7; + string derived_es=8; + uint32 elapsed=9; + int64 eligible=10; + int64 end=11; + uint32 exitcode=12; + // void *first_step_ptr; + uint32 gid=13; + uint32 jobid=14; + string jobname=15; + uint32 lft=16; + string partition=17; + string nodes=18; + uint32 priority=19; + uint32 qosid=20; + uint32 req_cpus=21; + uint32 req_mem=22; + uint32 requid=23; + uint32 resvid=24; + uint32 show_full=25; + int64 start=26; + uint32 state=27; + // repeated slurmdb_stats_t stats=28; + // List steps; + int64 submit=28; + uint32 suspended=29; + uint32 sys_cpu_sec=30; + uint32 sys_cpu_usec=31; + uint32 timelimit=32; + uint32 tot_cpu_sec=33; + uint32 tot_cpu_usec=34; + uint32 track_steps=35; + uint32 uid=36; + string user=37; + uint32 user_cpu_sec=38; + uint32 user_cpu_usec=39; + string wckey=40; + uint32 wckeyid=41; +} + +message slurmdb_stats_t{ + double act_cpufreq =1; + double cpu_ave=2; + double consumed_energy=3; + uint32 cpu_min=4; + uint32 cpu_min_nodeid=5; + uint32 cpu_min_taskid=6; + double disk_read_ave=7; + double disk_read_max=8; + uint32 disk_read_max_nodeid=9; + uint32 disk_read_max_taskid=10; + double disk_write_ave=11; + double disk_write_max=12; + uint32 disk_write_max_nodeid=13; + uint32 disk_write_max_taskid=14; + double pages_ave=15; + uint32 pages_max=16; + uint32 pages_max_nodeid=17; + uint32 pages_max_taskid=18; + double rss_ave=19; + uint32 rss_max=20; + uint32 rss_max_nodeid=21; + uint32 rss_max_taskid=22; + double vsize_ave=23; + uint32 vsize_max=24; + uint32 vsize_max_nodeid=25; + uint32 vsize_max_taskid=26; +} + + diff --git a/adaptor/pcm_slurm/idl/slurmdb_qos.proto b/adaptor/pcm_slurm/idl/slurmdb_qos.proto index 1229c31e..f1670075 100644 --- a/adaptor/pcm_slurm/idl/slurmdb_qos.proto +++ b/adaptor/pcm_slurm/idl/slurmdb_qos.proto @@ -2,48 +2,95 @@ syntax = "proto3"; package slurm; option go_package = "/slurmpb"; -import "google/protobuf/timestamp.proto"; import "idl/static.proto"; -message QosUsageInfo{ - +message QosReq{ + SlurmVersion slurmVersion = 1; + string name=2; } -message QosInfo{ - string description = 1 ; - int32 id = 2 ; - int32 flags = 3 ; - int32 grace_time = 4 ; - int64 grp_cpu_mins = 5; - int64 grp_cpu_run_mins = 6; - int32 grp_cpus = 7 ; - int32 grp_jobs = 8 ; - int32 grp_mem = 9 ; - int32 grp_nodes = 10 ; - int32 grp_submit_jobs = 11 ; - int32 grp_wall = 12 ; - int64 max_cpu_mins_pj = 13 ; - int64 max_cpu_run_mins_pu = 14 ; - int32 max_cpus_pj = 15 ; - int32 max_cpus_pu = 16 ; - int32 max_jobs_pu = 17 ; - int32 max_nodes_pj = 18 ; - int32 max_submit_jobs_pu = 19 ; - int32 max_wall_pj = 20 ; - string name = 21 ; - int32 preempt_bitstr = 22 ; - repeated string preempt_list = 23 ; - int32 preempt_mode = 24 ; - int32 priority = 25 ; - QosUsageInfo usage = 26 ; - double usageFactor = 27 ; - double usage_thres = 28 ; +message QosResp{ + repeated Slurmdb_qos_rec slurmdb_qos_rec = 1; } -message ListQossReq{ - SlurmVersion slurm_version = 1; +message DeleteQosReq{ + SlurmVersion slurmVersion = 1; + string names=2; } -message ListQossResp { - repeated QosInfo qos_infos =1; +message DeleteQosResp { + string result =1; } + +message AddQosReq { + SlurmVersion slurmVersion = 1; + Slurmdb_qos_rec slurmdb_qos_rec=2; +} + +message AddQosResp { + string result =1; +} + +message Qos_info_msg { + uint32 Record_count = 1; + repeated Slurmdb_qos_rec slurmdb_qos_rec = 2; +} + +message Slurmdb_qos_rec{ + string Description=1; + uint32 Id=2; + uint32 Flags =3; /* flags for various things to enforce or + override other limits */ + uint32 Grace_time =4; /* preemption grace time */ + uint64 Grp_cpu_mins =5; /* max number of cpu minutes all jobs + * running under this qos can run for */ + uint64 Grp_cpu_run_mins=6; /* max number of cpu minutes all jobs + * running under this qos can + * having running at one time */ + uint32 Grp_cpus =7; /* max number of cpus this qos + can allocate at one time */ + uint32 Grp_jobs=8; /* max number of jobs this qos can run + * at one time */ + uint32 Grp_mem =9; /* max amount of memory this qos + can allocate at one time */ + uint32 Grp_nodes=10; /* max number of nodes this qos + can allocate at once */ + uint32 Grp_submit_jobs=11; /* max number of jobs this qos can submit at + * one time */ + uint32 Grp_wall =12; /* total time in hours this qos can run for */ + + uint64 Max_cpu_mins_pj=13; /* max number of cpu mins a job can + * use with this qos */ + uint64 Max_cpu_run_mins_pu=14; /* max number of cpu mins a user can + * allocate at a given time when + * using this qos (Not yet valid option) */ + uint32 Max_cpus_pj =15; /* max number of cpus a job can + * allocate with this qos */ + uint32 Max_cpus_pu=16; /* max number of cpus a user can + * allocate with this qos at one time */ + uint32 Max_jobs_pu =17; /* max number of jobs a user can + * run with this qos at one time */ + uint32 Max_nodes_pj =18; /* max number of nodes a job can + * allocate with this qos at one time */ + uint32 Max_nodes_pu=19; /* max number of nodes a user can + * allocate with this qos at one time */ + uint32 Max_submit_jobs_pu=20; /* max number of jobs a user can + submit with this qos at once */ + uint32 Max_wall_pj =21; /* longest time this + * qos can run a job */ + + string Name =22; + //bitstr_t *preempt_bitstr; /* other qos' this qos can preempt */ + + repeated string preempt_list = 23; /* List preempt_list; list of char *'s only used to add or + * change the other qos' this can preempt, + * when doing a get use the preempt_bitstr */ + uint32 preempt_mode=24; /* See PREEMPT_MODE_* in slurm/slurm.h */ + uint32 priority=25; /* ranged int needs to be a unint for + * heterogeneous systems */ + //assoc_mgr_qos_usage_t *usage; /* For internal use only, DON'T PACK */ + double usage_factor=26; /* double, factor to apply to usage in this qos */ + double usage_thres=27; /* double, percent of effective usage of an + association when breached will deny + pending and new jobs */ +} \ No newline at end of file diff --git a/adaptor/pcm_slurm/server/slurm.go b/adaptor/pcm_slurm/server/slurm.go index 5d108b76..364101ae 100644 --- a/adaptor/pcm_slurm/server/slurm.go +++ b/adaptor/pcm_slurm/server/slurm.go @@ -113,8 +113,8 @@ func (s *Server) DeleteAccount(ctx context.Context, req *slurmpb.DeleteAccountRe } // ListQoss return all slurm qos -func (s *Server) ListQoss(ctx context.Context, req *slurmpb.ListQossReq) (*slurmpb.ListQossResp, error) { - resp, err := ListQoss(ctx, req) +func (s *Server) GetQos(ctx context.Context, req *slurmpb.QosReq) (*slurmpb.QosResp, error) { + resp, err := GetQos(ctx, req) if err != nil { glog.Errorf("ListSlurmQoss error %+v", err) return nil, status.Errorf(codes.Internal, err.Error()) @@ -241,3 +241,102 @@ func (s *Server) ListReservations(ctx context.Context, req *slurmpb.ListReservat } return resp, nil } + +func (s *Server) GetAllJobs(ctx context.Context, req *slurmpb.JobInfoMsgReq) (*slurmpb.JobInfoMsgResp, error) { + resp, err := GetAllJobs(ctx, req) + if err != nil { + glog.Errorf("GetAllJobs error %+v", err) + return nil, status.Errorf(codes.Internal, err.Error()) + } + return resp, nil +} + +func (s *Server) GetJob(ctx context.Context, req *slurmpb.JobInfoMsgReq) (*slurmpb.JobInfoMsgResp, error) { + resp, err := GetJob(ctx, req) + if err != nil { + glog.Errorf("GetJob error %+v", err) + return nil, status.Errorf(codes.Internal, err.Error()) + } + return resp, nil +} + +func (s *Server) SubmitJob(ctx context.Context, req *slurmpb.SubmitJobReq) (*slurmpb.SubmitJobResp, error) { + resp, err := SubmitJob(ctx, req) + if err != nil { + glog.Errorf("SubmitJob error %+v", err) + return nil, status.Errorf(codes.Internal, err.Error()) + } + return resp, nil +} + +func (s *Server) DeleteJob(ctx context.Context, req *slurmpb.DeleteJobReq) (*slurmpb.DeleteJobResp, error) { + resp, err := DeleteJob(ctx, req) + if err != nil { + glog.Errorf("DeleteJob error %+v", err) + return nil, status.Errorf(codes.Internal, err.Error()) + } + return resp, nil +} + +func (s *Server) UpdateJob(ctx context.Context, req *slurmpb.UpdateJobReq) (*slurmpb.UpdateJobResp, error) { + resp, err := UpdateJob(ctx, req) + if err != nil { + glog.Errorf("UpdateJob error %+v", err) + return nil, status.Errorf(codes.Internal, err.Error()) + } + return resp, nil +} + +func (s *Server) GetDiag(ctx context.Context, req *slurmpb.DiagReq) (*slurmpb.DiagResp, error) { + resp, err := GetDiag(ctx, req) + if err != nil { + glog.Errorf("GetDiag error %+v", err) + return nil, status.Errorf(codes.Internal, err.Error()) + } + return resp, nil +} + +func (s *Server) GetSlurmdbJobs(ctx context.Context, req *slurmpb.SlurmDbJobReq) (*slurmpb.SlurmDbJobResp, error) { + resp, err := GetSlurmdbJobs(ctx, req) + if err != nil { + glog.Errorf("GetSlurmdbJobs error %+v", err) + return nil, status.Errorf(codes.Internal, err.Error()) + } + return resp, nil +} + +func (s *Server) GetSlurmdbJobById(ctx context.Context, req *slurmpb.SlurmDbJobReq) (*slurmpb.SlurmDbJobResp, error) { + resp, err := GetSlurmdbJobById(ctx, req) + if err != nil { + glog.Errorf("GetSlurmdbJobById error %+v", err) + return nil, status.Errorf(codes.Internal, err.Error()) + } + return resp, nil +} + +func (s *Server) DeleteQos(ctx context.Context, req *slurmpb.DeleteQosReq) (*slurmpb.DeleteQosResp, error) { + resp, err := DeleteQos(ctx, req) + if err != nil { + glog.Errorf("DeleteQos error %+v", err) + return nil, status.Errorf(codes.Internal, err.Error()) + } + return resp, nil +} + +func (s *Server) AddQos(ctx context.Context, req *slurmpb.AddQosReq) (*slurmpb.AddQosResp, error) { + resp, err := AddQos(ctx, req) + if err != nil { + glog.Errorf("AddQos error %+v", err) + return nil, status.Errorf(codes.Internal, err.Error()) + } + return resp, nil +} + +func (s *Server) GetQosByName(ctx context.Context, req *slurmpb.QosReq) (*slurmpb.QosResp, error) { + resp, err := GetQosByName(ctx, req) + if err != nil { + glog.Errorf("GetQosByName error %+v", err) + return nil, status.Errorf(codes.Internal, err.Error()) + } + return resp, nil +} diff --git a/adaptor/pcm_slurm/server/slurmImpl.go b/adaptor/pcm_slurm/server/slurmImpl.go index 4ec55d87..4385598d 100644 --- a/adaptor/pcm_slurm/server/slurmImpl.go +++ b/adaptor/pcm_slurm/server/slurmImpl.go @@ -96,12 +96,12 @@ func DeleteAccount(ctx context.Context, req *pbslurm.DeleteAccountReq) (*pbslurm return resp, nil } -func ListQoss(ctx context.Context, req *pbslurm.ListQossReq) (*pbslurm.ListQossResp, error) { +func GetQos(ctx context.Context, req *pbslurm.QosReq) (*pbslurm.QosResp, error) { slurm, _ := slurmer.SelectSlurmVersion(req.SlurmVersion) if slurm == nil { return nil, nil } - resp, _ := slurm.ListQoss(ctx, req) + resp, _ := slurm.GetQos(ctx, req) return resp, nil } @@ -212,3 +212,102 @@ func ListReservations(ctx context.Context, req *pbslurm.ListReservationsReq) (*p resp, _ := slurm.ListReservations(ctx, req) return resp, nil } + +func GetAllJobs(ctx context.Context, req *pbslurm.JobInfoMsgReq) (*pbslurm.JobInfoMsgResp, error) { + slurm, _ := slurmer.SelectSlurmVersion(req.SlurmVersion) + if slurm == nil { + return nil, nil + } + resp, _ := slurm.GetAllJobs(ctx, req) + return resp, nil +} + +func GetJob(ctx context.Context, req *pbslurm.JobInfoMsgReq) (*pbslurm.JobInfoMsgResp, error) { + slurm, _ := slurmer.SelectSlurmVersion(req.SlurmVersion) + if slurm == nil { + return nil, nil + } + resp, _ := slurm.GetJob(ctx, req) + return resp, nil +} + +func SubmitJob(ctx context.Context, req *pbslurm.SubmitJobReq) (*pbslurm.SubmitJobResp, error) { + slurm, _ := slurmer.SelectSlurmVersion(req.SlurmVersion) + if slurm == nil { + return nil, nil + } + resp, _ := slurm.SubmitJob(ctx, req) + return resp, nil +} + +func DeleteJob(ctx context.Context, req *pbslurm.DeleteJobReq) (*pbslurm.DeleteJobResp, error) { + slurm, _ := slurmer.SelectSlurmVersion(req.SlurmVersion) + if slurm == nil { + return nil, nil + } + resp, _ := slurm.DeleteJob(ctx, req) + return resp, nil +} + +func UpdateJob(ctx context.Context, req *pbslurm.UpdateJobReq) (*pbslurm.UpdateJobResp, error) { + slurm, _ := slurmer.SelectSlurmVersion(req.SlurmVersion) + if slurm == nil { + return nil, nil + } + resp, _ := slurm.UpdateJob(ctx, req) + return resp, nil +} + +func GetDiag(ctx context.Context, req *pbslurm.DiagReq) (*pbslurm.DiagResp, error) { + slurm, _ := slurmer.SelectSlurmVersion(req.SlurmVersion) + if slurm == nil { + return nil, nil + } + resp, _ := slurm.GetDiag(ctx, req) + return resp, nil +} + +func GetSlurmdbJobs(ctx context.Context, req *pbslurm.SlurmDbJobReq) (*pbslurm.SlurmDbJobResp, error) { + slurm, _ := slurmer.SelectSlurmVersion(req.SlurmVersion) + if slurm == nil { + return nil, nil + } + resp, _ := slurm.GetSlurmdbJobs(ctx, req) + return resp, nil +} + +func GetSlurmdbJobById(ctx context.Context, req *pbslurm.SlurmDbJobReq) (*pbslurm.SlurmDbJobResp, error) { + slurm, _ := slurmer.SelectSlurmVersion(req.SlurmVersion) + if slurm == nil { + return nil, nil + } + resp, _ := slurm.GetSlurmdbJobById(ctx, req) + return resp, nil +} + +func DeleteQos(ctx context.Context, req *pbslurm.DeleteQosReq) (*pbslurm.DeleteQosResp, error) { + slurm, _ := slurmer.SelectSlurmVersion(req.SlurmVersion) + if slurm == nil { + return nil, nil + } + resp, _ := slurm.DeleteQos(ctx, req) + return resp, nil +} + +func AddQos(ctx context.Context, req *pbslurm.AddQosReq) (*pbslurm.AddQosResp, error) { + slurm, _ := slurmer.SelectSlurmVersion(req.SlurmVersion) + if slurm == nil { + return nil, nil + } + resp, _ := slurm.AddQos(ctx, req) + return resp, nil +} + +func GetQosByName(ctx context.Context, req *pbslurm.QosReq) (*pbslurm.QosResp, error) { + slurm, _ := slurmer.SelectSlurmVersion(req.SlurmVersion) + if slurm == nil { + return nil, nil + } + resp, _ := slurm.GetQosByName(ctx, req) + return resp, nil +} diff --git a/adaptor/pcm_slurm/service/common/diag.go b/adaptor/pcm_slurm/service/common/diag.go new file mode 100644 index 00000000..fd7bdb32 --- /dev/null +++ b/adaptor/pcm_slurm/service/common/diag.go @@ -0,0 +1,16 @@ +package slurmer + +import ( + pbslurm "code.gitlink.org.cn/JCCE/PCM.git/adaptor/pcm_slurm/gen/idl" + "code.gitlink.org.cn/JCCE/PCM.git/adaptor/pcm_slurm/service/tianhe" + "context" +) + +func (slurmStruct SlurmStruct) GetDiag(ctx context.Context, req *pbslurm.DiagReq) (*pbslurm.DiagResp, error) { + stats := tianhe.Get_diag() + + var resp = pbslurm.DiagResp{} + resp.StatsInfoResponseMsg = &stats + + return &resp, nil +} diff --git a/adaptor/pcm_slurm/service/common/job.go b/adaptor/pcm_slurm/service/common/job.go new file mode 100644 index 00000000..168a2025 --- /dev/null +++ b/adaptor/pcm_slurm/service/common/job.go @@ -0,0 +1,30 @@ +package slurmer + +import ( + pbslurm "code.gitlink.org.cn/JCCE/PCM.git/adaptor/pcm_slurm/gen/idl" + "code.gitlink.org.cn/JCCE/PCM.git/adaptor/pcm_slurm/service/tianhe" + "context" +) + +func (slurmStruct SlurmStruct) GetAllJobs(ctx context.Context, req *pbslurm.JobInfoMsgReq) (*pbslurm.JobInfoMsgResp, error) { + job_info_msg := tianhe.Get_all_jobs() + var resp = pbslurm.JobInfoMsgResp{} + resp.JobInfoMsg = &job_info_msg + + return &resp, nil +} + +func (slurmStruct SlurmStruct) GetJob(ctx context.Context, req *pbslurm.JobInfoMsgReq) (*pbslurm.JobInfoMsgResp, error) { + job_info_msg := tianhe.Get_job(req.JobId) + var resp = pbslurm.JobInfoMsgResp{} + resp.JobInfoMsg = &job_info_msg + + return &resp, nil +} + +func (slurmStruct SlurmStruct) DeleteJob(ctx context.Context, req *pbslurm.DeleteJobReq) (*pbslurm.DeleteJobResp, error) { + errorCode := tianhe.Delete_job(req.JobId) + var resp = pbslurm.DeleteJobResp{} + resp.ErrorCode = errorCode + return &resp, nil +} diff --git a/adaptor/pcm_slurm/service/common/job_db.go b/adaptor/pcm_slurm/service/common/job_db.go new file mode 100644 index 00000000..642c1fd0 --- /dev/null +++ b/adaptor/pcm_slurm/service/common/job_db.go @@ -0,0 +1,23 @@ +package slurmer + +import ( + pbslurm "code.gitlink.org.cn/JCCE/PCM.git/adaptor/pcm_slurm/gen/idl" + "code.gitlink.org.cn/JCCE/PCM.git/adaptor/pcm_slurm/service/tianhe" + "context" +) + +func (slurmStruct SlurmStruct) GetSlurmdbJobs(ctx context.Context, req *pbslurm.SlurmDbJobReq) (*pbslurm.SlurmDbJobResp, error) { + slurmdbJobs := tianhe.GetAllSlurmdbJobs() + var resp = pbslurm.SlurmDbJobResp{} + resp.SlurmdbJobRec = slurmdbJobs + + return &resp, nil +} + +func (slurmStruct SlurmStruct) GetSlurmdbJobById(ctx context.Context, req *pbslurm.SlurmDbJobReq) (*pbslurm.SlurmDbJobResp, error) { + slurmdbjob := tianhe.GetSlurmdbJobById(int(req.JobId)) + var resp = pbslurm.SlurmDbJobResp{} + resp.SlurmdbJobRec = slurmdbjob + + return &resp, nil +} diff --git a/adaptor/pcm_slurm/service/common/job_submit.go b/adaptor/pcm_slurm/service/common/job_submit.go new file mode 100644 index 00000000..da91ad52 --- /dev/null +++ b/adaptor/pcm_slurm/service/common/job_submit.go @@ -0,0 +1,27 @@ +package slurmer + +import ( + pbslurm "code.gitlink.org.cn/JCCE/PCM.git/adaptor/pcm_slurm/gen/idl" + "code.gitlink.org.cn/JCCE/PCM.git/adaptor/pcm_slurm/service/tianhe" + "context" +) + +func (slurmStruct SlurmStruct) SubmitJob(ctx context.Context, req *pbslurm.SubmitJobReq) (*pbslurm.SubmitJobResp, error) { + submit_response_msg := tianhe.Submit_job(req.Data) + var resp = pbslurm.SubmitJobResp{} + submitResponseMsg := pbslurm.SubmitResponseMsg{} + submitResponseMsg.JobId = submit_response_msg.JobId + submitResponseMsg.StepId = submit_response_msg.StepId + submitResponseMsg.ErrorCode = submit_response_msg.ErrorCode + + resp.SubmitResponseMsg = append(resp.SubmitResponseMsg, &submitResponseMsg) + + return &resp, nil +} + +func (slurmStruct SlurmStruct) UpdateJob(ctx context.Context, req *pbslurm.UpdateJobReq) (*pbslurm.UpdateJobResp, error) { + errorCode := tianhe.Update_job(req.Data, req.JobId) + var resp = pbslurm.UpdateJobResp{} + resp.ErrorCode = errorCode + return &resp, nil +} diff --git a/adaptor/pcm_slurm/service/common/qos_db.go b/adaptor/pcm_slurm/service/common/qos_db.go index a0333264..2b7c84c2 100644 --- a/adaptor/pcm_slurm/service/common/qos_db.go +++ b/adaptor/pcm_slurm/service/common/qos_db.go @@ -3,20 +3,58 @@ package slurmer import ( pbslurm "code.gitlink.org.cn/JCCE/PCM.git/adaptor/pcm_slurm/gen/idl" "code.gitlink.org.cn/JCCE/PCM.git/adaptor/pcm_slurm/service/tianhe" + "code.gitlink.org.cn/JCCE/PCM.git/common/ssh" "context" ) -func (slurmStruct SlurmStruct) ListQoss(ctx context.Context, req *pbslurm.ListQossReq) (*pbslurm.ListQossResp, error) { +func (slurmStruct SlurmStruct) GetQos(ctx context.Context, req *pbslurm.QosReq) (*pbslurm.QosResp, error) { + qoslist := tianhe.Get_Qos() + var resp = pbslurm.QosResp{} + resp.SlurmdbQosRec = qoslist + return &resp, nil +} - qosList := tianhe.GetQosInfo() +func (slurmStruct SlurmStruct) DeleteQos(ctx context.Context, req *pbslurm.DeleteQosReq) (*pbslurm.DeleteQosResp, error) { - resp := pbslurm.ListQossResp{} - for _, qos := range qosList.QosList { - qosInfoResult := qos - //userInfoResult.Name = user.Name + cmd := "/usr/local/bin/sacctmgr delete qos " + cmd = cmd + req.Names + cmd = cmd + " -i" - resp.QosInfos = append(resp.QosInfos, &qosInfoResult) - } + result := ssh.ExecCommand(cmd) + + resp := pbslurm.DeleteQosResp{} + resp.Result = result return &resp, nil } + +func (slurmStruct SlurmStruct) GetQosByName(ctx context.Context, req *pbslurm.QosReq) (*pbslurm.QosResp, error) { + qos := tianhe.GetQosByName(req.Name) + var resp = pbslurm.QosResp{} + slurmdbRosRec := []*pbslurm.SlurmdbQosRec{} + slurmdbRosRec = append(slurmdbRosRec, &qos) + resp.SlurmdbQosRec = slurmdbRosRec + + return &resp, nil +} + +func (slurmStruct SlurmStruct) AddQos(ctx context.Context, req *pbslurm.AddQosReq) (*pbslurm.AddQosResp, error) { + cmd := "/usr/local/bin/sacctmgr add qos " + cmd = cmd + req.SlurmdbQosRec.Name + + if len(req.SlurmdbQosRec.Description) != 0 { + cmd = cmd + " Description=" + req.SlurmdbQosRec.Description + } + + if len(req.SlurmdbQosRec.Name) != 0 { + cmd = cmd + " Name=" + req.SlurmdbQosRec.Name + } + + cmd = cmd + " -i" + + result := ssh.ExecCommand(cmd) + resp := pbslurm.AddQosResp{} + + resp.Result = result + return &resp, nil +} diff --git a/adaptor/pcm_slurm/service/common/slurmer.go b/adaptor/pcm_slurm/service/common/slurmer.go index 18105b53..ec7310a7 100644 --- a/adaptor/pcm_slurm/service/common/slurmer.go +++ b/adaptor/pcm_slurm/service/common/slurmer.go @@ -16,7 +16,7 @@ type Slurmer interface { GetAccount(ctx context.Context, req *pbslurm.GetAccountReq) (resp *pbslurm.GetAccountResp, err error) //get specific slurm account AddAccount(ctx context.Context, req *pbslurm.AddAccountReq) (resp *pbslurm.AddAccountResp, err error) //add slurm account DeleteAccount(ctx context.Context, req *pbslurm.DeleteAccountReq) (resp *pbslurm.DeleteAccountResp, err error) //delete slurm account - ListQoss(ctx context.Context, req *pbslurm.ListQossReq) (resp *pbslurm.ListQossResp, err error) //list slurm qoss + GetQos(ctx context.Context, req *pbslurm.QosReq) (resp *pbslurm.QosResp, err error) //list slurm qoss ListWckeys(ctx context.Context, req *pbslurm.ListWckeysReq) (resp *pbslurm.ListWckeysResp, err error) //list slurm wckeys GetWckey(ctx context.Context, req *pbslurm.GetWckeyReq) (resp *pbslurm.GetWckeyResp, err error) //list slurm wckeys ListClusters(ctx context.Context, req *pbslurm.ListClustersReq) (resp *pbslurm.ListClustersResp, err error) @@ -29,6 +29,17 @@ type Slurmer interface { GetPartition(ctx context.Context, req *pbslurm.GetPartitionReq) (resp *pbslurm.GetPartitionResp, err error) ListReservations(ctx context.Context, req *pbslurm.ListReservationsReq) (resp *pbslurm.ListReservationsResp, err error) GetReservation(ctx context.Context, req *pbslurm.GetReservationReq) (resp *pbslurm.GetReservationResp, err error) + GetAllJobs(ctx context.Context, req *pbslurm.JobInfoMsgReq) (resp *pbslurm.JobInfoMsgResp, err error) + GetJob(ctx context.Context, req *pbslurm.JobInfoMsgReq) (resp *pbslurm.JobInfoMsgResp, err error) + SubmitJob(ctx context.Context, req *pbslurm.SubmitJobReq) (resp *pbslurm.SubmitJobResp, err error) + DeleteJob(ctx context.Context, req *pbslurm.DeleteJobReq) (resp *pbslurm.DeleteJobResp, err error) + UpdateJob(ctx context.Context, req *pbslurm.UpdateJobReq) (resp *pbslurm.UpdateJobResp, err error) + GetDiag(ctx context.Context, req *pbslurm.DiagReq) (resp *pbslurm.DiagResp, err error) + GetSlurmdbJobs(ctx context.Context, req *pbslurm.SlurmDbJobReq) (resp *pbslurm.SlurmDbJobResp, err error) + GetSlurmdbJobById(ctx context.Context, req *pbslurm.SlurmDbJobReq) (resp *pbslurm.SlurmDbJobResp, err error) + DeleteQos(ctx context.Context, req *pbslurm.DeleteQosReq) (resp *pbslurm.DeleteQosResp, err error) + GetQosByName(ctx context.Context, req *pbslurm.QosReq) (resp *pbslurm.QosResp, err error) + AddQos(ctx context.Context, req *pbslurm.AddQosReq) (resp *pbslurm.AddQosResp, err error) } func SelectSlurmVersion(slurmVersion pbslurm.SlurmVersion) (slurmer Slurmer, err error) { diff --git a/adaptor/pcm_slurm/service/tianhe/converter.go b/adaptor/pcm_slurm/service/tianhe/converter.go new file mode 100644 index 00000000..e5722bf5 --- /dev/null +++ b/adaptor/pcm_slurm/service/tianhe/converter.go @@ -0,0 +1,134 @@ +package tianhe + +/* +#cgo LDFLAGS: -lslurm +#include +#include +#include +#include +#include +#include + +inline uint8_t uint8_ptr(uint8_t* pointer) { + if (NULL == pointer) { + return -1;} + return *pointer; +} +inline int8_t int8_ptr(int8_t* pointer) { + if (NULL == pointer) { + return -1;} + return *pointer; +} +inline uint16_t uint16_ptr(uint16_t* pointer) { + if (NULL == pointer) { + return -1;} + return *pointer; +} +inline int16_t int16_ptr(int16_t* pointer) { + if (NULL == pointer) { + return -1;} + return *pointer; +} +inline uint32_t uint32_ptr(uint32_t* pointer) { + if (NULL == pointer) { + return -1;} + return *pointer; +} +inline int32_t int32_ptr(int32_t* pointer) { + if (NULL == pointer) { + return -1;} + return *pointer; +} +inline uint64_t uint64_ptr(uint64_t* pointer) { + if (NULL == pointer) { + return -1;} + return *pointer; +} +inline int64_t int64_ptr(int16_t* pointer) { + if (NULL == pointer) { + return -1;} + return *pointer; +} +*/ +import "C" +import pbslurm "code.gitlink.org.cn/JCCE/PCM.git/adaptor/pcm_slurm/gen/idl" + +func Job_info_convert_c_to_go(c_struct *C.struct_job_info) pbslurm.JobInfo { + var go_struct pbslurm.JobInfo + + go_struct.Account = C.GoString(c_struct.account) + go_struct.AllocNode = C.GoString(c_struct.alloc_node) + go_struct.AllocSid = uint32(c_struct.alloc_sid) + go_struct.ArrayJobId = uint32(c_struct.array_job_id) + go_struct.ArrayTaskId = uint32(c_struct.array_task_id) + go_struct.AssocId = uint32(c_struct.assoc_id) + go_struct.BatchFlag = uint32(c_struct.batch_flag) + go_struct.BatchHost = C.GoString(c_struct.batch_host) + go_struct.BoardsPerNode = uint32(c_struct.boards_per_node) + go_struct.BatchScript = C.GoString(c_struct.batch_script) + go_struct.Command = C.GoString(c_struct.command) + go_struct.Comment = C.GoString(c_struct.comment) + go_struct.Contiguous = uint32(c_struct.contiguous) + go_struct.CoresPerSocket = uint32(c_struct.cores_per_socket) + go_struct.CpusPerTask = uint32(c_struct.cpus_per_task) + go_struct.Dependency = C.GoString(c_struct.dependency) + go_struct.DerivedEc = uint32(c_struct.derived_ec) + go_struct.EligibleTime = int64(c_struct.eligible_time) + go_struct.EndTime = int64(c_struct.end_time) + go_struct.ExcNodes = C.GoString(c_struct.exc_nodes) + go_struct.ExcNodeInx = int32(C.int32_ptr(c_struct.exc_node_inx)) + go_struct.ExitCode = uint32(c_struct.exit_code) + go_struct.Features = C.GoString(c_struct.features) + go_struct.GroupId = uint32(c_struct.group_id) + go_struct.Gres = C.GoString(c_struct.gres) + go_struct.JobId = uint32(c_struct.job_id) + go_struct.JobState = uint32(c_struct.job_state) + go_struct.Licenses = C.GoString(c_struct.licenses) + go_struct.MaxCpus = uint32(c_struct.max_cpus) + go_struct.MaxNodes = uint32(c_struct.max_nodes) + go_struct.Name = C.GoString(c_struct.name) + go_struct.Network = C.GoString(c_struct.network) + go_struct.Nodes = C.GoString(c_struct.nodes) + go_struct.Nice = uint32(c_struct.nice) + go_struct.NodeInx = int32(C.int32_ptr(c_struct.node_inx)) + go_struct.NtasksPerCore = uint32(c_struct.ntasks_per_core) + go_struct.NtasksPerNode = uint32(c_struct.ntasks_per_node) + go_struct.NtasksPerSocket = uint32(c_struct.ntasks_per_socket) + go_struct.NtasksPerBoard = uint32(c_struct.ntasks_per_board) + go_struct.NumCpus = uint32(c_struct.num_cpus) + go_struct.NumNodes = uint32(c_struct.num_nodes) + go_struct.Partition = C.GoString(c_struct.partition) + go_struct.PnMinMemory = uint32(c_struct.pn_min_memory) + go_struct.PnMinCpus = uint32(c_struct.pn_min_cpus) + go_struct.PnMinTmpDisk = uint32(c_struct.pn_min_tmp_disk) + go_struct.PreemptTime = int64(c_struct.preempt_time) + go_struct.PreSusTime = int64(c_struct.pre_sus_time) + go_struct.Priority = uint32(c_struct.priority) + go_struct.Profile = uint32(c_struct.profile) + go_struct.Qos = C.GoString(c_struct.qos) + go_struct.ReqNodes = C.GoString(c_struct.req_nodes) + go_struct.ReqNodeInx = int32(C.int32_ptr(c_struct.req_node_inx)) + go_struct.ReqSwitch = uint32(c_struct.req_switch) + go_struct.Requeue = uint32(c_struct.requeue) + go_struct.ResizeTime = int64(c_struct.resize_time) + go_struct.RestartCnt = uint32(c_struct.restart_cnt) + go_struct.ResvName = C.GoString(c_struct.resv_name) + go_struct.Shared = uint32(c_struct.shared) + go_struct.ShowFlags = uint32(c_struct.show_flags) + go_struct.SocketsPerBoard = uint32(c_struct.sockets_per_board) + go_struct.SocketsPerNode = uint32(c_struct.sockets_per_node) + go_struct.StartTime = int64(c_struct.start_time) + go_struct.StateDesc = C.GoString(c_struct.state_desc) + go_struct.StateReason = uint32(c_struct.state_reason) + go_struct.SubmitTime = int64(c_struct.submit_time) + go_struct.SuspendTime = int64(c_struct.suspend_time) + go_struct.TimeLimit = uint32(c_struct.time_limit) + go_struct.TimeMin = uint32(c_struct.time_min) + go_struct.ThreadsPerCore = uint32(c_struct.threads_per_core) + go_struct.UserId = uint32(c_struct.user_id) + go_struct.Wait4Switch = uint32(c_struct.wait4switch) + go_struct.Wckey = C.GoString(c_struct.wckey) + go_struct.WorkDir = C.GoString(c_struct.work_dir) + + return go_struct +} diff --git a/adaptor/pcm_slurm/service/tianhe/diag.go b/adaptor/pcm_slurm/service/tianhe/diag.go new file mode 100644 index 00000000..2c308307 --- /dev/null +++ b/adaptor/pcm_slurm/service/tianhe/diag.go @@ -0,0 +1,76 @@ +package tianhe + +/* +#include +#include +#include +#include +#include +#include + +stats_info_response_msg_t *getStats(){ + stats_info_response_msg_t *buf; + stats_info_request_msg_t req; + + int rc; + req.command_id = STAT_COMMAND_GET; + rc = slurm_get_statistics(&buf, (stats_info_request_msg_t *)&req); + if (rc != SLURM_SUCCESS) { + slurm_perror("slurm_get_statistics"); + return NULL; + } + //slurm_free_stats_info_request_msg(&req); + return buf; +} +*/ +import "C" +import pbslurm "code.gitlink.org.cn/JCCE/PCM.git/adaptor/pcm_slurm/gen/idl" + +func Get_diag() pbslurm.StatsInfoResponseMsg { + var go_stats_buffer pbslurm.StatsInfoResponseMsg + c_stats_buffer := C.getStats() + if c_stats_buffer == nil { + return go_stats_buffer + } + + go_stats_buffer = Stats_convert_c_to_go(c_stats_buffer) + //C.slurm_free_stats_response_msg(c_stats_buffer) + return go_stats_buffer +} + +func Stats_convert_c_to_go(c_struct *C.stats_info_response_msg_t) pbslurm.StatsInfoResponseMsg { + var go_struct pbslurm.StatsInfoResponseMsg + + go_struct.AgentQueueSize = uint32(c_struct.agent_queue_size) + go_struct.BfActive = uint32(c_struct.bf_active) + go_struct.BfBackfilledJobs = uint32(c_struct.bf_backfilled_jobs) + go_struct.BfCycleCounter = uint32(c_struct.bf_cycle_counter) + go_struct.BfCycleLast = uint32(c_struct.bf_cycle_last) + go_struct.BfCycleMax = uint32(c_struct.bf_cycle_max) + go_struct.BfCycleSum = uint32(c_struct.bf_cycle_sum) + go_struct.BfDepthSum = uint32(c_struct.bf_depth_sum) + go_struct.BfDepthTrySum = uint32(c_struct.bf_depth_try_sum) + go_struct.BfLastBackfilledJobs = uint32(c_struct.bf_last_backfilled_jobs) + go_struct.BfLastDepth = uint32(c_struct.bf_last_depth) + go_struct.BfLastDepthTry = uint32(c_struct.bf_last_depth_try) + go_struct.BfQueueLen = uint32(c_struct.bf_queue_len) + go_struct.BfQueueLenSum = uint32(c_struct.bf_queue_len_sum) + go_struct.BfWhenLastCycle = int64(c_struct.bf_when_last_cycle) + go_struct.JobsCompleted = uint32(c_struct.jobs_completed) + go_struct.JobsCanceled = uint32(c_struct.jobs_canceled) + go_struct.JobsFailed = uint32(c_struct.jobs_failed) + go_struct.JobsStarted = uint32(c_struct.jobs_started) + go_struct.JobsSubmitted = uint32(c_struct.jobs_submitted) + go_struct.PartsPacked = uint32(c_struct.parts_packed) + go_struct.ReqTime = int64(c_struct.req_time) + go_struct.ReqTimeStart = int64(c_struct.req_time_start) + go_struct.ScheduleCycleDepth = uint32(c_struct.schedule_cycle_depth) + go_struct.ScheduleCycleCounter = uint32(c_struct.schedule_cycle_counter) + go_struct.ScheduleCycleLast = uint32(c_struct.schedule_cycle_last) + go_struct.ScheduleCycleMax = uint32(c_struct.schedule_cycle_max) + go_struct.ScheduleCycleSum = uint32(c_struct.schedule_cycle_sum) + go_struct.ScheduleQueueLen = uint32(c_struct.schedule_queue_len) + go_struct.ServerThreadCount = uint32(c_struct.server_thread_count) + + return go_struct +} diff --git a/adaptor/pcm_slurm/service/tianhe/job.go b/adaptor/pcm_slurm/service/tianhe/job.go new file mode 100644 index 00000000..d1ebe7ef --- /dev/null +++ b/adaptor/pcm_slurm/service/tianhe/job.go @@ -0,0 +1,116 @@ +package tianhe + +/* +#include +#include +#include +#include +#include +#include + + struct job_info_msg *get_job_info(){ + + struct job_info_msg* job_buffer; + if(slurm_load_jobs ((time_t) NULL, + &job_buffer, SHOW_ALL)) { + return NULL; + } + + return job_buffer; + } + +struct job_info* job_from_list(struct job_info_msg *list, int i){ + return &list->job_array[i]; +} + + struct job_info_msg *get_single_job_info(uint32_t id){ + struct job_info_msg* job_buffer; + if( slurm_load_job (&job_buffer, id, SHOW_DETAIL)) { + return NULL; + } + return job_buffer; + } + +int delete_job(uint32_t id) { + int error_code = 0; + error_code = slurm_kill_job (id, SIGKILL, 0); + if (error_code) { + char msg[64]; + sprintf(msg, "slurm_kill_job(%.12s)",id); + slurm_perror (msg); + return error_code; + } + return (error_code); +} + + //static time_t last_update_time = (time_t) NULL; + //int error_code; + //job_info_msg_t * job_info_msg_ptr = NULL; + // + //error_code = slurm_load_jobs (last_update_time, &job_info_msg_ptr, 1); + //if (error_code) { + // slurm_perror ("slurm_load_jobs"); + // return (error_code); + //} + // + //slurm_print_job_info_msg ( stdout, job_info_msg_ptr, 1 ) ; + // + //slurm_free_job_info_msg ( job_info_msg_ptr ) ; + +*/ +import "C" +import ( + pbslurm "code.gitlink.org.cn/JCCE/PCM.git/adaptor/pcm_slurm/gen/idl" +) + +func Get_all_jobs() pbslurm.JobInfoMsg { + var go_job_buffer pbslurm.JobInfoMsg + c_job_buffer := C.get_job_info() + if c_job_buffer == nil { + go_job_buffer.LastUpdate = int64(0) + go_job_buffer.RecordCount = uint32(0) + go_job_buffer.JobList = nil + return go_job_buffer + } + + go_job_buffer.LastUpdate = int64(c_job_buffer.last_update) + go_job_buffer.RecordCount = uint32(c_job_buffer.record_count) + for i := uint32(0); i < go_job_buffer.RecordCount; i++ { + job := C.job_from_list(c_job_buffer, C.int(i)) + go_job := Job_info_convert_c_to_go(job) + go_job_buffer.JobList = append(go_job_buffer.JobList, &go_job) + //fmt.Println(Reason_to_string(go_job.Job_state)) + //fmt.Println(state_to_string(uint32(go_job.Job_state))) + } + C.slurm_free_job_info_msg(c_job_buffer) + + return go_job_buffer +} + +func Delete_job(id uint32) int32 { + error_code := C.slurm_kill_job(C.uint32_t(id), C.SIGKILL, C.uint16_t(0)) + return int32(error_code) +} + +func Get_job(id uint32) pbslurm.JobInfoMsg { + var go_job_buffer pbslurm.JobInfoMsg + c_job_buffer := C.get_single_job_info(C.uint32_t(id)) + if c_job_buffer == nil { + go_job_buffer.LastUpdate = int64(0) + go_job_buffer.RecordCount = uint32(0) + go_job_buffer.JobList = nil + return go_job_buffer + } + go_job_buffer.LastUpdate = int64(c_job_buffer.last_update) + go_job_buffer.RecordCount = uint32(c_job_buffer.record_count) + for i := uint32(0); i < go_job_buffer.RecordCount; i++ { + job := C.job_from_list(c_job_buffer, C.int(i)) + go_job := Job_info_convert_c_to_go(job) + go_job_buffer.JobList = append(go_job_buffer.JobList, &go_job) + //fmt.Println(Reason_to_string(go_job.Job_state)) + //fmt.Println(state_to_string(uint32(go_job.Job_state))) + } + C.slurm_free_job_info_msg(c_job_buffer) + + return go_job_buffer +} diff --git a/adaptor/pcm_slurm/service/tianhe/job_db.go b/adaptor/pcm_slurm/service/tianhe/job_db.go new file mode 100644 index 00000000..344dd17b --- /dev/null +++ b/adaptor/pcm_slurm/service/tianhe/job_db.go @@ -0,0 +1,139 @@ +package tianhe + +/* +#cgo LDFLAGS: -lslurmdb + +#include +#include +#include +#include +#include +#include +#include +#include + +slurmdb_job_rec_t *get_all_slurmdb_job() { + + slurmdb_job_cond_t *job_cond = NULL; + void *conn = slurmdb_connection_get(); + + List joblist = slurmdb_jobs_get(conn, job_cond); + + uint16_t listsize = slurm_list_count(joblist); + //qosinfo.record_count = size; + slurmdb_job_rec_t *jobarray = malloc(listsize * sizeof(slurmdb_job_rec_t)); + + ListIterator itr = slurm_list_iterator_create(joblist); + + slurmdb_job_rec_t *rec = NULL; + int i = 0; + while ((rec = slurm_list_next(itr))) { + jobarray[i] = *rec; + i++; + } + slurmdb_connection_close(&conn); + slurm_list_destroy(joblist); + //int arraysize = sizeof(jobarray); + //printf("%d\n", arraysize); + return jobarray; +} + +slurmdb_job_rec_t *job_from_array(slurmdb_job_rec_t *job_rec_array, int i) { + return (slurmdb_job_rec_t *) &(job_rec_array[i]); +} + +slurmdb_job_rec_t *job_from_array_by_id(slurmdb_job_rec_t *job_rec_array, int job_id) { + int i; + int arraysize = sizeof(job_rec_array); + for (i=0; i +#include +#include +#include +#include + +struct submit_response_msg *submit_job(struct job_descriptor *desc) +{ + + struct submit_response_msg *resp_msg; + if (slurm_submit_batch_job(desc, + &resp_msg)) { + return NULL; + } + return resp_msg; + +} + +void free_submit_response_msg(struct submit_response_msg *msg) +{ + slurm_free_submit_response_response_msg(msg); +} + +int update_job (struct job_descriptor *msg) { + + return slurm_update_job (msg); +} +*/ +import "C" +import ( + pbslurm "code.gitlink.org.cn/JCCE/PCM.git/adaptor/pcm_slurm/gen/idl" + "fmt" + "unsafe" +) + +func Submit_job(go_struct *pbslurm.JobDescriptor) pbslurm.SubmitResponseMsg { + + var c_struct C.struct_job_descriptor + + C.slurm_init_job_desc_msg(&c_struct) + if go_struct.Account != "" { + account_s := C.CString(go_struct.Account) + defer C.free(unsafe.Pointer(account_s)) + c_struct.account = account_s + } + if go_struct.AcctgFreq != "" { + acctg_freq_s := C.CString(go_struct.AcctgFreq) + defer C.free(unsafe.Pointer(acctg_freq_s)) + c_struct.acctg_freq = acctg_freq_s + } + if go_struct.AllocNode != "" { + alloc_node_s := C.CString(go_struct.AllocNode) + defer C.free(unsafe.Pointer(alloc_node_s)) + c_struct.alloc_node = alloc_node_s + } + if go_struct.AllocRespPort != 0 { + c_struct.alloc_resp_port = C.uint16_t(go_struct.AllocRespPort) + } + if go_struct.AllocSid != 0 { + c_struct.alloc_sid = C.uint32_t(go_struct.AllocSid) + } + if len(go_struct.Argv) > 0 { + c_struct.argc = C.uint32_t(len(go_struct.Argv)) + cArray := C.malloc(C.size_t(C.size_t(len(go_struct.Argv)) * C.size_t(unsafe.Sizeof(uintptr(0))))) + a := (*[1<<30 - 1]*C.char)(cArray) + for i := 0; i < len(go_struct.Argv); i++ { + a[i] = C.CString(go_struct.Argv[i].String()) + } + c_struct.argv = (**C.char)(cArray) + fmt.Printf("test\n") + } + + if go_struct.ArrayInx != "" { + array_inx_s := C.CString(go_struct.ArrayInx) + defer C.free(unsafe.Pointer(array_inx_s)) + c_struct.array_inx = array_inx_s + } + if go_struct.BeginTime != 0 { + c_struct.begin_time = C.int64_t(go_struct.BeginTime) + } + if go_struct.CkptInterval != 0 { + c_struct.ckpt_interval = C.uint16_t(go_struct.CkptInterval) + } + if go_struct.CkptDir != "" { + ckpt_dir_s := C.CString(go_struct.CkptDir) + defer C.free(unsafe.Pointer(ckpt_dir_s)) + c_struct.ckpt_dir = ckpt_dir_s + } + if go_struct.Comment != "" { + comment_s := C.CString(go_struct.Comment) + defer C.free(unsafe.Pointer(comment_s)) + c_struct.comment = comment_s + } + if go_struct.Contiguous != 0 { + c_struct.contiguous = C.uint16_t(go_struct.Contiguous) + } + if go_struct.CpuBind != "" { + cpu_bind_s := C.CString(go_struct.CpuBind) + defer C.free(unsafe.Pointer(cpu_bind_s)) + c_struct.cpu_bind = cpu_bind_s + } + if go_struct.CpuBindType != 0 { + c_struct.cpu_bind_type = C.uint16_t(go_struct.CpuBindType) + } + if go_struct.Dependency != "" { + dependency_s := C.CString(go_struct.Dependency) + defer C.free(unsafe.Pointer(dependency_s)) + c_struct.dependency = dependency_s + } + if go_struct.EndTime != 0 { + c_struct.end_time = C.int64_t(go_struct.EndTime) + } + if len(go_struct.Environment) > 0 { + c_struct.env_size = C.uint32_t(len(go_struct.Environment)) + cArray := C.malloc(C.size_t(C.size_t(len(go_struct.Environment)) * C.size_t(unsafe.Sizeof(uintptr(0))))) + a := (*[1<<30 - 1]*C.char)(cArray) + for i := 0; i < len(go_struct.Environment); i++ { + a[i] = C.CString(go_struct.Environment[i].String()) + defer C.free(unsafe.Pointer(a[i])) + } + c_struct.environment = (**C.char)(cArray) + } else { + c_struct.env_size = 1 + cArray := C.malloc(C.size_t(C.size_t(1) * C.size_t(unsafe.Sizeof(uintptr(0))))) + a := (*[1<<30 - 1]*C.char)(cArray) + a[0] = C.CString("SLURM_GO_JOB=TRUE") + defer C.free(unsafe.Pointer(a[0])) + c_struct.environment = (**C.char)(cArray) + + } + if go_struct.ExcNodes != "" { + exc_nodes_s := C.CString(go_struct.ExcNodes) + defer C.free(unsafe.Pointer(exc_nodes_s)) + c_struct.exc_nodes = exc_nodes_s + } + if go_struct.Features != "" { + features_s := C.CString(go_struct.Features) + defer C.free(unsafe.Pointer(features_s)) + c_struct.features = features_s + } + if go_struct.GroupId != 0 { + c_struct.group_id = C.uint32_t(go_struct.GroupId) + } + if go_struct.Immediate != 0 { + c_struct.immediate = C.uint16_t(go_struct.Immediate) + } + if go_struct.JobId != 0 { + c_struct.job_id = C.uint32_t(go_struct.JobId) + } + if go_struct.KillOnNodeFail != 0 { + c_struct.kill_on_node_fail = C.uint16_t(go_struct.KillOnNodeFail) + } + if go_struct.Licenses != "" { + licenses_s := C.CString(go_struct.Licenses) + defer C.free(unsafe.Pointer(licenses_s)) + c_struct.licenses = licenses_s + } + if go_struct.MailType != 0 { + c_struct.mail_type = C.uint16_t(go_struct.MailType) + } + if go_struct.MailUser != "" { + mail_user_s := C.CString(go_struct.MailUser) + defer C.free(unsafe.Pointer(mail_user_s)) + c_struct.mail_user = mail_user_s + } + if go_struct.MemBind != "" { + mem_bind_s := C.CString(go_struct.MemBind) + defer C.free(unsafe.Pointer(mem_bind_s)) + c_struct.mem_bind = mem_bind_s + } + if go_struct.MemBindType != 0 { + c_struct.mem_bind_type = C.uint16_t(go_struct.MemBindType) + } + if go_struct.Name != "" { + name_s := C.CString(go_struct.Name) + defer C.free(unsafe.Pointer(name_s)) + c_struct.name = name_s + } + if go_struct.Network != "" { + network_s := C.CString(go_struct.Network) + defer C.free(unsafe.Pointer(network_s)) + c_struct.network = network_s + } + if go_struct.Nice != 0 { + c_struct.nice = C.uint16_t(go_struct.Nice) + } + if go_struct.NumTasks != 0 { + c_struct.num_tasks = C.uint32_t(go_struct.NumTasks) + } + if go_struct.OpenMode != 0 { + c_struct.open_mode = C.uint8_t(go_struct.OpenMode) + } + if go_struct.OtherPort != 0 { + c_struct.other_port = C.uint16_t(go_struct.OtherPort) + } + if go_struct.Overcommit != 0 { + c_struct.overcommit = C.uint8_t(go_struct.Overcommit) + } + if go_struct.Partition != "" { + partition_s := C.CString(go_struct.Partition) + defer C.free(unsafe.Pointer(partition_s)) + c_struct.partition = partition_s + } + if go_struct.PlaneSize != 0 { + c_struct.plane_size = C.uint16_t(go_struct.PlaneSize) + } + if go_struct.Priority != 0 { + c_struct.priority = C.uint32_t(go_struct.Priority) + } + if go_struct.Profile != 0 { + c_struct.profile = C.uint32_t(go_struct.Profile) + } + if go_struct.Qos != "" { + qos_s := C.CString(go_struct.Qos) + defer C.free(unsafe.Pointer(qos_s)) + c_struct.qos = qos_s + } + if go_struct.Reboot != 0 { + c_struct.reboot = C.uint16_t(go_struct.Reboot) + } + if go_struct.RespHost != "" { + resp_host_s := C.CString(go_struct.RespHost) + defer C.free(unsafe.Pointer(resp_host_s)) + c_struct.resp_host = resp_host_s + } + if go_struct.ReqNodes != "" { + req_nodes_s := C.CString(go_struct.ReqNodes) + defer C.free(unsafe.Pointer(req_nodes_s)) + c_struct.req_nodes = req_nodes_s + } + if go_struct.Requeue != 0 { + c_struct.requeue = C.uint16_t(go_struct.Requeue) + } + if go_struct.Reservation != "" { + reservation_s := C.CString(go_struct.Reservation) + defer C.free(unsafe.Pointer(reservation_s)) + c_struct.reservation = reservation_s + } + if go_struct.Script != "" { + script_s := C.CString(go_struct.Script) + defer C.free(unsafe.Pointer(script_s)) + c_struct.script = script_s + } + if go_struct.Shared != 0 { + c_struct.shared = C.uint16_t(go_struct.Shared) + } + if go_struct.SpankJobEnvSize != 0 { + c_struct.spank_job_env_size = C.uint32_t(go_struct.SpankJobEnvSize) + } + if go_struct.TaskDist != 0 { + c_struct.task_dist = C.uint16_t(go_struct.TaskDist) + } + if go_struct.TimeLimit != 0 { + c_struct.time_limit = C.uint32_t(go_struct.TimeLimit) + } + if go_struct.TimeMin != 0 { + c_struct.time_min = C.uint32_t(go_struct.TimeMin) + } + //if go_struct.User_id != 0 { + // c_struct.user_id = C.uint32_t(go_struct.User_id) + //} + c_struct.user_id = C.uint32_t(go_struct.UserId) + + if go_struct.WaitAllNodes != 0 { + c_struct.wait_all_nodes = C.uint16_t(go_struct.WaitAllNodes) + } + if go_struct.WarnSignal != 0 { + c_struct.warn_signal = C.uint16_t(go_struct.WarnSignal) + } + if go_struct.WarnTime != 0 { + c_struct.warn_time = C.uint16_t(go_struct.WarnTime) + } + if go_struct.WorkDir != "" { + work_dir_s := C.CString(go_struct.WorkDir) + defer C.free(unsafe.Pointer(work_dir_s)) + c_struct.work_dir = work_dir_s + } + if go_struct.CpusPerTask != 0 { + c_struct.cpus_per_task = C.uint16_t(go_struct.CpusPerTask) + } + if go_struct.MinCpus != 0 { + c_struct.min_cpus = C.uint32_t(go_struct.MinCpus) + } + if go_struct.MaxCpus != 0 { + c_struct.max_cpus = C.uint32_t(go_struct.MaxCpus) + } + if go_struct.MinNodes != 0 { + c_struct.min_nodes = C.uint32_t(go_struct.MinNodes) + } + if go_struct.MaxNodes != 0 { + c_struct.max_nodes = C.uint32_t(go_struct.MaxNodes) + } + if go_struct.BoardsPerNode != 0 { + c_struct.boards_per_node = C.uint16_t(go_struct.BoardsPerNode) + } + if go_struct.SocketsPerBoard != 0 { + c_struct.sockets_per_board = C.uint16_t(go_struct.SocketsPerBoard) + } + if go_struct.SocketsPerNode != 0 { + c_struct.sockets_per_node = C.uint16_t(go_struct.SocketsPerNode) + } + if go_struct.CoresPerSocket != 0 { + c_struct.cores_per_socket = C.uint16_t(go_struct.CoresPerSocket) + } + if go_struct.ThreadsPerCore != 0 { + c_struct.threads_per_core = C.uint16_t(go_struct.ThreadsPerCore) + } + if go_struct.NtasksPerNode != 0 { + c_struct.ntasks_per_node = C.uint16_t(go_struct.NtasksPerNode) + } + if go_struct.NtasksPerSocket != 0 { + c_struct.ntasks_per_socket = C.uint16_t(go_struct.NtasksPerSocket) + } + if go_struct.NtasksPerCore != 0 { + c_struct.ntasks_per_core = C.uint16_t(go_struct.NtasksPerCore) + } + if go_struct.NtasksPerBoard != 0 { + c_struct.ntasks_per_board = C.uint16_t(go_struct.NtasksPerBoard) + } + if go_struct.PnMinCpus != 0 { + c_struct.pn_min_cpus = C.uint16_t(go_struct.PnMinCpus) + } + if go_struct.PnMinMemory != 0 { + c_struct.pn_min_memory = C.uint32_t(go_struct.PnMinMemory) + } + if go_struct.PnMinTmpDisk != 0 { + c_struct.pn_min_tmp_disk = C.uint32_t(go_struct.PnMinTmpDisk) + } + if go_struct.ReqSwitch != 0 { + c_struct.req_switch = C.uint32_t(go_struct.ReqSwitch) + } + if go_struct.StdErr != "" { + std_err_s := C.CString(go_struct.StdErr) + defer C.free(unsafe.Pointer(std_err_s)) + c_struct.std_err = std_err_s + } + if go_struct.StdIn != "" { + std_in_s := C.CString(go_struct.StdIn) + defer C.free(unsafe.Pointer(std_in_s)) + c_struct.std_in = std_in_s + } + if go_struct.StdOut != "" { + std_out_s := C.CString(go_struct.StdOut) + defer C.free(unsafe.Pointer(std_out_s)) + c_struct.std_out = std_out_s + } + + if go_struct.Wait4Switch != 0 { + c_struct.wait4switch = C.uint32_t(go_struct.Wait4Switch) + } + if go_struct.Wckey != "" { + wckey_s := C.CString(go_struct.Wckey) + defer C.free(unsafe.Pointer(wckey_s)) + c_struct.wckey = wckey_s + } + + c_msg := C.submit_job(&c_struct) + + defer C.free_submit_response_msg(c_msg) + if c_msg == nil { + go_msg := pbslurm.SubmitResponseMsg{} + go_msg.JobId = 1<<31 - 1 + go_msg.ErrorCode = uint32(C.slurm_get_errno()) + return go_msg + } + go_msg := submit_response_msg_convert_c_to_go(c_msg) + + return go_msg + +} + +func Update_job(update_info *pbslurm.UpdateJobOptions, JobId uint32) int32 { + + var c_struct C.struct_job_descriptor + C.slurm_init_job_desc_msg(&c_struct) + if update_info.Partition != "" { + partition_s := C.CString(update_info.Partition) + defer C.free(unsafe.Pointer(partition_s)) + c_struct.partition = partition_s + } + if update_info.Qos != "" { + qos_s := C.CString(update_info.Qos) + defer C.free(unsafe.Pointer(qos_s)) + c_struct.qos = qos_s + } + if update_info.NumTasks != 0 { + c_struct.num_tasks = C.uint32_t(update_info.NumTasks) + } + if update_info.NtasksPerCore != 0 { + c_struct.ntasks_per_core = C.uint16_t(update_info.NtasksPerCore) + } + + if update_info.NtasksPerNode != 0 { + c_struct.ntasks_per_node = C.uint16_t(update_info.NtasksPerNode) + } + if update_info.NtasksPerSocket != 0 { + c_struct.ntasks_per_socket = C.uint16_t(update_info.NtasksPerSocket) + } + + if update_info.MaxNodes != 0 { + c_struct.max_nodes = C.uint32_t(update_info.MaxNodes) + } + if update_info.MinNodes != 0 { + c_struct.min_nodes = C.uint32_t(update_info.MinNodes) + } + + job_list := Get_job(JobId) + + if job_list.JobList == nil { + return -1 + } + + job := job_list.JobList[0] + if job.JobState != C.JOB_PENDING { + return int32(C.ESLURM_JOB_NOT_PENDING) + } + c_struct.job_id = C.uint32_t(JobId) + + err := C.update_job(&c_struct) + + return int32(err) + +} + +func submit_response_msg_convert_c_to_go(c_struct *C.struct_submit_response_msg) pbslurm.SubmitResponseMsg { + var go_struct pbslurm.SubmitResponseMsg + + go_struct.JobId = uint32(c_struct.job_id) + go_struct.StepId = uint32(c_struct.step_id) + go_struct.ErrorCode = uint32(c_struct.error_code) + return go_struct +} diff --git a/adaptor/pcm_slurm/service/tianhe/node.go b/adaptor/pcm_slurm/service/tianhe/node.go index 1e03c6a3..6325a1f4 100644 --- a/adaptor/pcm_slurm/service/tianhe/node.go +++ b/adaptor/pcm_slurm/service/tianhe/node.go @@ -1,50 +1,10 @@ package tianhe /* -//#cgo LDFLAGS: -lslurm #include #include #include -inline uint8_t uint8_ptr(uint8_t* pointer) { - if (NULL == pointer) { - return -1;} - return *pointer; -} -inline int8_t int8_ptr(int8_t* pointer) { - if (NULL == pointer) { - return -1;} - return *pointer; -} -uint16_t uint16_ptr(uint16_t* pointer) { - if (NULL == pointer) { - return -1;} - return *pointer; -} -inline int16_t int16_ptr(int16_t* pointer) { - if (NULL == pointer) { - return -1;} - return *pointer; -} -inline uint32_t uint32_ptr(uint32_t* pointer) { - if (NULL == pointer) { - return -1;} - return *pointer; -} -inline int32_t int32_ptr(int32_t* pointer) { - if (NULL == pointer) { - return -1;} - return *pointer; -} -inline uint64_t uint64_ptr(uint64_t* pointer) { - if (NULL == pointer) { - return -1;} - return *pointer; -} -inline int64_t int64_ptr(int16_t* pointer) { - if (NULL == pointer) { - return -1;} - return *pointer; -} + struct node_info_msg *get_node_info(){ struct node_info_msg* node_buffer; if(slurm_load_node ((time_t) NULL, diff --git a/adaptor/pcm_slurm/service/tianhe/qos.go b/adaptor/pcm_slurm/service/tianhe/qos.go index c8fefa42..667cfa07 100644 --- a/adaptor/pcm_slurm/service/tianhe/qos.go +++ b/adaptor/pcm_slurm/service/tianhe/qos.go @@ -1,10 +1,5 @@ package tianhe -import "C" -import ( - pbslurm "code.gitlink.org.cn/JCCE/PCM.git/adaptor/pcm_slurm/gen/idl" -) - /* #cgo LDFLAGS: -lslurmdb @@ -13,40 +8,40 @@ import ( #include #include #include -#include +#include #include #include typedef struct slurmdb_qos_rec { - char *description; - uint32_t id; - uint32_t flags; - uint32_t grace_time; - uint64_t grp_cpu_mins; - uint64_t grp_cpu_run_mins; - uint32_t grp_cpus; - uint32_t grp_jobs; - uint32_t grp_mem; - uint32_t grp_nodes; - uint32_t grp_submit_jobs; - uint32_t grp_wall; - uint64_t max_cpu_mins_pj; - uint64_t max_cpu_run_mins_pu; - uint32_t max_cpus_pj; - uint32_t max_cpus_pu; - uint32_t max_jobs_pu; - uint32_t max_nodes_pj; - uint32_t max_nodes_pu; - uint32_t max_submit_jobs_pu; - uint32_t max_wall_pj; - char *name; - bitstr_t *preempt_bitstr; - List preempt_list; - uint16_t preempt_mode; - uint32_t priority; - assoc_mgr_qos_usage_t *usage; - double usage_factor; - double usage_thres; + char *description; + uint32_t id; + uint32_t flags; + uint32_t grace_time; + uint64_t grp_cpu_mins; + uint64_t grp_cpu_run_mins; + uint32_t grp_cpus; + uint32_t grp_jobs; + uint32_t grp_mem; + uint32_t grp_nodes; + uint32_t grp_submit_jobs; + uint32_t grp_wall; + uint64_t max_cpu_mins_pj; + uint64_t max_cpu_run_mins_pu; + uint32_t max_cpus_pj; + uint32_t max_cpus_pu; + uint32_t max_jobs_pu; + uint32_t max_nodes_pj; + uint32_t max_nodes_pu; + uint32_t max_submit_jobs_pu; + uint32_t max_wall_pj; + char *name; + bitstr_t *preempt_bitstr; + List preempt_list; + uint16_t preempt_mode; + uint32_t priority; + assoc_mgr_qos_usage_t *usage; + double usage_factor; + double usage_thres; } slurmdb_qos_rec_a; typedef struct qos_info { @@ -54,89 +49,111 @@ typedef struct qos_info { slurmdb_qos_rec_t *array; } qos_info_t; -struct qos_info get_qos_list() { - struct qos_info qosinfo; +struct qos_info Get_qos_list() { + struct qos_info qosinfo; - slurmdb_qos_cond_t *qos_cond = NULL; - void *conn = slurmdb_connection_get(); - List qoslist = slurmdb_qos_get(conn, qos_cond); + slurmdb_qos_cond_t *qos_cond = NULL; + void *conn = slurmdb_connection_get(); + List qoslist = slurmdb_qos_get(conn, qos_cond); + uint16_t size = slurm_list_count(qoslist); + qosinfo.record_count = size; + qosinfo.array = malloc(size * sizeof(slurmdb_qos_rec_t)); - uint16_t size = slurm_list_count(qoslist); - qosinfo.record_count = size; - qosinfo.array = malloc(size * sizeof(slurmdb_qos_rec_t)); - //slurmdb_qos_rec_t qosArray[size]; + slurmdb_qos_rec_t *rec = NULL; - slurmdb_qos_rec_t *rec = NULL; - ListIterator itr = slurm_list_iterator_create(qoslist); + //slurmdb_init_qos_rec() - int i = 0; - while ((rec = slurm_list_next(itr))) { - qosinfo.array[i] = *rec; - i++; - } - slurmdb_connection_close(&conn); - slurm_list_destroy(qoslist); + ListIterator itr = slurm_list_iterator_create(qoslist); - return qosinfo; + int i = 0; + while ((rec = slurm_list_next(itr))) { + qosinfo.array[i] = *rec; + i++; + } + slurmdb_connection_close(&conn); + slurm_list_destroy(qoslist); + + return qosinfo; } -struct slurmdb_qos_rec *qos_from_list(struct qos_info *qos_rec_t, int i) { - return (struct slurmdb_qos_rec *) &qos_rec_t->array[i]; + slurmdb_qos_rec_t *qos_from_list(struct qos_info *qos_rec_t, int i) { + return (slurmdb_qos_rec_t *) &qos_rec_t->array[i]; +} + +slurmdb_qos_rec_t *qos_from_list_by_name(slurmdb_qos_rec_t *qos_rec_array, char* qos_name) { + int i; + int arraysize = sizeof(qos_rec_array); + for (i=0; i