206 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			206 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
| package index_management
 | |
| 
 | |
| import (
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	log "github.com/cihub/seelog"
 | |
| 	"infini.sh/framework/core/global"
 | |
| 	"infini.sh/framework/core/orm"
 | |
| 	"net/http"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"infini.sh/console/model"
 | |
| 	httprouter "infini.sh/framework/core/api/router"
 | |
| 	"infini.sh/framework/core/elastic"
 | |
| 	"infini.sh/framework/core/util"
 | |
| )
 | |
| 
 | |
| func (handler APIHandler) HandleReindexAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
 | |
| 	reindexItem := &model.Reindex{}
 | |
| 	id := ps.ByName("id")
 | |
| 	if strings.Trim(id, "/") != "" {
 | |
| 		reindexItem.ID = id
 | |
| 	}
 | |
| 	resResult := newResponseBody()
 | |
| 
 | |
| 	err := handler.DecodeJSON(req, reindexItem)
 | |
| 	if err != nil {
 | |
| 		log.Error(err)
 | |
| 		resResult["error"] = err
 | |
| 		handler.WriteJSON(w, resResult, http.StatusOK)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	//fmt.Println(reindexItem)
 | |
| 	typ := handler.GetParameter(req, "_type")
 | |
| 	ID, err := reindex(global.MustLookupString(elastic.GlobalSystemElasticsearchID), reindexItem, typ)
 | |
| 	if err != nil {
 | |
| 		log.Error(err)
 | |
| 		resResult["error"] = err
 | |
| 		handler.WriteJSON(w, resResult, http.StatusOK)
 | |
| 		return
 | |
| 	}
 | |
| 	resResult["payload"] = ID
 | |
| 	handler.WriteJSON(w, resResult, http.StatusOK)
 | |
| }
 | |
| 
 | |
| func reindex(esName string, body *model.Reindex, typ string) (string, error) {
 | |
| 	client := elastic.GetClient(esName)
 | |
| 	source := map[string]interface{}{
 | |
| 		"index": body.Source.Index,
 | |
| 	}
 | |
| 	if body.Source.Query != nil {
 | |
| 		source["query"] = body.Source.Query
 | |
| 	}
 | |
| 	if len(body.Source.Source) > 0 {
 | |
| 		source["_source"] = body.Source.Source
 | |
| 	}
 | |
| 	dest := map[string]string{
 | |
| 		"index": body.Dest.Index,
 | |
| 	}
 | |
| 	if body.Dest.Pipeline != "" {
 | |
| 		dest["pipeline"] = body.Dest.Pipeline
 | |
| 	}
 | |
| 	esBody := map[string]interface{}{
 | |
| 		"source": source,
 | |
| 		"dest":   dest,
 | |
| 	}
 | |
| 	buf, _ := json.Marshal(esBody)
 | |
| 	//fmt.Println(string(buf))
 | |
| 	reindexResp, err := client.Reindex(buf)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	if body.ID == "" {
 | |
| 		body.ID = util.GetUUID()
 | |
| 	}
 | |
| 	body.TaskId = reindexResp.Task
 | |
| 	body.Status = model.ReindexStatusRunning
 | |
| 	body.CreatedAt = time.Now()
 | |
| 
 | |
| 	_, err = client.Index(orm.GetIndexName(body), typ, body.ID, body, "wait_for")
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	return body.ID, nil
 | |
| }
 | |
| 
 | |
| func newResponseBody() map[string]interface{} {
 | |
| 	return map[string]interface{}{
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (handler APIHandler) HandleDeleteRebuildAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
 | |
| 	id := ps.ByName("id")
 | |
| 	var ids = []string{id}
 | |
| 	resBody := newResponseBody()
 | |
| 	err := deleteTasksByIds(global.MustLookupString(elastic.GlobalSystemElasticsearchID), ids)
 | |
| 	if err != nil {
 | |
| 		log.Error(err)
 | |
| 		resBody["error"] = err
 | |
| 		handler.WriteJSON(w, resBody, http.StatusOK)
 | |
| 		return
 | |
| 	}
 | |
| 	resBody["payload"] = true
 | |
| 	handler.WriteJSON(w, resBody, http.StatusOK)
 | |
| }
 | |
| 
 | |
| func (handler APIHandler) HandleGetRebuildListAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
 | |
| 	var (
 | |
| 		from    = handler.GetIntOrDefault(req, "from", 0)
 | |
| 		size    = handler.GetIntOrDefault(req, "size", 10)
 | |
| 		name    = handler.GetParameter(req, "name")
 | |
| 		resBody = newResponseBody()
 | |
| 		esName  = global.MustLookupString(elastic.GlobalSystemElasticsearchID)
 | |
| 	)
 | |
| 	esResp, err := model.GetRebuildList(esName, from, size, name)
 | |
| 	if err != nil {
 | |
| 		log.Error(err)
 | |
| 		resBody["error"] = err.Error()
 | |
| 		handler.WriteJSON(w, resBody, http.StatusOK)
 | |
| 		return
 | |
| 	}
 | |
| 	err = SyncRebuildResult(esName)
 | |
| 	if err != nil {
 | |
| 		log.Error(err)
 | |
| 		resBody["error"] = err
 | |
| 		handler.WriteJSON(w, resBody, http.StatusOK)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	resBody["payload"] = esResp
 | |
| 	handler.WriteJSON(w, resBody, http.StatusOK)
 | |
| }
 | |
| 
 | |
| func SyncRebuildResult(esName string) error {
 | |
| 	client := elastic.GetClient(esName)
 | |
| 	esBody := fmt.Sprintf(`{"query":{"match":{"status": "%s"}}}`, model.ReindexStatusRunning)
 | |
| 	esRes, err := client.SearchWithRawQueryDSL(orm.GetIndexName(model.Reindex{}), []byte(esBody))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	var ids = []string{}
 | |
| 	idMap := map[string]int{}
 | |
| 	for idx, doc := range esRes.Hits.Hits {
 | |
| 		taskId := doc.Source["task_id"].(string)
 | |
| 		ids = append(ids, taskId)
 | |
| 		idMap[taskId] = idx
 | |
| 	}
 | |
| 	if len(ids) == 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 	taskResp, err := client.SearchTasksByIds(ids)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	var (
 | |
| 		status model.ReindexStatus
 | |
| 	)
 | |
| 	for _, doc := range taskResp.Hits.Hits {
 | |
| 		status = model.ReindexStatusRunning
 | |
| 		source := esRes.Hits.Hits[idMap[doc.ID]].Source
 | |
| 		if _, ok := doc.Source["error"]; ok {
 | |
| 			status = model.ReindexStatusFailed
 | |
| 		} else {
 | |
| 			status = model.ReindexStatusSuccess
 | |
| 		}
 | |
| 		source["status"] = status
 | |
| 		source["task_source"] = doc.Source
 | |
| 		_, err := client.Index(orm.GetIndexName(model.Reindex{}), "", esRes.Hits.Hits[idMap[doc.ID]].ID, source, "")
 | |
| 		return err
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func buildTermsQuery(fieldName string, terms []string) string {
 | |
| 	esBody := `{
 | |
|   "query":{
 | |
|     "terms": {
 | |
|       "%s": [
 | |
| 	  %s
 | |
|       ]
 | |
|     }
 | |
|   }
 | |
| }`
 | |
| 	strTerms := ""
 | |
| 	for _, term := range terms {
 | |
| 		strTerms += fmt.Sprintf(`"%s",`, term)
 | |
| 	}
 | |
| 	esBody = fmt.Sprintf(esBody, fieldName, strTerms[0:len(strTerms)-1])
 | |
| 	return esBody
 | |
| }
 | |
| 
 | |
| func deleteTasksByIds(esName string, terms []string) error {
 | |
| 	client := elastic.GetClient(esName)
 | |
| 	esBody := buildTermsQuery("_id", terms)
 | |
| 	deleteRes, err := client.DeleteByQuery(orm.GetIndexName(model.Reindex{}), []byte(esBody))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if deleteRes.Deleted != deleteRes.Total {
 | |
| 		return fmt.Errorf("total: %d, deleted: %d", deleteRes.Total, deleteRes.Deleted)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 |