大数据项目任务:mysql与es集群数据实时同步更新

启动数据库net start mysql,

输入mysql -u root -p ,密码:root

出现mysql>配置完成

输入(alter user user() identified by "密码";)

mysql退出 mysql>quit;

net start mysql打开数据库

mysql退出 mysql>quit;

输入net stop mysql关闭数据库

任务一、对数据库mysql进行操作的时候,es也要跟着实时动态更新。

1.mysql数据库与elasticsearch全文检索的同步,通过binlog的设置对mysql数据库操作的日志进行记录,利用python的模块对日志进行操作,再利用kafka的生产者消费者模式进行订阅,最终实现elasticsearch的同步。

binlog技术实现MySQL和elasticsearch进行同步

binlog是数据库里的二进制日志文件,存储关于数据库增删改查的操作,只要有增删改查的操作就可以在binlog中体现出来,

用这个日志可以体现用户的增删改查。

解决两个问题

1.binlog如何设置

binlog与sql数据库有关系,可以在mysql配置文件my.ini,当配置文件修改时,重启数据库

打开命令行,输入mysql -u root -p ,密码:root;进入数据库

显示与binlog有关的变量: show global variables like "%binlog%";

修改binlog_format变量值为行级row:set global binlog_format="ROW";

再用show binlog events;查看数据库增删改的操作

2.python如何读取binlog数据的

python是通过mysql-replication实现binlog日志的读取的

然后打开pycharm去导入binlog监控数据库的操作

我到最后一步老报错那一行代码,for row in binlogstream.rows:

说什么编码错误:LookupError: unknown encoding: utf8mb3,在这里卡了很久,一直找不到解决方法


这里解决了这个问题,原因是我的数据库在创建的时候可能出了点问题,是使用命令行创建的,但之后试了一下使用SQLyog创建数据库选中基字符集类型之后,有一个uft8mb3的字符集,这个之前在网上查找的是数据库改成8.0版本之后没有utf8了,只有utf8mb3代替了,但这里还是会乱码,这之后改掉了创建表的时候列的数据类型,然后可以正常同步数据库的内容,成功输出


之后进行了下一步

下一步就是通过这个日志 把读出来的信息同步到elasticsearch中就更好了

因为数据量很大,所以使用Kafka生产者消费者模式对数据进行处理,消息来消息去

Kafka消息系统

采用页面缓存技术+磁盘顺序读写技术

Page cache基于页面缓存技术+磁盘顺序读写

Kafka实现需要zookeeper调度,对服务器集群进行管理的一个服务

启动zookeeper命令行:

进入d盘

cd d: 敲命令:zkServer

启动kafka服务命令行:先进入windows

kafka-server-start

再设置config中的server.properties

启动kafka

D:\kafka\kafka_2.13-3.4.0\bin\windows>kafka-server-start ..\..\config\server.properties

启动kafka完之后,启动kafka生产者消费者模式,主要利用的是kafka生产者消费者模式

打开命令行,进入d盘,

d:

cd kafka

cd kafka_2.13-3.4.0

cd bin

cd windows

dir

kafka是基于topic消息订阅模式的,利用topic由生产者发送到消费者,所以要先关联topic

启动生产者

kafka-console-producer --broker-list localhost:9092 --topic test

启动消费者(基于topic消息订阅的)

kafka-console-consumer --bootstrap-server localhost:9092 --topic test

在生产者输入this a book

如下图消费者会传过去数据

上述是命令行的启动和设置方式

现在用python模块来实现kafka的生产者消费者模式,安装的是kafka-python模块,实现了python操作kafka的服务

生产者端使用KafkaProducer

消费者端使用KafkaConsumer

注意:在消费者一端的参数要有生产者一端的变量名

现在要直接结合之前的reader,将mysql数据库的数据同步到kafka生产者消费者中,最后运行,再随便改几个数据库的数据,最后可以看到kafka消费者也同步了数据

这样就成功的将reader加入了kafka_producer中


接下来进行elasticsearch数据的同步

  1. MySQL数据首先存储在elasticsearch中

(1)python读取mysql数据,用pysql的模块,建一个reader_data读取pysql数据

(2)把读取的数据存储到elasticsearch当中,可以使用Elasticsearch.index方法进行存储

先启动es


笔记做到这儿的时候,我不明白之前为什么不这样读取数据,还要用kafka生产者,消费者模式来处理,然后代码跑不通,构建不了es索引

调试很多次 调试不通,所以先去找找利用Java写SQL与es数据同步

  1. 当MySQL中的数据发生变化时,同时elasticsearch中的数据也要发生变化




1.mysql数据库与elasticsearch全文检索的同步,通过binlog的设置对mysql数据库操作的日志进行记录,利用python的模块对日志进行操作,再利用kafka的生产者消费者模式进行订阅,最终实现elasticsearch的同步。

2.mysql数据库与elasticsearch全文检索的同步,通过binlog的设置对mysql数据库操作的日志进行记录,利用python的模块对日志进行操作,再利用kafka的生产者消费者模式进行订阅,最终实现elasticsearch的同步。本讲主要针对elasticsearch的增删改同步数据库。

create database readerBinlog default charset=utf8;

use readerBinlog;

create table mytable(id int(11),name varchar(20));

insert into mytable values(1,"孙大圣");

select * from mytable;

show binlog events;

update mytable set name ="黑熊怪" where id=5;

delete from mytable where id=5;

L-JankinLee
关注 关注
  • 0
    点赞
  • 1
    收藏
    觉得还不错? 一键收藏
  • 3
    评论
ELasticSearch(五)ES集群原理与搭建
GBEzmz1993的博客
01-03 827
一、ES集群原理   查看集群健康状况:URL+ /GET _cat/health   (1)、ES基本概念名词     Cluster   代表一个集群集群中有多个节点,其中有一个为主节点,这个主节点是可以通过选举产生的,主从节点是对于集群内部来说的。es的一个概念就是去中心化,字面上理解就是无中心节点,这是对于集群外部来说的,因为从外部来看es集群,在逻辑上是个整体,你与...
特别有用的MySQL数据实时同步ES轻松配置手册
10-12
特别有用的MySQL数据实时同步ES轻松配置手册 特别有用的MySQL数据实时同步ES轻松配置手册
Elasticsearch集群同步
weixin_34026484的博客
06-04 5018
高可用架构 ZSearch是目前公司内最大的Elasticsearch服务平台,随着业务的深入,越来越多的关键链路用户对数据的可用性和容灾能力提出更高的需求,而在这块领域 社区一直没有完整的解决策略,原生的 Snapshot And Restore 只能做快照的恢复,不能做到实时同步;业内主流的队列分发模式(通过消息队列缓存请求数据,多个集群消费数据实现...
MySQLElasticsearch 实时数据同步实操分享,腾讯T2手把手教你
03-22 796
【连 接 名 称】:设置连接的名称,多个连接的名称不能重复【数据库地址】:数据库 IP / Host【端 口】:数据库端口【数据库名称】:tapdata 数据库连接是以一个 db 为一个数据源。这里的 db 是指一个数据库实例中的 database,而不是一个 mysql 实例。【账 号】:可以访问数据库的账号【密 码】:数据库账号对应的密码【时 间 时 区】:默认使用该数据库的时区;若指定时区,则使用指定后的时区设置测试连接,提示测试通过测试通过后保存连接即可。
ES数据同步&集群
Eumenides_Suki的博客
04-09 1582
elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearchmysql之间的。集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。当node3当选后,集群继续对外提供服务,node2和node3自成集群,node1自成集群,两个集群数据同步,出现数据差异。:索引可以被拆分为不同的部分进行存储,称为分片。
数据库➡ES数据更新同步
LIWENDIAN1的博客
05-30 1428
mysql数据数据同步elasticsearch(es)
ES实时刷新与n秒刷新,压测性能耗损情况
热门推荐
王卫东 博客
12-07 1万+
ES压测 机器配置: CPU:4核     内存:8G 一、10万数据 1、50个线程(实时刷新,设置方法参考:http://blog.csdn.net/wwd0501/article/details/78316581) ①创建索引后,默认1s更新 耗时:131437ms(2分钟多)  丢索引情况:无丢失 CPU情况:100%左右 内存使用情况:30%左右 异常情况:无 ②
基于Canal的MySQLElasticsearch实时同步方案设计源码
最新发布
04-09
MySQLElasticsearch实时同步方案 - ...该项目为用户提供了一个基于Canal的MySQLElasticsearch实时同步方案,支持增量同步和全量同步,通过界面交互和功能模块,为用户提供了一个高效、易用的实时数据同步解决方案。
大数据-flink-mysql-connector:用于flinkcdc实时同步
02-04
flinkc-sql-connector-mysql,用于mysql数据实时同步
canal实现mysqlES数据实时同步
07-11
canal实现mysqlES数据实时同步
Binlog2Hive:MySQL增量数据实时同步到HDFSHive
04-29
项目背景RDS的数据实时同步到HDFS下,并映射到Hive原理通过解析RDS的binlog将RDS的增量数据同步到HDFS下,并映射加载到Hive外部分区表由于RDS表中的第二个字段都为datetime字段,所以刚才以该字段作为Hive的分区字段...
es-index-db:一种从数据库创建Elasticsearch索引的简便方法,并在数据更新实时更新索引
05-01
db-river-es 背景   ,深受欢迎的开源分布式搜索引擎,很多场景下,我们需要将数据库的数据导入到ES,提供快速稳定的搜索服务。然而,从绑定了很多业务逻辑的关系型数据库中创建规范的,能够搜索的索引并不那么容易,我们必须根据业务,编写较多代码,关联多个表,才能很好的创建索引,而且很多时候这些代码是重复的。   另外索引创建之后,数据数据如有改动,索引无法联动修改,ElasticSearch没有相关功能,我们只能根据修改频率重建索引,基本上没有实时性可言。   db-river-es正是为了解决这些问题而生。 项目介绍 名称:es-index-db 语言:java,基于jdk1.7 定位:从数据库创建ElasticSearch全量索引,索引与数据数据联动,实时更新 关键词:ElasticSearch index / mysql, sqlserver... / real time i
用python简单实现mysql数据同步ElasticSearch的教程
09-09
今天小编就为大家分享一篇用python简单实现mysql数据同步ElasticSearch的教程,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
Python-同步mysql数据elasticsearch的工具
08-10
同步mysql数据elasticsearch的工具,功能丰富,用法简单,配置灵活,扩展性强;
数列的前m项的和。
Handsomest_的博客
10-24 447
数列的定义如下: 数列的第一项为n,以后各项为前一项的平方根,求数列的前m项的和。 输入数据有多组,每组占一行,由两个整数n(n<10000)和m(m<1000)组成,n和m的含义如前所述。 对于每组输入数据,输出该数列的和,每个测试实例占一行,要求精度保留2位小数。 import java.util.Scanner; public class IxF { public static void main(String[] args) { // TODO Auto-generate
Elasticsearch集群实时同步数据数据(mysql)
ZhanBiaoChina的博客
04-10 1622
帮助文档: jdbc输入插件 https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html 实时同步 https://www.elastic.co/cn/blog/logstash-jdbc-input-plugin# https://segmentfault.com/a/1190000014387486 输出插...
es不是实时更新
荣山的博客
09-03 866
es不是实时更新
自建es集群数据迁移同步到腾讯云
随风而来的温柔
10-13 794
用户自建ES集群如何通过snapshot+cos的方式实现数据到腾讯云ES的迁移恢复
MysqlES数据同步方案汇总
荷逸的博客
02-05 3231
mysql同步es的4中方法
大数据学习一:大数据(离线分析)-spark写入hbase、mysql过程
03-16
大数据学习一:大数据(离线分析)-spark写入hbase、mysql过程 Spark是一个快速、通用、可扩展的大数据处理引擎,可以用于离线批处理、交互式查询和流处理等多种场景。在Spark中,我们可以使用Spark SQL、DataFrame和Dataset等API来进行数据处理和分析。 在Spark中,我们可以将数据写入到HBase和MySQL数据库中。下面是写入HBase和MySQL的过程: 1. 写入HBase (1)创建HBase表 在HBase中,我们需要先创建表,然后才能将数据写入到表中。可以使用HBase Shell或Java API来创建表。下面是使用HBase Shell创建表的示例: create 'mytable', 'cf' 其中,mytable是表名,cf是列族名。 (2)编写Spark程序 在Spark程序中,我们需要使用HBase API来将数据写入到HBase表中。下面是一个简单的示例: val conf = HBaseConfiguration.create() val table = new HTable(conf, "mytable") val put = new Put(Bytes.toBytes("rowkey")) put.add(Bytes.toBytes("cf"), Bytes.toBytes("column"), Bytes.toBytes("value")) table.put(put) 其中,conf是HBase配置对象,table是HBase表对象,put是HBase数据对象。我们可以使用put.add方法来添加数据,然后使用table.put方法将数据写入到HBase表中。 (3)运行Spark程序 在运行Spark程序之前,我们需要将HBase的相关jar包添加到Spark的classpath中。可以使用--jars参数来指定jar包的路径。下面是一个示例: spark-submit --class com.example.MyApp --jars /path/to/hbase.jar /path/to/myapp.jar 其中,MyApp是Spark程序的入口类,/path/to/hbase.jar是HBase的jar包路径,/path/to/myapp.jar是Spark程序的jar包路径。 2. 写入MySQL (1)创建MySQL表 在MySQL中,我们需要先创建表,然后才能将数据写入到表中。可以使用MySQL命令行或GUI工具来创建表。下面是使用MySQL命令行创建表的示例: CREATE TABLE mytable ( id INT PRIMARY KEY, name VARCHAR(50), age INT ); 其中,mytable是表名,id、name和age是列名。 (2)编写Spark程序 在Spark程序中,我们需要使用JDBC API来将数据写入到MySQL表中。下面是一个简单的示例: val url = "jdbc:mysql://localhost:3306/mydb" val props = new Properties() props.setProperty("user", "root") props.setProperty("password", "password") val df = spark.read.format("csv").load("/path/to/data.csv") df.write.mode("append").jdbc(url, "mytable", props) 其中,url是MySQL连接字符串,props是连接属性对象,df是数据集对象。我们可以使用spark.read方法来读取数据,然后使用df.write方法将数据写入到MySQL表中。 (3)运行Spark程序 在运行Spark程序之前,我们需要将MySQL的相关jar包添加到Spark的classpath中。可以使用--jars参数来指定jar包的路径。下面是一个示例: spark-submit --class com.example.MyApp --jars /path/to/mysql.jar /path/to/myapp.jar 其中,MyApp是Spark程序的入口类,/path/to/mysql.jar是MySQL的jar包路径,/path/to/myapp.jar是Spark程序的jar包路径。

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
写文章

热门文章

  • 大数据开发面试经验总结1(慢慢学习补充) 1724
  • 大数据项目任务:mysql与es集群数据实时同步更新 938
  • 刷题day7 344反转字符串、541反转字符串II、剑指Offer 05替换空格、151翻转字符串里的单词、右旋转字符串 912
  • 刷题day8 28实现strStr()、459重复的子字符串、字符串总结、双指针回顾 869
  • 刷题day9 栈与队列理论基础、232用栈实现队列、225用队列实现栈 861

最新评论

  • 刷题day12 二叉树的理论基础、递归遍历、迭代遍历、统一迭代

    CSDN-Ada助手: 推荐 算法 技能树:https://edu.csdn.net/skill/algorithm?utm_source=AI_act_algorithm

  • 刷题day7 344反转字符串、541反转字符串II、剑指Offer 05替换空格、151翻转字符串里的单词、右旋转字符串

    普通网友: 优质好文,博主的文章细节很到位,兼顾实用性和可操作性,期待博主持续带来更多好文【我也写了一些相关领域的文章,希望能够得到博主的指导,共同进步!】

  • 大数据项目任务:mysql与es集群数据实时同步更新

    yudian0427: 写代码一个sql查出来的数据就可以同步了

  • 大数据项目任务:mysql与es集群数据实时同步更新

    尔卿: 其实我也在想这个问题,这个怎么操作呢

  • 刷题day7 344反转字符串、541反转字符串II、剑指Offer 05替换空格、151翻转字符串里的单词、右旋转字符串

    CSDN-Ada助手: 不知道 Java 技能树是否可以帮到你:https://edu.csdn.net/skill/java?utm_source=AI_act_java

大家在看

  • RabbitMQ实践——利用一致性Hash交换器做负载均衡 243
  • ELK日志分析系统 293
  • JS-逆向 特x电(X-Token、WTS、WVER、logininfo)参数
  • OpenStack入门体验及一键部署
  • 《梦醒蝶飞:释放Excel函数与公式的力量》1.2.1Excel如何保存工作簿

最新文章

  • 刷题day12 二叉树的理论基础、递归遍历、迭代遍历、统一迭代
  • 刷题day11 239滑动窗口最大值、347前K个高频元素、总结
  • 刷题day10 20有效的括号、1047删除字符串中的所有相邻重复项、150逆波兰表达式求值
2024年6篇
2023年17篇
2022年2篇

目录

目录

评论 3
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43元 前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值

深圳SEO优化公司爱联网页制作公司晋城建设网站沙井建站南京关键词排名包年推广多少钱安阳网站优化软件太原网站搭建抚州网站优化中卫外贸网站设计哪家好淮北SEO按天收费哪家好沈阳百度竞价包年推广报价思茅网站优化软件报价宜宾网页设计大庆企业网站制作推荐宜宾百度竞价价格湖州网站seo优化价格观澜网站推广系统哪家好汉中网站优化按天收费多少钱淄博百姓网标王哪家好邯郸网站搭建推荐霍邱关键词按天计费公司阳泉外贸网站建设多少钱鹤岗网站优化公司坑梓百姓网标王推广随州SEO按效果付费白银百度标王多少钱鞍山seo优化推荐岳阳品牌网站设计报价三亚网站排名优化哪家好白银网络推广多少钱临沧seo报价歼20紧急升空逼退外机英媒称团队夜以继日筹划王妃复出草木蔓发 春山在望成都发生巨响 当地回应60岁老人炒菠菜未焯水致肾病恶化男子涉嫌走私被判11年却一天牢没坐劳斯莱斯右转逼停直行车网传落水者说“没让你救”系谣言广东通报13岁男孩性侵女童不予立案贵州小伙回应在美国卖三蹦子火了淀粉肠小王子日销售额涨超10倍有个姐真把千机伞做出来了近3万元金手镯仅含足金十克呼北高速交通事故已致14人死亡杨洋拄拐现身医院国产伟哥去年销售近13亿男子给前妻转账 现任妻子起诉要回新基金只募集到26元还是员工自购男孩疑遭霸凌 家长讨说法被踢出群充个话费竟沦为间接洗钱工具新的一天从800个哈欠开始单亲妈妈陷入热恋 14岁儿子报警#春分立蛋大挑战#中国投资客涌入日本东京买房两大学生合买彩票中奖一人不认账新加坡主帅:唯一目标击败中国队月嫂回应掌掴婴儿是在赶虫子19岁小伙救下5人后溺亡 多方发声清明节放假3天调休1天张家界的山上“长”满了韩国人?开封王婆为何火了主播靠辱骂母亲走红被批捕封号代拍被何赛飞拿着魔杖追着打阿根廷将发行1万与2万面值的纸币库克现身上海为江西彩礼“减负”的“试婚人”因自嘲式简历走红的教授更新简介殡仪馆花卉高于市场价3倍还重复用网友称在豆瓣酱里吃出老鼠头315晚会后胖东来又人满为患了网友建议重庆地铁不准乘客携带菜筐特朗普谈“凯特王妃P图照”罗斯否认插足凯特王妃婚姻青海通报栏杆断裂小学生跌落住进ICU恒大被罚41.75亿到底怎么缴湖南一县政协主席疑涉刑案被控制茶百道就改标签日期致歉王树国3次鞠躬告别西交大师生张立群任西安交通大学校长杨倩无缘巴黎奥运

深圳SEO优化公司 XML地图 TXT地图 虚拟主机 SEO 网站制作 网站优化