开发
安全
运维
Geek
佳软
Photos
生活
Search
1
使用AI实现高精度钢琴曲转谱Piano Transcription简明使用教程
37,461 阅读
2
使用ESP8266Wifi模块制作Wifi杀手
37,123 阅读
3
unravel 让图片唱歌详细教程 Real Time Image Animation 项目
27,090 阅读
4
佳能相机刷Magic Lantern魔灯固件
23,045 阅读
5
战地3 正版账号免费分享!
15,885 阅读
开发
安全
运维
Geek
佳软
Photos
生活
登录
Search
标签搜索
python
前端
环境搭建
Ubuntu
markdown
神器
黑苹果
编码
技巧
Git
数据库
开发
insmoin
累计撰写
187
篇文章
累计收到
48
条评论
首页
栏目
开发
安全
运维
Geek
佳软
Photos
生活
页面
搜索到
86
篇与
的结果
2023-04-08
ChatGPT处理超长文本限制的四种思路
在GPT和类似的大语言模型中,文本被切割成称为"tokens"的小单位进行处理。一个token可以是一个字符,也可以是一个单词,这取决于分词策略。GPT模型有一个最大token限制,表示模型能够处理的最大输入长度。这个限制是由模型的架构和超参数决定的,它是为了平衡模型的计算复杂性和资源消耗而设定的。Token 限制包括了输入和输出,也就是你在一次对话中提交给 ChatGPT 的内容和 ChatGPT 输出的内容不能超过模型规定的 Token 数量。比如 ChatGTP 3.5 的 Token 限制是 4096,ChatGPT 4 的 Token 限制是 8192。以后可能会支持的Token 数更高, 但我们比较复杂的问题时, 往往很轻易就能超过这个上限。ChatGPT阅读理解和逻辑能力是比较好的, ChatGPT刚出来我就想过, 如果我输入一本书的内容给他会怎么样?把整个项目的源码全部给他,让他根据需求改代码会怎么样?把一个技术文档给他, 让他根据需求写代码会怎么样。。。后来发现是不现实的, 会有token限制。其实还有很多场景需要解决这个token限制,也就是长文本问题。下面就从几个思路看看怎么解决超长文本限制的问题。压缩法在处理token限制之前, 先明白 tokens 咋回事先看看OpenAI官方的介绍:什么是代币以及如何计算它们?https://help.openai.com/en/articles/4936856-what-are-tokens-and-how-to-count-themOpenAI 官方的 Tokenizer 可以用于计算内容的token数。https://platform.openai.com/tokenizer在官方的 Tokenizer网站输入中文"我爱你"和英文"i love you"对比一下tokens 不是指 prompt 字符串的长度,token指的是一段话中可能被分出来的词汇。比如:i love you,就是三个token,分别为 「i」「love」「you」。不同语言token计算不一样,比如中文的「我爱你」其实是算 5 个 token,因为它会先把内容转成 unicode。简单来说就是tokens是文本的一部分,不一定和单词对应,它们的数量取决于文本的长度和语言。tokens的数量影响了API的请求和计费。语言转换压缩法根据上边的对token的了解, 我们可以这样操作, 先把中文转换为英文, 再去发送请求。这样就能减少的token的大小, 达到某个程度上解决超长文本限制的问题。当然,缺点也是很多的,语言的转换可能会丢失原有语义,机翻的话效果没那么好, 使用GPT进行翻译又而外增加了token消耗,得不偿失。在某个场景或许是用得上的,可以作为一种技术储备。摘要压缩法还有一个压缩思路, 就是通过一些技术手段或者直接用GPT提炼压缩成短文本,比如删除冗余信息,提取关键信息,使用缩略语等。这种方法可以保留文本的核心内容,但是可能会丢失一些细节信息和语义信息。应用场景:如果我们想要用GPT来阅读一篇论文,我们可以先用一些摘要技术或者关键词提取技术将论文压缩成一个短摘要或者一个关键词列表,然后用GPT生成一个对论文的理解或者评价。然后进行索引,方便搜寻。对论文的某一段有疑问,在根据摘要的索引找到对应的文章句子进行发送给GPT提问, 这样就能用于辅助阅读一篇长论文了。额外话:之前看到一个关于压缩很有趣的观点, 有兴趣可以去阅读下。压缩即泛化,泛化即智能https://zhuanlan.zhihu.com/p/615554635ChatGPT是网上所有文本模糊的图片https://baijiahao.baidu.com/s?id=1757454901451803072&wfr=spider&for=pc切分法如果我们向朋友发送一个5G的文件, 而软件最大限制是1GB, 那怎么发呢?可以用压缩分卷的形式拆成5个1GB的文件,分5次发送就可以解决。同理如果发送要发送的文本太大, 我们可以将长文本切分成若干个短文本,然后分别输入模型,最后将输出结果拼接起来。这种方法简单易实现,但是可能会损失文本之间的上下文信息和连贯性。例如,如果我们想要用GPT来写一篇博客文章,我们可以将文章切分成若干个段落或者小节,然后分别用GPT生成每个段落或者小节的内容,最后将它们拼接起来形成完整的文章。但是这样做可能会导致文章缺乏整体的逻辑和结构。切分法缺点显而易见,就是可能会丢失上下文的关联。适合上下文关联不强的场景。比如翻译一本英文小说的场景, 按句切按页切会丢失上下文的关联, 造成逻辑和结构问题。切分如果切得恰当,比如按一章节进行切分, 可以减少丢失上下文的关联导致的逻辑性问题。Embedding什么是Embedding?Embedding是一种将输入文本(单词、句子等)转换为连续向量表示的过程。在GPT模型中,也使用了词向量的技术,将文本中的每个单词映射到一个连续的向量空间,这个向量空间可以捕捉到单词之间的语义和语法关系。举个例子,考虑以下输入文本:"I love to play soccer."词语嵌入(Word Embedding):首先,每个单词("I", "love", "to", "play", "soccer")被映射到模型的嵌入空间,其中每个单词对应一个向量表示。例如,"love"可能被映射为一个包含多个浮点数值的向量,如[0.2, 0.8, -0.4, ...]。位置嵌入(Position Embedding):为了捕捉输入文本中单词的顺序和位置信息,位置嵌入将每个单词的位置编码为一个向量。例如,第一个单词"I"的位置嵌入向量可能是[0.1, -0.3, 0.5, ...],第二个单词"love"的位置嵌入向量可能是[-0.2, 0.4, -0.1, ...],以此类推。什么是向量数据?使用OpenAI的Embedding接口将输入文本(如句子或段落)中的每个单词或符号转换为连续的向量表示。这些向量被称为嵌入向量,它们捕捉了单词在语义和上下文方面的信息。让我们通过一个具体的例子来说明GPT的嵌入向量数据:考虑以下输入文本:"The cat is sitting on the mat."对于这个文本,GPT模型中的嵌入层将为其中的每个单词生成一个嵌入向量。假设每个嵌入向量的维度为100,那么每个单词将被表示为一个100维的向量。例如,对于单词 "cat",它的嵌入向量可能是一个长度为100的向量,如 [0.2, -0.1, 0.5, ...]。同样地,其他单词(如 "sitting"、"mat")也会有相应的嵌入向量。这些嵌入向量的生成是通过GPT模型在大规模文本语料上进行预训练得到的。在预训练过程中,模型学习到了单词之间的上下文关系和语义信息。因此,嵌入向量在向量空间中的相对位置可以反映单词之间的语义相似性。例如,对于类似的单词 "cat" 和 "dog",它们的嵌入向量在向量空间中可能会比较接近,因为它们在语义上相关。相反,与它们不相关的单词(如 "car")的嵌入向量可能会与它们较远。这样,通过使用GPT的嵌入向量,我们可以将输入文本中的离散单词转换为连续的向量表示,从而为模型提供了一种更好地理解和处理自然语言数据的方式。这些向量可以用于各种下游任务,如文本分类、情感分析、文本生成等。什么是向量数据库?向量数据库是一种专门用于存储和查询向量数据的数据库系统。它们提供高效的向量操作和相似性搜索功能。Embedding 可以以多种格式存储,其中 JSON 是一种常见的格式之一。向量数据库(Vector Database)是一种专门用于存储和检索向量数据的数据库系统。它们提供高效的向量索引和查询功能,允许用户根据向量之间的相似性进行快速搜索和分析。使用向量数据库的主要原因是向量数据的特殊性质。传统的数据库通常适用于标量或结构化数据,而对于高维向量数据(如嵌入向量、图像特征向量、音频特征向量等),传统数据库的查询和索引方法往往效率较低。向量数据库通过使用专门的索引结构和查询算法,能够高效地处理向量数据,提供更快的查询速度和更好的检索准确性。向量数据库的优势包括:高效的相似度搜索:向量数据库可以根据向量之间的相似度,快速找到最相似的向量。这在许多应用场景中非常有用,如图像搜索、推荐系统、聚类分析等。扩展性:向量数据库通常能够处理大规模的向量数据集,并具备良好的水平扩展性,可以在需要时轻松添加更多的存储和计算资源。灵活的查询功能:向量数据库提供了各种灵活的查询功能,可以支持范围查询、K近邻查询、相似度匹配等多种查询类型。一些常见的向量数据库包括:PostgreSQL:PostgreSQL是另一个常见的关系型数据库,它提供了更丰富的数据类型和功能。Faiss:Facebook AI Research 开源的向量索引库,提供了高效的相似度搜索和向量聚类功能。Milvus:一个开源的向量数据库引擎,支持高性能的相似度搜索和向量存储。Annoy:一个快速的C++库,用于在大规模数据集上进行近似最近邻搜索。Elasticsearch:一个流行的分布式搜索和分析引擎,可以通过插件支持向量数据的索引和查询。这只是一小部分向量数据库的例子,还有其他许多向量数据库可用,具体选择取决于应用需求和性能要求。使用嵌入(Embedding)来解决长文本限制问题是一种常见的方法。嵌入是将文本或实体表示为低维稠密向量的技术,可以将高维的文本表示转化为固定长度的向量,从而克服原始文本长度的限制。以下是一些具体的操作方式和示例场景:文本嵌入模型:使用预训练的文本嵌入模型(如Word2Vec、GloVe、BERT等)可以将单词或短语转换为向量表示。这样,长文本可以通过将其分解为单词或短语,并将它们的嵌入向量进行平均或拼接,得到整个文本的嵌入表示。这种方式可以用于各种场景,例如情感分析、文本分类、文本相似度计算等。示例场景:在一个知识库中,每篇文章都有较长的文本描述。可以使用预训练的文本嵌入模型,将每个文章的文本描述转换为固定长度的嵌入向量。然后,可以计算用户查询与知识库中文章的相似度,基于相似度进行匹配和推荐相关文章。序列嵌入模型:对于长文本序列,如文章、评论或对话,可以使用序列嵌入模型(如LSTM、Transformer等)来获取整个序列的嵌入表示。这种方式会考虑序列中的上下文信息,并生成一个固定长度的向量表示整个序列。这对于文本生成、机器翻译、对话建模等任务非常有用。示例场景:在一个问答系统中,用户输入一个较长的问题,需要将其转换为向量表示并与知识库中的答案进行匹配。可以使用序列嵌入模型,将问题和答案分别转换为嵌入向量,然后计算它们之间的相似度,以找到最相关的答案。文本摘要:对于长文本,可以使用文本摘要模型(如Seq2Seq模型)生成一个简洁的摘要,将长文本压缩为固定长度的摘要向量。这对于新闻摘要、文档摘要等任务非常有用。示例场景:在一个新闻聚合应用中,用户可以浏览大量的新闻文章。为了提供更好的用户体验,可以使用文本摘要模型将每篇新闻文章压缩为简洁的摘要向量,以便用户快速浏览并选择感兴趣的文章。这些是一些使用嵌入来解决长文本限制问题的操作方式和示例场景。具体的选择取决于应用需求和数据特点。嵌入技术可以帮助我们从长文本中提取有用的信息并转换为固定长度的向量表示,从而应对长文本带来的挑战。模型微调OpenAI 的微调(fine-tuning)是指在预训练的语言模型基础上,使用特定的数据集对模型进行进一步的训练,以适应特定的任务或领域。微调可以使模型更好地理解和生成与特定任务相关的文本。要解决长文本输入限制的问题,可以使用 OpenAI 的微调技术来对语言模型进行适应。以下是使用微调技术解决长文本输入限制的一般步骤:数据集准备:准备一个与所需任务相关的数据集,其中包含长文本示例。这可以是一个包含长文本样本的文本数据集,或者是一个特定任务的标注数据集,例如长文本分类或生成任务。模型选择:选择一个适合任务的预训练语言模型作为基础模型。OpenAI 提供了各种预训练模型,如GPT-3、GPT-2等。根据任务需求和计算资源,选择一个合适的模型。微调模型:使用准备好的数据集对预训练模型进行微调。微调的过程包括加载预训练模型的权重,将任务相关的数据输入模型,并通过反向传播优化模型参数。微调的目标是使模型适应特定任务,并提高模型在长文本输入上的性能。超参数调整:微调过程中,可能需要进行一些超参数的调整,如学习率、批次大小和训练轮数。这些超参数的选择可以通过实验和验证集上的性能评估来进行调整。测试和评估:在微调完成后,对模型进行测试和评估。使用一组测试集或实际应用场景的数据,评估模型在长文本输入上的性能和效果。通过微调技术,可以使预训练模型更好地理解和处理长文本输入。预训练模型具有对语言的广泛理解能力,而微调可以帮助模型针对特定任务或领域进行优化,以解决长文本输入限制的问题。总结总之,在GPT这样的预训练语言模型中,长文本输入是一个普遍存在且有待解决的问题。根据不同的场景和任务,可以采用不同的思路和方案来解决这个问题,比如切分,压缩,向量化,模型微调等,也可以综合起来运用。这些方案各有优缺点,需要根据实际情况进行选择和优化。
2023年04月08日
886 阅读
0 评论
0 点赞
2023-03-16
试用ChatGPT API接口并实现上下文对话
之前发现ChatGPT居然能够有记忆功能, 能够记得之前聊过什么, 终于感觉机器有点人性化了, 很好奇是怎么实现的.经过查资料, 发现很简单, 其实ChatGPT并没有真正"记忆"功能, 而是把历史聊天记录一并发过去了, API接口处理的字数也是有限制的, 如果聊得够多, 他就会丢弃"忘记"最早聊过的内容.现在就让我们试试使用ChatGPT的API实现带上下文功能的对话.官方文档地址: https://platform.openai.com/docs/首先我们需要先创建API KEY,这个 API KEY 是用于 HTTP 请求身份验证的,可以创建多个。到OpenAI官网创建API KEY, 没有账号的要先去弄个账号, 账号网上有注册教程, 也可以直接购买现成的。创建地址:https://platform.openai.com/account/api-keys注意 API 调用是收费的,新注册账号会赠送一些额度安装官方的openai库pip install openaiChatGPT API调用示例:import os import openai # 设置API key openai.api_key = os.getenv("OPENAI_API_KEY") # 给ChatGPT发送请求 completion = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ {"role": "user", "content": "Hello!"} ] ) # 打印请求结果 print(completion.choices[0].message)Role角色参数详解user 表示提交prompt的一方。 assistant 表示给出completion响应的一方,实际上就是ChatGPT本身。 system message里role为system,是为了让ChatGPT在对话过程中设定自己的行为,不含上下文的对话import os import openai openai.api_key = os.getenv("OPENAI_API_KEY") while True: content = input("User: ") messages = [{"role": "user", "content": content}] completion = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=messages ) chat_response = completion answer = chat_response.choices[0].message.content print(f'ChatGPT: {answer}')上面这个实现里,每次只发送了当前输入的信息,并没有发送对话的历史记录,所以ChatGPT无法知道上下文。我们来看对话效果如下:User: 你好 ChatGPT: 你好!我是AI助手,有什么可以帮到您的吗? User: 我刚才说了什么 ChatGPT: 很抱歉,由于我是AI语音助手,无法得知您刚才说了什么,请您再次告知。包含聊天记录的案例import openai openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Who won the world series in 2020?"}, {"role": "assistant", "content": "The Los Angeles Dodgers won the World Series in 2020."}, {"role": "user", "content": "Where was it played?"} ] )上面这段代码里,使用了3种角色的role,这个messages发送给ChatGPT后,ChatGPT就有了上下文,知道作为user的我们说了什么,也知道作为assistant的自己回答了什么。想通过API实现包含上下文信息的多轮对话的关键就是用好role字段。改进一下, 包含上下文的对话import os import openai openai.api_key = os.getenv("OPENAI_API_KEY") messages = [] while True: content = input("User: ") messages.append({"role": "user", "content": content}) completion = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=messages ) chat_response = completion answer = chat_response.choices[0].message.content print(f'ChatGPT: {answer}') messages.append({"role": "assistant", "content": answer})上面这个实现里,每次发送请求给ChatGPT时,把历史对话记录也一起发送,所以ChatGPT知道对话的上下文。我们来看对话效果如下:User: 你好 ChatGPT: 你好!我是AI助手,有什么需要帮忙的吗? User: 我刚才说了什么 ChatGPT: 你刚才说了 "你好"。目前发现的问题这只是个简单的测试, 目前还是有很多问题的.token限制问题: 由于是累加的, 聊天达到一定的数量后, 就会超出token限制.API是按照token数量收费的, 由于每次都带之前的聊天, 对token消耗也很大.
2023年03月16日
593 阅读
0 评论
0 点赞
2022-09-08
使用Python的pdf2docx把PDF转为word文件
朋友碰到个PDF转word的需求, 网上一大堆工具都是付费的, 问我有没有办法.我搜了下python的pdf2docx就可以实现, 记录一下要求Python版本>=3.6。通过pip安装库:pip install pdf2docx作为Python库使用from pdf2docx import Converter pdf_file = '/path/to/xxx.pdf' docx_file = 'path/to/xxx.docx' # convert pdf to docx cv = Converter(pdf_file) cv.convert(docx_file) # 默认参数start=0, end=None cv.close() # more samples # cv.convert(docx_file, start=1) # 转换第2页到最后一页 # cv.convert(docx_file, pages=[1,3,5]) # 转换第2,4,6页作为命令行工具调用pdf2docx convert xxx.pdf xxx.docx可以通过--start、--end或者--pages指定页面范围。转换效果可能不是100%完美, 有可能图片会错位, 这时候可以试试微软Office, WPS, 极速Office不同的软件打开试试
2022年09月08日
907 阅读
0 评论
0 点赞
2022-08-11
Python使用Tesseract实现OCR文字识别
自动化办公有些场景会用到OCR文字识别, 不是很复杂的场景Tesseract-OCR就够用, 当然, 比较复杂的比如发票,合同, 表格等识别建议还是购买商用的基于AI训练的OCR识别接口.Tesseract,一款由HP实验室开发由Google维护的开源OCR引擎。它可以读取和识别多种图像格式,包括jpeg, png, gif, bmp, tiff等。它还可以将识别的文本输出为字符串、边界框、数据、方向和脚本等信息安装Tesseract-OCR官方网站:https://github.com/tesseract-ocr/tesseract官方文档:https://github.com/tesseract-ocr/tessdoc下载地址:https://digi.bib.uni-mannheim.de/tesseract/1.下载安装TesseractTesseract-OCR支持很多操作系统,如windows、linux、MacOS等等。注意尽量不要下载带dev,alpha,beta等版本,这些版本不稳定,也可能是测试版本。这里选择5.0版本, 以windows系统为例https://digi.bib.uni-mannheim.de/tesseract/tesseract-ocr-w64-setup-v5.0.0.20211201.exe双击下载的安装包,一路Next。安装过程可以附带选择要安装的语言包,如果你要识别简体中文, 可以选择简体中文, 之后自动会从服务器下载该语言包下来。也可以不选择下载语言包, 下载离线包进行安装。由于在线安装比较慢且不太稳定, 我们这里不勾选在线安装语言包点下一步后会有安装路径的选择,你可以选择要安装C盘D盘等, 但记住这个路径, 之后设置环境变量会用, 我这里选择在默认路径C:\Program Files\Tesseract-OCR点击下一步直至完成安装2.下载语言包进入tesseract的github文档页(https://tesseract-ocr.github.io/tessdoc),找到5.0.0.x目录下的Traineddata Files目录:该目录下有tessdata,tessdata_best,tessdata_fast等5种语言包,其中tessdata是检测速度和准确度居中的语言包,后缀best对应最慢和最准确的语言包,后缀fast对应最快和准确度较差的语言包,这里我们选择tessdata。进入到tessdata语言包的github仓后,可以用git命令拉到本地,或者网页版下载到本地后解压,就可以看到很多以语言简称为文件名、traineddata为后缀的文件,其中eng.traineddata和chi_sim.traineddata一般足够应对中文和英文场景:下载后将该包直接放在程序安装目录就是刚才的C:\Program Files\Tesseract-OCR目录, 放入目录下的的tessdata文件夹里面即可。3.设置环境变量在左下角的windows图标上点击右键, 选择系统, 在设置选择高级系统设置, 选择环境变量按图所属选择编辑, 填入tesseract的安装路径C:\Program Files\Tesseract-OCR, 点确定保存打开cmd命令行, 输入tesseract -v检查查看是否安装成功命令行使用 tesseract --list-langs命令可查看当前软件支持的语言:出现上面信息即代表安装成功了在Python中使用Tesseract安装pytesseract包pip install pytesseract进行图片文字识别from PIL import Image import pytesseract img = Image.open('xxx.jpg') # 识别文字,并指定语言 string = pytesseract.image_to_string(img, lang='chi_sim') print(string)命令行使用下面这个例子解析单个文件test.png(第1个参数),在标准输出(命令行界面)打印解析结果(第2个参数为stdout),用-l参数带chi_sim表示使用简体中文语言:tesseract.exe test.png stdout -l chi_sim也可以将stdout改为其他的字符串(第2个参数改为输出文件名称,不用带txt后缀),这样会将识别的结果写入到以该字符串命名的txt文件中:tesseract.exe test.png result -l chi_sim在当前目录下就会生成一个result.txt的文件,文件内容就是识别出来的文字内容。扩展对于发票合同之类比较复杂的场景, 可以利用OpenCV对发票图像进行预处理滤波、自适应阈值等一系列预处理得到二值图像;然后利用形态学中的开运算提取表格全域线段,进行表格位置提取,并结合表格交点坐标与自定义模板,实现表头与内容自适应适配;最后利用jTessBoxEditor对表格区域内容进行字库训练优化,最终实现基于Tesseract-OCR的字符识别。发票文档由中英文、数字和特殊符号共同组成,Tesseract-OCR引擎自带的字库识别准确率并不高,引入jTessBoxEditor来训练专门针对发票识别的字库。通过修正坐标,将内容与表格边框分隔开,使表头与内容精准匹配,从而实现任意区域下对特定表格进行内容提取,并高效精准识别。如果嫌麻烦又不差钱的, 可以选择使用阿里云发票票据识别、护照识别、名片识别、等复杂场景的接口.
2022年08月11日
602 阅读
0 评论
0 点赞
2022-05-14
Django项目使用Swagger自动生成API文档
简介接口开发完成了,那么接下来需要编写接口文档。传统的接口文档编写都是使用word或者其他一些接口文档管理平台,这种形式接口文档维护更新比较麻烦,每次接口有变动时得手动修改文档。Swagger是一个很好用的管理Api文档的工具,不仅仅Spring系列有自动化生成Swagger Api文档的工具包,Python同样也有(配置非常非常简单)!Django接入Swagger网上很多资料在介绍Django接入Swagger方法时,都是基于django-rest-swagger库进行讲解的,都殊不知,从2019年6月份开始,官方已经废弃了该库,在django 3.0中已经不支持该库了,取而代之的是全新的第三方drf-yasg库。使用1.安装安装drf-yasg库pip install drf-yasg2.编辑设置文件修改项目settings.py文件,添加api和drf_yasg。INSTALLED_APPS = [ ... 'drf_yasg', ]3.路由设置编辑url.py文件# 配置swagger各个参数 from drf_yasg import openapi from drf_yasg.views import get_schema_view schema_view = get_schema_view( openapi.Info( title="XX项目 API", # 名称 default_version="版本 v1.0.0", # 版本 description="XX项目API交互文档由Swagger自动生成", # 项目描述 ), public=True, ) urlpatterns = [ # 这两个url配置是一定要有的,用于生成ui界面,其它url正常定义就好 path('swagger/', schema_view.with_ui('swagger', cache_timeout=0), name='schema-swagger-ui'), path('redoc/', schema_view.with_ui('redoc', cache_timeout=0), name='schema-redoc'), ]4.测试访问重启项目后访问Swagger http://127.0.0.1:8000/swagger/redoc ui http://127.0.0.1:8000/redoc/
2022年05月14日
905 阅读
0 评论
0 点赞
2022-04-02
阿里Canal数据同步工具的使用实现异构数据同步
简介阿里Canal是一个MySQL数据同步工具, 伪装成MySQL的一个Slave(从)节点, 通过订阅MySQL的binlog来实现数据同步。巧用阿里Canal能够MySQL与异构数据Elasticsearch,MongoDB等同步,数据能做到实时同步,团队间解耦,两个团队之间不用多次调用接口。应用场景:数据库镜像数据库实时备份索引构建和实时维护业务cache(缓存)刷新带业务逻辑的增量数据处理使用canal下载https://github.com/alibaba/canal/releasescanal-server(canal-deploy):直接监听MySQL的binlog,把自己伪装成MySQL的从库,只负责接收数据,并不做处理。canal-adapter:相当于canal的客户端,会从canal-server中获取数据,然后对数据进行同步,可以同步到MySQL、Elasticsearch和HBase等存储中去。canal-admin:为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。1.安装MySQLfirewall‐cmd ‐‐zone=public ‐‐add‐port=3306/tcp ‐‐permanent firewall‐cmd ‐‐reloadcd /home/ wget ‐‐no‐check‐certificate https://manongbiji.oss‐cn‐ beijing.aliyuncs.com/ittailkshow/canal/download/world.sql wget ‐‐no‐check‐certificate https://repo.mysql.com/mysql80‐community‐releas e‐el7‐5.noarch.rpm yum localinstall ‐y mysql80‐community‐release‐el7‐5.noarch.rpm#自动安装MySQL 8.0.28 yum install ‐y mysql‐community‐server 调整配置文件sudo cat >> /etc/my.cnf <<‐'EOF' server‐id=1 log‐bin=mysql‐bin binlog_format=row binlog‐do‐db=world EOF systemctl start mysqld获取初始密码grep 'temporary password' /var/log/mysqld.log 2022-03-31T04:20:25.133810Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: Jby&XTOc.7iN mysql ‐uroot ‐pJby&XTOc.7iN #修改root密码 ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'asA S123456!'; #MySQL降低密码强度 set global validate_password.policy=0; set global validate_password.length=4; #创建canal同步账户 CREATE USER canal@'%' IDENTIFIED with mysql_native_password BY 'canal'; #授权canal用户允许远程到mysql实现主从复制 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; CREATE USER remote@'%' IDENTIFIED with mysql_native_password BY 'remote'; grant all privileges on *.* to remote@'%'; #初始化数据库 source /home/world.sql2. 安装Canal-Server安装JDKyum ‐y install java‐1.8.0‐openjdk‐devel.x86_64 sudo cat >> /etc/profile <<‐'EOF' export JAVA_HOME=/usr/lib/jvm/java‐1.8.0‐openjdk export JRE_HOME=$JAVA_HOME/jre export CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH EOF source /etc/profile echo $JAVA_HOME 下载canal-deployer最新版# 下载脚本 wget ‐‐no‐check‐certificate https://manongbiji.oss‐cn‐ beijing.aliyuncs.com/ittailkshow/canal/download/canal.deployer‐ 1.1.5.tar.gz mkdir /home/canal tar zxvf canal.deployer‐1.1.5.tar.gz ‐C /home/canal vi conf/example/instance.properties #调整serverId canal.instance.mysql.slaveId=10 # master地址 canal.instance.master.address=192.168.31.230:3306 ... # 关闭tsdb canal.instance.tsdb.enable=false #确认canal同步用的用户名、密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal #启动服务 sh bin/startup.sh #canal admin 端口 firewall‐cmd ‐‐zone=public ‐‐add‐port=11110/tcp ‐‐permanent #canal 监听端口 firewall‐cmd ‐‐zone=public ‐‐add‐port=11111/tcp ‐‐permanent #canal 指标监控端口 firewall‐cmd ‐‐zone=public ‐‐add‐port=11112/tcp ‐‐permanent firewall‐cmd ‐‐reload tail canal.log 2022‐03‐31 13:46:19.413 [main] INFO com.alibaba.otter.canal.deployer.CanalL auncher ‐ ## set default uncaught exception handler 2022‐03‐31 13:46:19.503 [main] INFO com.alibaba.otter.canal.deployer.CanalL auncher ‐ ## load canal configurations 2022‐03‐31 13:46:19.524 [main] INFO com.alibaba.otter.canal.deployer.CanalS tarter ‐ ## start the canal server. 2022‐03‐31 13:46:19.627 [main] INFO com.alibaba.otter.canal.deployer.CanalC ontroller ‐ ## start the canal server[172.17.0.1(172.17.0.1):11111] 2022‐03‐31 13:46:21.250 [main] INFO com.alibaba.otter.canal.deployer.CanalS tarter ‐ ## the canal server is running now ......开发数据监听程序 pom.xml<dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.5</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>1.1.5</version> </dependency> </dependencies>编写数据监听程序package com.itlaoqi; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.List; public class AD { public static void main(String[] args) throws InterruptedException, Invalid ProtocolBufferException { //TODO 获取连接 CanalConnector canalConnector = CanalConnectors.newSingleConnector(new Inet SocketAddress("192.168.31.231", 11111), "example", "", ""); while (true) { //TODO 连接 canalConnector.connect(); //TODO 订阅数据库 canalConnector.subscribe("world.*"); //TODO 获取数据 Message message = canalConnector.get(100); //TODO 获取Entry集合 List<CanalEntry.Entry> entries = message.getEntries(); //TODO 判断集合是否为空,如果为空,则等待一会继续拉取数据 if (entries.size() <= 0) { System.out.println("当次抓取没有数据,休息一会。。。。。。"); Thread.sleep(1000); } else { //TODO 遍历entries,单条解析 for (CanalEntry.Entry entry : entries) { //1.获取表名 String tableName = entry.getHeader().getTableName(); //2.获取类型 CanalEntry.EntryType entryType = entry.getEntryType(); //3.获取序列化后的数据 ByteString storeValue = entry.getStoreValue(); //4.判断当前entryType类型是否为ROWDATA if (CanalEntry.EntryType.ROWDATA.equals(entryType)) { //5.反序列化数据 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); //6.获取当前事件的操作类型 CanalEntry.EventType eventType = rowChange.getEventType(); //7.获取数据集 List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList(); //8.遍历rowDataList,并打印数据集 for (CanalEntry.RowData rowData : rowDataList) { JSONObject beforeData = new JSONObject(); List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { beforeData.put(column.getName(), column.getValue()); } JSONObject afterData = new JSONObject(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { afterData.put(column.getName(), column.getValue()); } //数据打印 System.out.println("Table:" + tableName + ",EventType:" + eventType + ",Before:" + beforeData + ",After:" + afterData); } } else { System.out.println("当前操作类型为:" + entryType); } } } } } }资料参考https://www.bilibili.com/video/BV1zy4y1L7HFhttps://www.bilibili.com/video/BV1jY41177Qh
2022年04月02日
938 阅读
0 评论
1 点赞
2022-02-04
FastApi快速入门 05: 中间键、子应用
7.中间件"中间件"是一个函数,它在每个请求被特定的路径操作处理之前,以及在每个响应返回之前工作。要创建中间件你可以在函数的顶部使用装饰器 @app.middleware("http").中间件参数接收如下参数: request 一个函数 call_next 它将接收 request 作为参数 这个函数将 request 传递给相应的 路径操作 然后它将返回由相应的路径操作生成的 response 然后你可以在返回 response 前进一步修改它 import time from fastapi import FastAPI, Request app = FastAPI() @app.middleware("http") async def add_process_time_header(request: Request, call_next): start_time = time.time() response = await call_next(request) process_time = time.time() - start_time response.headers["X-Process-Time"] = str(process_time) return response 8.Background Tasksbackground tasks 就是在返回响应之后立即运行的任务。from fastapi import BackgroundTasks, FastAPI app = FastAPI() def write_notification(email: str, message=""): with open("log.txt", mode="w") as email_file: content = f"notification for {email}: {message}" email_file.write(content) @app.post("/send-notification/{email}") async def send_notification(email: str, background_tasks: BackgroundTasks): background_tasks.add_task(write_notification, email, message="some notification") return {"message": "Notification sent in the background"} 9.子应用如果你有2个独立的FastAPI的应用,你可以设置一个为主应用,另外一个为子应用:from fastapi import FastAPI app = FastAPI() @app.get("/app") def read_main(): return {"message": "Hello World from main app"} subapi = FastAPI() @subapi.get("/sub") def read_sub(): return {"message": "Hello World from sub API"} app.mount("/subapi", subapi)
2022年02月04日
1,812 阅读
0 评论
2 点赞
2022-02-04
FastApi快速入门 04: OAuth2
6.安全在许多框架和系统中,仅处理安全性和身份认证就会花费大量的精力和代码(在许多情况下,可能占编写的所有代码的 50% 或更多)。FastAPI 提供了多种工具,可帮助你以标准的方式轻松、快速地处理安全性,而无需研究和学习所有的安全规范。JWT 表示 「JSON Web Tokens」。它是一个将 JSON 对象编码为密集且没有空格的长字符串的标准。字符串看起来像这样:eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c 它没有被加密,因此任何人都可以从字符串内容中还原数据。但它经过了签名。因此,当你收到一个由你发出的令牌时,可以校验令牌是否真的由你发出。通过这种方式,你可以创建一个有效期为 1 周的令牌。然后当用户第二天使用令牌重新访问时,你知道该用户仍然处于登入状态。一周后令牌将会过期,用户将不会通过认证,必须再次登录才能获得一个新令牌。而且如果用户(或第三方)试图修改令牌以篡改过期时间,你将因为签名不匹配而能够发觉。OAuth2OAuth2是一个规范,它定义了几种处理身份认证和授权的方法。它是一个相当广泛的规范,涵盖了一些复杂的使用场景。它包括了使用「第三方」进行身份认证的方法。这就是所有带有「使用 Facebook,Google,Twitter,GitHub 登录」的系统背后所使用的机制。下面演示了如何使用OAuth2 和 JWT进行用户验证。from datetime import datetime, timedelta from typing import Optional from fastapi import Depends, FastAPI, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import JWTError, jwt from passlib.context import CryptContext from pydantic import BaseModel # to get a string like this run: # openssl rand -hex 32 SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, } } class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): username: Optional[str] = None class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None class UserInDB(User): hashed_password: str pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") app = FastAPI() def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return user def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if user is None: raise credentials_exception return user async def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user @app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} @app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)): return current_user @app.get("/users/me/items/") async def read_own_items(current_user: User = Depends(get_current_active_user)): return [{"item_id": "Foo", "owner": current_user.username}] OAuth2PasswordBearer:访问tokenUrl地址,获取token并返回 OAuth2PasswordRequestForm是一个类依赖项,声明了如下的请求表单: username password 一个可选的 scope 字段,是一个由空格分隔的字符串组成的大字符串 一个可选的 grant_type 一个可选的 client_id 一个可选的 client_secret
2022年02月04日
716 阅读
0 评论
0 点赞
2022-02-04
FastApi快速入门 03: 路由管理
4.路由管理4.1 FastApi 路由管理fastapi路由管理,和GIN的框架思想一致。 类似于flask的蓝图 入口函数----主路由---控制器---服务4.1.1 main入口函数需要include api_routerfrom router.api import api_router app = FastAPI() app.include_router(api_router, prefix="/api") if __name__ == "__main__": uvicorn.run(app, host="127.0.0.1", port=8081) 4.1.2 路由文件必须导入实际的控制文件,每个控制文件设置一个routerfrom fastapi import APIRouter from control import user api_router = APIRouter() api_router.include_router(user.router, prefix="/user") 4.1.3 实际的控制文件,设置好子路由from typing import List from fastapi import Depends, FastAPI, HTTPException, APIRouter from pydantic import BaseModel from services import user_services from models import user_models router = APIRouter() class UserSchemas(BaseModel): username: str fullname: str password: str @router.get("/{username}") def get_user(username: str): print(username) user = user_services.get_user_by_username(username) return user 4.2 跨域设置from fastapi.middleware.cors import CORSMiddleware app = FastAPI() origins = [ "http://localhost", "http://localhost:8090", ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) 5.依赖注入FastAPI 提供了简单易用,但功能强大的依赖注入系统,可以让开发人员轻松地把组件集成至FastAPI。什么是「依赖注入」?依赖注入是一种消除类之间依赖关系的设计模式。把有依赖关系的类放到容器中,解析出这些类的实例,就是依赖注入。目的是实现类的解耦。示例:from typing import Optional from fastapi import Depends, FastAPI app = FastAPI() async def common_parameters(q: Optional[str] = None, skip: int = 0, limit: int = 100): return {"q": q, "skip": skip, "limit": limit} @app.get("/items/") async def read_items(commons: dict = Depends(common_parameters)): return commons @app.get("/users/") async def read_users(commons: dict = Depends(common_parameters)): return commons 本例中的依赖项预期接收如下参数: 类型为 str 的可选查询参数 q 类型为 int 的可选查询参数 skip,默认值是 0 类型为 int 的可选查询参数 limit,默认值是 100 然后,依赖项函数返回包含这些值的 dict。使用Class作为依赖:from typing import Optional from fastapi import Depends, FastAPI app = FastAPI() fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}] class CommonQueryParams: def __init__(self, q: Optional[str] = None, skip: int = 0, limit: int = 100): self.q = q self.skip = skip self.limit = limit @app.get("/items/") async def read_items(commons: CommonQueryParams = Depends(CommonQueryParams)): response = {} if commons.q: response.update({"q": commons.q}) items = fake_items_db[commons.skip : commons.skip + commons.limit] response.update({"items": items}) return response 使用嵌套子依赖:from typing import Optional from fastapi import Cookie, Depends, FastAPI app = FastAPI() def query_extractor(q: Optional[str] = None): return q def query_or_cookie_extractor( q: str = Depends(query_extractor), last_query: Optional[str] = Cookie(None) ): if not q: return last_query return q @app.get("/items/") async def read_query(query_or_default: str = Depends(query_or_cookie_extractor)): return {"q_or_cookie": query_or_default} 在路径中使用依赖:from fastapi import Depends, FastAPI, Header, HTTPException app = FastAPI() async def verify_token(x_token: str = Header(...)): if x_token != "fake-super-secret-token": raise HTTPException(status_code=400, detail="X-Token header invalid") async def verify_key(x_key: str = Header(...)): if x_key != "fake-super-secret-key": raise HTTPException(status_code=400, detail="X-Key header invalid") return x_key @app.get("/items/", dependencies=[Depends(verify_token), Depends(verify_key)]) async def read_items(): return [{"item": "Foo"}, {"item": "Bar"}] 全局依赖项,可以为所有路径操作应用该依赖项:from fastapi import Depends, FastAPI, Header, HTTPException async def verify_token(x_token: str = Header(...)): if x_token != "fake-super-secret-token": raise HTTPException(status_code=400, detail="X-Token header invalid") async def verify_key(x_key: str = Header(...)): if x_key != "fake-super-secret-key": raise HTTPException(status_code=400, detail="X-Key header invalid") return x_key app = FastAPI(dependencies=[Depends(verify_token), Depends(verify_key)]) @app.get("/items/") async def read_items(): return [{"item": "Portal Gun"}, {"item": "Plumbus"}] @app.get("/users/") async def read_users(): return [{"username": "Rick"}, {"username": "Morty"}]
2022年02月04日
2,814 阅读
0 评论
0 点赞
2022-02-03
FastApi快速入门 02 : 使用SQLAlchemy orm框架
3.使用数据库示例3.1 使用SQLAlchemy orm框架官方代码里面使用的是sqlalchemy,异步也是使用的这个3.1.1 准备工作安装 SQLAlchemypip install sqlalchemy 创建数据库 fastapi_dbcreate database fastapi_db; 3.1.2 配置数据库连接创建文件database.pyfrom sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base # 数据库连接配置 SQLALCHEMY_DATABASE_URI = ( "mysql+pymysql://root:123456@localhost/fastapi_db?charset=utf8mb4" # 用户:密码@服务器/数据库?参数 ) # 创建数据库引擎 engine = create_engine(SQLALCHEMY_DATABASE_URI) # 创建数据库会话 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # 声明基类 Base = declarative_base() 3.1.3 创建数据模型创建文件models.pyfrom sqlalchemy import Boolean, Column, ForeignKey, Integer, String from sqlalchemy.orm import relationship from .database import Base # 定义 User 类 class User(Base): __tablename__ = 'users' # 定义表名 id = Column(Integer, primary_key=True, index=True) email = Column(String(255), unique=True, index=True) hashed_password = Column(String(255)) is_active = Column(Boolean, default=True) items = relationship("Item", back_populates="owner") # 关联 Item 表 # 定义 Item 类 class Item(Base): __tablename__ = "items" id = Column(Integer, primary_key=True, index=True) title = Column(String(255), index=True) description = Column(String(255), index=True) owner_id = Column(Integer, ForeignKey('users.id')) owner = relationship("User", back_populates="items") # 关联 User 表 3.1.4 创建 Pydantic 模型创建文件schemas.pyfrom typing import List from pydantic import BaseModel class ItemBase(BaseModel): title: str description: str = None class ItemCreate(ItemBase): pass class Item(ItemBase): id: int owner_id: int class Config: orm_mode = True class UserBase(BaseModel): email: str class UserCreate(UserBase): password: str class User(UserBase): id: int is_active: bool items: List[Item] = [] class Config: orm_mode = True 复制3.1.5 crud 工具创建文件crud.pyfrom sqlalchemy.orm import Session from . import models, schemas def get_user(db: Session, user_id: int): """ 根据id获取用户信息 :param db: 数据库会话 :param user_id: 用户id :return: 用户信息 """ return db.query(models.User).filter(models.User.id == user_id).first() def get_user_by_email(db: Session, email: str): """ 根据email获取用户信息 :param db: 数据库会话 :param email: 用户email :return: 用户信息 """ return db.query(models.User).filter(models.User.email == email).first() def get_users(db: Session, skip: int = 0, limit: int = 100): """ 获取特定数量的用户 :param db: 数据库会话 :param skip: 开始位置 :param limit: 限制数量 :return: 用户信息列表 """ return db.query(models.User).offset(skip).limit(limit).all() def create_user(db: Session, user: schemas.UserCreate): """ 创建用户 :param db: 数据库会话 :param user: 用户模型 :return: 根据email和password登录的用户信息 """ fake_hashed_password = user.password + "notreallyhashed" db_user = models.User(email=user.email, hashed_password=fake_hashed_password) db.add(db_user) # 添加到会话 db.commit() # 提交到数据库 db.refresh(db_user) # 刷新数据库 return db_user def get_items(db: Session, skip: int = 0, limit: int = 100): """ 获取指定数量的item :param db: 数据库会话 :param skip: 开始位置 :param limit: 限制数量 :return: item列表 """ return db.query(models.Item).offset(skip).limit(limit).all() def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int): """ 创建用户item :param db: 数据库会话 :param item: Item对象 :param user_id: 用户id :return: Item模型对象 """ db_item = models.Item(**item.dict(), owner_id=user_id) db.add(db_item) db.commit() db.refresh(db_item) return db_item 3.1.6 入口main函数修改from typing import List from fastapi import Depends, FastAPI, HTTPException from sqlalchemy.orm import Session from . import crud, models, schemas from .database import SessionLocal, engine models.Base.metadata.create_all(bind=engine) app = FastAPI() # 依赖 def get_db(): try: db = SessionLocal() yield db finally: db.close() @app.post("/users/", response_model=schemas.User) def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)): # 根据email查找用户 db_user = crud.get_user_by_email(db, email=user.email) # 如果用户存在,提示该邮箱已经被注册 if db_user: raise HTTPException(status_code=400, detail="Email already registered") # 返回创建的user对象 return crud.create_user(db=db, user=user) @app.get("/users/", response_model=List[schemas.User]) def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): # 读取指定数量用户 users = crud.get_users(db, skip=skip, limit=limit) return users @app.get("/users/{user_id}", response_model=schemas.User) def read_user(user_id: int, db: Session = Depends(get_db)): # 获取当前id的用户信息 db_user = crud.get_user(db, user_id=user_id) # 如果没有信息,提示用户不存在 if db_user is None: raise HTTPException(status_code=404, detail="User not found") return db_user @app.post("/users/{user_id}/items/", response_model=schemas.Item) def create_item_for_user( user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db) ): # 创建该用户的items return crud.create_user_item(db=db, item=item, user_id=user_id) @app.get("/items/", response_model=List[schemas.Item]) def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): # 获取所有items items = crud.get_items(db, skip=skip, limit=limit) return items (pt19) D:\web_python_dev>uvicorn fastapi_mysql.main:app --reload INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit) INFO: Started reloader process [6988] using watchgod INFO: Started server process [2112] INFO: Waiting for application startup. INFO: Application startup complete. mysql> use fastapi_db Database changed mysql> show tables; +----------------------+ | Tables_in_fastapi_db | +----------------------+ | items | | users | +----------------------+ 2 rows in set (0.00 sec) 3.2 其他orm框架除了sqlalchemy, 还有fastapi-async-sqlalchemy, tortoise-orm等其他框架, 网上可以搜搜
2022年02月03日
4,477 阅读
0 评论
3 点赞
2022-01-18
FastApi快速入门 01: 基础使用
FastApi Web后端开发框架 快速入门01 简介FastAPI 是一个用于构建 API 的现代、快速(高性能)的 web 框架,使用基于类型提示的 Python 3.6 及更高版本。关键特性: 快速:可与 NodeJS 和 Go 比肩的极高性能(归功于 Starlette 和 Pydantic)。最快的 Python web 框架之一。 高效编码:提高功能开发速度约 200% 至 300%。 更少bug:减少约 40% 的人为(开发者)导致错误。 智能:极佳的编辑器支持。处处皆可自动补全,减少调试时间。 简单:设计的易于使用和学习,减少阅读文档时间。 简短:减少代码重复。通过不同的参数声明实现丰富功能。bug 更少。 健壮:生产可用级别的代码。以及自动生成的交互式文档。 标准化:基于 API 的相关开放标准并完全兼容:OpenAPI (以前被称为 Swagger) 和 JSON Schema。官方文档:https://fastapi.tiangolo.com 源码地址:https://github.com/tiangolo/fastapi 安装 直接使用pip命令进行安装:pip install fastapi 需要一个ASGI 服务器,生产环境可以使用 Uvicorn 或者 Hypercornpip install uvicorn 可选的其他组件# 从表单获取参数 pip install python-multipart # 资源文件管理 pip install aiofiles # jinja2模板引擎 pip install jinja2 # orm操作数据库 pip install tortoise-orm #异步orm框架 pip install sqlalchemy # 有名的ORM框架 pip install pymysql # MySQL驱动 pip install fastapi-async-sqlalchemy # 异步sqlalchemy 1.hello world 1.1 创建文件main.pyfrom fastapi import FastAPI import uvicorn app = FastAPI() @app.get("/") async def index(): return {"msg":"hello world"} if __name__ == '__main__': uvicorn.run(app,host="127.0.0.1",port=8000) 1.2 启动项目如果是用虚拟环境先进入虚拟环境activate # 进入虚拟环境 deactivate # 退出虚拟环境 运行脚本uvicorn main:app # 普通启动 uvicorn main:app --reload # 自动重载方式启动 uvicorn main:app --port 8000 --reload # 指定端口启动 1.3 接口文档fastapi会自动生成接口文档, 启动后可以查看接口文档http://127.0.0.1:8000/docs 交互式 API 文档http://127.0.0.1:8000/redoc 2.常规请求示例2.1 单个POST请求@app.post("/login") def login(): return {"msg":"login success"} 2.2 同时支持支持post, get和put请求@app.api_route("/list", methods=["GET","POST","PUT"]) def list(): return {"list":"data"} 2.3 从URL获取参数@app.get("/page/{n}") def page(n): """从URL获取参数""" return n 2.4 从请求头获取参数from fastapi import FastAPI,Header @app.get("/user") def user(id,token=Header(None)): '''从请求头获取参数''' return {"id":id,"token":token} 2.5 从请求体获取参数from fastapi import FastAPI,Header,Body @app.get("/getdata") def getdata(data=Body(None)): """从请求体获取参数""" return {"data":data} 2.6 从请求表单获取参数安装处理包pip install python-multipart from fastapi import FastAPI,Header,Body,Form @app.post("/login2") def login2(username=Form(None),passwd=Form(None)): """从请求表单获取参数""" return {"data":{"username":username,"passwd":passwd}} 2.7 定制响应头from fastapi.responses import JSONResponse @app.get("/user") def user2(): '''定制响应头''' return JSONResponse(content={"msg":"get user"}, status_code=202, headers={"a":"b"}) 2.8 返回HTML页面from fastapi.responses import JSONResponse,HTMLResponse @app.get("/html") def html(): '''返回HTML页面''' html_content = ''' <html> <body> <p> html page </p> </body> </html> ''' return HTMLResponse(content=html_content) 2.9 返回资源文件安装附加包pip install aiofiles from fastapi.responses import JSONResponse,HTMLResponse,FileResponse @app.get("/files") def files(): '''返回资源文件''' files = 'static/mage.jpg' return FileResponse(files,filename="mage.jpg") 2.10 使用模板使用jinja2模板引擎pip install jinja2 from fastapi import FastAPI,Request from fastapi.templating import Jinja2Templates template = Jinja2Templates("pages") @app.get("/app1") def app1(req:Request): """返回html页面""" return template.TemplateResponse("index.html",context={"request":req}) 模板语法略
2022年01月18日
1,150 阅读
0 评论
0 点赞
2021-12-05
Python 多线程之 Redis 分布式锁
前言在很多互联网产品应用中,有些场景需要加锁处理,例如:双11秒杀,全局递增ID,楼层生成等等。大部分的解决方案是基于 DB 实现的,Redis 为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对 Redis 的连接并不存在竞争关系。其次 Redis 提供一些命令SETNX,GETSET,可以方便实现分布式锁机制。Python代码实现import time import redis import threading #使用连接池方式连接redis redis_pool=redis.ConnectionPool(host="127.0.0.1",port=6379) redis_conn=redis.Redis(connection_pool=redis_pool) #定义redis类 class RedisLock(): def __init__(self): self.redis_conn = redis_conn print("init the redis connection") #获取锁 def get_lock(self,name,value): while True: # set(name, value, ex=None, px=None, nx=False, xx=False) # nx - 如果设置为True,则只有name不存在时,当前set操作才执行 # ex - 过期时间(秒) result=self.redis_conn.set(name,value,nx=True,ex=3) # print(result) if(result): # 获取到result后就终止while循环 break # time.sleep(0.5) #释放锁 def release_lock(self,name,value): #获取原name key对应的value old_value = redis_conn.get(name) print("--------------------------------the key =%s ;the name=%s"%(name,old_value)) #判断原value 与 要释放的值是否相同 if(old_value == value): #相同就从redis里面释放 self.redis_conn.delete(name) print("release the lock is success") def redis_lock_test(lock,name,value): try: print("% --start to work"%name) print("% --ready get the lock and execute lock operation"%name) lock.get_lock(name,value)#这里是获取锁操作 print("% --get the lock and continue to operation"%name) except Exception as e: print("the exception is:%s"%str(e)) finally: print("% --ready release the lock"%name) lock.release_lock(name,value)#最终必须释放锁操作 print("% --release the lock is over"%name) if __name__ == '__main__': start_time=time.time() rs=RedisLock() tasks=[] for i in range(1,3): # 创建线程 t = threading.Thread(target=redis_lock_test(rs,"task-name%"%i,"lock%d"%i)) # 将创建的线程放入列表 tasks.append(t) # 启动线程 t.start() #这里没有设置守护线程且没有设置join函数的timeout参数时,主线程将会一直等待,直到子线程全部结束,主线程才结束,程序退出 [t.join() for t in tasks] print("total waster time is:",time.time()-start_time)注意:thread.setDaemon(True) 当设置守护线程 join 函数的参数 timeout=2 时,主线程将会等待多个子线程 timeout 的累加和这样的一段时间,时间一到,主线程结束,杀死未执行完的子线程,程序退出。当没有设置守护线程且 join 函数的参数 timeout=2 时,主线程将会等待多个子线程 timeout 的累加和这样的一段时间,时间一到主线程结束,但是并没有杀死子线程,子线程依然可以继续执行,直到子线程全部结束,程序退出。参考 https://blog.csdn.net/weixin_41754309/article/details/121419465
2021年12月05日
560 阅读
0 评论
0 点赞
2021-06-21
Django项目使用Elasticsearch全文搜索引擎实现搜索优化
Django项目使用Elasticsearch全文搜索引擎实现搜索优化 简介由于项目需要用到模糊搜索, Mysql模糊查询正常情况下在数据量小的时候,速度还是可以的,数据表数据量大的时候就会查询时效率低下, 对此, 使用全文搜索引擎就是一个很好的解决方法.Elasticsearch 是一个分布式、高扩展、高实时的搜索与数据分析引擎。它能很方便的使大量数据具有搜索、分析和探索的能力。在 Django 项目中使用Elasticsearch有很多现成的库, 比如elasticsearch,django-elasticsearch-dsl, django-haystack , drf_haystack 等。Haystack 是在 Django 中对接搜索引擎的框架,搭建了用户和搜索引擎之间的沟通桥梁。 我们在 Django 中可以通过使用 Haystack 来调用 Elasticsearch 搜索引擎。 Haystack 可以在不修改代码的情况下使用不同的搜索后端(比如 Elasticsearch、Whoosh、Solr等等)。 需要注意的是Haystack只支持es6以下的版本准备 下载elasticsearch 2.4.6版本, 建议使用docker方式安装, 注意版本号下载kibana 4.6.4版本,用于管理下载对应版本的中文分词插件elasticsearch-analysis-ik-1.10.6.zip, 在es安装好下载的插件下载地址https://www.elastic.co/cn/elasticsearch/https://www.elastic.co/cn/downloads/kibanahttps://github.com/medcl/elasticsearch-analysis-ik/releases?page=13 docker镜像市场: http://hub.daocloud.io/Docker安装在服务器上新建配置文件docker-compose.ymlversion: "2" services: elasticsearch: image: daocloud.io/library/elasticsearch:2.6.4 restart: always container_name: elasticserach ports: - "9200:9200" kibana: image: daocloud.io/library/kibana:4.6.4 restart: always container_name: kibana ports: - "5601:5601" environment: - elasticsearch_url=http://127.0.0.1:9200 depends_on: - elasticsearch 输入命令安装docker-compose up -d 安装分词插件docker ps # 查看当前运行进程 docker exec -it es镜像id bash cd bin ./elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v2.6.4/elasticsearch-analysis-ik-1.10.6.zip 重启esdocker restart es镜像id 安装好Python需要的库pip install django-haystack pip install elasticsearch==2.4.1 # 注意版本2.4.1 pip install drf_haystack drf_haystack 文档https://drf-haystack.readthedocs.io/en/latest/index.html 示例1.配置再设置里配置添加appINSTALLED_APPS = [ 'haystack', ... ] 配置链接HAYSTACK_CONNECTIONS = { 'default': { 'ENGINE': 'haystack.backends.elasticsearch_backend.ElasticsearchSearchEngine', 'URL': 'http://127.0.0.1:9200/', # 此处为elasticsearch运行的服务器ip地址和端口 'INDEX_NAME': 'gameprop', # 指定elasticserach建立的索引库名称 }, } # 搜索结果每页显示数量 HAYSTACK_SEARCH_RESULTS_PER_PAGE = 5 # 实时更新index HAYSTACK_SIGNAL_PROCESSOR = 'haystack.signals.RealtimeSignalProcessor' 2.创建haystack数据模型在app名/模块名目录下创建search_indexes.py文件,注意文件名必须使用search_indexes.py,代码如下:from haystack import indexes from Headlines.models import Article class ArticleIndex(indexes.SearchIndex, indexes.Indexable): """文章索引模型类""" text = indexes.CharField(document=True, use_template=True) id = indexes.IntegerField(model_attr='id') createtime = indexes.DateTimeField(model_attr='createtime') content = indexes.CharField(model_attr='content') title = indexes.CharField(model_attr='title') def get_model(self): """返回建立索引的模型类""" return Article def index_queryset(self, using=None): """返回要建立索引的数据查询集""" return self.get_model().objects.all() 3.创建 text 字段索引值模板文件创建 text 字段索引值模板文件templates/search/indexes/APP名/模块名_text.txt{{ object.id }} {{ object.title }} {{ object.channel }} {{ object.labels }} {{ object.content }} 4.手动生成初始索引在命令行中添加如下命令, 来手动生成索引表:python manage.py rebuild_index 5.编辑视图文件views.pyfrom drf_haystack.viewsets import HaystackViewSet # 搜索文章 class SearchArticleViewSet(HaystackViewSet): # GET /Headlines/search/?text=<搜索关键字> # 这里可以写多个模型,相应的:serializer里也可以写多个index_classes index_models = [Article] serializer_class = SearchArticleIndexSerializer 6.编辑序列化器serializers.pyfrom drf_haystack import serializers as HSER # 搜索文章 class SearchArticleIndexSerializer(HSER.HaystackSerializer): class Meta: index_classes = [ArticleIndex] # 索引类的名称,可以有多个 fields = ('text', 'content', 'id', 'title', 'createtime') 7.编辑路由urls.py# 搜索文章路由 from rest_framework.routers import DefaultRouter from Headlines.views import SearchArticleViewSet router = DefaultRouter() router.register(r'Headlines/search', SearchArticleViewSet, base_name='search') urlpatterns += router.urls
2021年06月21日
2,007 阅读
2 评论
5 点赞
2021-02-05
为什么MySQL单表不不建议超过2000W?
经常听说单表不能存太多数据, 如果单表数据太多,性能就会下降得比较厉害本就探索精神, 去搜索了下由来.对于MySQL数据库单表最大行数的说法,常见的限制是约为2千万行(或2000万行),但这个数字并不是MySQL官方的硬性限制。实际上,MySQL的表的最大行数和大小受到多个因素的影响,包括硬件资源、操作系统限制、配置设置以及数据类型和索引的使用等。这个2kw是个建议值,我们要来看下这个2kw是怎么来的单表行数限制单表行数理论最大值是多少。我们设计表的时候, 往往需要设置一个主键, 常见的id就是主键。如果主键声明为int大小,也就是32位,那么能支持2^32-1,也就是21个亿左右。如果是bigint,那就是2^64-1,但这个数字太大,一般还没到这个限制之前,磁盘先受不了。如果我把主键声明为 tinyint,一个字节,8位,最大2^8-1,也就是255。如果我想插入一个id=256的数据,那就会报错。也就是说,tinyint主键限制表内最多255条数据。结论: 主键本身唯一,也就是说主键的大小可以限制表的上限。表的索引索引内部是用的B+树,最大行数2kw的由来是基于B+树索引结构和页的大小进行估算的。下面是重新整理的说明:假设使用B+树作为索引结构,并且每个页的大小为15KB。在B+树中,叶子节点存储实际的行数据,每条行数据大小假设为1KB。一个页可以容纳的行数为y=15。我们可以使用公式 (x ^ (z-1)) * y 来计算行总数,其中x为每个非叶子节点的分支因子(即每个非叶子节点的子节点数),z为B+树的层级数。假设B+树是两层,即z=2,我们需要确定分支因子x的值。假设每个页的大小是15KB,由于叶子节点和非叶子节点的数据结构相同,我们假设剩下的15KB可以用于非叶子节点。因此,x可以计算为15KB除以每个非叶子节点大小的估计值。假设每个非叶子节点大小与叶子节点大小相同,即1KB。则x=15KB/1KB=15。已知x=15,y=15,z=2,代入公式 (x ^ (z-1)) y,可以计算出行总数为 (15 ^ (2-1)) 15 = 2万行。因此,这个2万行是指在B+树索引结构下,每个表的建议最大行数。在两层B+树的结构中,通过合理的分支因子和页大小设置,建议的最大行数为2万行。超过这个行数,可能会影响查询性能和磁盘IO次数。参考资料https://juejin.cn/post/7116381265300815903
2021年02月05日
380 阅读
0 评论
0 点赞
2021-01-03
分布式唯一ID生成规则SnowFlake雪花算法Python实现
分布式唯一ID生成规则SnowFlake雪花算法Python实现 简介在复杂分布式系统中,往往需要对大量的数据和消息进行唯一标识。在好多系统随着数据的日渐增长,对数据分库分表后需要有一个唯一ID来标识一条数据或消息,数据库的自增ID显然不能满足需求。此时一个能够生成全局唯一ID的系统是非常必要的。雪花算法和美团Leaf都是用于分布式系统中生成唯一ID的工具 雪花算法: SnowFlake是Twitter公司采用的一种算法,目的是在分布式系统中产生全局唯一且趋势递增的ID。美团Leaf: Leaf是美团基础研发平台推出的一个分布式ID生成服务 当单表数据量急剧上升后,表的查询性能会逐渐下降,会涉及到分库分表操作,如何确保数据均匀分布,可以通过全局唯一的ID。全局唯一的ID生成规则要求: 全局唯一性 有序递增 高可用 时间上的特性(例如互联网订单,从订单号可以看出具体时间的信息) 雪花算法Python实现 1.第一位 占用1bit,其值始终是0,没有实际作用。2.时间戳 占用41bit,精确到毫秒,总共可以容纳约69年的时间。3.工作机器id 占用10bit,其中高位5bit是数据中心ID,低位5bit是工作节点ID,做多可以容纳1024个节点。4.序列号 占用12bit,每个节点每毫秒0开始不断累加,最多可以累加到4095,一共可以产生4096个ID。SnowFlake算法在同一毫秒内最多可以生成多少个全局唯一ID呢:: 同一毫秒的ID数量 = 1024 X 4096 = 4194304 import time class InvalidSystemClock(Exception): """ 时钟回拨异常 """ pass # 64位ID的划分 WORKER_ID_BITS = 5 DATACENTER_ID_BITS = 5 SEQUENCE_BITS = 12 # 最大取值计算 MAX_WORKER_ID = -1 ^ (-1 << WORKER_ID_BITS) # 2**5-1 0b11111 MAX_DATACENTER_ID = -1 ^ (-1 << DATACENTER_ID_BITS) # 移位偏移计算 WOKER_ID_SHIFT = SEQUENCE_BITS DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS # 序号循环掩码 SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS) # 开始时间截 (2015-01-01) TWEPOCH = 1420041600000 class IdWorker(object): """ 用于生成IDs """ def __init__(self, datacenter_id, worker_id, sequence=0): """ 初始化 :param datacenter_id: 数据中心(机器区域)ID :param worker_id: 机器ID :param sequence: 其实序号 """ # sanity check if worker_id > MAX_WORKER_ID or worker_id < 0: raise ValueError('worker_id值越界') if datacenter_id > MAX_DATACENTER_ID or datacenter_id < 0: raise ValueError('datacenter_id值越界') self.worker_id = worker_id self.datacenter_id = datacenter_id self.sequence = sequence self.last_timestamp = -1 # 上次计算的时间戳 def _gen_timestamp(self): """ 生成整数时间戳 :return:int timestamp """ return int(time.time() * 1000) def get_id(self): """ 获取新ID :return: """ timestamp = self._gen_timestamp() # 时钟回拨 if timestamp < self.last_timestamp: raise InvalidSystemClock if timestamp == self.last_timestamp: self.sequence = (self.sequence + 1) & SEQUENCE_MASK if self.sequence == 0: timestamp = self._til_next_millis(self.last_timestamp) else: self.sequence = 0 self.last_timestamp = timestamp new_id = ((timestamp - TWEPOCH) << TIMESTAMP_LEFT_SHIFT) | (self.datacenter_id << DATACENTER_ID_SHIFT) | \ (self.worker_id << WOKER_ID_SHIFT) | self.sequence return new_id def _til_next_millis(self, last_timestamp): """ 等到下一毫秒 """ timestamp = self._gen_timestamp() while timestamp <= last_timestamp: timestamp = self._gen_timestamp() return timestamp if __name__ == '__main__': worker = IdWorker(0, 0) print(worker.get_id())
2021年01月03日
1,152 阅读
0 评论
0 点赞
1
2
...
6