使用logstash同步mysql数据到Elasticsearch

1.下载

 wget https://artifacts.elastic.co/downloads/logstash/logstash-7.7.0.tar.gz

2.解压

tar -zvxf logstash-7.7.0.tar.gz 

3.修改jvm
jvm.options 默认
-Xms1g
-Xmx1g
我机器内存很小所以需要修改

/opt/logstash-7.7.0/config# vim jvm.options 

-Xms512m
-Xmx512m

4.运行

 /opt/logstash-7.7.0/bin#./logstash -e 'input { stdin { } } output { stdout {} }'

5.安装 jdbc 和 elasticsearch 插件

/opt/logstash-7.7.0# bin/logstash-plugin install logstash-input-jdbc
Validating logstash-input-jdbc
Installing logstash-input-jdbc
Installation successful
/opt/logstash-7.7.0# bin/logstash-plugin install logstash-output-elasticsearch
Validating logstash-output-elasticsearch
Installing logstash-output-elasticsearch
Installation successful

6.下载mysql-connector-java
7.编写配置文件 sync_table.conf

注意:数据库中删除的数据无法同步到ES中,只能同步insert update 数据

/opt/logstash-7.7.0/config# vim sync_table.conf
input {
     
  jdbc {
     
    # mysql相关jdbc配置
    jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false"
    jdbc_user => "root"
    jdbc_password => "123456"

    # jdbc连接mysql驱动的文件  此处路径一定要正确 否则会报com.mysql.cj.jdbc.Driver could not be loaded
    jdbc_driver_library => "/opt/logstash-7.7.0/sync_config/mysql-connector-java-8.0.13.jar"
    # the name of the driver class for mysql
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_paging_enabled => true
    jdbc_page_size => "50000"

    jdbc_default_timezone =>"Asia/Shanghai"

    # mysql文件, 也可以直接写SQL语句在此处,如下:
    # 如果要使字段和实体类的驼峰命名法一致  则需要这样写sql  select d_name as dName, c_id as cId from area where update_time >= :sql_last_value order by update_time asc
    statement => "select * from area where update_time >= :sql_last_value order by update_time asc"
    # statement_filepath => "./config/jdbc.sql"

    # 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
    schedule => "* * * * *"
    #type => "jdbc"
 

    # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    #record_last_run => true

    # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
    use_column_value => true

    # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
    tracking_column => "update_time"

    tracking_column_type => "timestamp"

    last_run_metadata_path => "area_logstash_capital_bill_last_id"

    # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
    clean_run => false

    #是否将 字段(column) 名称转小写
    #lowercase_column_names => false
  }
}

filter {
     
  date {
     
    match => [ "update_time", "yyyy-MM-dd HH:mm:ss" ]
    timezone => "Asia/Shanghai"
  }
}

output {
     
  elasticsearch {
     
    hosts => ["127.0.0.1:9200"]
    # index名 自定义 相当于数据库 对于实体类上@Document(indexName = "sys_core", type = "area")indexName
    index => "sys_core"  
    #索引的类型 相当于数据库里面的表 对于实体类上@Document(indexName = "sys_core", type = "area")type
    document_type => "area"
    #需要关联的数据库中有有一个id字段,对应索引的id号
    document_id => "%{id}"
    template_overwrite => true
  }

  # 这里输出调试,正式运行时可以注释掉
  stdout {
     
      codec => json_lines
  }
}

8.启动

/opt/logstash-7.7.0# bin/logstash -f config/sync_table.cfg

9…配置同步多张表
比如想同步tableA tableB tableC 3张表 则需要创建3个 sync_table.conf 文件 sync_tableA.conf sync_tableB.conf sync_tableC.conf
只是修改里面的sql语句和索引名
sync_table.conf 文件创建好后最后在 /opt/logstash-7.7.0/config/pipelines.yml 配置

- pipeline.id: table1
  path.config: "/opt/logstash-7.7.0/sync_config/sync_tableA.conf"
- pipeline.id: table2
  path.config: "/opt/logstash-7.7.0/sync_config/ sync_tableB.conf"
- pipeline.id: table3
  path.config: "/opt/logstash-7.7.0/sync_config/sync_tableC.conf"

10.然后启动

/opt/logstash-7.7.0# bin/logstash

你可能感兴趣的