modify rebuild

This commit is contained in:
silenceqi 2021-01-03 22:48:35 +08:00
parent e44729fcb8
commit 2eaf5e6e1b
13 changed files with 478 additions and 102 deletions

View File

@ -120,7 +120,7 @@ func (handler APIHandler) HandleDocumentAction(w http.ResponseWriter, req *http.
sort = fmt.Sprintf(`"%s":{"order":"%s"}`, sortField, sortDirection)
}
query := fmt.Sprintf(`{"from":%d, "size": %d, "query": %s, "sort": [{%s}]}`, from, pageSize, filter, sort)
fmt.Println(indexName, query)
//fmt.Println(indexName, query)
var reqBytes = []byte(query)
resp, err := client.SearchWithRawQueryDSL(indexName, reqBytes)
if err != nil {

View File

@ -0,0 +1,42 @@
package index_management
import (
"net/http"
"strings"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic"
)
func (handler APIHandler) HandleGetMappingsAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
client := elastic.GetClient(handler.Config.Elasticsearch)
indexName := ps.ByName("index")
resBody := map[string]interface{}{
"errno": "0",
"errmsg": "",
"payload": nil,
}
var copyAll = false
if indexName == "*" {
indexName = ""
copyAll = true
}
_, _, idxs, err := client.GetMapping(copyAll, indexName)
if err != nil {
resBody["errno"] = "E30001"
resBody["errmsg"] = err.Error()
handler.WriteJSON(w, resBody, http.StatusOK)
return
}
if copyAll {
for key, _ := range *idxs {
if strings.HasPrefix(key, ".") {
delete(*idxs, key)
}
}
}
resBody["payload"] = idxs
handler.WriteJSON(w, resBody, http.StatusOK)
}

View File

@ -48,16 +48,10 @@ func reindex(esName string, body *model.InfiniReindex) (string, error) {
source := map[string]interface{}{
"index": body.Source.Index,
}
if body.Source.MaxDocs > 0 {
source["max_docs"] = body.Source.MaxDocs
}
if body.Source.Query != nil {
source["query"] = body.Source.Query
}
if body.Source.Sort != "" {
source["sort"] = body.Source.Sort
}
if body.Source.Source != "" {
if len(body.Source.Source) > 0 {
source["_source"] = body.Source.Source
}
dest := map[string]string{
@ -71,7 +65,7 @@ func reindex(esName string, body *model.InfiniReindex) (string, error) {
"dest": dest,
}
buf, _ := json.Marshal(esBody)
fmt.Println(string(buf))
//fmt.Println(string(buf))
reindexRes, err := client.Request("POST", url, buf)
if err != nil {
return "", err
@ -93,3 +87,101 @@ func reindex(esName string, body *model.InfiniReindex) (string, error) {
}
return body.ID, nil
}
func newResponseBody() map[string]interface{} {
return map[string]interface{}{
"errno": "0",
"errmsg": "",
"payload": nil,
}
}
func (handler APIHandler) HandleGetRebuildListAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
var (
from = handler.GetIntOrDefault(req, "from", 0)
size = handler.GetIntOrDefault(req, "size", 10)
name = handler.GetParameter(req, "name")
resBody = newResponseBody()
esName = handler.Config.Elasticsearch
)
esResp, err := model.GetRebuildList(esName, from, size, name)
if err != nil {
resBody["errno"] = "E20003"
resBody["errmsg"] = err.Error()
handler.WriteJSON(w, resBody, http.StatusOK)
return
}
var ids = []string{}
idMap := map[string]int{}
for idx, doc := range esResp.Hits.Hits {
taskId := doc.Source["task_id"].(string)
ids = append(ids, taskId)
idMap[taskId] = idx
}
taskResp, err := getTasksByTerms(esName, ids)
if err != nil {
resBody["errno"] = "E20004"
resBody["errmsg"] = err.Error()
}
var (
completed bool
status string
esErrStr string
tookTime int
)
for _, doc := range taskResp.Hits.Hits {
status = "RUNNING"
tookTime = 0
esErrStr = ""
completed = doc.Source["completed"].(bool)
source := esResp.Hits.Hits[idMap[doc.ID.(string)]].Source
if esErr, ok := doc.Source["error"]; ok {
status = "FAILED"
if errMap, ok := esErr.(map[string]interface{}); ok {
esErrStr = errMap["reason"].(string)
}
} else {
if resMap, ok := doc.Source["response"].(map[string]interface{}); ok {
tookTime = int(resMap["took"].(float64))
}
status = "SUCCESS"
}
if !completed {
status = "RUNNING"
}
source["status"] = status
source["error"] = esErrStr
source["took_time"] = tookTime
}
resBody["payload"] = formatESSearchResult(esResp)
handler.WriteJSON(w, resBody, http.StatusOK)
}
func getTasksByTerms(esName string, terms []string) (*elastic.SearchResponse, error) {
if len(terms) == 0 {
return nil, nil
}
client := elastic.GetClient(esName)
esBody := `{
"query":{
"terms": {
"_id": [
%s
]
}
}
}`
strTerms := ""
for _, term := range terms {
strTerms += fmt.Sprintf(`"%s",`, term)
}
esBody = fmt.Sprintf(esBody, strTerms[0:len(strTerms)-1])
return client.SearchWithRawQueryDSL(".tasks", []byte(esBody))
}

View File

@ -23,4 +23,6 @@ func Init(cfg *config.AppConfig) {
ui.HandleUIMethod(api.GET, pathPrefix+"indices/_cat", handler.HandleGetIndicesAction)
ui.HandleUIMethod(api.POST, pathPrefix+"rebuild/_create", handler.ReindexAction)
ui.HandleUIMethod(api.GET, pathPrefix+"rebuild/list", handler.HandleGetRebuildListAction)
ui.HandleUIMethod(api.GET, pathPrefix+"indices/_mappings/:index", handler.HandleGetMappingsAction)
}

View File

@ -1,6 +1,12 @@
package model
import "time"
import (
"fmt"
"strings"
"time"
"infini.sh/framework/core/elastic"
)
type InfiniReindex struct {
ID string `json:"id" elastic_meta:"_id"`
@ -8,11 +14,10 @@ type InfiniReindex struct {
Desc string `json:"desc" elastic_mapping:"desc:{type:text}"`
TaskId string `json:"task_id" elastic_mapping:"task_id:{type:keyword}"`
Source struct {
Index string `json:"index"`
MaxDocs int `json:"max_docs"`
Query map[string]interface{} `json:"query"`
Sort string `json:"sort"`
Source string `json:"_source"`
Index string `json:"index"`
//Size int `json:"size"`
Query map[string]interface{} `json:"query"`
Source []string `json:"_source"`
} `json:"source" elastic_mapping:"source:{type:object}"`
Dest struct {
Index string `json:"index"`
@ -22,3 +27,27 @@ type InfiniReindex struct {
CreatedAt time.Time `json:"created_at" elastic_mapping:"created_at:{type:date}"`
Status string `json:"status" elastic_mapping:"status:{type:keyword}"`
}
func GetRebuildList(esName string, from, size int, name string) (*elastic.SearchResponse, error) {
var (
sort = `[{
"created_at": {
"order": "desc"
}}]`
query = `{
"bool": {
"must": [
%s
]
}
}`
must = ""
)
if name = strings.Trim(name, " "); name != "" {
must = fmt.Sprintf(`{"match":{"name": "%s"}}`, name)
}
query = fmt.Sprintf(query, must)
rq := fmt.Sprintf(`{"from":%d, "size":%d, "sort": %s, "query": %s}`, from, size, sort, query)
client := elastic.GetClient(esName)
return client.SearchWithRawQueryDSL("infinireindex", []byte(rq))
}

View File

@ -122,6 +122,11 @@ export default [
path: '/data/rebuild',
name: 'rebuild',
component: './DataManagement/Rebuild',
},{
path: '/data/rebuild/list',
name: 'rebuildlist',
component: './DataManagement/RebuildList',
hideInMenu: true,
}, {
path: '/data/import',
name: 'export',

View File

@ -12,37 +12,37 @@ function getUUID(len){
}
export default {
'post /_search-center/doc/:index': function(req, res){
switch(req.body.action){
case 'SAVE':
res.send({
errno: "0",
errmsg: ""
});
break;
case 'ADD':
res.send({
errno: "0",
errmsg: "",
payload: {
...req.body.payload,
id: getUUID(),
}
});
break;
case 'DELETE':
res.send({
errno: "0"
});
break;
default:
res.send(queryData)
}
},
'get /_search-center/indices/_cat': function(req, res){
res.send({
errno: "0",
payload: ["infini-test"],
});
}
// 'post /_search-center/doc/:index': function(req, res){
// switch(req.body.action){
// case 'SAVE':
// res.send({
// errno: "0",
// errmsg: ""
// });
// break;
// case 'ADD':
// res.send({
// errno: "0",
// errmsg: "",
// payload: {
// ...req.body.payload,
// id: getUUID(),
// }
// });
// break;
// case 'DELETE':
// res.send({
// errno: "0"
// });
// break;
// default:
// res.send(queryData)
// }
// },
// 'get /_search-center/indices/_cat': function(req, res){
// res.send({
// errno: "0",
// payload: ["infini-test"],
// });
// }
}

View File

@ -9,16 +9,13 @@ const {Option} = Select;
const {TextArea} = Input;
@Form.create()
@connect(({document}) => ({
document
@connect(({document,rebuild}) => ({
document,
rebuild,
}))
class Rebuild extends Component {
state = {
currentStep: 0,
configData: {
source:{},
dest:{},
},
selectedSourceIndex: ''
}
componentDidMount(){
const {dispatch} = this.props;
@ -28,6 +25,50 @@ class Rebuild extends Component {
cluster: 'sinlge-es'
}
})
dispatch({
type: 'rebuild/fetchMappings',
payload: {
index: ''
}
})
}
getFields = (index)=>{
if(!index){
return [];
}
let {mappings} = this.props.rebuild;
let filterMappings = {};
if(index.indexOf("*")>0){
index = index.replace("*", '');
for(let key in mappings){
if(key.startsWith(index)){
filterMappings['key'] = mappings[key];
}
}
}else{
filterMappings[index] = mappings[index] || {};
}
let fields = [];
for(let key in filterMappings){
for(let fi in filterMappings[key].mappings.properties){
fields.push(fi);
}
}
return fields;
}
handleSourceIndexChange = (v) =>{
const {dispatch, form} = this.props;
form.setFieldsValue({
source__source: [],
});
dispatch({
type: 'rebuild/saveData',
payload: {
selectedSourceIndex: v
}
})
}
renderSteps = (currentStep) => {
let {clusterIndices} = this.props.document;
@ -71,13 +112,15 @@ class Rebuild extends Component {
}
},
};
let {configData, selectedSourceIndex} = this.props.rebuild;
switch(currentStep){
case 0:
stepDom = (
<div style={{marginTop:20}}>
<Form.Item {...formItemLayout} label="Task Name">
{getFieldDecorator('name', {
initialValue: this.state.configData.name,
initialValue: configData.name,
rules: [{ required: true, message: 'please input a task name' }],
})(
<Input autoComplete="off" style={{width: 200}}/>
@ -85,7 +128,7 @@ class Rebuild extends Component {
</Form.Item>
<Form.Item {...formItemLayout} label="Task Description">
{getFieldDecorator('desc', {
initialValue: this.state.configData.creterial,
initialValue: configData.desc,
rules: [
],
})(
@ -106,17 +149,16 @@ class Rebuild extends Component {
<div style={{marginTop:20}}>
<Form.Item {...formItemLayout} label="选择源索引">
{getFieldDecorator('source_index', {
initialValue: this.state.configData.source.index,
initialValue: configData.source.index,
rules: [{ required: true, message: '请选择要重建的索引' }],
})(
<InputSelect data={indices} style={{width: 200}}/>
<InputSelect onChange={this.handleSourceIndexChange} data={indices} style={{width: 200}}/>
)}
</Form.Item>
<Form.Item {...formItemLayout} label="Query">
{getFieldDecorator('source_query', {
initialValue: this.state.configData.source.query,
initialValue: configData.source.query,
rules: [
{required: true, },
],
})(
<TextArea
@ -125,31 +167,17 @@ class Rebuild extends Component {
/>
)}
</Form.Item>
<Form.Item {...formItemLayout} label="max_docs">
{getFieldDecorator('source_max_docs', {
initialValue: this.state.configData.source_max_docs,
rules: [
],
})(
<InputNumber min={1} style={{width:200}}/>
)}
</Form.Item>
<Form.Item {...formItemLayout} label="_source">
{getFieldDecorator('source__source', {
initialValue: this.state.configData.source__source,
initialValue: configData.source__source,
rules: [
],
})(
<Input style={{width:'50%'}}/>
)}
</Form.Item>
<Form.Item {...formItemLayout} label="sort">
{getFieldDecorator('source_sort', {
initialValue: this.state.configData.source_sort,
rules: [
],
})(
<Input style={{width:'50%'}}/>
<Select mode="multiple" style={{width:'80%'}}>
{ this.getFields(selectedSourceIndex).map(item=>{
return (<Select.Option key={item} value={item}>{item}</Select.Option>)
})}
</Select>
)}
</Form.Item>
<Form.Item {...tailFormItemLayout}>
@ -164,7 +192,7 @@ class Rebuild extends Component {
<span style={{height:1}}/>
<Form.Item {...formItemLayout} label="目标索引名">
{getFieldDecorator('dest_index', {
initialValue: this.state.configData.dest.index || '',
initialValue: configData.dest.index || '',
rules: [{ required: true, message: '请输入目标索引名称' }],
})(
<InputSelect data={indices} style={{width: 200}}/>
@ -172,7 +200,7 @@ class Rebuild extends Component {
</Form.Item>
<Form.Item {...formItemLayout} label="Pipeline">
{getFieldDecorator('dest_pipeline', {
initialValue: this.state.configData.dest.source,
initialValue: configData.dest.source,
rules: [
],
})(
@ -191,8 +219,7 @@ class Rebuild extends Component {
handleNext(currentStep){
const { form, dispatch } = this.props;
const { configData: oldValue } = this.state;
var formValues = {};
const { configData: oldValue } = this.props.rebuild;
form.validateFields((err, fieldsValue) => {
if (err) return;
let newValue = {};
@ -200,7 +227,7 @@ class Rebuild extends Component {
fieldsValue['source_query'] = JSON.parse(fieldsValue['source_query'])
}
for(let key in fieldsValue){
if(key.startsWith('source_')){
if(key.startsWith('source_') && fieldsValue[key]){
!newValue.source && (newValue.source ={})
newValue.source[key.slice(7)] = fieldsValue[key]
}else if(key.startsWith('dest_')){
@ -220,25 +247,31 @@ class Rebuild extends Component {
}
})
}
this.setState({
configData: {
...oldValue,
...newValue,
},
currentStep: currentStep+1
},()=>{
message.info(JSON.stringify(this.state));
});
dispatch({
type:"rebuild/saveData",
payload: {
configData: {
...oldValue,
...newValue,
},
currentStep: currentStep+1
}
})
// message.info(JSON.stringify(this.state));
});
}
backward(currentStep){
const {dispatch} = this.props;
if(currentStep > 0){
currentStep = currentStep - 1;
}
this.setState({
currentStep: currentStep,
});
dispatch({
type: 'rebuild/saveData',
payload: {
currentStep: currentStep,
}
})
}
renderFooter = currentStep => {
if (currentStep === 1) {
@ -277,16 +310,17 @@ class Rebuild extends Component {
];
};
render() {
const {currentStep} = this.props.rebuild;
return (
<PageHeaderWrapper >
<Card>
<Steps current={this.state.currentStep}>
<Steps current={currentStep}>
<Step title="基本信息" />
<Step title="源索引信息" />
<Step title="目标索引信息" />
</Steps>
<Divider/>
{this.renderSteps(this.state.currentStep)}
{this.renderSteps(currentStep)}
</Card>
</PageHeaderWrapper>
);

View File

@ -0,0 +1,87 @@
import { Card, Table, Form, Row, Input, Col, Button, Divider } from 'antd';
import React from 'react';
import {connect} from 'dva'
@connect(({rebuildlist}) => ({
rebuildlist,
}))
@Form.create()
class RebuildList extends React.Component {
componentDidMount(){
const {dispatch} = this.props;
dispatch({
type: 'rebuildlist/fetchRebuildList',
payload:{
index: 'infinireindex'
}
})
}
columns = [{
title: 'rebuild name',
dataIndex: 'name',
key: 'name',
},{
title: 'description',
dataIndex: 'desc',
key: 'desc',
},{
title: 'status',
dataIndex: 'status',
key: 'status',
},{
title: 'took_time',
dataIndex: 'took_time',
key: 'took_time',
},{
title: 'created_at',
dataIndex: 'created_at',
key: 'created_at',
},{
title: 'Operation',
render: (text, record) => (
<div>
<a onClick={() => {
this.state.selectedRows.push(record);
this.handleDeleteClick();
}}>删除</a>
{record.status=='FAILED' ? [<Divider type="vertical" />,<a onClick={() => {}}>Redo</a>,
]: ''}
</div>
),
},];
render(){
const {getFieldDecorator} = this.props.form;
const formItemLayout = {
labelCol: { span: 10 },
wrapperCol: { span: 14 },
style: {marginBottom: 0}
};
return (
<Card>
<Form>
<Row gutter={{md:16, sm:8}}>
<Col md={8} sm={10}>
<Form.Item {...formItemLayout} label="Rebuild Name">
{getFieldDecorator('name')(<Input placeholder="please input rebuild name" />)}
</Form.Item>
</Col>
<Col md={8} sm={8}>
<div style={{paddingTop:4}}>
<Button type="primary" onClick={this.handleSearch}>
Search
</Button>
</div>
</Col>
</Row>
</Form>
<Divider />
<Table columns={this.columns} dataSource={this.props.rebuildlist.data}>
</Table>
</Card>
)
}
}
export default RebuildList;

View File

@ -1,9 +1,17 @@
import {reindex} from '@/services/rebuild';
import {getMappings} from '@/services/indices';
import { message } from 'antd';
export default {
namespace: 'rebuild',
state: {},
state: {
currentStep: 0,
selectedSourceIndex:'',
configData: {
source:{},
dest:{},
},
},
effects:{
*addTask({payload}, {call, put}){
let resp = yield call(reindex, payload);
@ -12,9 +20,39 @@ export default {
message.warn("rebuild failed")
return
}
message.info("submit succeed")
yield put({
type: 'saveData',
payload: {
currentStep: 0,
configData: {
source:{},
dest:{},
},
}
})
},
*fetchMappings({payload}, {call, put}){
let resp = yield call(getMappings, payload);
console.log(resp);
if(resp.errno != "0"){
message.warn("get mappings failed")
return
}
yield put({
type: 'saveData',
payload: {
mappings: resp.payload,
}
})
}
},
reducers: {
saveData(state, {payload}){
return {
...state,
...payload,
}
}
}
}

View File

@ -0,0 +1,25 @@
import {getRebuildList} from '@/services/rebuild';
import { message } from 'antd';
export default {
namespace: 'rebuildlist',
state: {
},
effects:{
*fetchRebuildList({payload}, {call, put}){
let resp = yield call(getRebuildList, payload)
yield put({
type: 'saveData',
payload: resp.payload
})
}
},
reducers: {
saveData(state, {payload}){
return {
...state,
...payload,
}
}
}
}

View File

@ -0,0 +1,11 @@
import request from '@/utils/request';
import {pathPrefix} from './common';
export async function getMappings(payload){
let index = payload.index || '*'
let url = `${pathPrefix}/indices/_mappings/${index}`;
return request(url,{
method: 'GET',
expirys: 0,
});
}

View File

@ -8,4 +8,15 @@ export async function reindex(payload){
body: payload,
expirys: 0,
});
}
export async function getRebuildList(payload){
let url = `${pathPrefix}/rebuild/list`;
payload.name && (url+=`name=${payload.name}`)
payload.from && (url+=`name=${payload.from}`)
payload.size && (url+=`name=${payload.size}`)
return request(url,{
method: 'GET',
expirys: 0,
});
}