Logstash 同步mysql数据到ElasticSearch 痛定思痛。 2021-11-05 23:22 353阅读 0赞 1.解压Logstash 2.在logstash根目录新建custom文件夹,用于自定义的一些配置文件,如下图 ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwOTcxNzUy_size_16_color_FFFFFF_t_70][] custom文件夹下的配置文件如下 ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwOTcxNzUy_size_16_color_FFFFFF_t_70 1][] 把数据库驱动拷贝到custom下 首先创建索引库,该语句只适用与7.x以上的版本,(如需要7.x以下的需要mappings 和properties之间加一个文档名(Docement),因为7.x之后就默认文档名(Docement)为\_doc,并且不可修改) > PUT /area > \{ > > "settings": \{ > > "refresh\_interval": "5s", > "number\_of\_shards": 1, > "number\_of\_replicas": 1, > "analysis": \{ > "filter": \{ > "pinyin\_full\_filter": \{ > "keep\_joined\_full\_pinyin": "true", > "lowercase": "true", > "keep\_original": "false", > "keep\_first\_letter": "false", > "keep\_separate\_first\_letter": "false", > "type": "pinyin", > "keep\_none\_chinese": "false", > "limit\_first\_letter\_length": "50", > "keep\_full\_pinyin": "true" > \}, > "pinyin\_simple\_filter": \{ > "type": "pinyin", > "keep\_joined\_full\_pinyin": "true", > "lowercase": "true", > "none\_chinese\_pinyin\_tokenize": "false", > "padding\_char": " ", > "keep\_original": "true", > "keep\_first\_letter": "true", > "keep\_separate\_first\_letter": "false", > > "keep\_full\_pinyin": "false" > \} > \}, > "analyzer": \{ > "pinyinFullIndexAnalyzer": \{ > "filter": \["asciifolding", "lowercase", "pinyin\_full\_filter"\], > "type": "custom", > "tokenizer": "ik\_max\_word" > \}, > "ik\_pinyin\_analyzer": \{ > "filter": \["asciifolding", "lowercase", "pinyin\_full\_filter", "word\_delimiter"\], > "type": "custom", > "tokenizer": "ik\_smart" > \}, > "ikIndexAnalyzer": \{ > "filter": \["asciifolding", "lowercase"\], > "type": "custom", > "tokenizer": "ik\_max\_word" > \}, > "pinyiSimpleIndexAnalyzer": \{ > "type": "custom", > "tokenizer": "ik\_max\_word", > "filter": \["pinyin\_simple\_filter", "lowercase"\] > \} > \} > \} > > \}, > "mappings":\{ > > "properties":\{ > "area":\{ > "type":"text", > "analyzer":"ikIndexAnalyzer", > "fields":\{ > "ik":\{ > "type":"text", > "analyzer":"ikIndexAnalyzer" > \}, > "spy":\{ > "type":"text", > "analyzer":"pinyiSimpleIndexAnalyzer" > \}, > "fpy":\{ > "type":"text", > "analyzer":"pinyinFullIndexAnalyzer" > \} > \} > \} > > \} > \} > 编写area.sql > SELECT a.id as id,a.area\_name as address FROM area a 编写shipper.conf(自己改下数据库驱动,数据库路径,用户名密码,sql路径) > input \{ > jdbc\{ > \#type => "archive\_files\_index" > jdbc\_connection\_string => "jdbc:mysql://192.168.0.146:3306/face" > jdbc\_driver\_library => "F:/software/logstash-6.2.4/logstash-6.2.4/custom/mysql-connector-java-5.1.44.jar" > jdbc\_driver\_class => "com.mysql.jdbc.driver" > > jdbc\_user => "root" > jdbc\_password => "root" > > \#last\_run\_metadata\_path => "./logstash\_jdbc\_last\_run" > > \#statement => "select \* from sys\_user where name = :name" > \#直接执行sql > \#statement => "" > statement\_filepath => "F:/software/ELK7.2/logstash-7.2.0/logstash-7.2.0/custom/area.sql" > \#是否分页 > jdbc\_paging\_enabled => "true" > jdbc\_page\_size => "50" > > \#every 5 minutes execute > \#schedule => "\* \* \* \* \* \*" > \#每分钟同步一次数据 > schedule => "\* \* \* \* \* \*" > > \} > \} > > output \{ > elasticsearch\{ > \#elasticsearch端口号 > hosts => "localhost:9200" > index => "area" > \#document\_type => "student" > document\_id => "%\{id\}" > \#doc\_as\_upsert => true > \#action => "update" > \} > > > stdout\{ > \#以json的方式进行输出 > codec => json\_lines > \} > \} 表数据有点多,我是爬虫爬的全国省 地级市 和 区域信息(这里就给你们10条作为测试数据吧) > CREATE TABLE \`area\` ( > \`id\` int(4) NOT NULL, > \`area\_name\` varchar(255) CHARACTER SET utf8 COLLATE utf8\_general\_ci NULL DEFAULT NULL COMMENT '地区名称', > \`parent\_id\` int(11) NULL DEFAULT NULL COMMENT '上级地区 0-省 1-市 2-县区', > INDEX \`id\`(\`id\`) USING BTREE, > INDEX \`fk\_parent\_id\`(\`parent\_id\`) USING BTREE > ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8\_general\_ci ROW\_FORMAT = Compact; > > > INSERT INTO \`area\` VALUES (1, '北京市', 0); > INSERT INTO \`area\` VALUES (2, '市辖区', 1); > INSERT INTO \`area\` VALUES (3, '东城区', 2); > INSERT INTO \`area\` VALUES (4, '西城区', 2); > INSERT INTO \`area\` VALUES (5, '朝阳区', 2); > INSERT INTO \`area\` VALUES (6, '丰台区', 2); > INSERT INTO \`area\` VALUES (7, '石景山区', 2); > INSERT INTO \`area\` VALUES (8, '海淀区', 2); > INSERT INTO \`area\` VALUES (9, '门头沟区', 2); > INSERT INTO \`area\` VALUES (10, '房山区', 2); 一切准备就绪 使用dos命令窗口,切换到logstash下 执行命令: logstash -f ../custom/shipper.conf ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwOTcxNzUy_size_16_color_FFFFFF_t_70 2][] ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwOTcxNzUy_size_16_color_FFFFFF_t_70 3][] 因为加了cron表达式,所以每分钟会同步一次 kabana查看数据 GET area/\_search ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwOTcxNzUy_size_16_color_FFFFFF_t_70 4][] ok,搞定 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwOTcxNzUy_size_16_color_FFFFFF_t_70]: /images/20211104/29686a4606c2498693953af8049ad57c.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwOTcxNzUy_size_16_color_FFFFFF_t_70 1]: /images/20211104/f4f5549f66be4bdb96eda98557b9b86c.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwOTcxNzUy_size_16_color_FFFFFF_t_70 2]: /images/20211104/9b0007f742054345b4159e0cdf48e910.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwOTcxNzUy_size_16_color_FFFFFF_t_70 3]: /images/20211104/3fcadc7c72174482bd4a65c9ad2933c2.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwOTcxNzUy_size_16_color_FFFFFF_t_70 4]: /images/20211104/c44db686baf94ac2873c5008f315b0d9.png
还没有评论,来说两句吧...