From 2eaf5e6e1bd9bfdfead55022da9e8ee43bf2bdd2 Mon Sep 17 00:00:00 2001 From: silenceqi Date: Sun, 3 Jan 2021 22:48:35 +0800 Subject: [PATCH] modify rebuild --- api/index_management/document.go | 2 +- api/index_management/indices.go | 42 ++++++ api/index_management/rebuild.go | 108 +++++++++++++- api/init.go | 2 + model/reindex.go | 41 +++++- web/config/router.config.js | 5 + web/mock/datamanagement/document.js | 66 ++++----- web/src/pages/DataManagement/Rebuild.js | 138 +++++++++++------- web/src/pages/DataManagement/RebuildList.js | 87 +++++++++++ .../pages/DataManagement/models/rebuild.js | 42 +++++- .../DataManagement/models/rebuildlist.js | 25 ++++ web/src/services/indices.js | 11 ++ web/src/services/rebuild.js | 11 ++ 13 files changed, 478 insertions(+), 102 deletions(-) create mode 100644 api/index_management/indices.go create mode 100644 web/src/pages/DataManagement/RebuildList.js create mode 100644 web/src/pages/DataManagement/models/rebuildlist.js create mode 100644 web/src/services/indices.js diff --git a/api/index_management/document.go b/api/index_management/document.go index 3b22c0c6..8fd48e6b 100644 --- a/api/index_management/document.go +++ b/api/index_management/document.go @@ -120,7 +120,7 @@ func (handler APIHandler) HandleDocumentAction(w http.ResponseWriter, req *http. sort = fmt.Sprintf(`"%s":{"order":"%s"}`, sortField, sortDirection) } query := fmt.Sprintf(`{"from":%d, "size": %d, "query": %s, "sort": [{%s}]}`, from, pageSize, filter, sort) - fmt.Println(indexName, query) + //fmt.Println(indexName, query) var reqBytes = []byte(query) resp, err := client.SearchWithRawQueryDSL(indexName, reqBytes) if err != nil { diff --git a/api/index_management/indices.go b/api/index_management/indices.go new file mode 100644 index 00000000..f42d1135 --- /dev/null +++ b/api/index_management/indices.go @@ -0,0 +1,42 @@ +package index_management + +import ( + "net/http" + "strings" + + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/elastic" +) + +func (handler APIHandler) HandleGetMappingsAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + client := elastic.GetClient(handler.Config.Elasticsearch) + indexName := ps.ByName("index") + resBody := map[string]interface{}{ + "errno": "0", + "errmsg": "", + "payload": nil, + } + var copyAll = false + if indexName == "*" { + indexName = "" + copyAll = true + } + _, _, idxs, err := client.GetMapping(copyAll, indexName) + if err != nil { + resBody["errno"] = "E30001" + resBody["errmsg"] = err.Error() + handler.WriteJSON(w, resBody, http.StatusOK) + return + } + if copyAll { + for key, _ := range *idxs { + if strings.HasPrefix(key, ".") { + delete(*idxs, key) + } + } + } + + resBody["payload"] = idxs + + handler.WriteJSON(w, resBody, http.StatusOK) +} diff --git a/api/index_management/rebuild.go b/api/index_management/rebuild.go index 82ef0dd2..ff9e0ab6 100644 --- a/api/index_management/rebuild.go +++ b/api/index_management/rebuild.go @@ -48,16 +48,10 @@ func reindex(esName string, body *model.InfiniReindex) (string, error) { source := map[string]interface{}{ "index": body.Source.Index, } - if body.Source.MaxDocs > 0 { - source["max_docs"] = body.Source.MaxDocs - } if body.Source.Query != nil { source["query"] = body.Source.Query } - if body.Source.Sort != "" { - source["sort"] = body.Source.Sort - } - if body.Source.Source != "" { + if len(body.Source.Source) > 0 { source["_source"] = body.Source.Source } dest := map[string]string{ @@ -71,7 +65,7 @@ func reindex(esName string, body *model.InfiniReindex) (string, error) { "dest": dest, } buf, _ := json.Marshal(esBody) - fmt.Println(string(buf)) + //fmt.Println(string(buf)) reindexRes, err := client.Request("POST", url, buf) if err != nil { return "", err @@ -93,3 +87,101 @@ func reindex(esName string, body *model.InfiniReindex) (string, error) { } return body.ID, nil } + +func newResponseBody() map[string]interface{} { + return map[string]interface{}{ + "errno": "0", + "errmsg": "", + "payload": nil, + } + +} + +func (handler APIHandler) HandleGetRebuildListAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + defer func() { + if err := recover(); err != nil { + fmt.Println(err) + } + }() + var ( + from = handler.GetIntOrDefault(req, "from", 0) + size = handler.GetIntOrDefault(req, "size", 10) + name = handler.GetParameter(req, "name") + resBody = newResponseBody() + esName = handler.Config.Elasticsearch + ) + esResp, err := model.GetRebuildList(esName, from, size, name) + if err != nil { + resBody["errno"] = "E20003" + resBody["errmsg"] = err.Error() + handler.WriteJSON(w, resBody, http.StatusOK) + return + } + + var ids = []string{} + idMap := map[string]int{} + for idx, doc := range esResp.Hits.Hits { + taskId := doc.Source["task_id"].(string) + ids = append(ids, taskId) + idMap[taskId] = idx + } + taskResp, err := getTasksByTerms(esName, ids) + if err != nil { + resBody["errno"] = "E20004" + resBody["errmsg"] = err.Error() + } + var ( + completed bool + status string + esErrStr string + tookTime int + ) + for _, doc := range taskResp.Hits.Hits { + status = "RUNNING" + tookTime = 0 + esErrStr = "" + completed = doc.Source["completed"].(bool) + source := esResp.Hits.Hits[idMap[doc.ID.(string)]].Source + if esErr, ok := doc.Source["error"]; ok { + status = "FAILED" + if errMap, ok := esErr.(map[string]interface{}); ok { + esErrStr = errMap["reason"].(string) + } + } else { + if resMap, ok := doc.Source["response"].(map[string]interface{}); ok { + tookTime = int(resMap["took"].(float64)) + } + status = "SUCCESS" + } + if !completed { + status = "RUNNING" + } + source["status"] = status + source["error"] = esErrStr + source["took_time"] = tookTime + } + resBody["payload"] = formatESSearchResult(esResp) + handler.WriteJSON(w, resBody, http.StatusOK) +} + +func getTasksByTerms(esName string, terms []string) (*elastic.SearchResponse, error) { + if len(terms) == 0 { + return nil, nil + } + client := elastic.GetClient(esName) + esBody := `{ + "query":{ + "terms": { + "_id": [ + %s + ] + } + } +}` + strTerms := "" + for _, term := range terms { + strTerms += fmt.Sprintf(`"%s",`, term) + } + esBody = fmt.Sprintf(esBody, strTerms[0:len(strTerms)-1]) + return client.SearchWithRawQueryDSL(".tasks", []byte(esBody)) +} diff --git a/api/init.go b/api/init.go index 581162cb..74dd0f48 100644 --- a/api/init.go +++ b/api/init.go @@ -23,4 +23,6 @@ func Init(cfg *config.AppConfig) { ui.HandleUIMethod(api.GET, pathPrefix+"indices/_cat", handler.HandleGetIndicesAction) ui.HandleUIMethod(api.POST, pathPrefix+"rebuild/_create", handler.ReindexAction) + ui.HandleUIMethod(api.GET, pathPrefix+"rebuild/list", handler.HandleGetRebuildListAction) + ui.HandleUIMethod(api.GET, pathPrefix+"indices/_mappings/:index", handler.HandleGetMappingsAction) } diff --git a/model/reindex.go b/model/reindex.go index 70d2b316..29a28d9a 100644 --- a/model/reindex.go +++ b/model/reindex.go @@ -1,6 +1,12 @@ package model -import "time" +import ( + "fmt" + "strings" + "time" + + "infini.sh/framework/core/elastic" +) type InfiniReindex struct { ID string `json:"id" elastic_meta:"_id"` @@ -8,11 +14,10 @@ type InfiniReindex struct { Desc string `json:"desc" elastic_mapping:"desc:{type:text}"` TaskId string `json:"task_id" elastic_mapping:"task_id:{type:keyword}"` Source struct { - Index string `json:"index"` - MaxDocs int `json:"max_docs"` - Query map[string]interface{} `json:"query"` - Sort string `json:"sort"` - Source string `json:"_source"` + Index string `json:"index"` + //Size int `json:"size"` + Query map[string]interface{} `json:"query"` + Source []string `json:"_source"` } `json:"source" elastic_mapping:"source:{type:object}"` Dest struct { Index string `json:"index"` @@ -22,3 +27,27 @@ type InfiniReindex struct { CreatedAt time.Time `json:"created_at" elastic_mapping:"created_at:{type:date}"` Status string `json:"status" elastic_mapping:"status:{type:keyword}"` } + +func GetRebuildList(esName string, from, size int, name string) (*elastic.SearchResponse, error) { + var ( + sort = `[{ + "created_at": { + "order": "desc" + }}]` + query = `{ + "bool": { + "must": [ + %s + ] + } + }` + must = "" + ) + if name = strings.Trim(name, " "); name != "" { + must = fmt.Sprintf(`{"match":{"name": "%s"}}`, name) + } + query = fmt.Sprintf(query, must) + rq := fmt.Sprintf(`{"from":%d, "size":%d, "sort": %s, "query": %s}`, from, size, sort, query) + client := elastic.GetClient(esName) + return client.SearchWithRawQueryDSL("infinireindex", []byte(rq)) +} diff --git a/web/config/router.config.js b/web/config/router.config.js index cb3bd601..398874d4 100644 --- a/web/config/router.config.js +++ b/web/config/router.config.js @@ -122,6 +122,11 @@ export default [ path: '/data/rebuild', name: 'rebuild', component: './DataManagement/Rebuild', + },{ + path: '/data/rebuild/list', + name: 'rebuildlist', + component: './DataManagement/RebuildList', + hideInMenu: true, }, { path: '/data/import', name: 'export', diff --git a/web/mock/datamanagement/document.js b/web/mock/datamanagement/document.js index ec8b2ab6..96beeb09 100644 --- a/web/mock/datamanagement/document.js +++ b/web/mock/datamanagement/document.js @@ -12,37 +12,37 @@ function getUUID(len){ } export default { - 'post /_search-center/doc/:index': function(req, res){ - switch(req.body.action){ - case 'SAVE': - res.send({ - errno: "0", - errmsg: "" - }); - break; - case 'ADD': - res.send({ - errno: "0", - errmsg: "", - payload: { - ...req.body.payload, - id: getUUID(), - } - }); - break; - case 'DELETE': - res.send({ - errno: "0" - }); - break; - default: - res.send(queryData) - } - }, - 'get /_search-center/indices/_cat': function(req, res){ - res.send({ - errno: "0", - payload: ["infini-test"], - }); - } + // 'post /_search-center/doc/:index': function(req, res){ + // switch(req.body.action){ + // case 'SAVE': + // res.send({ + // errno: "0", + // errmsg: "" + // }); + // break; + // case 'ADD': + // res.send({ + // errno: "0", + // errmsg: "", + // payload: { + // ...req.body.payload, + // id: getUUID(), + // } + // }); + // break; + // case 'DELETE': + // res.send({ + // errno: "0" + // }); + // break; + // default: + // res.send(queryData) + // } + // }, + // 'get /_search-center/indices/_cat': function(req, res){ + // res.send({ + // errno: "0", + // payload: ["infini-test"], + // }); + // } } \ No newline at end of file diff --git a/web/src/pages/DataManagement/Rebuild.js b/web/src/pages/DataManagement/Rebuild.js index ccde8741..79d0e041 100644 --- a/web/src/pages/DataManagement/Rebuild.js +++ b/web/src/pages/DataManagement/Rebuild.js @@ -9,16 +9,13 @@ const {Option} = Select; const {TextArea} = Input; @Form.create() -@connect(({document}) => ({ - document +@connect(({document,rebuild}) => ({ + document, + rebuild, })) class Rebuild extends Component { state = { - currentStep: 0, - configData: { - source:{}, - dest:{}, - }, + selectedSourceIndex: '' } componentDidMount(){ const {dispatch} = this.props; @@ -28,6 +25,50 @@ class Rebuild extends Component { cluster: 'sinlge-es' } }) + dispatch({ + type: 'rebuild/fetchMappings', + payload: { + index: '' + } + }) + } + getFields = (index)=>{ + if(!index){ + return []; + } + let {mappings} = this.props.rebuild; + let filterMappings = {}; + if(index.indexOf("*")>0){ + index = index.replace("*", ''); + for(let key in mappings){ + if(key.startsWith(index)){ + filterMappings['key'] = mappings[key]; + } + } + }else{ + filterMappings[index] = mappings[index] || {}; + } + + let fields = []; + for(let key in filterMappings){ + for(let fi in filterMappings[key].mappings.properties){ + fields.push(fi); + } + } + + return fields; + } + handleSourceIndexChange = (v) =>{ + const {dispatch, form} = this.props; + form.setFieldsValue({ + source__source: [], + }); + dispatch({ + type: 'rebuild/saveData', + payload: { + selectedSourceIndex: v + } + }) } renderSteps = (currentStep) => { let {clusterIndices} = this.props.document; @@ -71,13 +112,15 @@ class Rebuild extends Component { } }, }; + let {configData, selectedSourceIndex} = this.props.rebuild; + switch(currentStep){ case 0: stepDom = (
{getFieldDecorator('name', { - initialValue: this.state.configData.name, + initialValue: configData.name, rules: [{ required: true, message: 'please input a task name' }], })( @@ -85,7 +128,7 @@ class Rebuild extends Component { {getFieldDecorator('desc', { - initialValue: this.state.configData.creterial, + initialValue: configData.desc, rules: [ ], })( @@ -106,17 +149,16 @@ class Rebuild extends Component {
{getFieldDecorator('source_index', { - initialValue: this.state.configData.source.index, + initialValue: configData.source.index, rules: [{ required: true, message: '请选择要重建的索引' }], })( - + )} {getFieldDecorator('source_query', { - initialValue: this.state.configData.source.query, + initialValue: configData.source.query, rules: [ - {required: true, }, ], })(