ElasticSearch 嵌套分桶的过滤与排序

By | 2022年1月11日

工作中遇到一个复杂的需求:先根据IP字段为Syslog日志分组,再在每个组中按日志类型(思科路由器、华为防火墙、Jinper防火墙等)分组,但是却没有日志类型的字段,并且要实现多字段排序,另外 Java端还要根据IP、日志类型过滤。下面直接贴参考语法。

第1步:首先来解决嵌套分组

上面的查询语句按IP分了三个桶,分别是 192.168.1.157、192.168.1.158other_hosts,接着在每个IP桶里又嵌套了3个日志类型的桶,分别是 log_type1、log_type2、other_categories

圈出来的地方是要根据客户在页面上的选择进行过滤的,需要在 Java 查询代码里动态设置。

由于日志类型的桶在 ElasticSearch 数据库中是没有字段的,因此这里使用了 match 匹配,匹配到某一类的都归那类日志。

第2步:嵌套统计与多字段排序

有了上面的知识,我们再来看实际的代码,无非加了统计数量和排序:

请求

{
  "size": 0,
  "aggs" : {
    "a1" : {
      "filters" : {
        "other_bucket_key": "other_hosts",
        "filters" : {
          "192_168_1_157" : { "term" : { "host" : "192.168.1.247"   }},
          "192_168_1_247" : { "term" : { "host" : "192.168.1.249"   }}
        }
      },
      "aggs": {
        "doc_count2": { "value_count": { "field": "host" } },
        "a2" : {
          "filters" : {
            "other_bucket_key": "other_categories",
            "filters" : {
              "log_type1" : { "match_phrase" : { "message" : "type=conn" }},
              "log_type2" : { "match_phrase" : { "message" : "type pf" }},
              "log_type3" : { "bool": {
                                "should": [
                                  { "match_phrase": { "message":  "type pf" }},
                                  { "match_phrase": { "message": "type=conn"   }}
                                ]
                            }
              }
            }
          },
                    
          "aggs": {
            "doc_count3": { "value_count": { "field": "host" } },
            "sales_bucket_sort2": {
              "bucket_sort": {
                "sort": [
                  { "doc_count3": { "order": "asc" } } 
                ]                            
              }
            }
          }
      
        },
        "sales_bucket_sort": {
          "bucket_sort": {
            "sort": [
              { "doc_count2": { "order": "asc" } } 
            ]                            
          }
        }
      }
    }
  }
}

结果

{
  "took" : 523,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2239,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "a1" : {
      "buckets" : {
        "192_168_1_157" : {
          "doc_count" : 4,
          "a2" : {
            "buckets" : {
              "other_categories" : {
                "doc_count" : 4,
                "doc_count3" : {
                  "value" : 4
                }
              }
            }
          },
          "doc_count2" : {
            "value" : 4
          }
        },
        "other_hosts" : {
          "doc_count" : 8,
          "a2" : {
            "buckets" : {
              "other_categories" : {
                "doc_count" : 8,
                "doc_count3" : {
                  "value" : 8
                }
              }
            }
          },
          "doc_count2" : {
            "value" : 8
          }
        },
        "192_168_1_247" : {
          "doc_count" : 2227,
          "a2" : {
            "buckets" : {
              "log_type1" : {
                "doc_count" : 33,
                "doc_count3" : {
                  "value" : 33
                }
              },
              "other_categories" : {
                "doc_count" : 126,
                "doc_count3" : {
                  "value" : 126
                }
              },
              "log_type2" : {
                "doc_count" : 2068,
                "doc_count3" : {
                  "value" : 2068
                }
              },
              "log_type3" : {
                "doc_count" : 2101,
                "doc_count3" : {
                  "value" : 2101
                }
              }
            }
          },
          "doc_count2" : {
            "value" : 2227
          }
        }
      }
    }
  }
}

上面的 doc_count2 存储了每个IP桶里有多少条数据, doc_count3 则存储了每个IP桶里的每个日志类型桶里有多少条数据。 另外我在 doc_count2 上进行了 asc 排序,在 doc_count3 也进行了 asc 排序,这和关系数据库里的多字段排序是一个意思,先按 doc_count2 排,再按 doc_count3 排。

FiltersAggregationBuilder 过滤聚合类

上面嵌套分组聚合例子中,原始数据中是没有分组信息的,而是过滤出来的分组信息。在 Java 中这个过滤聚合类对应的是 FiltersAggregationBuilder

再来看个查询:类型 t-shirt 的平均价格,先根据类型过滤,然后在进行聚合计算。

{
    "aggs" : {
        "t_shirts" : {
            "filter" : { "term": { "type": "t-shirt" } },
            "aggs" : {
                "avg_price" : { "avg" : { "field" : "price" } }
            }
        }
    }
}

参考

Java Elasticsearch 聚合查询(Aggregation)详解

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注