Oracle数据库有其公司开发的配套rac来实现负载均衡,目前已知的最大节点数能到128个,但是其带来的维护成本无疑是很高的,并且rac的稳定性也并不是特别理想,尤其是节点很多的时候。
但是,相对mysql来说,rac的实用性要比mysql的配套集群软件mysql-cluster要高很多。因为从网上了解到情况来看,很少公司在使用mysql-cluster,大多数企业都会选择第三方代理软件,例如MySQL Proxy、Mycat、haproxy等,但是这会引起另外一个问题:单点故障(包括mysql-cluster:管理节点)。如果要解决这个问题,就需要给代理软件搭建集群,在访问量很大的情况下,代理软件的双机或三机集群会成为访问瓶颈,继续增加其节点数,无疑会带来各方面的成本。
那么,如何可以解决这个问题呢?
解决上述问题,最好的方式个人认为应该是在程序中实现。通过和其他mysql DBA的沟通,也证实了这个想法。但是由此带来的疑问也就产生了:会不会增加开发成本?对现有的应用系统做修改会不会改动很大?会不会增加后期版本升级的难度?等等。
对于一个架构设计良好的应用系统可以很肯定的回答:不会。
那么怎么算一个架构设计良好的应用系统呢?
简单来说,就是分层合理、功能模块之间耦合性底。以本人的经验来说,系统设计基本上可以划分为以下四层:
1. 实体层:主要定义一些实体类
2. 数据层:也可以叫SQL处理层。主要负责跟数据库交互取得数据
3. 业务处:主要是根据业务流程及功能区分模块(或者说定义不同的业务类)
4. 表现层:呈现最终结果给用户
实现上述功能(mysql的读写分离及负载均衡),在这四个层次中,仅仅涉及到数据层。
严格来说,对于设计良好的系统,只涉及到一个类的一个函数:在数据层中,一般都会单独划分出一个连接类,并且这个连接类中会有一个连接函数,需要改动的就是这个函数:在读取连接字符串之前加一个功能函数返回需要的主机、ip、端口号等信息(没有开发经历的同学可能理解这段话有点费劲)。
流程图如下:
代码如下:
import mmap import json import random import mysql.connector import time ##公有变量 #dbinfos={ # "db0":{'host':'192.168.42.60','user':'root','pwd':'Abcd1234','my_user':'root','my_pwd':'Abcd.1234',"port":3306,"database":"","role":"RW","weight":10,"status":1}, # "db1":{'host':'192.168.42.61','user':'root','pwd':'Abcd1234','my_user':'root','my_pwd':'Abcd.1234',"port":3306,,"database":"":"R","weight":20,"status":1} # } dbinfos={} mmap_file = None mmap_time=None ##这个函数返回json格式的字符串,也是实现初始化数据库信息的地方 ##使用json格式是为了方便数据转换,从字符串---》二进制--》字符串---》字典 ##如果采用其它方式共享dbinfos的方法,可以不用此方式 ##配置库的地址 def get_json_str1(): return json.dumps(dbinfos) ##读取配置库中的内容 def get_json_str(): try: global dbinfos cnx = mysql.connector.connect(user='root', password='Abcd.1234', host='192.168.42.60', database='rwlb') cursor = cnx.cursor() cmdString="select * from rwlb" cnt=-1 cursor.execute(cmdString) for (host,user,pwd,my_user,my_pwd,role,weight,status,port,db ) in cursor: cnt=cnt+1 dict_db={'host':host,'user':user,'pwd':pwd,'my_user':my_user,'my_pwd':my_pwd,"port":port,"database":db,"role":role,"weight":weight,"status":status} dbinfos["db"+str(cnt)]=dict_db cursor.close() cnx.close() return json.dumps(dbinfos) except: cursor.close() cnx.close() return "" ##判断是否能正常连接到数据库 def check_conn_host(): try: cnx = mysql.connector.connect(user='root', password='Abcd.1234', host='192.168.42.60', database='rwlb') cursor = cnx.cursor() cmdString="select user()" cnt=-1 cursor.execute(cmdString) for user in cursor: cnt=len(user) cursor.close() cnx.close() return cnt except : return -1; ##select 属于读操作,其他属于写操作-----这里可以划分的更详细,比如执行存储过程等 def analyze_sql_state(sql): if "select" in sql: return "R" else: return "W" ##读取时间信息 def read_mmap_time(): global mmap_time,mmap_file mmap_time.seek(0) ##初始时间 inittime=int(mmap_time.read().translate(None, b'\x00').decode()) ##当前时间 endtime=int(time.time()) ##时间差 dis_time=endtime-inittime print("dis_time:"+str(dis_time)) #重新读取数据 if dis_time>10: ##当配置库正常的情况下才重新读取数据 print(str(check_conn_host())) if check_conn_host()>0: print("read data again") mmap_time.seek(0) mmap_file.seek(0) mmap_time.write(b'\x00') mmap_file.write(b'\x00') get_mmap_time() get_mmap_info() else: print("can not connect to host") #不重新读取数据 else: print("do not read data again") ##从内存中读取信息, def read_mmap_info(sql): read_mmap_time() print("The data is in memory") global mmap_file,dict_db mmap_file.seek(0) ##把二进制转换为字符串 info_str=mmap_file.read().translate(None, b'\x00').decode() #3把字符串转成json格式,方便后面转换为字典使用 infos=json.loads(info_str) host_count=len(infos) ##权重列表 listw=[] ##总的权重数量 wtotal=0 ##数据库角色 dbrole=analyze_sql_state(sql) ##根据权重初始化一个列表。这个是比较简单的算法,所以权重和控制在100以内比较好----这里可以选择其他比较好的算法 for i in range(host_count): db="db"+str(i) if dbrole in infos[db]["role"]: if int(infos[db]["status"])==1: w=infos[db]["weight"] wtotal=wtotal+w for j in range(w): listw.append(i) if wtotal >0: ##产生一个随机数 rad=random.randint(0,wtotal-1) ##读取随机数所在的列表位置的数据 dbindex=listw[rad] ##确定选择的是哪个db db="db"+str(dbindex) ##为dict_db赋值,即选取的db的信息 dict_db=infos[db] return dict_db else : return {} ##如果内存中没有时间信息,则向内存红写入时间信息 def get_mmap_time(): global mmap_time ##第二个参数1024是设定的内存大小,单位:字节。如果内容较多,可以调大一点 mmap_time = mmap.mmap(-1, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_time') ##读取有效比特数,不包括空比特 cnt=mmap_time.read_byte() if cnt==0: print("Load time to memory") mmap_time = mmap.mmap(0, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_time') inittime=str(int(time.time())) mmap_time.write(inittime.encode()) ##如果内存中没有对应信息,则向内存中写信息以供下次调用使用 def get_mmap_info(): global mmap_file ##第二个参数1024是设定的内存大小,单位:字节。如果内容较多,可以调大一点 mmap_file = mmap.mmap(-1, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_mmap') ##读取有效比特数,不包括空比特 cnt=mmap_file.read_byte() if cnt==0: print("Load data to memory") mmap_file = mmap.mmap(0, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_mmap') mmap_file.write(get_json_str().encode()) ##测试函数 def test1(): get_mmap_time() get_mmap_info() for i in range(10): sql="select * from db" #sql="update t set col1=a where b=2" dbrole=analyze_sql_state(sql) dict_db=read_mmap_info(sql) print(dict_db["host"]) def test2(): sql="select * from db" res=analyze_sql_state(sql) print("select:"+res) sql="update t set col1=a where b=2" res=analyze_sql_state(sql) print("update:"+res) sql="insert into t values(1,2)" res=analyze_sql_state(sql) print("insert:"+res) sql="delete from t where b=2" res=analyze_sql_state(sql) print("delete:"+res) ##类似主函数 if __name__=="__main__": test2()
测试结果:
从结果可以看出,只有第一次向内存加载数据,并且按照权重实现了负载均衡。
因为测试函数test1()写的是固定语句,所以读写分离的结果没有显示出来。
另外:测试使用的数据库表结构及数据:
desc rwlb; +---------+-------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +---------+-------------+------+-----+---------+-------+ | host | varchar(50) | YES | | NULL | | | user | varchar(50) | YES | | NULL | | | pwd | varchar(50) | YES | | NULL | | | my_user | varchar(50) | YES | | NULL | | | my_pwd | varchar(50) | YES | | NULL | | | role | varchar(10) | YES | | NULL | | | weight | int(11) | YES | | NULL | | | status | int(11) | YES | | NULL | | | port | int(11) | YES | | NULL | | | db | varchar(50) | YES | | NULL | | +---------+-------------+------+-----+---------+-------+ select * from rwlb; +---------------+------+----------+---------+-----------+------+--------+--------+------+------+ | host | user | pwd | my_user | my_pwd | role | weight | status | port | db | +---------------+------+----------+---------+-----------+------+--------+--------+------+------+ | 192.168.42.60 | root | Abcd1234 | root | Abcd.1234 | RW | 10 | 1 | NULL | NULL | | 192.168.42.61 | root | Abcd1234 | root | Abcd.1234 | R | 20 | 1 | NULL | NULL | +---------------+------+----------+---------+-----------+------+--------+--------+------+------+
总结
以上所述是小编给大家介绍的python实现mysql的读写分离及负载均衡,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
稳了!魔兽国服回归的3条重磅消息!官宣时间再确认!
昨天有一位朋友在大神群里分享,自己亚服账号被封号之后居然弹出了国服的封号信息对话框。
这里面让他访问的是一个国服的战网网址,com.cn和后面的zh都非常明白地表明这就是国服战网。
而他在复制这个网址并且进行登录之后,确实是网易的网址,也就是我们熟悉的停服之后国服发布的暴雪游戏产品运营到期开放退款的说明。这是一件比较奇怪的事情,因为以前都没有出现这样的情况,现在突然提示跳转到国服战网的网址,是不是说明了简体中文客户端已经开始进行更新了呢?
更新日志
- 凤飞飞《我们的主题曲》飞跃制作[正版原抓WAV+CUE]
- 刘嘉亮《亮情歌2》[WAV+CUE][1G]
- 红馆40·谭咏麟《歌者恋歌浓情30年演唱会》3CD[低速原抓WAV+CUE][1.8G]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[320K/MP3][193.25MB]
- 【轻音乐】曼托凡尼乐团《精选辑》2CD.1998[FLAC+CUE整轨]
- 邝美云《心中有爱》1989年香港DMIJP版1MTO东芝首版[WAV+CUE]
- 群星《情叹-发烧女声DSD》天籁女声发烧碟[WAV+CUE]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[FLAC/分轨][748.03MB]
- 理想混蛋《Origin Sessions》[320K/MP3][37.47MB]
- 公馆青少年《我其实一点都不酷》[320K/MP3][78.78MB]
- 群星《情叹-发烧男声DSD》最值得珍藏的完美男声[WAV+CUE]
- 群星《国韵飘香·贵妃醉酒HQCD黑胶王》2CD[WAV]
- 卫兰《DAUGHTER》【低速原抓WAV+CUE】
- 公馆青少年《我其实一点都不酷》[FLAC/分轨][398.22MB]
- ZWEI《迟暮的花 (Explicit)》[320K/MP3][57.16MB]