feat: add api to clean unavailable index, node metadata (#35)

This commit is contained in:
silenceqi 2024-12-14 19:18:45 +08:00 committed by GitHub
parent 1a62e5a1c4
commit 5c783288ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 130 additions and 53 deletions

View File

@ -151,13 +151,71 @@ func (h *APIHandler) SearchIndexMetadata(w http.ResponseWriter, req *http.Reques
must := []interface{}{
}
hasAllPrivilege, indexPrivilege := h.GetCurrentUserIndex(req)
if !hasAllPrivilege && len(indexPrivilege) == 0 {
if indexFilter, hasIndexPri := h.getAllowedIndexFilter(req); hasIndexPri {
if indexFilter != nil{
must = append(must, indexFilter)
}
}else{
h.WriteJSON(w, elastic.SearchResponse{
}, http.StatusOK)
return
}
boolQuery := util.MapStr{
"must_not": []util.MapStr{
{
"term": util.MapStr{
"metadata.labels.index_status": "deleted",
},
},
},
"filter": filter,
"must": must,
}
if len(should) > 0 {
boolQuery["should"] = should
boolQuery["minimum_should_match"] = 1
}
query := util.MapStr{
"aggs": aggs,
"size": reqBody.Size,
"from": reqBody.From,
"highlight": elastic.BuildSearchHighlight(&reqBody.Highlight),
"query": util.MapStr{
"bool": boolQuery,
},
"sort": []util.MapStr{
{
"timestamp": util.MapStr{
"order": "desc",
},
},
},
}
if len(reqBody.Sort) > 1 {
query["sort"] = []util.MapStr{
{
reqBody.Sort[0]: util.MapStr{
"order": reqBody.Sort[1],
},
},
}
}
dsl := util.MustToJSONBytes(query)
response, err := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).SearchWithRawQueryDSL(orm.GetIndexName(elastic.IndexConfig{}), dsl)
if err != nil {
resBody["error"] = err.Error()
h.WriteJSON(w,resBody, http.StatusInternalServerError )
return
}
w.Write(util.MustToJSONBytes(response))
}
func (h *APIHandler) getAllowedIndexFilter(req *http.Request) (util.MapStr, bool){
hasAllPrivilege, indexPrivilege := h.GetCurrentUserIndex(req)
if !hasAllPrivilege && len(indexPrivilege) == 0 {
return nil, false
}
if !hasAllPrivilege {
indexShould := make([]interface{}, 0, len(indexPrivilege))
for clusterID, indices := range indexPrivilege {
@ -215,57 +273,9 @@ func (h *APIHandler) SearchIndexMetadata(w http.ResponseWriter, req *http.Reques
"should": indexShould,
},
}
must = append(must, indexFilter)
return indexFilter, true
}
boolQuery := util.MapStr{
"must_not": []util.MapStr{
{
"term": util.MapStr{
"metadata.labels.index_status": "deleted",
},
},
},
"filter": filter,
"must": must,
}
if len(should) > 0 {
boolQuery["should"] = should
boolQuery["minimum_should_match"] = 1
}
query := util.MapStr{
"aggs": aggs,
"size": reqBody.Size,
"from": reqBody.From,
"highlight": elastic.BuildSearchHighlight(&reqBody.Highlight),
"query": util.MapStr{
"bool": boolQuery,
},
"sort": []util.MapStr{
{
"timestamp": util.MapStr{
"order": "desc",
},
},
},
}
if len(reqBody.Sort) > 1 {
query["sort"] = []util.MapStr{
{
reqBody.Sort[0]: util.MapStr{
"order": reqBody.Sort[1],
},
},
}
}
dsl := util.MustToJSONBytes(query)
response, err := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).SearchWithRawQueryDSL(orm.GetIndexName(elastic.IndexConfig{}), dsl)
if err != nil {
resBody["error"] = err.Error()
h.WriteJSON(w,resBody, http.StatusInternalServerError )
return
}
w.Write(util.MustToJSONBytes(response))
return nil, true
}
func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var indexIDs []interface{}
@ -1265,3 +1275,37 @@ func (h APIHandler) ListIndex(w http.ResponseWriter, req *http.Request, ps httpr
return
}
//deleteIndexMetadata used to delete index metadata after index is deleted from cluster
func (h APIHandler) deleteIndexMetadata(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
esClient := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID))
indexName := orm.GetIndexName(elastic.IndexConfig{})
must := []util.MapStr{
{
"term": util.MapStr{
"metadata.labels.state": "delete",
},
},
}
if indexFilter, hasIndexPri := h.getAllowedIndexFilter(req); hasIndexPri {
if indexFilter != nil {
must = append(must, indexFilter)
}
}else{
//has no any index permission, just return
h.WriteAckOKJSON(w)
return
}
dsl := util.MapStr{
"query": util.MapStr{
"bool": util.MapStr{
"must": must,
},
},
}
_, err := esClient.DeleteByQuery(indexName, util.MustToJSONBytes(dsl))
if err != nil {
h.WriteError(w, err, http.StatusInternalServerError)
}
h.WriteAckOKJSON(w)
}

View File

@ -131,5 +131,7 @@ func init() {
api.HandleAPIMethod(api.GET, "/elasticsearch/metadata", clusterAPI.RequireLogin(clusterAPI.GetMetadata))
api.HandleAPIMethod(api.GET, "/elasticsearch/hosts", clusterAPI.RequireLogin(clusterAPI.GetHosts))
api.HandleAPIMethod(api.DELETE, "/elasticsearch/metadata/index", clusterAPI.RequireLogin(clusterAPI.deleteIndexMetadata))
api.HandleAPIMethod(api.DELETE, "/elasticsearch/metadata/node", clusterAPI.RequireLogin(clusterAPI.deleteNodeMetadata))
}

View File

@ -1364,4 +1364,35 @@ func (h *APIHandler) GetNodeShards(w http.ResponseWriter, req *http.Request, ps
}
h.WriteJSON(w, shards, http.StatusOK)
}
//deleteNodeMetadata used to clean node metadata after node is offline and not active within 7 days
func (h APIHandler) deleteNodeMetadata(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
esClient := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID))
indexName := orm.GetIndexName(elastic.NodeConfig{})
dsl := util.MapStr{
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"metadata.labels.status": "unavailable",
},
},
{
"range": util.MapStr{
"timestamp": util.MapStr{
"lt": "now-7d",
},
},
},
},
},
},
}
_, err := esClient.DeleteByQuery(indexName, util.MustToJSONBytes(dsl))
if err != nil {
h.WriteError(w, err, http.StatusInternalServerError)
}
h.WriteAckOKJSON(w)
}