教程
信息差
资源
软件工具
技术笔记
AIGC
视频
Search
1
使用AI实现高精度钢琴曲转谱Piano Transcription简明使用教程
37,794 阅读
2
使用ESP8266Wifi模块制作Wifi杀手
37,467 阅读
3
unravel 让图片唱歌详细教程 Real Time Image Animation 项目
27,386 阅读
4
佳能相机刷Magic Lantern魔灯固件
23,501 阅读
5
战地3 正版账号免费分享!
16,213 阅读
教程
信息差
资源
软件工具
技术笔记
AIGC
视频
Search
标签搜索
python
前端
环境搭建
空镜素材
Ubuntu
markdown
神器
黑苹果
编码
技巧
Git
数据库
开发
下载工具
Youtube
CDN
PDF
OST
电影原声带
音乐
易小灯塔
累计撰写
176
篇文章
累计收到
44
条评论
首页
栏目
教程
信息差
资源
软件工具
技术笔记
AIGC
视频
页面
搜索到
127
篇与
的结果
2021-06-21
Django项目使用Elasticsearch全文搜索引擎实现搜索优化
Django项目使用Elasticsearch全文搜索引擎实现搜索优化 简介由于项目需要用到模糊搜索, Mysql模糊查询正常情况下在数据量小的时候,速度还是可以的,数据表数据量大的时候就会查询时效率低下, 对此, 使用全文搜索引擎就是一个很好的解决方法.Elasticsearch 是一个分布式、高扩展、高实时的搜索与数据分析引擎。它能很方便的使大量数据具有搜索、分析和探索的能力。在 Django 项目中使用Elasticsearch有很多现成的库, 比如elasticsearch,django-elasticsearch-dsl, django-haystack , drf_haystack 等。Haystack 是在 Django 中对接搜索引擎的框架,搭建了用户和搜索引擎之间的沟通桥梁。 我们在 Django 中可以通过使用 Haystack 来调用 Elasticsearch 搜索引擎。 Haystack 可以在不修改代码的情况下使用不同的搜索后端(比如 Elasticsearch、Whoosh、Solr等等)。 需要注意的是Haystack只支持es6以下的版本准备 下载elasticsearch 2.4.6版本, 建议使用docker方式安装, 注意版本号下载kibana 4.6.4版本,用于管理下载对应版本的中文分词插件elasticsearch-analysis-ik-1.10.6.zip, 在es安装好下载的插件下载地址https://www.elastic.co/cn/elasticsearch/https://www.elastic.co/cn/downloads/kibanahttps://github.com/medcl/elasticsearch-analysis-ik/releases?page=13 docker镜像市场: http://hub.daocloud.io/Docker安装在服务器上新建配置文件docker-compose.ymlversion: "2" services: elasticsearch: image: daocloud.io/library/elasticsearch:2.6.4 restart: always container_name: elasticserach ports: - "9200:9200" kibana: image: daocloud.io/library/kibana:4.6.4 restart: always container_name: kibana ports: - "5601:5601" environment: - elasticsearch_url=http://127.0.0.1:9200 depends_on: - elasticsearch 输入命令安装docker-compose up -d 安装分词插件docker ps # 查看当前运行进程 docker exec -it es镜像id bash cd bin ./elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v2.6.4/elasticsearch-analysis-ik-1.10.6.zip 重启esdocker restart es镜像id 安装好Python需要的库pip install django-haystack pip install elasticsearch==2.4.1 # 注意版本2.4.1 pip install drf_haystack drf_haystack 文档https://drf-haystack.readthedocs.io/en/latest/index.html 示例1.配置再设置里配置添加appINSTALLED_APPS = [ 'haystack', ... ] 配置链接HAYSTACK_CONNECTIONS = { 'default': { 'ENGINE': 'haystack.backends.elasticsearch_backend.ElasticsearchSearchEngine', 'URL': 'http://127.0.0.1:9200/', # 此处为elasticsearch运行的服务器ip地址和端口 'INDEX_NAME': 'gameprop', # 指定elasticserach建立的索引库名称 }, } # 搜索结果每页显示数量 HAYSTACK_SEARCH_RESULTS_PER_PAGE = 5 # 实时更新index HAYSTACK_SIGNAL_PROCESSOR = 'haystack.signals.RealtimeSignalProcessor' 2.创建haystack数据模型在app名/模块名目录下创建search_indexes.py文件,注意文件名必须使用search_indexes.py,代码如下:from haystack import indexes from Headlines.models import Article class ArticleIndex(indexes.SearchIndex, indexes.Indexable): """文章索引模型类""" text = indexes.CharField(document=True, use_template=True) id = indexes.IntegerField(model_attr='id') createtime = indexes.DateTimeField(model_attr='createtime') content = indexes.CharField(model_attr='content') title = indexes.CharField(model_attr='title') def get_model(self): """返回建立索引的模型类""" return Article def index_queryset(self, using=None): """返回要建立索引的数据查询集""" return self.get_model().objects.all() 3.创建 text 字段索引值模板文件创建 text 字段索引值模板文件templates/search/indexes/APP名/模块名_text.txt{{ object.id }} {{ object.title }} {{ object.channel }} {{ object.labels }} {{ object.content }} 4.手动生成初始索引在命令行中添加如下命令, 来手动生成索引表:python manage.py rebuild_index 5.编辑视图文件views.pyfrom drf_haystack.viewsets import HaystackViewSet # 搜索文章 class SearchArticleViewSet(HaystackViewSet): # GET /Headlines/search/?text=<搜索关键字> # 这里可以写多个模型,相应的:serializer里也可以写多个index_classes index_models = [Article] serializer_class = SearchArticleIndexSerializer 6.编辑序列化器serializers.pyfrom drf_haystack import serializers as HSER # 搜索文章 class SearchArticleIndexSerializer(HSER.HaystackSerializer): class Meta: index_classes = [ArticleIndex] # 索引类的名称,可以有多个 fields = ('text', 'content', 'id', 'title', 'createtime') 7.编辑路由urls.py# 搜索文章路由 from rest_framework.routers import DefaultRouter from Headlines.views import SearchArticleViewSet router = DefaultRouter() router.register(r'Headlines/search', SearchArticleViewSet, base_name='search') urlpatterns += router.urls
2021年06月21日
2,177 阅读
2 评论
5 点赞
2021-03-08
自动化运维 Python Netmiko库的使用
Netmiko用于简化paramiko与网络设备之间的ssh连接,可在windows与Unix平台使用 , Netmiko 脚本比 Paramiko 脚本短很多安装pip install netmiko 实例执行单条命令from netmiko import ConnectHandler cisco = { 'device_type':'cisco_ios', 'host':'ip地址', 'username':'用户名', 'password':'密码' } net_connect = ConnectHandler(**cisco) ##或者 # net_connect = ConnectHandler(device_type='cisco_ios',host='IP地址',username='用户名',password='密码') #找到目前所在视图 current_view = net_connect.find_prompt() print(current_view) #执行命令,返回结果为字符串,赋值给output output = net_connect.send_command('show ip int brief') print(output) # #此为在windows里如果\n不能显示回车,则进行如下语句格式化 # o_list = output.split("\n") # for line in o_list: # print(line) 执行配置命令:手动关闭接口G1/0/29from netmiko import ConnectHandler cisco = { 'device_type':'cisco_ios', 'host':'ip地址', 'username':'用户名', 'password':'密码' } net_connect = ConnectHandler(**cisco) ##或者 # net_connect = ConnectHandler(device_type='cisco_ios',host='IP地址',username='用户名',password='密码') #要配置的命令 config_commands = ['interface GigabitEthernet1/0/29','shutdown'] #提交要配置的命令,input为提交的真实内容 input = net_connect.send_config_set(config_commands) #验证shutdown是否执行成功 output = net_connect.send_command('show run inter gi1/0/29') print(output) # #此为在windows里如果\n不能显示回车,则进行如下语句格式化 # o_list = output.split("\n") # for line in o_list: # print(line) 执行多条命令from netmiko import ConnectHandler as ch # 通过字典方式定义设备登录信息 host = { 'device_type': 'hp_comware', 'host': '192.168.56.20', 'username': 'netdevops', 'password': 'netdevops', 'port': 22, 'secret': '', # enable密码,没有可以不写这行 } # 连接设备 conn = ch(**host) # 定义一个命令列表,比如为G0/1配置一个IP地址 commands = ['int g0/1', 'ip add 1.1.1.1 30', 'desc netmiko_config'] # 这个时候可以使用 send_config_set 方法执行多条命令 output = conn.send_config_set(commands) print(output) 常用方法net_connect.send_command() # 向下发送命令,返回输出(基于模式)net_connect.send_command_timing() # 沿通道发送命令,返回输出(基于时序)net_connect.send_config_set() # 将配置命令发送到远程设备net_connect.send_config_from_file() # 发送从文件加载的配置命令net_connect.save_config() # 将running#config保存到startup#confignet_connect.enable() # 输入启用模式net_connect.find_prompt() # 返回当前路由器提示符net_connect.commit() # 在Juniper和IOS#XR上执行提交操作net_connect.disconnect() # 关闭连接net_connect.write_channel() # 通道的低级写入net_connect.read_channel() # 通道的低级写入
2021年03月08日
2,136 阅读
0 评论
2 点赞
2021-03-08
自动化运维 Python Paramiko库的使用
Python自动化运维 Paramiko库的使用 简介Paramiko是用Python语言写的一个库,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接。paramiko支持Linux, Solaris, BSD, MacOS X, Windows等平台通过SSH从一个平台连接到另外一个平台。利用该模块,可以方便的进行ssh连接和sftp协议进行sftp文件传输paramiko包含两个核心组件:SSHClient和SFTPClient。 SSHClient的作用类似于Linux的ssh命令,是对SSH会话的封装,该类封装了传输(Transport),通道(Channel)及SFTPClient建立的方法(open_sftp),通常用于执行远程命令。 SFTPClient的作用类似与Linux的sftp命令,是对SFTP客户端的封装,用以实现远程文件操作,如文件上传、下载、修改文件权限等操作。 这两个组件实现了常见ssh客户端的一些功能, 通过Paramiko编写脚本任务, 就可以实现自动化运维 使用在命令行下输入下面命令即可安装pip install paramiko 密码远程连接单个连接import paramiko ##1.创建一个ssh对象 client = paramiko.SSHClient() #2.解决问题:如果之前没有,连接过的ip,会出现选择yes或者no的操作, ##自动选择yes client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) #3.连接服务器 client.connect(hostname='172.25.254.31', port=22, username='root', password='westos') #4.执行操作 stdin,stdout, stderr = client.exec_command('hostname') #5.获取命令执行的结果 result=stdout.read().decode('utf-8') print(result) #6.关闭连接 client.close() 批量远程密码连接from paramiko.ssh_exception import NoValidConnectionsError from paramiko.ssh_exception import AuthenticationException def connect(cmd,hostname,port=22,username='root',passwd='westos'): import paramiko ##1.创建一个ssh对象 client = paramiko.SSHClient() #2.解决问题:如果之前没有,连接过的ip,会出现选择yes或者no的操作, ##自动选择yes client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) #3.连接服务器 try: client.connect(hostnamehostname=hostname, portport=port, usernameusername=username, password=passwd) print('正在连接主机%s......'%(hostname)) except NoValidConnectionsError as e: ###用户不存在时的报错 print("连接失败") except AuthenticationException as t: ##密码错误的报错 print('密码错误') else: #4.执行操作 stdin,stdout, stderr = client.exec_command(cmd) #5.获取命令执行的结果 result=stdout.read().decode('utf-8') print(result) #6.关闭连接 finally: client.close() with open('ip.txt') as f: #ip.txt为本地局域网内的一些用户信息 for line in f: lineline = line.strip() ##去掉换行符 hostname,port,username,passwd= line.split(':') print(hostname.center(50,'*')) connect('uname', hostname, port,username,passwd) 基于公钥密钥连接import paramiko from paramiko.ssh_exception import NoValidConnectionsError, AuthenticationException def connect(cmd, hostname, port=22, user='root'): client = paramiko.SSHClient() private_key = paramiko.RSAKey.from_private_key_file('id_rsa') ###id_rsa为本地局域网密钥文件 client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: client.connect(hostnamehostname=hostname, portport=port, userusername=user, pkey=private_key ) stdin, stdout, stderr = client.exec_command(cmd) except NoValidConnectionsError as e: print("连接失败") except AuthenticationException as e: print("密码错误") else: result = stdout.read().decode('utf-8') print(result) finally: client.close() for count in range(254): host = '172.25.254.%s' %(count+1) print(host.center(50, '*')) connect('uname', host) sftp文件操作上传文件import paramiko #获取Transport实例 tran = paramiko.Transport("172.25.254.31",22) #连接SSH服务端 tran.connect(username = "root", password = "westos") #获取SFTP实例 sftp = paramiko.SFTPClient.from_transport(tran) #设置上传的本地/远程文件路径 localpath="passwd.html" ##本地文件路径 remotepath="/home/kiosk/Desktop/fish" ##上传对象保存的文件路径 #执行上传动作 sftp.put(localpath,remotepath) tran.close() 下载文件import paramiko #获取SSHClient实例 client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) #连接SSH服务端 client.connect("172.25.254.31",username="root",password="westos") #获取Transport实例 tran = client.get_transport() #获取SFTP实例 sftp = paramiko.SFTPClient.from_transport(tran) remotepath='/home/kiosk/Desktop/fish' localpath='/home/kiosk/Desktop/fish' sftp.get(remotepath, localpath) client.close() 密钥的上传和下载import paramiko private_key = paramiko.RSAKey.from_private_key_file('id_rsa') tran = paramiko.Transport('172.25.254.31',22) tran.connect(username='root',password='westos') #获取SFTP实例 sftp = paramiko.SFTPClient.from_transport(tran) remotepath='/home/kiosk/Desktop/fish8' localpath='/home/kiosk/Desktop/fish1' sftp.put(localpath,remotepath) sftp.get(remotepath, localpath) paramiko的再封装import os import paramiko from paramiko.ssh_exception import NoValidConnectionsError, AuthenticationException, SSHException class SshRemoteHost(object): def __init__(self, hostname, port, user, passwd, cmd): self.hostname = hostname self.port = port self.user = user self.passwd = passwd self.cmd = cmd def run(self): """默认调用的内容""" # cmd hostname # put # get cmd_str = self.cmd.split()[0] # cmd # 类的反射,判断类里面是否可以支持该操作 if hasattr(self, 'do_'+ cmd_str): # do_cmd getattr(self, 'do_'+cmd_str)() else: print("目前不支持该功能") def do_cmd(self): client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: client.connect(hostname=self.hostname, port=self.port, username=self.user, password=self.passwd) print("正在连接%s......." % (self.hostname)) except NoValidConnectionsError as e: print("连接失败") except AuthenticationException as e: print("密码错误") else: # 4. 执行操作 cmd = ''.join(self.cmd.split()[1:]) ##将输入的后面的取出,作为 stdin, stdout, stderr = client.exec_command(cmd) # 5.获取命令执行的结果 result = stdout.read().decode('utf-8') print(result) finally: # 6.关闭连接 client.close() def do_put(self): ###put /tmp/passwd ###将本地的/tmp/passwd上传到远端/tmp/passwd print('正在上传...') try: #获取Transport实例 tran = paramiko.Transport(self.hostname,int(self.port)) ##由于端口为整形,而我们用split方法得到的是str #连接SSH服务端 tran.connect(username = self.user, password = self.passwd) except SSHException as e: print('连接失败') else: #获取SFTP实例 sftp = paramiko.SFTPClient.from_transport(tran) newCmd = self.cmd.split()[1:] if len(newCmd) == 2: #设置上传的本地/远程文件路径 localpath=newCmd[0] remotepath=newCmd[1] #执行上传动作 sftp.put(localpath,remotepath) print('%s文件上传到%s主机的%s文件成功' %(localpath,self.hostname,remotepath)) else: print('上传文件信息错误') tran.close() def do_get(self): print('正在下载...') try: # 获取Transport实例 tran = paramiko.Transport(self.hostname, int(self.port)) ##由于端口为整形,而我们用split方法得到的是str # 连接SSH服务端 tran.connect(username=self.user, password=self.passwd) except SSHException as e: print('连接失败') else: # 获取SFTP实例 sftp = paramiko.SFTPClient.from_transport(tran) newCmd = self.cmd.split()[1:] if len(newCmd) == 2: # 设置下载的本地/远程文件路径 localpath = newCmd[1] remotepath = newCmd[0] # 执行上传动作 sftp.get( remotepath,localpath) print('%s主机的%s文件下载到%s文件成功' % (self.hostname,remotepath,localpath)) else: print('上传文件信息错误') tran.close() import paramiko import os # 1.选择操作的主机组:eg:mysql,web,ftp groups=[file.rstrip('.conf') for file in os.listdir('conf')] print("主机组显示:".center(50,'*')) for group in groups: print('\t',group) choiceGroup = input("选择批量操作的主机组(eg:mysql):") ##2.根据选择的主机组,显示包含的主机IP/主机名 # 1).打开文件conf/choiceGroup.conf # 2).依次读取文件每一行 # 3).只拿出 print("主机组包含的主机:".center(50,'*')) with open('conf/%s.conf' %(choiceGroup)) as f: for line in f: print(line.split(':')[0]) f.seek(0,0) ##把指针移动到文件最开始 hostinfos = [line.strip() for line in f.readlines()] ###3.让用户确认信息,选择需要批量执行的命令; ## -cmd shell 命令 ## -put 本地文件 远程文件 ## -get 远程文件 本地文件 print("批量执行脚本".center(50,"*")) while True: cmd = input('>>:').strip() if cmd : if cmd == 'exit' or cmd == "quit": print("执行完毕,正在退出") break for info in hostinfos: host,port,user,passwd = info.split(':') clientObj = SshRemoteHost(host,port,user,passwd,cmd) clientObj.run()
2021年03月08日
3,421 阅读
0 评论
0 点赞
2021-02-05
为什么MySQL单表不不建议超过2000W?
经常听说单表不能存太多数据, 如果单表数据太多,性能就会下降得比较厉害本就探索精神, 去搜索了下由来.对于MySQL数据库单表最大行数的说法,常见的限制是约为2千万行(或2000万行),但这个数字并不是MySQL官方的硬性限制。实际上,MySQL的表的最大行数和大小受到多个因素的影响,包括硬件资源、操作系统限制、配置设置以及数据类型和索引的使用等。这个2kw是个建议值,我们要来看下这个2kw是怎么来的单表行数限制单表行数理论最大值是多少。我们设计表的时候, 往往需要设置一个主键, 常见的id就是主键。如果主键声明为int大小,也就是32位,那么能支持2^32-1,也就是21个亿左右。如果是bigint,那就是2^64-1,但这个数字太大,一般还没到这个限制之前,磁盘先受不了。如果我把主键声明为 tinyint,一个字节,8位,最大2^8-1,也就是255。如果我想插入一个id=256的数据,那就会报错。也就是说,tinyint主键限制表内最多255条数据。结论: 主键本身唯一,也就是说主键的大小可以限制表的上限。表的索引索引内部是用的B+树,最大行数2kw的由来是基于B+树索引结构和页的大小进行估算的。下面是重新整理的说明:假设使用B+树作为索引结构,并且每个页的大小为15KB。在B+树中,叶子节点存储实际的行数据,每条行数据大小假设为1KB。一个页可以容纳的行数为y=15。我们可以使用公式 (x ^ (z-1)) * y 来计算行总数,其中x为每个非叶子节点的分支因子(即每个非叶子节点的子节点数),z为B+树的层级数。假设B+树是两层,即z=2,我们需要确定分支因子x的值。假设每个页的大小是15KB,由于叶子节点和非叶子节点的数据结构相同,我们假设剩下的15KB可以用于非叶子节点。因此,x可以计算为15KB除以每个非叶子节点大小的估计值。假设每个非叶子节点大小与叶子节点大小相同,即1KB。则x=15KB/1KB=15。已知x=15,y=15,z=2,代入公式 (x ^ (z-1)) y,可以计算出行总数为 (15 ^ (2-1)) 15 = 2万行。因此,这个2万行是指在B+树索引结构下,每个表的建议最大行数。在两层B+树的结构中,通过合理的分支因子和页大小设置,建议的最大行数为2万行。超过这个行数,可能会影响查询性能和磁盘IO次数。参考资料https://juejin.cn/post/7116381265300815903
2021年02月05日
583 阅读
0 评论
0 点赞
2021-01-03
分布式唯一ID生成规则SnowFlake雪花算法Python实现
分布式唯一ID生成规则SnowFlake雪花算法Python实现 简介在复杂分布式系统中,往往需要对大量的数据和消息进行唯一标识。在好多系统随着数据的日渐增长,对数据分库分表后需要有一个唯一ID来标识一条数据或消息,数据库的自增ID显然不能满足需求。此时一个能够生成全局唯一ID的系统是非常必要的。雪花算法和美团Leaf都是用于分布式系统中生成唯一ID的工具 雪花算法: SnowFlake是Twitter公司采用的一种算法,目的是在分布式系统中产生全局唯一且趋势递增的ID。美团Leaf: Leaf是美团基础研发平台推出的一个分布式ID生成服务 当单表数据量急剧上升后,表的查询性能会逐渐下降,会涉及到分库分表操作,如何确保数据均匀分布,可以通过全局唯一的ID。全局唯一的ID生成规则要求: 全局唯一性 有序递增 高可用 时间上的特性(例如互联网订单,从订单号可以看出具体时间的信息) 雪花算法Python实现 1.第一位 占用1bit,其值始终是0,没有实际作用。2.时间戳 占用41bit,精确到毫秒,总共可以容纳约69年的时间。3.工作机器id 占用10bit,其中高位5bit是数据中心ID,低位5bit是工作节点ID,做多可以容纳1024个节点。4.序列号 占用12bit,每个节点每毫秒0开始不断累加,最多可以累加到4095,一共可以产生4096个ID。SnowFlake算法在同一毫秒内最多可以生成多少个全局唯一ID呢:: 同一毫秒的ID数量 = 1024 X 4096 = 4194304 import time class InvalidSystemClock(Exception): """ 时钟回拨异常 """ pass # 64位ID的划分 WORKER_ID_BITS = 5 DATACENTER_ID_BITS = 5 SEQUENCE_BITS = 12 # 最大取值计算 MAX_WORKER_ID = -1 ^ (-1 << WORKER_ID_BITS) # 2**5-1 0b11111 MAX_DATACENTER_ID = -1 ^ (-1 << DATACENTER_ID_BITS) # 移位偏移计算 WOKER_ID_SHIFT = SEQUENCE_BITS DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS # 序号循环掩码 SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS) # 开始时间截 (2015-01-01) TWEPOCH = 1420041600000 class IdWorker(object): """ 用于生成IDs """ def __init__(self, datacenter_id, worker_id, sequence=0): """ 初始化 :param datacenter_id: 数据中心(机器区域)ID :param worker_id: 机器ID :param sequence: 其实序号 """ # sanity check if worker_id > MAX_WORKER_ID or worker_id < 0: raise ValueError('worker_id值越界') if datacenter_id > MAX_DATACENTER_ID or datacenter_id < 0: raise ValueError('datacenter_id值越界') self.worker_id = worker_id self.datacenter_id = datacenter_id self.sequence = sequence self.last_timestamp = -1 # 上次计算的时间戳 def _gen_timestamp(self): """ 生成整数时间戳 :return:int timestamp """ return int(time.time() * 1000) def get_id(self): """ 获取新ID :return: """ timestamp = self._gen_timestamp() # 时钟回拨 if timestamp < self.last_timestamp: raise InvalidSystemClock if timestamp == self.last_timestamp: self.sequence = (self.sequence + 1) & SEQUENCE_MASK if self.sequence == 0: timestamp = self._til_next_millis(self.last_timestamp) else: self.sequence = 0 self.last_timestamp = timestamp new_id = ((timestamp - TWEPOCH) << TIMESTAMP_LEFT_SHIFT) | (self.datacenter_id << DATACENTER_ID_SHIFT) | \ (self.worker_id << WOKER_ID_SHIFT) | self.sequence return new_id def _til_next_millis(self, last_timestamp): """ 等到下一毫秒 """ timestamp = self._gen_timestamp() while timestamp <= last_timestamp: timestamp = self._gen_timestamp() return timestamp if __name__ == '__main__': worker = IdWorker(0, 0) print(worker.get_id())
2021年01月03日
1,458 阅读
0 评论
0 点赞
2020-12-28
Ant Design Pro 环境搭建
简介ant design蚂蚁金服基于react打造的一个服务于企业级产品的UI框架。ant design pro 就是基于Ant Design这个框架搭建的中后台管理控制台的脚手架官网 https://pro.ant.design/文档 https://pro.ant.design/docs/getting-started-cnhttps://gitee.com/ant-design/ant-design-pro环境安装Nodejshttp://nodejs.cn/npm修改配置$ npm config set prefix"D:\Devs\nodejs\node_modules\npm\node_global_modules" $ npm config set cache"D:\Devs\nodejs\node_modules\npm\node_cache"修改npm源为淘宝源$ npm config set registry https://registry.npm.taobao.org切换回官方源$ npm config set registry http://www.npmjs.orgcnpm$ npm install -g cnpm --registry=https://registry.npm.taobao.orgcreate-react-app 项目构建工具$ cnpm install -g create-react-appUmi 乌米 可插拔的企业级 react 应用框架$ npm install -g umicreate-umi 项目构建工具$ npm install -g create-umiyarn 比npm更好用的包管理$ npm install -g yarntyarn 国内源版本的tyarn$ npm install yarn tyarn -g开发工具React Developer Tools谷歌浏览器插件使用代理进入chrome插件商店安装Umi UI 可视化项目的本地研发工作台项目管理。集中式管理本地项目。配置管理。umi / bigfish 常用项目配置。任务管理。集成启动、构建、测试、代码规范检查、重新安装依赖等常用操作。资产、区块管理在项目中执行$ yarn add @umijs/preset-ui -D $ umi dev项目创建react项目使用create-react-app创建react项目$ create-react-app my-app启动项目$ cd my-app $ npm start编译生产版本$ npm run buildAnt Design 组件库$ yarn create react-app 项目 # 进入到项目目录, 引入 antd $ yarn add antd # 初始化环境 $ yanr # 运行项目 $ yarn startAnd Design Pro V4创建项目$ yarn create umi antd_pro $ # 或者 $ npm create umi antd_pro安装Umi UI 可视化项目的本地研发工作台$ yarn add @umijs/preset-ui -D # 或者 $ npm install --save @umijs/preset-ui进入到项目目录antd_pro , 初始化环境$ npm install $ # 或者 $ yarn运行项目$ npm start $ # 或者 $ yarn start运行Umi UI 管理工作台$ umi dev
2020年12月28日
1,085 阅读
0 评论
0 点赞
2020-12-22
MySQL超大分页优化
MySQL超大分页优化 在数据库开发过程中我们经常会使用分页,核心技术是使用用limit start, count分页语句进行数据的读取MySQL分页起点越大查询速度越慢 直接用limit start, count分页语句,表示从第start条记录开始选择count条记录 :select * from product limit start, count 当起始页较小时,查询没有性能问题,我们分别看下从10, 1000, 10000, 100000开始分页的执行时间(每页取20条)。select * from product limit 10, 20 0.002秒 select * from product limit 1000, 20 0.011秒 select * from product limit 10000, 20 0.027秒 select * from product limit 100000, 20 0.057秒 我们已经看出随着起始记录的增加,时间也随着增大, 这说明分页语句limit跟起始页码是有很大关系的,那么我们把起始记录改为100w看下:select * from product limit 1000000, 20 0.682秒 我们惊讶的发现MySQL在数据量大的情况下分页起点越大查询速度越慢,300万条起的查询速度已经需要1.368秒钟。这是为什么呢?因为limit 3000000,10的语法实际上是mysql扫描到前3000020条数据,之后丢弃前面的3000000行,这个步骤其实是浪费掉的。select * from product limit 3000000, 20 1.368秒 从中我们也能总结出两件事情: limit语句的查询时间与起始记录的位置成正比 mysql的limit语句是很方便,但是对记录很多的表并不适合直接使用。 二、 limit大分页问题的性能优化方法 (1)利用表的覆盖索引来加速分页查询MySQL的查询完全命中索引的时候,称为覆盖索引,是非常快的。因为查询只需要在索引上进行查找,之后可以直接返回,而不用再回表拿数据。在我们的例子中,我们知道id字段是主键,自然就包含了默认的主键索引。现在让我们看看利用覆盖索引的查询效果如何。select id from product limit 1000000, 20 0.2秒 那么如果我们也要查询所有列,如何优化?优化的关键是要做到让MySQL每次只扫描20条记录,我们可以使用limit n,这样性能就没有问题,因为MySQL只扫描n行。我们可以先通过子查询先获取起始记录的id,然后根据Id拿数据:select * from vote_record where id>=(select id from vote_record limit 1000000,1) limit 20; (2)用上次分页的最大id优化先找到上次分页的最大ID,然后利用id上的索引来查询,类似于:select * from user where id>1000000 limit 100
2020年12月22日
663 阅读
0 评论
0 点赞
2020-10-12
Python处理视频
简介Python处理视频主要有3类模块:opencv-python,由于视频本质上就是连续的图像,所以图像处理模块也能处理视频中的每一帧图像。最后对视频的编码和读写会依赖ffmpeg完成。ffmpeg-python,这类模块是对ffmpeg的命令包装,相当于用Python调用ffmepg的命令。moviepy,提供了便捷的视频处理接口,文件编码和读写也依赖ffmpeg。其中,moviepy使用门槛低,足够应付最常见的需求,如截取、拼接、简单转场和特效等。模块安装:pip install moviepy它的基本工作原理可以概括为:基于ffmpeg读写视频文件。基于numpy、scipy、opencv、PIL处理内部图像数据。两大核心类:AudioClip、VideoClip分别处理音频和视频。如果要在视频中增加图形或文字,需要提前安装ImageMagick软件。ImageMagick的安装在Mac上稍微复杂些,因为它基于X11框架。分两步安装:安装XQuartz:即X11框架的MacOS版实现。Homebrew安装软件:brew install imagemagick本文将以moviepy为主介绍视频处理,图像特效等部分会兼用opencv和skimage等模块。视频处理的常见场景包括:分段截取:剪掉前几秒或后几秒,或取中间某段素材提取:音频提取,视频截图清晰调整:帧率、分辨率倍速播放:加速、减速格式转换:视频编码选择、GIF转换视频拼接:如添加片头、添加片尾视频剪裁:裁剪某个区域内容水印处理:加文字水印、加图片水印、加动画水印视频特效:镜像、滤镜、过长切换、遮照字幕处理:提取字幕,添加字幕智能处理:人脸追踪、马赛克、换脸下面分成4个部分介绍:基本使用、拼接裁剪、效果水印、智能处理。基本使用视频的基本处理包括:文件读写、分段截取、音量调整、素材提取、清晰度参数、倍速播放、格式转换。import pathlib from moviepy.editor import VideoFileClip path = list(pathlib.Path.cwd().parents)[1].joinpath('data/automate/008video') mp4_path = path.joinpath('input.mp4') vout_path = path.joinpath('008video_basic_video.mp4') vout15_path = path.joinpath('008video_basic_video_fps15.mp4') vout_scale_path = path.joinpath('008video_basic_video_scale.mp4') vout_speed2x_path = path.joinpath('008video_basic_video_speed2x.mp4') vout_speed05x_path = path.joinpath('008video_basic_video_speed05x.mp4') vout_webm_path = path.joinpath('008video_basic_video_format.webm') vout_gif_path = path.joinpath('008video_basic_video_gif.gif') aout_path = path.joinpath('008video_basic_audio.mp3') img_path = path.joinpath('008video_basic_images') clip = VideoFileClip(str(mp4_path)) # 获取基本信息:时长、 print('基本信息:') print(clip.duration, clip.size, clip.fps) # 截取前50秒视频 clip = clip.subclip(0, 50) # 提取音频素材 audio = clip.audio audio.write_audiofile(str(aout_path)) # 视频截图 ts = [5, 10, 20, 30, 40, 50] # 单位:秒 for t in ts: clip.save_frame(str(img_path.joinpath(f'{t}.png')), t=t) # 调低音量 clip.volumex(0.6) # 保存文件,audio_codec指定音频编码,默认视频编码为libx264 clip.write_videofile(str(vout_path), audio_codec='aac') # 清晰度参数:帧率、分辨率 clip_fps15 = clip.set_fps(15) # 调整帧率,并不会减少多少文件大小 # 如果不指定audio,就会生成一个临时音频文件 clip_fps15.write_videofile(str(vout15_path), audio_codec='aac') # 调整分辨率,可以很明显降低文件大小 # clip_scale = clip.resize((clip.w//2, clip.h//2)) clip_scale = clip.resize(0.5) # 等比缩放0.5 clip_scale.write_videofile(str(vout_scale_path), audio_codec='aac') # 倍速播放 clip_sp2x = clip.speedx(2) clip_sp2x.write_videofile(str(vout_speed2x_path), audio_codec='aac') clip_sp05x = clip.speedx(0.5) clip_sp05x.write_videofile(str(vout_speed05x_path), audio_codec='aac') # 格式转换,根据后缀选择编码器 clip.write_videofile(str(vout_webm_path), audio=True) # 转GIF图 clip.subclip(0,10).set_fps(1).write_gif(str(vout_gif_path))视频拼接和裁剪视频拼接是指在时间维度上,把多个视频段连起来,常见如每个视频的片头片尾。视频裁剪是指在屏幕上划出一个区域当成新的视频。import pathlib from moviepy.editor import VideoFileClip, TextClip from moviepy.editor import vfx from moviepy.editor import CompositeVideoClip, concatenate_videoclips from moviepy.video.tools.drawing import circle from moviepy.video.tools.credits import credits1 path = list(pathlib.Path.cwd().parents)[1].joinpath('data/automate/008video') mp4s_path = path.joinpath('008video_concat') vout_final_path = path.joinpath('008video_concat_final.mp4') vout_cropped_path = path.joinpath('008video_concat_cropped.mp4') font_path = path.joinpath('SourceHanSansCN-Bold.otf') # 设置一个简单片头片尾 the_start = TextClip('英语"不可能"怎么说?\n"No Way"', font=font_path, color='white', fontsize=70).set_duration(2).set_pos('center') the_end = TextClip('By 程一初', font=font_path, color='white', fontsize=70).set_duration(2).set_pos('center') clip_list = [ the_start ] # 把所有文件夹下的视频都读取出来 mp4_list = [ f for f in mp4s_path.iterdir() if f.is_file() ] for mp4 in mp4_list: clip_list.append(VideoFileClip(str(mp4))) clip_list.append(the_end) # 拼接,'compose'表示不管各种视频大小,以最大为基础 final = concatenate_videoclips(clip_list, method='compose') final.write_videofile(str(vout_final_path), audio_codec='aac') # 裁剪,取中间一块 W, H = final.size cropped = final.crop(x_center=W//2, y_center=H//2, width=400, height=300) cropped.write_videofile(str(vout_cropped_path), audio_codec='aac')效果处理和水印对视频中的每一帧图像应用滤镜,就是对视频应用滤镜。滤镜可以是变换色彩风格,也可以是应用遮照。所以视频水印原理与图像一致,可以加文字、图片和动画水印。此外,在视频片段间连接时,可以增加一些淡入淡出的过场效果。moviepy最核心的3个方法:fl_image:处理每一帧图像,比如添加元素、应用遮照。fl_time:处理时间相关特效,比如动态变速。fl:同时处理时间和每一帧图像。在使用时,优先用前两个,有时会加快渲染速度。此外moviepy通过vfx包提供了很多内置特效功能。效果处理import pathlib from PIL import Image, ImageDraw import numpy as np from moviepy.editor import VideoFileClip, ImageClip, TextClip from moviepy.editor import vfx, clips_array, CompositeVideoClip from moviepy.video.tools.drawing import circle path = list(pathlib.Path.cwd().parents)[1].joinpath('data/automate/008video') mp4_path = path.joinpath('input.mp4') vout_path = path.joinpath('008video_effect.mp4') clip = VideoFileClip(str(mp4_path)).subclip(0, 10).margin(10) # 水平镜像,变亮,增加进场效果 clip_x = clip.fx(vfx.mirror_x).fx(vfx.colorx, 2).fx(vfx.fadein, 1.5) # 垂直镜像,变暗 clip_y = clip.fx(vfx.mirror_y).fx(vfx.colorx, 0.5) # 上下左右对称,增加淡入淡出过场效果 clip_yx = clip_y.fx(vfx.mirror_x).fx(vfx.fadein, 1.5).fx(vfx.fadeout, 1.5) # 任意角度 clip_90 = clip.fx(vfx.rotate, angle=90) # 遮照: 用Image画个圆形遮照 img = Image.new('RGB', clip.size, (0,0,0)) draw = ImageDraw.Draw(img) r = min(clip.w, clip.h) x, y = (clip.w-r)/2, (clip.h-r)/2 draw.ellipse((x,y,x+r,y+r), fill=(255,255,255)) mask = ImageClip(np.array(img), ismask=True) clip_mask = CompositeVideoClip([clip.set_mask(mask)]) # 输出整个效果系列 final_clip = clips_array([[clip, clip_x], [clip_y, clip_yx], [clip_90, clip_mask]]) final_clip.write_videofile(str(vout_path), audio_codec='aac')关于动态遮照,目前官方代码moviepy.video.tools.drawing.color_gradient有点小问题。动态遮照的本质,是对每一帧图像应用动态生成的遮照。由于moviepy内部使用numpy.ndarray格式存储数据,我们可以选择opencv、scikit-image来处理动态的遮照图像。这里就以scikit-image来演示,模块安装:pip install scikit-image。scikit-image基本画图方法先看下scikit-image的基本图形绘制方法:线:skimage.draw.line实心圆:skimage.draw.circle空心圆:skimage.draw.circle_perimeter多边形:skimage.draw.polygon椭圆:skimage.draw.ellipse空心椭圆:skimage.draw.ellipse_perimeter贝塞尔曲线:skimage.draw.bezier_curve具体参数官方都有详细解释,就不列了。动态遮照import pathlib from skimage import draw from skimage import img_as_float import cv2 import numpy as np from moviepy.editor import VideoFileClip, TextClip from moviepy.editor import clips_array, CompositeVideoClip path = list(pathlib.Path.cwd().parents)[1].joinpath('data/automate/008video') mp4_path = path.joinpath('input.mp4') vout_path = path.joinpath('008video_effect_dynamic.mp4') clip = VideoFileClip(str(mp4_path)).subclip(0, 5).margin(10) # 开场,圆形打开效果 clip_start = clip.add_mask() # 结束,圆形关闭效果,出现“The End” clip_end = clip.add_mask() w, h = clip.size r = max(h, w)/2 def make_circle_ski_start(t): # 注意w和h,cy和cx的顺序 arr = np.zeros((h,w), np.uint8) rr, cc = draw.circle(clip.h/2, clip.w/2, radius=min(r*2, int(200*t)), shape=arr.shape) arr[rr, cc] = 1 return arr def make_circle_ski_end(t): arr = np.zeros((h,w), np.uint8) rr, cc = draw.circle(clip.h/2, clip.w/2, radius=max(0, int(r-200*t)), shape=arr.shape) arr[rr, cc] = 1 return arr def make_circle_cv2(t): arr = np.zeros((h,w), np.uint8) cv2.circle(arr, (clip.w//2, clip.h//2), max(0, int(r-200*t)), 255, -1) # 如果要用opencv,返回值需要转为[0, 1]范围(也是skimage采用格式) return img_as_float(arr) clip_start.mask.get_frame = make_circle_ski_start clip_end.mask.get_frame = make_circle_ski_end # clip_end.mask.get_frame = make_circle_cv2 txt_end = TextClip('The End', font='Amiri-bold', color='white', fontsize=20).set_duration(clip.duration).set_pos('center') clip_end = CompositeVideoClip([txt_end, clip_end], size=clip.size) final_clip = clips_array([[clip_start, clip_end],]) final_clip.write_videofile(str(vout_path), audio_codec='aac')水印处理视频水印的处理,可以把原视频和水印用CompositeVideoClip方法合并。import pathlib from moviepy.editor import VideoFileClip, ImageClip, TextClip from moviepy.editor import clips_array, CompositeVideoClip path = list(pathlib.Path.cwd().parents)[1].joinpath('data/automate/008video') font_path = path.joinpath('SourceHanSansCN-Bold.otf') mp4_path = path.joinpath('input.mp4') avatar_path = path.joinpath('avatar.jpg') gif_path = path.joinpath('wm.gif') vout_path = path.joinpath('008video_watermark.mp4') clip = VideoFileClip(str(mp4_path)).subclip(0, 10).margin(10) # 文字水印 txt_clip = TextClip('By 程一初', font=font_path, fontsize=20, color='white').set_duration( clip.duration).margin( mar=10, color=(96,96,96), opacity=0.5).set_opacity(0.5) txt_clip = txt_clip.set_position((clip.w-txt_clip.w, clip.h-txt_clip.h)) txt_clip = CompositeVideoClip([clip, txt_clip]) # 图片水印 img_clip = ImageClip(str(avatar_path)).set_duration( clip.duration).resize(0.1).margin( mar=10, color=(96,96,96), opacity=0.5).set_opacity(0.5) img_clip = img_clip.set_position((clip.w-img_clip.w, clip.h-img_clip.h)) img_clip = CompositeVideoClip([clip, img_clip]) # 动画水印 gif_clip = VideoFileClip(str(gif_path)).loop().set_duration( clip.duration).margin( mar=10, color=(96,96,96), opacity=0.5).set_opacity(0.5) gif_clip = gif_clip.set_position((clip.w-gif_clip.w, clip.h-gif_clip.h)) gif_clip = CompositeVideoClip([clip, gif_clip]) # 输出整个效果系列 final_clip = clips_array([[clip, txt_clip], [img_clip, gif_clip]]) final_clip.write_videofile(str(vout_path), audio_codec='aac')关于去水印的主要4种思路参考:通过裁剪,把包含水印部分去除,最简单但会丢失部分信息。把水印部分模糊化,或另一个水印覆盖原水印,相当于涂抹。拿到水印原文件,尝试透明度反向减除,不能100%但有时可做到肉眼不可见。基于算法消除,目前大部分速度很慢,一张图都得几十秒,更不用说视频。智能处理视频相关的智能处理,可以分解到对字幕、图像、音频的处理。如:生成字幕、人脸追踪、视频分类等。字幕提取关于字幕提取的3个思路:字幕如果是嵌入在视频文件中,就可以通过ffmpeg命令直接提取字幕srt文件。更多时候字幕和视频渲染在一起,即所谓“硬字幕”,这时就需要靠算法识别。算法识别字幕有两种方式:从音频里提取(即上一章的STT),或从图像里提取(即OCR技术)。OCR技术中较出名的如Google的tesseract项目,它能识别100多种语言。之前介绍过的百度paddlehub也有文字识别的模型。从效果上看,paddlehub在图像的中文识别方面更优。处理方式也很简单:从视频里抽取图像。调用paddlehub识别图片里的文字。import pathlib import paddlehub as hub module = hub.Module(name='chinese_ocr_db_crnn_mobile') path = list(pathlib.Path.cwd().parents)[1].joinpath('data/automate/008video') img_path = path.joinpath('008video_seqimages') img_path_list = sorted([ str(f) for f in img_path.iterdir() if f.is_file() ]) results = module.recognize_text(paths=img_path_list, visualization=True) for result in results: print(result)注意还需要安装2个模块:pip install shapely pyclipper。在实战中更推荐STT方式提取字幕。除了之前推荐的云平台之外,平时也可以使用如网易见外、讯飞听见等在线应用。人脸追踪2019年ZAO换脸曾风靡一时,它就是人脸追踪的一种应用,而且实现了追踪后替换融合的效果。此外我们经常看到一些新闻里会对人脸动态打马赛克,其基本原理如下:找到每一帧图像中的人脸位置,记录下数据。处理每一帧图像,对人脸打马赛克。我们通过结合moviepy和paddlehub可以很容易实现。import pathlib import numpy as np import cv2 from moviepy.editor import VideoFileClip, ImageSequenceClip import paddlehub as hub path = list(pathlib.Path.cwd().parents)[1].joinpath('data/automate/008video') mp4_path = path.joinpath('input.mp4') out_path = path.joinpath('008video_paddlehub_headblur_fl.mp4') out_path_frm = path.joinpath('008video_paddlehub_headblur_frm.mp4') snd_path = path.joinpath('008video_snd.mp3') clip = VideoFileClip(str(mp4_path)).subclip(0,10) module = hub.Module(name='ultra_light_fast_generic_face_detector_1mb_640') def mask_frame(im): h, w, d = im.shape results = module.face_detection(images=[im]) face_data = results[0]['data'] # 模糊每个人脸 for d in face_data: x = int((d['left']+d['right'])//2) y = int((d['top']+d['bottom'])//2) r_zone = int((d['right']-d['left'])//2) # 半径 r_blur = int(2*r_zone/3) # 模糊范围 x1, x2 = max(0, x - r_zone), min(x + r_zone, w) y1, y2 = max(0, y - r_zone), min(y + r_zone, h) region_size = y2 - y1, x2 - x1 mask = np.zeros(region_size).astype('uint8') cv2.circle(mask, (r_zone, r_zone), r_zone, 255, -1, lineType=cv2.CV_AA) mask = np.dstack(3 * [(1.0 / 255) * mask]) orig = im[y1:y2, x1:x2] blurred = cv2.blur(orig, (r_blur, r_blur)) im[y1:y2, x1:x2] = mask * blurred + (1 - mask) * orig return im def fl_fun(im): # im是只读数据,需要重新创建一个可修改的ndarray frame = np.array(im) return mask_frame(frame) clip_blur = clip.fl_image(fl_fun) clip_blur.write_videofile(str(out_path), audio_codec='aac')如果想要实现类似ZAO换脸一样的效果,除了定位人脸,还得实现图像融合。比较热门的一个开源项目faceswap,实现了换脸算法。有兴趣可以看我Notebook的记录,对硬件要求较高,需要训练自己的模型,速度很慢。也可以用paddlehub来识别人脸,它提供了不少训练模型,如:人体结构标注:ace2p人脸识别:ultra_light_fast_generic_face_detector_1mb_640人脸结构标注:face_landmark_localization可以比较容易识别出人脸模型,但想要融合,则需要借助额外的模型,如图像风格迁移StarGAN。
2020年10月12日
6,623 阅读
0 评论
4 点赞
2020-09-13
Django的流量控制
APIView会在进行dispatch()分发前,对请求进行身份认证、权限检查、流量控制对接口访问的频次进行限制,减少对服务器的压力在settings.py添加一个REST_FRAMEWORK字典REST_FRAMEWORK = { "DEFAULT_THROTTLE_CLASS":( 'rest_framework.throttling.AnonRateThrottle',#未登录用户 'rest_framework.throttling.UserRateThrottle'#登录用户 ), "DEFAULT_THROTTLE_RATES':{ 'anon':'100/day', #未登录用户 每天100次 'user':'1000/day' #登录用户 每天1000次 } } #DEFAULT_THROTTLE_RATES也可以使用second,minutes,hour或day指明周期 指明指定视图的访问次数:class ContactListView(APIView): throttle_scope = 'contacts' class ContactDetailView(APIView): throttle_scope = 'contacts' class UploadView(APIView): throttle_scope = 'uploads' REST_FRAMEWORK = { 'DEFAULT_THROTTLE_CLASSES':( 'rest_framework.throttling.ScopedRateThrottle', ), 'DEFAULT_THROTTLE_RATES':{ 'contacts':'1000/day', 'uploads':'20/day' } }
2020年09月13日
6,279 阅读
0 评论
0 点赞
2020-09-11
接口幂等性设计
接口幂等性设计 概述幂等性是什么?幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。接口为什么要实现幂等? 前端重复提交选中的数据,后台只产生对应这个数据的一个反应结果。 使用幂等的场景1.前端重复提交用户注册,用户创建商品等操作,前端都会提交一些数据给后台服务,后台需要根据用户提交的数据在数据库中创建记录。如果用户不小心多点了几次,后端收到了好几次提交,这时就会在数据库中重复创建了多条记录。这就是接口没有幂等性带来的 bug。2.接口超时重试对于给第三方调用的接口,有可能会因为网络原因而调用失败,这时,一般在设计的时候会对接口调用加上失败重试的机制。如果第一次调用已经执行了一半时,发生了网络异常。这时再次调用时就会因为脏数据的存在而出现调用异常。3.消息重复消费在使用消息中间件来处理消息队列,且手动 ack 确认消息被正常消费时。如果消费者突然断开连接,那么已经执行了一半的消息会重新放回队列。当消息被其他消费者重新消费时,如果没有幂等性,就会导致消息重复消费时结果异常,如数据库重复数据,数据库数据冲突,资源重复等 接口幂等性实现思路1.Token机制当客户端请求页面时,服务器会生成一个随机数token,并且将token放置到session当中,然后将token发给客户端(一般通过构造hidden表单)。下次客户端提交请求时,token会随着表单一起提交到服务器端。服务器端第一次验证相同过后,会将session中的token值更新下,若用户重复提交,第二次的验证判断将失败,因为用户提交的表单中的token没变,但服务器端session中token已经改变了。具体流程步骤: 客户端会先发送一个请求去获取 token,服务端会生成一个全局唯一的 ID 作为 token 保存在 redis 中,同时把这个 ID 返回给客户端 客户端第二次调用业务请求的时候必须携带这个 token 服务端会校验这个 token,如果校验成功,则执行业务,并删除 redis 中的 token 如果校验失败,说明 redis 中已经没有对应的 token,则表示重复操作,直接返回指定的结果给客户端 2.数据库联合唯一约束插入数据,应该按照唯一索引进行插入,比如:订单号,相同的订单号就不能有两条记录插入。 我们在数据库层面防止重复。这个机制是利用了数据库的主键唯一约束的特性,解决了在insert场景时幂等问题。 但主键的要求不是自增的主键,这样就需要业务生成全局唯一的主键。 如果是分库分表场景下,路由规则要保证相同请求下,落在同一个数据库和同一表中,要不然数据库约束就不起 效果了,因为是不同的数据库和表主键不相关具体流程步骤: 建立一张去重表,其中某个字段需要建立唯一索引 客户端去请求服务端,服务端会将这次请求的一些信息插入这张去重表中 因为表中某个字段带有唯一索引,如果插入成功,证明表中没有这次请求的信息,则执行后续的业务逻辑 如果插入失败,则代表已经执行过当前请求,直接返回
2020年09月11日
857 阅读
0 评论
0 点赞
2020-07-18
Fiddler配合安卓模拟器实现APP的Https抓包
Fiddler配合安卓模拟器实现APP的Https抓包简介Fiddler抓包功能非常强大,可非常便捷得对包进行断点跟踪和回放, 手机的APP和后台通信很多都是基于https的, 需要特殊的设置, 不然是抓不到的.Fiddler的抓包原理是通过设置一个代理服务器, 让APP或网站通过这个代理访问网站, 从而监听到访问数据. https的数据需要一个可信任的证书, Fiddler可以生成证书, 安装好证书就可以监听到https数据了, 下面就来介绍怎么安装证书需要用的软件fiddler 网上下载一个即可有root权限的手机或安卓模拟器 很多手机root都比较麻烦, 这里选择安卓模拟器来做演示, 原理一样的, 网上下载个逍遥,夜神安卓模拟器即可安卓文件浏览器 能访问系统文件的, 如re文件浏览器等1.获取证书文件1.1 导出证书打开 fiddler在设置项中找到Tools->Fiddler Options->HTTPS按图片设置导出证书到桌面1.2 用命令行转换证书在证书所在的目录打开CMD命令行转化证书格式为pemopenssl x509 -inform DER -in FiddlerRoot.cer -out cacert.pem 计算证书hash并改名openssl x509 -inform PEM -subject_hash -in cacert.pem 计算结果若为269953fb 将cacert.pem改命为 269953fb.02.安装证书文件2.1 将证书文件复制到安卓模拟器打开安卓模拟器, 把刚才的 269953fb.0文件复制到模拟器共享文件夹2.2 将证书文件复制到安卓系统证书目录打开re文件管理器, 找到download目录, 找到刚才的 269953fb.0文件复制到证书文件夹/system/etc/security/cacerts 里, 证书就安装成功了2.3 检查证书是否安装成功安卓模拟器上打开设置-> 安全 -> 信任的凭据 看到如下 所示则代表证书已经安装成功了3.配置监听抓包3.1 fiddler的设置打开Fiddler设置代理Tools –> Fiddler Options –> HTTPS3.2 安卓模拟器设置安卓模拟器长安WiFi图标, 点击修改网络填入代理, ip为你自己电脑局域网ip(可通过局域网ipconfig查看), 保存即可3.3 测试抓包在浏览器中输入https://www.baidu.com/#, 或打开app, 就可在fiddler看到在抓到的https包了结尾其实App的http抓包不难, 只是证书安装稍微麻烦, 其他抓包软件比如Charles大概也是这么一个流程, iOS也是差不多的教程
2020年07月18日
2,620 阅读
0 评论
0 点赞
2020-03-25
使用python和redis实现分布式锁
业务场景描述:多个业务节点(多台主机运行同一个业务代码),但是只有一个节点真正运行(运行节点),其他都处于等待(备选节点)运行节点一旦异常(服务挂掉,宕机,误操作等),其他备选节点,立刻有一个继续运行之前的运行节点,重启之后,变为备选节点,处于等待节点实现多节点高可用分布式锁具备的功能:资源唯一表示锁续时(锁绝对不能不设置过期时间)锁误操作(改变了不属于自己的锁)-----资源唯一表示,即可解决Redis 挂掉(或者哨兵模式下,主节点挂掉,哨兵发现和选举延迟,或者redis整个服务挂掉),本地服务处理。redis连接挂掉之后重启,锁可能还在,也可能没有了。没有之后,可能其他的备选节点会加锁成功,成为运行节点也就是说:Redis重启之后,之前的运行节点说不一定不是运行节点了。非运行节点,需要把本地的服务都停掉,确保只有一个运行节点在运行主要业务逻辑额外要求:锁具备高可用,易用,其他业务代码侵入小。所以使用修饰器。实现步骤:唯一资源表示:Redis中的key选取某一个服务名称,value选取主机(或本地IP)+ 主进程ID作为标示代码:def redis_distributed_lock(lock_name: str): """ redis分布式锁 :param lock_name: redis中锁的名称 :return: """ def redis_function(func): def wrapper(self, *args, **kwargs): """ High-availability operating mechanism based on redis :return: None """ redis_model = RedisModel() lock_value = socket.gethostname() + "_" + str(threading.get_ident()) # 没有注册上,休息后再尝试注册 while True: try: result = redis_model.set_ex_nx(key=lock_name, value=lock_value, seconds=60) # 执行主体方法 func(*args, **kwargs) # 获取锁的过期时间 ttl = redis_model.ttl(key=lock_name) time.sleep(ttl if ttl > 1 else 1) except Exception as e: self.logger.warning( "Run redis_distributed_lock failed, cause: {}, traceback: {}".format(e, traceback.format_exc())) time.sleep(5) return wrapper return redis_function锁的续时间:锁续时(锁绝对不能不设置过期时间)。锁续时是个持续的操作(while True),尽量不能影响主体业务逻辑操作。我这里的处理是:续时操作,创建一个额外的线程(watch_dog)或者额外线程执行主体业务逻辑函数。续时需要判断一下,Redis中的锁,还是否为本运行节点锁具有的锁。是的话,才进行续时,防止锁误操作。不是的话,本地的服务可能需要都停掉,主进程处于备用状态,防止多节点运行。1、要删除该运行节点下的所有创建的线程,并且不影响主进程继续运行,并处于备用状态,2、如果子线程创建了进程,等等,需要将该节点下的所有进程也删除掉代码:修饰器:redis_distributed_lock#!/usr/bin/env python # -*- coding: UTF-8 -*- """ ================================================= @Project -> File @IDE :PyCharm @Author :Desmond.zhan @Date :2021/12/7 3:07 下午 @Desc :redis分布式锁,运行执行leader节点的服务器运行,其他的阻塞,以达到高可用 通过注解方式,灵活性更强 ================================================== """ import ctypes import inspect import multiprocessing import socket import threading import time import traceback # 自定义Redis Model. from xxxxxx import RedisModel def _async_raise(tid, exctype): """raises the exception, performs cleanup if needed""" tid = ctypes.c_long(tid) if not inspect.isclass(exctype): exctype = type(exctype) res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) if res == 0: raise ValueError("invalid thread id") elif res != 1: # """if it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL to revert the effect""" ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) raise SystemError("PyThreadState_SetAsyncExc failed") def _async_stop_children_process(): """ 杀死主进程下所有的子进程 :return: """ children_process = multiprocessing.active_children() for process in children_process: process.terminate() def redis_distributed_lock(lock_name: str): """ redis分布式锁 :param lock_name: redis中锁的名称 :return: """ def redis_function(func): def wrapper(self, *args, **kwargs): """ High-availability operating mechanism based on redis :return: None """ redis_model = RedisModel() lock_value = socket.gethostname() + "_" + str(threading.get_ident()) # 没有注册上,休息后再尝试注册 while True: try: result = redis_model.set_ex_nx(key=lock_name, value=lock_value, seconds=60) # 加锁成功后,判断是否该主机+线程用的这个锁,如果是,设置一个超时时间。否则仍旧等待加锁 self.logger.info("Redis set_nx result:{}".format(result)) self.logger.info( "redis_model.get(lock_name) : {}, lock_value:{}".format(redis_model.get(lock_name), lock_value)) # 加锁成功 if result and redis_model.get(lock_name).decode(encoding="utf8") == lock_value: # 获取到redis分布式锁,开启子线程,进行执行目标函数 t_func = threading.Thread(target=func, args=args, kwargs=kwargs) t_func.start() # 启动看门狗,lock续时 redis_model.lock_watchdog_timeout(key=lock_name, value=lock_value) # 停止函数执行 _async_raise(tid=t_func.ident, exctype=SystemExit) _async_stop_children_process() # 获取锁的过期时间 ttl = redis_model.ttl(key=lock_name) time.sleep(ttl if ttl > 1 else 1) except Exception as e: self.logger.warning( "Run redis_distributed_lock failed, cause: {}, traceback: {}".format(e, traceback.format_exc())) time.sleep(5) return wrapper return redis_function续时操作:watch_dog def lock_watchdog_timeout(self, key, value, seconds=30): self.logger.info("key:{} lock watch dog threading start work ".format(self.get_key(key))) try: self._lock_extend(key, value, seconds) except Exception as e: self.logger.warning("redis lock lock_watchdog_timeout error, error info :{}".format(e)) def _lock_extend(self, key, value, lifetime): while True: try: self.logger.debug("watch dog extend key time, key:{}".format(self.get_key(key))) # 判断redis中还是否为本进程持有的锁 redis_value = self.redis_client.get(name=self.get_key(key)) if redis_value != value: self.logger.warning( "Watch dog warning, redis lock value is not acquired, will suspend function execution") self.logger.warning("this value:{}, redis value".format(value, redis_value)) # watch dog也没意义了,直接返回吧 return result = self.redis_client.expire(name=self.get_key(key), time=lifetime) self.logger.debug("{} extend result : {}".format(self.get_key(key), str(result))) time.sleep(lifetime // 2) except Exception as e: self.logger.warning( "Run _lock_extend failed, cause: {}, traceback: {}".format(e, traceback.format_exc())) time.sleep(5)另外:如果除主进程之外,其他的某些子进程,备选节点也需要保留,可以在_async_stop_children_process()中根据进程名字,或者进程名字前缀,进行过滤,选择性删除子进程如果是子线程创建子进程,子进程再创建进程,可以在_async_stop_children_process()方法中迭代,发现所有的进程,杀死进程之后,这些子进程下的子线程也就不存在了。由于主进程可能创建子线程,而成为备选节点后,主进程仍旧存活,子线程需要清除掉,所有不能使用signal去处理子进程,这里是调用底层C语言暴露的API,可见参考参考 https://www.jianshu.com/p/957853c659f1
2020年03月25日
1,278 阅读
0 评论
0 点赞
2020-03-21
Redis的大key问题
一. 什么是大key?通常指数据内存使用量非常大的数据,比如set里放了相当多的数据ps:我们用set存储了单个用户的文章点赞数据,有个小伙伴天天无聊就在那给文章点赞,点了几百万,那set里就有了几百万数据,这个可能就是一个大key二. 数据层面的解释--避免大key操作业务方应尽量避免进行大key操作,如 hgetall 一次获取非常大的hash数据,用 hmset 一次设置非常多的value,用 lrange 一次取一个非常大的 list 或非常多的元素,如果客户端需要用到这些操作对应的API,一次操作的返回结果大小必须是在合理可控的范围内,防止导致节点通信超时、网络堵塞等严重后果。备注:大key操作通常可见于集群慢日志,同期会伴随缓存调用的高延迟,甚至节点完全阻塞造成的不可用。参考:根据测试结果,value在超过1KB后性能开始下降,超过10KB后性能下降明显,出现拐点。三. 结构层面的大key问题解释 1.资源使用不均(该大key可能会使用该实例相当多的内存,浪费相当大的Cpu,) 2.带宽使用极大(比如假如我们有个功能展示上面例子中点赞的所有文章,一下查出来全部,肯定会使用非常大的带宽) 3.影响该实例其他key的操作(基于redis的单线程处理机制,大家都在排队,前面的慢,自然影响其他数据的操作) 四. 大key如何发现? bigkeys命令 bigkeys命令以遍历的方式分析Redis实例中的所有Key,并返回整体统计信息与每个数据类型中Top1的大Key redis-rdb-tools 使用redis-rdb-tools离线分析工具来扫描RDB持久化文件,虽然实时性略差,但是完全离线对性能无影响。 Redis 4.0 以后的版本:支持 了 memory 命令查看 key 的大小 预估值,不太准确(采用的是多次抽样分析,预估全部数据的量) 五. 如何解决大key问题? 1.数据结构拆分,比如我们这里有个活动数据,活动有活动商品数据,这俩就进行了拆分,并没有放一起 2.数据分片,比如后面加序号,进行多实例的拆分 六. 大key的删除问题6.1 Redis 4.0以前大key删除4.0 以前 string,list,set,hash 不同数据类型的大 key,删除方式有所不同。一般有两种情况:del 命令删除单个很大的 key 和 del 批量删除 大 key。直接 del 命令粗暴的删大 key 容易造成 redis 线程阻塞。4.0 以前要优雅的删除就是针对不同的类型 写脚本,拆分链表,hash 表,分批删除。6.2 Redis 4.0 以后优雅的删除大 key 主动删除 UNLINK xxxkey unlink 命令是 del 的异步版本,由 Lazyfree 机制实现。Lazyfree 机制的原理是在删除的时候只进行逻辑删除,把 key 释放操作放在 bio (Background I/O)单独的子线程中惰性处理,减少删除大 key 对 redis 主线程的阻塞,有效地避免因删除大key带来的性能问题。unlink 即使在批量删除 大 key 时,也不会对阻塞造成阻塞。 被动删除 被动删除是指 Redis 自身的 key 清除策略,一个 大 key 过期或者被淘汰时,如何被清除,会不会导致阻塞?4.0 以前自动清除是有可能阻塞主线程的。 4.0 以后的版本,被动删除策略是可选的配置参数,允许以 Lazyfree 的方式清除。但是参数默认是关闭的,可配置如下参数开启。 lazyfree-lazy-expire on # 过期惰性删除 lazyfree-lazy-eviction on # 超过最大内存惰性删除 lazyfree-lazy-server-del on # 服务端被动惰性删除
2020年03月21日
640 阅读
0 评论
0 点赞
2020-02-12
Python性能分析工具Line_profiler
介绍profile和line_profiler两个模块都是性能分析工具。有时候需要找到代码中运行速度较慢处或瓶颈,可以通过这两模块实现,而不再使用time计时。line_profiler模块可以记录每行代码的运行时间和耗时百分比。安装输入以下命令安装即可pip install line_profiler使用方法1 :生成一个Line_profiler类(推荐)简单版本from line_profiler import LineProfiler def do_stuff(numbers): s = sum(numbers) l = [numbers[i] / 43 for i in range(len(numbers))] m = ['hello' + str(numbers[i]) for i in range(len(numbers))] if __name__ == '__main__': number = [1,2,3,4,5,6] p = LineProfiler() p_wrap = p(do_stuff) p_wrap(number) p.print_stats() # 控制台打印相关信息 p.dump_stats('saveName.lprof') # 当前项目根目录下保存文件 输出结果:"D:\Program Files\Anaconda3\envs\tensorflow2.3\python.exe" "C:/Users/admin/Desktop/xxxx/temp.py" Timer unit: 1e-07 s Total time: 1.08e-05 s File: C:/Users/admin/Desktop/GPflowMPC_cui _contract - profile/temp.py Function: do_stuff at line 193 Line # Hits Time Per Hit % Time Line Contents ============================================================== 193 def do_stuff(numbers): 194 1 21.0 21.0 19.4 s = sum(numbers) 195 1 45.0 45.0 41.7 l = [numbers[i] / 43 for i in range(len(numbers))] 196 1 42.0 42.0 38.9 m = ['hello' + str(numbers[i]) for i in range(len(numbers))]多函数调用当你需要调用不止一个函数的时候,就可以使用add_function函数去添加额外需要监控的函数from line_profiler import LineProfiler def second_function(): # just for test i = 5 pass def do_stuff(numbers): s = sum(numbers) l = [numbers[i] / 43 for i in range(len(numbers))] m = ['hello' + str(numbers[i]) for i in range(len(numbers))] for i in range(5): second_function() if __name__ == '__main__': number = [1,2,3,4,5,6] p = LineProfiler() p.add_function(second_function) p_wrap = p(do_stuff) p_wrap(number) p.print_stats() p.dump_stats('saveName.lprof')输出结果"D:\Program Files\Anaconda3\envs\tensorflow2.3\python.exe" "C:/Users/admin/Desktop/xxxx/temp.py" Timer unit: 1e-07 s Total time: 2.4e-06 s File: C:/Users/admin/Desktop/GPflowMPC_cui _contract - profile/temp.py Function: second_function at line 193 Line # Hits Time Per Hit % Time Line Contents ============================================================== 193 def second_function(): 194 # just for test 195 5 14.0 2.8 58.3 i = 5 196 5 10.0 2.0 41.7 pass Total time: 2.44e-05 s File: C:/Users/admin/Desktop/GPflowMPC_cui _contract - profile/temp.py Function: do_stuff at line 198 Line # Hits Time Per Hit % Time Line Contents ============================================================== 198 def do_stuff(numbers): 199 1 22.0 22.0 9.0 s = sum(numbers) 200 1 48.0 48.0 19.7 l = [numbers[i] / 43 for i in range(len(numbers))] 201 1 45.0 45.0 18.4 m = ['hello' + str(numbers[i]) for i in range(len(numbers))] 202 6 32.0 5.3 13.1 for i in range(5): 203 5 97.0 19.4 39.8 second_function()方法2:使用装饰器@profile在需要检测的函数上面添加@profile装饰符号, (此时调包是调用profile 而不是line_profiler)。在命令行中使用kernprof -l -v test.py启动,结束会在窗口打印逐行信息以及生成一个lprof文件。 与方法1 相似。问题在于每次不在使用profile 查看性能时,需要将函数上的装饰类注释掉读取lprof 文件: 进入当前目录后,在命令行中使用python -m line_profiler saveName.lprof # saveName.lprof 读取指定文件资料来源 https://blog.csdn.net/weixin_44613728/article/details/120411325
2020年02月12日
916 阅读
0 评论
0 点赞
2020-02-09
Pyhton2/Python3 安装/卸载 启动出错排查解决
C盘东西太多, 就迁移了下开发用的环境和软件到别的盘, 迁移之后发现python运行出错, 卸载也重装也有问题, 网上搜索了下, 最终解决了Python2/Pyhton3 共存, 冲突同时装有python2和python3的话, 因为有些文件名是相同的, 所有有可能使用的时候会有意料之外的执行结果解决方法:python2版本里边的python.exe pip.exe 改为 python2.exe pip2.exe 使用时输入python2 pip2或者不改, 使用时输入python pip,更改python3版本里边的python.exe pip.exe 改为 python3.exe pip3.exe 使用时输入python3 pip3python2 和python3不重名就行pip启动出错, 安装包时出错报错Fatal error in launcher: Unable to create process using或其他where pip where pip2 where pip3 如果是路径不是你pip的安装路径, 有可能是装了别的环境影响了, 到系统环境里修改, 把pip的路径排在前面, 或删除比的环境变量路径解决方法 升级新版本的pippython -m pip install --upgrade pip 或 python2 -m pip install --upgrade pip 或 python -m pip install --upgrade pip Python启动出错先查询路径有没有对where python where python2 where python3 如果是路径不是你python的安装路径, 有可能是装了别的环境影响了, 到系统环境里修改, 把python的路径排在前面, 或删除比的环境变量路径 Python安装/卸载出错, 无法卸载报错提示"无法使用此产品的安装源,请确认安装源存在并且你可以访问它""产品安装来源无法使用" 或其他出现这种情况是使用了一些垃圾清除软件清理了软件安装包或别的原因导致 解决办法下载Windows Install Clean Up, 清除软件残留注册项, 重新安装
2020年02月09日
710 阅读
0 评论
0 点赞
1
2
3
...
9