elasticsearch 中使用 pipeline 对数据执行转换,丰富数据


elasticsearch 中使用 pipeline 对数据执行转换,丰富数据

pipeline 是什么

在 Elasticsearch 中,管道(Pipeline)是一种用于在文档索引时执行一系列转换和处理操作的机制。管道能够对输入文档进行多步骤的处理,以生成最终的索引文档。管道的主要目的是对文档进行预处理,以便更有效地索引和检索数据。

我们主要为了给日志数据增加ip地理位置说明以及ip公私有和ip是否可信

创建管道

  1. 参考例子
  2. 在Kibana打开主菜单
  3. 导航到对战管理>引入管道
    • 查看 ElasticSearch Ingest 管道的列表,并深入了解详细信息
    • 编辑或克隆现有的ElasticSearch收录的管道
  4. 创建管道 > 新建管道
  5. 输入 管道的名称和适当描述
  6. 单击”添加处理器”选项,然后选择”Grok”作为处理器类型
  7. 将字段配置为消息,将模式配置为Grok 模式
  8. 最后,单机”添加”选项以保存处理器。还可以将处理器描述配置为”消息”中提取字段
  9. 现在,为时间戳、IP 地址和用户代理字段设置处理器
  10. 单击“添加文档”,并在“文档”选项卡中提供用于测试的示例文档。然后,单击“运行管道”选项
  11. 测试成功后,可以关闭面板,然后单击“创建管道”选项

数据策略

  1. api doc
  2. 策略删除
    • DELETE /_enrich/policy/{policy name}
    • 示例 DELETE /_enrich/policy/misp-enrich-block-ip
  3. 策略创建

    •   PUT /_enrich/policy/{policy name}
        {
            "match": {
                "indices": "{indexes}",
                "match_field": "{match field}",
                "enrich_fields": ["{field1}", "{field2}", ...]
            }
        }
      
    •   PUT /_enrich/policy/misp-enrich-block-ip
        {
            "match": {
                "indices": "fixed-misp-7.10.2",
                "match_field": "destination.ip",
                "enrich_fields": ["rule", "misp.threat_indicator"]
            }
        }
      
  4. 策略执行
    • PUT /_enrich/policy/{policy name}/_execute
    • 示例: PUT /_enrich/policy/misp-enrich-block-ip/_execute
  5. 策略状态查看
    • GET /_enrich/_stats
  6. 策略查看
    • GET /_enrich/policy/misp-enrich-block-ip

例子 丰富IP结果

  1. 管道创建
  2. 创建脚本处理器
    1. 语言: painless
    2. 源:

           Pattern pattern = /^(127\.0\.0\.1)|(localhost)|(10\.\d{1,3}\.\d{1,3}\.\d{1,3})|(172\.((1[6-9])|(2\d)|(3[01]))\.\d{1,3}\.\d{1,3})|(192\.168\.\d{1,3}\.\d{1,3})$/;
           boolean flag = pattern.matcher(ctx.data.dest_ip).matches();
           if(flag) {
               ctx.data.dest_ip_flag = "private";
           } else {
               ctx.data.dest_ip_flag = "public";
           }
      
    3. 条件: ctx?.data?.dest_ip != null
    4. painless 断点测试 Debug.explain(ctx._source)
  3. 创建数据丰富处理器
    1. 字段: data.dest_ip
    2. 策略名称: misp-enrich-block-ip
    3. 目标字段: data.dest_enrich
    4. 条件: ctx?.data?.dest_ip != null && ctx?.data?.dest_ip_flag==”public”
  4. 测试数据

     [
         {
             "_index": "index",
             "_id": "id",
             "_source": {
             "data": {
                 "dest_ip": "192.168.1.1"
             }
             }
         },
         {
             "_index": "index",
             "_id": "id",
             "_source": {
             "data": {
                 "dest_ip": "66.66.66.66"
             }
             }
         },
         {
             "_index": "index",
             "_id": "id",
             "_source": {
             "data": {
                 "dest_ip": "101.43.209.65"
             }
             }
         }
     ]
    

脚本

  1. create pattern

         Pattern pattern = /^(127\.0\.0\.1)|(localhost)|(10\.\d{1,3}\.\d{1,3}\.\d{1,3})|(172\.((1[6-9])|(2\d)|(3[01]))\.\d{1,3}\.\d{1,3})|(192\.168\.\d{1,3}\.\d{1,3})$/;
         boolean flag = pattern.matcher(ctx.data.dest_ip).matches();
         if(flag) {
             ctx.data.dest_ip_flag = "private";
         } else {
             ctx.data.dest_ip_flag = "public";
         }
    
  2. test data

         [
             {
                 "_index": "index",
                 "_id": "id",
                 "_source": {
                 "data": {
                     "dest_ip": "192.168.1.1"
                 }
                 }
             },
             {
                 "_index": "index",
                 "_id": "id",
                 "_source": {
                 "data": {
                     "dest_ip": "66.66.66.66"
                 }
                 }
             },
             {
                 "_index": "index",
                 "_id": "id",
                 "_source": {
                 "data": {
                     "dest_ip": "101.43.209.65"
                 }
                 }
             }
         ]
    
  3. output

         {
             "doc": {
                 "_index": "index",
                 "_type": "_doc",
                 "_id": "id",
                 "_source": {
                 "data": {
                     "dest_enrich": {
                         "destination": {
                             "ip": "101.43.209.65"
                         },
                         "rule": {
                             "description": "blockrules of rules.emergingthreats.net feed",
                             "id": "1",
                             "category": "Network activity",
                             "uuid": "61201a4f-5971-4f71-b287-0e3ba5603ffe"
                         },
                         "misp": {
                             "threat_indicator": {
                             "feed": "misp",
                             "attack_pattern": "[destination:ip = '101.43.209.65']",
                             "description": "blockrules of rules.emergingthreats.net feed",
                             "id": "61201a4f-5971-4f71-b287-0e3ba5603ffe",
                             "type": "ip-dst",
                             "attack_pattern_kql": "destination.ip: \"101.43.209.65\""
                             }
                         }
                     },
                     "dest_ip_flag": "public",
                     "dest_ip": "101.43.209.65"
                 }
                 },
                 "_ingest": {
                 "timestamp": "2022-09-08T09:18:38.568617863Z"
                 }
             }
         }