206 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			206 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
package index_management
 | 
						|
 | 
						|
import (
 | 
						|
	"encoding/json"
 | 
						|
	"fmt"
 | 
						|
	log "github.com/cihub/seelog"
 | 
						|
	"infini.sh/framework/core/global"
 | 
						|
	"infini.sh/framework/core/orm"
 | 
						|
	"net/http"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"infini.sh/console/model"
 | 
						|
	httprouter "infini.sh/framework/core/api/router"
 | 
						|
	"infini.sh/framework/core/elastic"
 | 
						|
	"infini.sh/framework/core/util"
 | 
						|
)
 | 
						|
 | 
						|
func (handler APIHandler) HandleReindexAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
 | 
						|
	reindexItem := &model.Reindex{}
 | 
						|
	id := ps.ByName("id")
 | 
						|
	if strings.Trim(id, "/") != "" {
 | 
						|
		reindexItem.ID = id
 | 
						|
	}
 | 
						|
	resResult := newResponseBody()
 | 
						|
 | 
						|
	err := handler.DecodeJSON(req, reindexItem)
 | 
						|
	if err != nil {
 | 
						|
		log.Error(err)
 | 
						|
		resResult["error"] = err
 | 
						|
		handler.WriteJSON(w, resResult, http.StatusOK)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	//fmt.Println(reindexItem)
 | 
						|
	typ := handler.GetParameter(req, "_type")
 | 
						|
	ID, err := reindex(global.MustLookupString(elastic.GlobalSystemElasticsearchID), reindexItem, typ)
 | 
						|
	if err != nil {
 | 
						|
		log.Error(err)
 | 
						|
		resResult["error"] = err
 | 
						|
		handler.WriteJSON(w, resResult, http.StatusOK)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	resResult["payload"] = ID
 | 
						|
	handler.WriteJSON(w, resResult, http.StatusOK)
 | 
						|
}
 | 
						|
 | 
						|
func reindex(esName string, body *model.Reindex, typ string) (string, error) {
 | 
						|
	client := elastic.GetClient(esName)
 | 
						|
	source := map[string]interface{}{
 | 
						|
		"index": body.Source.Index,
 | 
						|
	}
 | 
						|
	if body.Source.Query != nil {
 | 
						|
		source["query"] = body.Source.Query
 | 
						|
	}
 | 
						|
	if len(body.Source.Source) > 0 {
 | 
						|
		source["_source"] = body.Source.Source
 | 
						|
	}
 | 
						|
	dest := map[string]string{
 | 
						|
		"index": body.Dest.Index,
 | 
						|
	}
 | 
						|
	if body.Dest.Pipeline != "" {
 | 
						|
		dest["pipeline"] = body.Dest.Pipeline
 | 
						|
	}
 | 
						|
	esBody := map[string]interface{}{
 | 
						|
		"source": source,
 | 
						|
		"dest":   dest,
 | 
						|
	}
 | 
						|
	buf, _ := json.Marshal(esBody)
 | 
						|
	//fmt.Println(string(buf))
 | 
						|
	reindexResp, err := client.Reindex(buf)
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
	if body.ID == "" {
 | 
						|
		body.ID = util.GetUUID()
 | 
						|
	}
 | 
						|
	body.TaskId = reindexResp.Task
 | 
						|
	body.Status = model.ReindexStatusRunning
 | 
						|
	body.CreatedAt = time.Now()
 | 
						|
 | 
						|
	_, err = client.Index(orm.GetIndexName(body), typ, body.ID, body, "wait_for")
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
	return body.ID, nil
 | 
						|
}
 | 
						|
 | 
						|
func newResponseBody() map[string]interface{} {
 | 
						|
	return map[string]interface{}{
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (handler APIHandler) HandleDeleteRebuildAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
 | 
						|
	id := ps.ByName("id")
 | 
						|
	var ids = []string{id}
 | 
						|
	resBody := newResponseBody()
 | 
						|
	err := deleteTasksByIds(global.MustLookupString(elastic.GlobalSystemElasticsearchID), ids)
 | 
						|
	if err != nil {
 | 
						|
		log.Error(err)
 | 
						|
		resBody["error"] = err
 | 
						|
		handler.WriteJSON(w, resBody, http.StatusOK)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	resBody["payload"] = true
 | 
						|
	handler.WriteJSON(w, resBody, http.StatusOK)
 | 
						|
}
 | 
						|
 | 
						|
func (handler APIHandler) HandleGetRebuildListAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
 | 
						|
	var (
 | 
						|
		from    = handler.GetIntOrDefault(req, "from", 0)
 | 
						|
		size    = handler.GetIntOrDefault(req, "size", 10)
 | 
						|
		name    = handler.GetParameter(req, "name")
 | 
						|
		resBody = newResponseBody()
 | 
						|
		esName  = global.MustLookupString(elastic.GlobalSystemElasticsearchID)
 | 
						|
	)
 | 
						|
	esResp, err := model.GetRebuildList(esName, from, size, name)
 | 
						|
	if err != nil {
 | 
						|
		log.Error(err)
 | 
						|
		resBody["error"] = err.Error()
 | 
						|
		handler.WriteJSON(w, resBody, http.StatusOK)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	err = SyncRebuildResult(esName)
 | 
						|
	if err != nil {
 | 
						|
		log.Error(err)
 | 
						|
		resBody["error"] = err
 | 
						|
		handler.WriteJSON(w, resBody, http.StatusOK)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	resBody["payload"] = esResp
 | 
						|
	handler.WriteJSON(w, resBody, http.StatusOK)
 | 
						|
}
 | 
						|
 | 
						|
func SyncRebuildResult(esName string) error {
 | 
						|
	client := elastic.GetClient(esName)
 | 
						|
	esBody := fmt.Sprintf(`{"query":{"match":{"status": "%s"}}}`, model.ReindexStatusRunning)
 | 
						|
	esRes, err := client.SearchWithRawQueryDSL(orm.GetIndexName(model.Reindex{}), []byte(esBody))
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	var ids = []string{}
 | 
						|
	idMap := map[string]int{}
 | 
						|
	for idx, doc := range esRes.Hits.Hits {
 | 
						|
		taskId := doc.Source["task_id"].(string)
 | 
						|
		ids = append(ids, taskId)
 | 
						|
		idMap[taskId] = idx
 | 
						|
	}
 | 
						|
	if len(ids) == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	taskResp, err := client.SearchTasksByIds(ids)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	var (
 | 
						|
		status model.ReindexStatus
 | 
						|
	)
 | 
						|
	for _, doc := range taskResp.Hits.Hits {
 | 
						|
		status = model.ReindexStatusRunning
 | 
						|
		source := esRes.Hits.Hits[idMap[doc.ID]].Source
 | 
						|
		if _, ok := doc.Source["error"]; ok {
 | 
						|
			status = model.ReindexStatusFailed
 | 
						|
		} else {
 | 
						|
			status = model.ReindexStatusSuccess
 | 
						|
		}
 | 
						|
		source["status"] = status
 | 
						|
		source["task_source"] = doc.Source
 | 
						|
		_, err := client.Index(orm.GetIndexName(model.Reindex{}), "", esRes.Hits.Hits[idMap[doc.ID]].ID, source, "")
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func buildTermsQuery(fieldName string, terms []string) string {
 | 
						|
	esBody := `{
 | 
						|
  "query":{
 | 
						|
    "terms": {
 | 
						|
      "%s": [
 | 
						|
	  %s
 | 
						|
      ]
 | 
						|
    }
 | 
						|
  }
 | 
						|
}`
 | 
						|
	strTerms := ""
 | 
						|
	for _, term := range terms {
 | 
						|
		strTerms += fmt.Sprintf(`"%s",`, term)
 | 
						|
	}
 | 
						|
	esBody = fmt.Sprintf(esBody, fieldName, strTerms[0:len(strTerms)-1])
 | 
						|
	return esBody
 | 
						|
}
 | 
						|
 | 
						|
func deleteTasksByIds(esName string, terms []string) error {
 | 
						|
	client := elastic.GetClient(esName)
 | 
						|
	esBody := buildTermsQuery("_id", terms)
 | 
						|
	deleteRes, err := client.DeleteByQuery(orm.GetIndexName(model.Reindex{}), []byte(esBody))
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if deleteRes.Deleted != deleteRes.Total {
 | 
						|
		return fmt.Errorf("total: %d, deleted: %d", deleteRes.Total, deleteRes.Deleted)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 |