ElasticSearch 时序数据流(TSDS)

By | 2024年1月29日

ElasticSearch 8.7 开始支持时序数据(TSDS)和降采样(Downsampling)。对时间序列数据流(TSDS)进行降采样的推荐方法是使用索引生命周期管理(ILM),也可以手动调用 降采样API 来触发降采样。

TSDS与常规数据流的区别

  • 索引模板要设置 index.mode: time_series
  • 除了要有 @timestamp,还要至少一个 keyword 类型的维度(dimension)字段,以及至少一个指标(metric)字段
  • Elastic 会为 TSDS 中的每个文档生成一个隐藏的 _tsid 元数据字段,_tsid 是一个包含 dimension 的对象,同一 TSDS 中具有相同 _tsid 的文档是同一时间序列的一部分。
  • TSDS 使用 “time-bound backing indices” 将同一时间段的数据存储在同一后备索引中。
  • (TSDS 的索引模板必须包含 index.routing_path 设置,TSDS 使用此设置来执行基于维度的路由(dimension-based routing)。
  • TSDS 使用内部的 “index sorting”按 _tsid 和 @timestamp 对分片段进行排序。
  • TSDS document 仅支持自动生成的文档 _id 值,不支持自定义文档 _id 值。对于 TSDS 文档,文档 _id 是文档维度和 @timestamp 的哈希值。
  • TSDS 使用 “synthetic _source”,因此受到许多限制,合成的 _source 限制请参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-source-field.html#synthetic-source-restrictions
  • TSDS 可以包含维度或指标以外的字段

官方参考(重要):https://www.elastic.co/guide/en/elasticsearch/reference/current/tsds.html#differences-from-regular-data-stream
官方案例:https://www.elastic.co/guide/en/elasticsearch/reference/current/use-elasticsearch-for-time-series-data.html

TSDS 的定义

创建组件模板

需要定义维度(dimension)、指标(metric)字段:

{
  "template": {
    "mappings": {
      "properties": {
        "@timestamp": {
          "type": "date",
          "format": "date_optional_time||epoch_millis"
        },
        "ruleUuid": {
          "type": "keyword",
          "time_series_dimension": true
        },
        "hostIP": {
          "type": "keyword",
          "time_series_dimension": true
        },
        "hitCount": {
          "type": "long",
          "time_series_metric": "gauge"
        }
      }
    }
  }
}

注意:text 类型不支持定义维度字段,可以定义维度字段的类型参考:Time series data stream (TSDS)

创建索引模板

专门针对时序数据定义的索引模板,这里使用生命周期自动管理时序数据:

{
  "index_patterns": [
    "my-policy-log*"
  ],
  "data_stream": {},
  "template": {
    "settings": {
      "index": {
        "mode": "time_series",
        "look_ahead_time": "1m",
        "number_of_replicas": 0,
        "number_of_shards": 2
      },
      "index.lifecycle.name": "my-policy-log"
    }
  },
  "composed_of": [
    "my-policy-log"
  ],
  "priority": 101
}

创建生命周期策略

{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_age": "5m"
          },
          "downsample": {
            "fixed_interval": "1h"
          }
        }
      }
    }
  }
}

上面的配置,每隔5分钟进行索引翻转,fixed_interval 表示将1小时内的数据汇聚起来以减少数据量,不是每隔1小时触发一次。

降采样

DownSample操作是对一个原始索引执行DownSample,生成DownSample索引。DownSample操作是在索引rollover后产生了一个新索引,然后旧索引过了一段时间,不再写入数据时进行的。目前默认是当前时间比旧索引的end_time大两小时才开始进行DownSample。为了模拟这个效果,创建索引时可以手动指定start_timeend_time参考阿里文档

重要提醒:最新索引的end_time会被Elasticsearch修改为最新时间,影响DownSample演示,默认是5分钟修改一次。DownSample演示操作要确保end_time不被修改,end_time值可通过GET {index}/_settings命令查看。

手动降采样 – 调API

当时是使用 ILM 迟迟没有自动触发降采样,于是测试了下通过调用API来降采样,直接通过 Kibana 来调用:

POST /.ds-my-policy-log-2024.01.29-000001/_downsample/my-downsample-1m
{
    "fixed_interval": "1m"
}

测试报错:Downsample requires setting [index.blocks.write = true] for index [.ds-my-policy-log-2024.01.29-000001]"

于是再调用接口设置 index.blocks.write = true:

PUT /.ds-my-policy-log-2024.01.29-000001/_block/write

Index blocks 参考

接着再调用降采样API就成功了,生成了一个 my-downsample-1m 的新索引,里面文档数也少了。

自动降采样 – ILM

手动降采样时,给滚动索引设置了 index.blocks.write = true(用ILM不需要设置),此时索引的当前 action 变成 downsample 了,且有告警:

似乎看到希望了,时间来到 index.time_series.end_time 还是没有自动触发降采样,直到再过5分钟左右终于看到原来的滚动索引 .ds-my-policy-log-2024.01.29-000001 变成了 downsample-1h-.ds-my-policy-log-2024.01.29-000001,名字多了前缀 downsample-1h-,文档数也少了2条:

降采样后,数据也查到了:

看来配置是没有问题的,只是等的太久了,具体要等多久,目前心理也没数何时会触发。索引生命周期管理检查符合策略标准的索引的频率默认是10分钟检查一次:indices.lifecycle.poll_interval

downsampling 官方参考

自动降采样 – 索引区间

Elasticsearch 在索引创建和翻转过程中自动配置 index.time_series.start_time 和 index.time_series.end_time值,但通过设置 index.look_ahead_time 可以控制 index.time_series.end_time 值的生成,默认是按: now + index.look_ahead_time

在时间序列轮询间隔(通过 time_series.poll_interval 设置控制),Elasticsearch 检查 write index 是否满足其索引生命周期策略中的滚动条件,如果不满足,Elasticsearch 会刷新当前值并将 write index 的 index.time_series.end_time 更新为:now + index.look_ahead_time + time_series.poll_interval。此过程将持续到 write index 翻转为止。当索引滚动时,Elasticsearch 为索引设置最终的 index.time_series.end_time 值。

只有当插入数据的 @timestamp 时间字段值落在 start_time 和 end_time 之间才能被索引。

与降采样的关系,我的理解:到达 end_time 后才会触发降采样,降采样过程中会经历多个 step,要登上好多分钟。参考 time-bound-indices
Some ILM actions mark the source index as read-only, or expect the index to not be actively written anymore in order to provide good performance. These actions are: – Delete – Downsample – Force merge – Read only – Searchable snapshot – Shrink Index lifecycle management will not proceed with executing these actions until the upper time-bound for accepting writes, represented by the index.time_series.end_time index setting, has lapsed.

look_ahead_time 参数默认是 2h(2小时),但 end_time 的计算似乎有个固定的 2h(有点困惑,有时候又没有,未理解透),因此默认生成的索引 start_time和end_time间隔了4小时,如果设置 look_ahead_time 为1m(1分钟),start_time和end_time就间隔2小时1分钟:

此时索引的 action 和 step:

降采样阶段先看到了这个 wait-for-shard-history-leases:

然后变为 check-ts-end-time-passed:

最后触发了降采样:

降采样后 gauge 类型的指标字段(使用 counter 计数器类型汇聚后无法进行 sum),不是一个数字了,变成了一个对象。
java上处理的时候要特别小心,对于还没汇聚的 hitCount值是一个整数,对于已汇聚的是一个 LinkedHashMap类型:

降采样后新的时间戳问题

接收早于指定时间的 @timestamp 文档

控制索引2小时以前的数据可以试用 index.look_back_time 参数,此参数默认就是2小时,参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/tsds.html#tsds-look-back-time

快照生命周期管理 (SLM)

参考

对时间序列数据流进行降采样(downsampling)
使用 ILM 示例运行降采样 (downsampling)

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注