您当前的位置:首页 > 互联网教程

redis如何与数据库数据同步

发布时间:2025-05-24 20:56:53    发布人:远客网络

redis如何与数据库数据同步

一、redis如何与数据库数据同步

1、我们大多倾向于使用这种方式,也就是将数据库中的变化同步到Redis,这种更加可靠。Redis在这里只是做缓存。

2、方案1(推荐学习:Redis视频教程)

3、做缓存,就要遵循缓存的语义规定:

4、读:读缓存redis,没有,读mysql,并将mysql的值写入到redis。

5、写:写mysql,成功后,更新或者失效掉缓存redis中的值。

6、对于一致性要求高的,从数据库中读,比如金融,交易等数据。其他的从Redis读。

7、这种方案的好处是由mysql,常规的关系型数据库来保证持久化,一致性等,不容易出错。

8、这里还可以基于binlog使用mysql_udf_redis,将数据库中的数据同步到Redis。

9、但是很明显的,这将整体的复杂性提高了,而且本来我们在系统代码中能很轻易完成的功能,现在需要依赖第三方工具,而且系统的整个边界扩大了,变得更加不稳定也不好管理了。

二、将mysql数据库中的单个库的数据同步到redis数据库中

1、(self,key,where,refvalue,value):"""在key对应的列表的某一个值前或后插入一个新值 r.linsert("list2","before","11","00")#往列表中左边第一个出现的元素"11"前插入元素"00":param key::param where: before or after:param refvalue:标杆值,即:在它前后插入数据:param value:要插入的数据:return:""" try: if str(where).lower() not in [‘before‘,‘after‘]: raise ValueError(‘where值只能是 before或 after‘) return self.rd.linsert(key,where,refvalue,value) except Exception as e: return"Func linsertkeys Error:%d%s"%(e.args[0],e.args[1]) def lsetkeys(self,key,index,value):"""对key对应的list中的某一个索引位置重新赋值:param key::param index:索引值,从0开始:param value::return:""" try: if not isinstance(index,int): raise TypeError(‘index只能是整数‘) return self.rd.lset(key,index,value) except Exception as e: return"Func lsetkeys:%d%s"%(e.args[0],e.args[1]) def lremkeys(self,key,num,value):"""在key对应的list中删除指定的值:param key::param value::param num: num=0,删除列表中所有指定值,num=2,从前到后,删除2个,num=1,从前到后,删除左边第一个,num=-2,从后到前,删除两个:return:""" return self.rd.lrem(key,num,value) def lpopkeys(self,key):"""在key对应的列表的左侧获取第一个元素并在列表中移除,返回值则是第一个元素:param key::return:""" return self.rd.lpop(key) def rpopkeys(self,key):"""在key对应的列表的右侧获取第一个元素并在列表中移除,返回值则是第一个元素:param key::return:""" return self.rd.rpop(key) def ltrimkeys(self,key,start,end):"""在key对应的list中移除没有在start- end索引之间的值:param key::param start:索引的开始位置:param end:索引的结束位置:return:""" return self.rd.ltrim(key,start,end) def lindexkeys(self,key,index):"""在name对应的list中根据索引获取列表元素:param key::param index::return:""" return self.rd.lindex(key,index) def rpoplpushkeys(self,srckey,dstkey):"""将元素从一个表移动到另一个表中(从一个列表取出最右边的元素,同时将其添加至另一个列表的最左边):param srckey::param detkey::return:""" return self.rd.rpoplpush(srckey,dstkey) def brpoplpushkeys(self,srckey,dstkey,timeout=0):"""从一个列表的右侧移除一个元素并将其添加到另一个列表的左侧,可以设置超时:param srckey::param dstkey::param timeout:当seckey对应的list中没有数据时,阻塞等待其有数据的超时时间单位为秒,0表示永远阻塞:return:""" return self.rd.brpoplpush(srckey,dstkey,timeout) def blpopkeys(self,keys,timeout):"""将多个列表排序,按照从左到右去pop对应的list元素:param keys::param timeout:超时时间,当元素所有列表的元素获取完之后,阻塞等待列表内有数据的时间(秒), 0表示永远阻塞:return:""" return self.rd.blpop(keys,timeout) def brpopkeys(self,keys,timeout):"""将多个列表排序,按照从右到左去pop对应的list元素:param keys::param timeout:超时时间,当元素所有列表的元素获取完之后,阻塞等待列表内有数据的时间(秒), 0表示永远阻塞:return:""" return self.rd.brpop(keys,timeout) def llenkeys(self,key):"""返回key的数据长度:param key::return:""" return self.rd.llen(key) def lrangekeys(self,key,start=0,end=-1):"""获取key的全部数据值通常与llen联用:param key::param start:起始位置(包括):param end:结束位置(包括):return:""" return self.rd.lrange(key,start,end) def listiterkeys(self,key):""" list增量迭代:param key::return:返回yield生成器""" list_count= self.rd.llen(key) for index in range(list_count): yield self.rd.lindex(key,index)# set操作 def saddkeys(self,key,values):""":key对应的集合中添加元素,重复的元素将忽略 saddkeys(‘xxx‘,1,2,3,4,5):param key::param values::return:""" return self.rd.sadd(key,values) def scardkeys(self,key):"""获取元素个数,类似于len:param key::return:""" return self.rd.scard(key) def smemnerkeys(self,key):"""获取key对应的集合的所有成员:param key::return:""" return self.rd.smembers(key) def sscankeys(self, key, cursor=0, match=None, count=None):"""获取集合中所有的成员--元组形式:param key::param cursor::param match::param count::return:""" return self.rd.sscan(key,cursor,match,count) def sscaniterkeys(self,key,match=0,count=None):"""获取集合中所有的成员--迭代器方式:param key::param match::param count::return:""" return self.rd.sscan_iter(key,match,count) def sdiffkeys(self,keys,*args):"""在第一个key对应的集合中且不在其他key对应的集合的元素集合 sdiffkeys(‘a‘,‘b)在集合a中并且不再集合b中:param keys::param args::return:""" return self.rd.sdiff(keys,args) def sdiffstorekeys(self,dstkey,keys,*args):"""获取第一个key对应的集合中且不在其他name对应的结合,在将其加入到dstkey对应的集合中:param dstkey::param keys::param args::return:""" res= self.rd.sdiffstore(dstkey,keys,args) return res def sinterkeys(self,keys,*args):"""获取多个keys对应集合的交集:param keys::param args::return:""" return self.rd.sinter(keys,args) def sinterstorekeys(self,dstkey,keys,*args):"""获取多个keys对应集合的交集,再将其加入到dstkey对应的集合中:param dstkey::param keys::param args::return:""" res= self.rd.sinterstore(dstkey,keys,args) return res def sunionkeys(self,keys,*args):"""获取多个key对应的集合的并集:param keys::param args::return:""" return self.rd.sunion(keys,args) def sunionstorekeys(self,dstkey,keys,*args):"""获取多个key对应集合的并集,并将其结果保存到dstkey对应的集合中:param dstkey::param keys::param args::return:""" return self.rd.sunionstore(dstkey,keys,args) def sismemberkeys(self,key,value):"""检查value是否是key对应的结合的成员,结果返回True or False:param key::param value::return: True or False""" return self.rd.sismember(key,value) def smovekeys(self,srckey,dstkey,value):"""将某个成员从一个集合中移动到另一个集合:param srckey::param dstkey::param value::return:""" return self.rd.smove(srckey,dstkey,value) def spopkeys(self,key):"""从集合移除一个成员,并将其返回,说明一下,集合是无序的,所有都是随机删除的:param key::return:""" return self.rd.spop(key) def sremkeys(self,key,values):"""在key对应的集合中删除某些值 sremkeys(‘xx‘,‘a‘,‘b‘):param key::param values::return:""" return self.rd.srem(key,values)#有序集合 zset操作 def zaddkeys(self,key,mapping):"""在key对应的有序集合中添加元素 zaddkeys(‘xx‘,{‘k1‘:1,‘k2‘:2}):param key::param args::param kwargs::return:""" return self.rd.zadd(key,mapping) def zcardkeys(self,key):"""获取有序集合元素的个数:param key::return:""" return self.rd.zcard(key) def zrangekeys(self,key,start,stop,desc=False,withscores=False,score_cast_func=float):"""获取有序集合索引范围内的元素:param key::param start:索引起始位置:param stop:索引结束位置:param desc:排序规则,默认按照分数从小到大排序:param withscores:是否获取元素的分数,默认之火去元素的值:param score_cast_func:对分数进行数据转换的函数:return:""" return self.rd.zrange(key,start,stop,desc,withscores,score_cast_func) def zrevrangekeys(self,key,start,end,withscores=False,score_cast_func=float):"""从大到小排序:param key::param start::param end::param withscores::param score_cast_func::return:""" return self.rd.zrevrange(key,start,end,withscores,score_cast_func) def zrangebyscorekeys(self,key,min,max,start=None,num=None,withscores=False,score_cast_func=float):"""按照分数返回获取key对应的有序集合的元素 zrangebyscore("zset3", 15, 25))##在分数是15-25之间,取出符合条件的元素 zrangebyscore("zset3", 12, 22, withscores=True)#在分数是12-22之间,取出符合条件的元素(带分数):param key::param min::param max::param start::param num::param withscores::param score_cast_func::return:""" return self.rd.zrangebyscore(key,min,max,start,num,withscores,score_cast_func) def zrevrangebyscorekeys(self,key,max,min,start=None,num=None,withscores=False,score_cast_func=float):"""按照分数范围获取有序集合的元素并排序(默认按照从大到小排序):param key::param max::param min::param start::param num::param withscores::param score_cast_func::return:""" return self.rd.zrevrangebyscore(key,max,min,start,num,withscores,score_cast_func) def zscankeys(self,key,cursor=0,match=None,count=None,score_cast_func=float):"""获取所有元素--默认按照分数顺序排序:param key::param cursor::param match::param count::param score_cast_func::return:""" return self.rd.zscan(key,cursor,match,count,score_cast_func) def zscaniterkeys(self,key,match=None,count=None,score_cast_func=float):"""获取所有元素的迭代器:param key::param match::param count::param score_cast_func::return:""" return self.rd.zscan_iter(key,match,count,score_cast_func) def zcountkeys(self,key,min,max):"""获取key对应的有序集合中分数在[min,max]之间的个数:param key::param min::param max::return:""" return self.rd.zcount(key,min,max) def zincrbykey(self,key,value,amount=1):"""自增key对应的有序集合key对应的分数:param key::param value::param amount::return:""" return self.rd.zincrby(key,amount,value) def zrankeys(self,key,value):"""获取某个值在key对应的有序集合中的索引(从0开始),默认是从小到大顺序:param key::param value::return:""" return self.rd.zrank(key,value) def zrevrankey(self,key,value):"""获取某个值在key对应的有序集合中的索引,默认是从大到小顺序:param key::param value::return:""" return self.rd.zrevrank(key,value) def zremkeys(self,key,values):"""删除key对应的有序集合中值是value的成员:param key::param value::return:""" return self.rd.zrem(key,values) def zremrangebyrankeys(self,key,min,max):"""删除根据排行范围删除,按照索引删除:param key::param min::param max::return:""" return self.rd.zremrangebyrank(key,min,max) def zremrangebyscoreskeys(self,key,min,max):"""删除根据分数范围删除,按照分数范围删除:param key::param min::param max::return:""" return self.rd.zremrangebyscore(key,min,max) def zscoreskeys(self,key,value):"""获取key对应有序集合中value对应的分数:param key::param value::return:""" return self.rd.zscore(key,value)#其他常用操作 def deletekeys(self,*keys):"""根据删除redis中的任意数据类型 deletekeys(‘gender‘,‘cname‘)可以删除多个key:param keys::return:""" return self.rd.delete(*keys) def existskeys(self,key):"""检测redis的key是否存在,存在就返回True,False则不存在:param key::return:""" return self.rd.exists(key) def getkeys(self,pattern=""):"""根据模型获取redis的key getkeys*匹配数据库中所有 key。 getkeys h?llo匹配 hello, hallo和 hxllo等。 getkeys hllo匹配 hllo和 heeeeello等。 getkeys h[ae]llo匹配 hello和 hallo,但不匹配 hillo:param pattern::return:""" return self.rd.keys(pattern) def expirekeys(self,key,time):"""设置超时时间,为某个key设置超时时间:param key::param time:秒:return:""" return self.rd.expire(key,time) def renamekeys(self,srckey,dstkey):"""对key进行重命名:param srckey::param dstkey::return:""" return self.rd.rename(srckey,dstkey) def randomkeys(self):"""随即获取一个key(不删除):return:""" return self.rd.randomkey() def getkeytypes(self,key):"""获取key的类型:param key::return:""" return self.rd.type(key) def scankeys(self,cursor=0,match=None,count=None):"""查看所有key,终极大招:param cursor::param match::param count::return:""" return self.rd.scan(cursor,match,count) def scaniterkeys(self,match=None,count=None):"""查看所有key的迭代器,防止内存被撑爆:param match::param count::return:""" return self.rd.scan_iter(match,count) def dbsizekeys(self):"""当前redis包含多少条数据:return:""" return self.rd.dbsize() def savekeys(self):"""将数据刷入磁盘中,保存时阻塞:return:""" return self.rd.save() def flushdbkeys(self,asynchronous=False):"""清空当前数据库的所有数据,请谨慎使用:param asynchronous:异步:return:""" return self.rd.flushdb(asynchronous) def flushallkeys(self,asynchronous=False):"""清空redis所有库中的数据,不到最后时刻请不要使用:param asynchronous:异步:return:""" return self.rd.flushall(asynchronous)#创建管道""" redis默认在执行每次请求都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline是原子性操作。管道(pipeline)是redis在提供单个请求中缓冲多条服务器命令的基类的子类。它通过减少服务器-客户端之间反复的TCP数据库包,从而大大提高了执行批量命令的功能。""" def createpipe(self):""" pipeline(transaction=False)#默认的情况下,管道里执行的命令可以保证执行的原子性,执行pipe= r.pipeline(transaction=False)可以禁用这一特性。 pipe= r.pipeline()#创建一个管道 pipe.set(‘name‘,‘jack‘) pipe.set(‘role‘,‘sb‘) pipe.sadd(‘faz‘,‘baz‘) pipe.incr(‘num‘)#如果num不存在则vaule为1,如果存在,则value自增1 pipe.execute() pipe.set(‘hello‘,‘redis‘).sadd(‘faz‘,‘baz‘).incr(‘num‘).execute():return:""" return self.rd.pipeline() def disconectkeys(self): return self.connPoor.disconnect()class QueryAllData(): def __init__(self): self.show_tables= config.mysql_show_tables self.desc_table= config.mysql_desc_table self.query_info= config.mysql_query_info def query_tables_info(self): mhelper= MySQLHelper() tables_list= mhelper.queryTableNums(self.show_tables) tables_attr={} for table in tables_list: rows= mhelper.queryAll(self.query_info%table) tables_attr[table]= rows mhelper.close() return tables_attr def mysql2redis(self,redisobj): mysql_data= self.query_tables_info() for keys, values in mysql_data.items():# keys为表名字 for val in values:# val为每一行的数据 keysorted= sorted(val.keys()) for s in keysorted:#s为数据的key item=‘%s:%s‘%(keys,str(val[‘id‘])) ischange= redisobj.hgetkey(item,s) if ischange!= val[s]: redisobj.hsetkey(item,s,val[s])if __name__=="__main__": queryalldata= QueryAllData() rhelper= RedisHelper() while True: queryalldata.mysql2redis(rhelper)

2、将mysql数据库中的单个库的数据同步到redis数据库中

3、标签:pipeline打印取数据countversion清空memberscores==

4、标签 pipeline打印取数据 count version清空 members cores==

三、php redis做mysql的缓存,怎么异步redis同步到mysql数据库

对于变化频率非常快的数据来说,如果还选择传统的静态缓存方式(Memocached、File System等)展示数据,可能在缓存的存取上会有很大的开销,并不能很好的满足需要,而Redis这样基于内存的NoSQL数据库,就非常适合担任实时数据的容器。

但是往往又有数据可靠性的需求,采用MySQL作为数据存储,不会因为内存问题而引起数据丢失,同时也可以利用关系数据库的特性实现很多功能。

所以就会很自然的想到是否可以采用MySQL作为数据存储引擎,Redis则作为Cache。而这种需求目前还没有看到有特别成熟的解决方案或工具,因此采用Gearman+PHP+MySQL UDF的组合异步实现MySQL到Redis的数据复制。

无论MySQL还是Redis,自身都带有数据同步的机制,比较常用的MySQL的Master/Slave模式,就是由Slave端分析Master的binlog来实现的,这样的数据复制其实还是一个异步过程,只不过当服务器都在同一内网时,异步的延迟几乎可以忽略。

那么理论上也可以用同样方式,分析MySQL的binlog文件并将数据插入Redis。但是这需要对binlog文件以及MySQL有非常深入的理解,同时由于binlog存在Statement/Row/Mixedlevel多种形式,分析binlog实现同步的工作量是非常大的。

因此这里选择了一种开发成本更加低廉的方式,借用已经比较成熟的MySQL UDF,将MySQL数据首先放入Gearman中,然后通过一个自己编写的PHP Gearman Worker,将数据同步到Redis。比分析binlog的方式增加了不少流程,但是实现成本更低,更容易操作。

Gearman是一个支持分布式的任务分发框架。设计简洁,获得了非常广泛的支持。一个典型的Gearman应用包括以下这些部分:

Gearman Job Server:Gearman核心程序,需要编译安装并以守护进程形式运行在后台

Gearman Client:可以理解为任务的收件员,比如在后台执行一个发送邮件的任务,可以在程序中调用一个Gearman Client并传入邮件的信息,然后就可以将执行结果立即展示给用户,而任务本身会慢慢在后台运行。

Gearman Worker:任务的真正执行者,一般需要自己编写具体逻辑并通过守护进程方式运行,Gearman Worker接收到Gearman Client传递的任务内容后,会按顺序处理。

以前曾经介绍过类似的后台任务处理项目Resque。两者的设计其实非常接近,简单可以类比为:

Gearman Job Server:对应Resque的Redis部分

Gearman Client:对应Resque的Queue操作

Gearman Worker:对应Resque的Worker和Job

这里之所以选择Gearman而不是Resque是因为Gearman提供了比较好用的MySQL UDF,工作量更小。

apt-get install gearman gearman-server libgearman-dev

/etc/init.d/gearman-job-server status

PHP的Gearman扩展可以通过pecl直接安装

echo"extension=gearman.so">/etc/php5/conf.d/gearman.ini

但是实测发现ubuntu默认安装的gearman版本过低,直接运行pecl install gearman会报错

configure: error: libgearman version 1.1.0or later required

因此Gearman+ PHP扩展建议通过编译方式安装,这里为了简单说明,选择安装旧版本扩展:

为了更容易理解后文Gearman的运行流程,这里不妨从一个最简单的Gearman实例来说明,比如要进行一个文件处理的操作,首先编写一个Gearman Client并命名为client.php:

$client->doBackground('writeLog','Log content');

echo'文件已经在后台操作';

运行这个文件,相当于模拟用户请求一个Web页面后,将处理结束的信息返回用户:

(echo status; sleep 0.1)| netcat127.0.0.14730

说明已经在Gearman中建立了一个名为writeLog的任务,并且有1个任务在队列等待中。

而上面的4列分别代表当前的Gearman的运行状态:

watch-n 1"(echo status; sleep 0.1)| nc 127.0.0.1 4730"

然后我们需要编写一个Gearman Worker命名为worker.php:

$worker->addFunction('writeLog','writeLog');while($worker->work());function writeLog($job){

$log=$job->workload();file_put_contents(__DIR__.'/gearman.log',$log."\n", FILE_APPEND| LOCK_EX);}

Worker使用一个while死循环实现守护进程,运行

同时查看同目录下gearman.log,内容应为从Client传入的值Log content。

通过MySQL UDF+ Trigger同步数据到Gearman

MySQL要实现与外部程序互通的最好方式还是通过MySQL UDF(MySQL user defined functions)来实现。为了让MySQL能将数据传入Gearman,这里使用了lib_mysqludf_json和gearman-mysql-udf的组合。

使用lib_mysqludf_json的原因是因为Gearman只接受字符串作为入口参数,可以通过lib_mysqludf_json将MySQL中的数据编码为JSON字符串

apt-get install libmysqlclient-dev

wget

gcc$(mysql_config--cflags)-shared-fPIC-o lib_mysqludf_json.so lib_mysqludf_json.c

可以看到重新编译生成了 lib_mysqludf_json.so文件,此时需要查看MySQL的插件安装路径:

mysql-u root-pPASSWORD--execute="show variables like'%plugin%';"+---------------+------------------------+|Variable_name|Value|+---------------+------------------------+| plugin_dir|/usr/lib/mysql/plugin/|+---------------+------------------------+

然后将 lib_mysqludf_json.so文件复制到对应位置:

cp lib_mysqludf_json.so/usr/lib/mysql/plugin/

最后登入MySQL运行语句注册UDF函数:

CREATE FUNCTION json_object RETURNS STRING SONAME'lib_mysqludf_json.so';

apt-get install libgearman-dev

wget

tar-xzf gearman-mysql-udf-0.6.tar.gz

cd gearman-mysql-udf-0.6./configure--with-mysql=/usr/bin/mysql_config

-libdir=/usr/lib/mysql/plugin/

登入MySQL运行语句注册UDF函数:

CREATE FUNCTION gman_do_background RETURNS STRING SONAME'libgearman_mysql_udf.so';

CREATE FUNCTION gman_servers_set RETURNS STRING SONAME'libgearman_mysql_udf.so';

SELECT gman_servers_set('127.0.0.1:4730');

最终同步哪些数据,同步的条件,还是需要根据实际情况决定,比如将数据表data的数据在每次更新时同步,那么编写Trigger如下:

CREATE TRIGGER datatoredis AFTER UPDATE ON data

SET@ret=gman_do_background('syncToRedis', json_object(NEW.id as`id`, NEW.volume as`volume`));END$$

尝试在数据库中更新一条数据查看Gearman是否生效。

Gearman PHP Worker将MySQL数据异步复制到Redis

Redis作为时下当热的NoSQL缓存解决方案无需过多介绍,其安装及使用也非常简单:

echo"extension=redis.so">/etc/php5/conf.d/redis.ini

然后编写一个Gearman Worker:redis_worker.php

$worker->addFunction('syncToRedis','syncToRedis');

$redis->connect('127.0.0.1',6379);while($worker->work());function syncToRedis($job){global$redis;

$workString=$job->workload();

$work= json_decode($workString);if(!isset($work->id)){returnfalse;}

$redis->set($work->id,$workString);}

nohup php redis_worker.php&

通过这种方式将MySQL数据复制到Redis,经测试单Worker基本可以瞬时完成。