使用 FastStream 进行加密货币分析¤

aststream-gen在本教程中,我们将引导您完成使用Python 库实时检索加密货币价格并计算其移动平均值的过程 。为此,我们将生成以下两个 FastStream 应用程序:

  1. 微服务从外部 Web 服务检索当前的加密货币价格并将检索到的数据发布到 Kafka 主题。
  2. 微服务使用此类消息,计算每种加密货币的移动平均价格并将其发布到另一个 Kafka 主题。

让我们开始吧!

安装¤

要完成本教程,您将需要以下软件和 Python 库:

  1. Python (3.8 及更高版本)
  2. 有效的 OPENAI API 密钥
  3. [可选] github帐户和安装的 git命令

建议为您的 Python 项目使用虚拟环境。虚拟环境是一种常见且有效的 Python 开发技术,通过为不同项目创建隔离的 Python 环境,有助于将不同项目所需的依赖关系分开。

在本教程中,我们将使用 Python 的 venv 模块创建虚拟环境。

首先,为本教程创建一个根目录。导航到所需位置并创建一个名为的新目录 faststream_gen_tutorial并输入它。mkdir faststream_gen_tutorial cd faststream_gen_tutorial

创建并激活新的Python虚拟环境¤

使用venv创建新的虚拟环境 :python3 -m venv venv

接下来,激活新的虚拟环境:source venv/bin/activate

安装软件包¤

如果需要升级 pip 并安装faststream-gen包:pip install --upgrade pip && pip install faststream-gen

运行以下命令检查安装是否成功:faststream_gen --help

您应该在输出中看到该命令的完整选项列表。

现在您已成功设置环境并安装 faststream-gen软件包。

设置 OpenAI API 密钥¤

faststream-gen使用 OpenAI API,您需要在环境变量中导出您的 API 密钥OPENAI_API_KEY。如果您使用 bash 或兼容的 shell,则可以使用以下命令来执行此操作:export OPENAI_API_KEY="sk-your-openai-api-key"

如果您还没有,您可以在此处OPENAI_API_KEY创建一个 。

生成 FastStream 应用程序¤

检索和发布加密货币价格¤

现在,我们将创建一个应用程序,该应用程序从外部 Web 服务检索有关当前加密货币价格的信息并将消息发布到 Kafka 主题。为了实现这一点,我们需要用 简单的英语提供应用程序的高级描述,其中仅包含熟悉 FastStream 框架的知识渊博的 Python 开发人员实现它所需的必要信息。这应包括消息架构、外部 API 和 Web 服务的说明以及选择适当主题和分区键的指南等详细信息。

以下是在该特定情况下使用的此类描述的示例。请注意,我们没有指定实际实现所需的步骤,我们指定了服务应该做什么,但没有指定如何Create a FastStream application which will retrieve the current cryptocurrency price and publish it to new_crypto_price topic. The application should retrieve the data every 2 seconds. A message which will be produced is JSON with the two attributes: - price: non-negative float (current price of cryptocurrency in USD) - crypto_currency: string (the cryptocurrency e.g. BTC, ETH...) The current price of Bitcoin can be retrieved by a simple GET request to: - https://api.coinbase.com/v2/prices/BTC-USD/spot The current price of Ethereum can be retrieved by a simple GET request to: - https://api.coinbase.com/v2/prices/ETH-USD/spot The response of this GET request is a JSON and you can get information about the crypto_currency in: response['data']['base'] and the information about the price in: response['data']['amount'] Use utf-8 encoded crypto_currency attribute as a partition key when publishing the message to new_crypto_price topic.

让我们在目录FastStream中生成一个新项目 retrieve-publish-crypto。首先,复制前面的描述并将其粘贴到名为 description_retrieve_publish.txt.

接下来,运行以下命令(参数-i指定应用描述文件的文件路径,参数-o指定生成的项目文件的保存目录):faststream_gen -i description_retrieve_publish.txt -o retrieve-publish-crypto ✨ Generating a new FastStream application! ✔ Application description validated. ✔ FastStream app skeleton code generated. ✔ The app and the tests are generated. ✔ New FastStream project created. ✔ Integration tests were successfully completed. Tokens used: 36938 Total Cost (USD): $0.11436 ✨ All files were successfully generated!

失败的一代¤

生成过程并不是万无一失的,它可能会失败并显示如下消息:✨ Generating a new FastStream application! ✔ Application description validated. ✔ New FastStream project created. ✔ FastStream app skeleton code generated. ✘ Error: Failed to generate a valid application and test code. ✘ Error: Integration tests failed. Tokens used: 79384 Total Cost (USD): $0.24567 Apologies, we couldn't generate a working application and test code from your application description. Please run the following command to start manual debugging: cd retrieve_without_logs && pytest For in-depth debugging, check the retrieve-publish-crypto/_faststream_gen_logs directory for complete logs, including individual step information.

造成这种情况的原因可能有很多,最常见的是:

  • 您提供的规范不够详细,无法生成应用程序。在这种情况下,您可以尝试添加更详细的说明,然后重试。
  • 该任务对于默认的 GPT-3.5 模型来说太难处理,你可以尝试使用 GPT-4 来代替:

faststream_gen --model gpt-4 -i description_retrieve_publish.txt -o retrieve-publish-crypto

  • 你运气不好,只需要再次执行该命令即可。大型语言模型本质上是随机的,它们总是对相同的问题给出不同的答案。引擎中有重试机制,但有时它们还不够,只需重新运行命令即可解决问题。

如果上述策略均不起作用,请检查已生成的文件并查看潜在的问题可能是什么。您也可以自己完成实施或测试。

成功的一代¤

如果成功,该命令将生成FastStream具有以下结构的项目:retrieve-publish-crypto ├── .github │ └── workflows │ ├── build_docker.yml │ ├── deploy_docs.yml │ └── test.yml ├── .gitignore ├── Dockerfile ├── LICENSE ├── README.md ├── app │ └── application.py ├── pyproject.toml ├── scripts │ ├── build_docker.sh │ ├── lint.sh │ ├── services.yml │ ├── start_kafka_broker_locally.sh │ ├── static-analysis.sh │ ├── stop_kafka_broker_locally.sh │ └── subscribe_to_kafka_broker_locally.sh └── tests ├── __init__.py └── test_application.py

生成的应用程序位于该app/目录中,而测试则位于该tests/目录中。重要的是要记住,这些文件是由 LLM 生成的,并且可能随每一代而变化。

app/application.py:import asyncio import json from datetime import datetime import aiohttp from pydantic import BaseModel, Field, NonNegativeFloat from faststream import ContextRepo, FastStream, Logger from faststream.kafka import KafkaBroker broker = KafkaBroker("localhost:9092") app = FastStream(broker) class CryptoPrice(BaseModel): price: NonNegativeFloat = Field( ..., examples=[50000.0], description="Current price of cryptocurrency in USD" ) crypto_currency: str = Field( ..., examples=["BTC"], description="The cryptocurrency" ) publisher = broker.publisher("new_crypto_price") async def fetch_crypto_price( url: str, crypto_currency: str, logger: Logger, context: ContextRepo, time_interval: int = 2 ) -> None: # Always use context: ContextRepo for storing app_is_running variable while context.get("app_is_running"): async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: data = await response.json() price = data["data"]["amount"] new_crypto_price = CryptoPrice( price=price, crypto_currency=crypto_currency ) await publisher.publish( new_crypto_price, key=crypto_currency.encode("utf-8"), ) else: logger.warning( f"Failed API request {url} at time {datetime.now()}" ) await asyncio.sleep(time_interval) @app.on_startup async def app_setup(context: ContextRepo): context.set_global("app_is_running", True) @app.on_shutdown async def shutdown(context: ContextRepo): context.set_global("app_is_running", False) # Get all the running tasks and wait them to finish fetch_tasks = context.get("fetch_tasks") await asyncio.gather(*fetch_tasks) @app.after_startup async def publish_crypto_price(logger: Logger, context: ContextRepo): logger.info("Starting publishing:") cryptocurrencies = [("Bitcoin", "BTC"), ("Ethereum", "ETH")] fetch_tasks = [ asyncio.create_task( fetch_crypto_price( f"https://api.coinbase.com/v2/prices/{crypto_currency}-USD/spot", crypto_currency, logger, context, ) ) for _, crypto_currency in cryptocurrencies ] # you need to save asyncio tasks so you can wait them to finish at app shutdown (the function with @app.on_shutdown function) context.set_global("fetch_tasks", fetch_tasks)

tests/test_application.py:import pytest from faststream import Context, TestApp from faststream.kafka import TestKafkaBroker from app.application import CryptoPrice, app, broker @broker.subscriber("new_crypto_price") async def on_new_crypto_price( msg: CryptoPrice, key: bytes = Context("message.raw_message.key") ): pass @pytest.mark.asyncio async def test_fetch_crypto_price(): async with TestKafkaBroker(broker): async with TestApp(app): await on_new_crypto_price.wait_call(2) on_new_crypto_price.mock.assert_called()

创建新的Python虚拟环境¤

运行新生成的 FastStream 项目所需的所有依赖项都位于该pyproject.toml文件中。

retrieve-publish-crypto使用 venv创建并激活目录内的新虚拟环境 :cd retrieve-publish-crypto python3 -m venv venv source venv/bin/activate

如果需要,升级 pip 并安装开发依赖项:pip install --upgrade pip && pip install -e .[dev]

测试应用程序¤

为了验证应用程序的功能正确性,建议通过运行命令来执行生成的单元和集成测试pytestpytest ============================= test session starts ============================== platform linux -- Python 3.10.8, pytest-7.4.2, pluggy-1.3.0 rootdir: /workspaces/faststream-gen/docs_src/tutorial/retrieve-publish-crypto configfile: pyproject.toml plugins: anyio-3.7.1, asyncio-0.21.1 asyncio: mode=strict collected 1 item tests/test_application.py . [100%] ============================== 1 passed in 2.98s ===============================

预览 AsyncAPI 文档¤

要预览应用程序的 AsyncAPI 文档,请执行以下命令:faststream docs serve app.application:app INFO: Started server process [3575270] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)

现在,您可以通过在浏览器中打开localhost:8000来访问 AsyncAPI 文档 。

使用 FastStream 进行加密货币分析¤

启动 localhost Kafka 代理¤

FastStream在本地运行应用程序,请确保您有一个正在运行的 Kafka 代理。您可以通过执行 shell 脚本来启动 Kafka Docker 容器start_kafka_broker_locally.sh./scripts/start_kafka_broker_locally.sh [+] Running 2/2 ⠿ Network scripts_default Created 0.1s ⠿ Container bitnami_kafka Started

启动应用程序¤

要启动应用程序,请执行以下命令:faststream run app.application:app 2023-09-15 13:41:21,948 INFO - FastStream app starting... 2023-09-15 13:41:22,144 INFO - | - Starting publishing: 2023-09-15 13:41:22,144 INFO - FastStream app started successfully! To exit press CTRL+C Topic new_crypto_price not found in cluster metadata

确保应用程序保持运行,后续步骤需要它。

计算移动平均线¤

new_crypto_price让我们开发一个应用程序,计算从主题接收到的每种加密货币的最近三条消息的平均价格。随后,我们会将计算出的平均价格发布到price_mean主题中。

以下是所需应用程序的完整描述:Create a FastStream application for consuming messages from the new_crypto_price topic. This topic needs to use a partition key. new_crypto_price messages use JSON with two attributes (create class CryptoPrice with these attributes): - price: non-negative float (it represents the current price of the crypto) - crypto_currency: string (it represents the cryptocurrency e.g. BTC, ETH...) The application should save each message to a dictionary (global variable) - partition key should be used as a dictionary key and value should be a List of prices. Keep only the last 100 messages in the dictionary. If there are fewer than 3 messages for a given partition key, do not publish any messages. Otherwise, Calculate the price mean of the last 3 messages for the given partition key. Publish the price mean to the price_mean topic and use the same partition key that the new_crypto_price topic is using.

请打开一个新终端并导航到本教程的根目录,该目录名为faststream_gen_tutorial. 进入faststream_gen_tutorial文件夹后,请激活虚拟环境。cd path_to/faststream_gen_tutorial source venv/bin/activate

faststream要在目录内创建应用程序calculate-mean-app ,请首先复制前面的描述并将其粘贴到 description_calculate_mean.txt文件中。

接下来,运行以下命令:faststream_gen -i description_calculate_mean.txt -o calculate-mean-app ✨ Generating a new FastStream application! ✔ Application description validated. ✔ FastStream app skeleton code generated. ✔ The app and the tests are generated. ✔ New FastStream project created. ✔ Integration tests were successfully completed. Tokens used: 13367 Total Cost (USD): $0.04147 ✨ All files were successfully generated!

如果成功,该命令将在里面生成带有和 的calculate-mean-app目录。如果没有,只需再次运行该命令,直到成功。app/application.pytests/test_application.py

app/application.py:from typing import Dict, List from pydantic import BaseModel, Field, NonNegativeFloat from faststream import Context, ContextRepo, FastStream, Logger from faststream.kafka import KafkaBroker class CryptoPrice(BaseModel): price: NonNegativeFloat = Field( ..., examples=[50000], description="Current price of the cryptocurrency" ) crypto_currency: str = Field( ..., examples=["BTC"], description="Cryptocurrency symbol" ) broker = KafkaBroker("localhost:9092") app = FastStream(broker) publisher = broker.publisher("price_mean") @app.on_startup async def app_setup(context: ContextRepo): message_history: Dict[str, List[float]] = {} context.set_global("message_history", message_history) @broker.subscriber("new_crypto_price") async def on_new_crypto_price( msg: CryptoPrice, logger: Logger, message_history: Dict[str, List[float]] = Context(), key: bytes = Context("message.raw_message.key"), ) -> None: logger.info(f"New crypto price {msg=}") crypto_key = key.decode("utf-8") if crypto_key not in message_history: message_history[crypto_key] = [] message_history[crypto_key].append(msg.price) if len(message_history[crypto_key]) > 100: message_history[crypto_key] = message_history[crypto_key][-100:] if len(message_history[crypto_key]) >= 3: price_mean = sum(message_history[crypto_key][-3:]) / 3 await publisher.publish(price_mean, key=key)

tests/test_application.py:import pytest from faststream import Context, TestApp from faststream.kafka import TestKafkaBroker from app.application import CryptoPrice, app, broker, on_new_crypto_price @broker.subscriber("price_mean") async def on_price_mean(msg: float, key: bytes = Context("message.raw_message.key")): pass @pytest.mark.asyncio async def test_app(): async with TestKafkaBroker(broker): async with TestApp(app): await broker.publish( CryptoPrice(price=50000, crypto_currency="BTC"), "new_crypto_price", key=b"BTC", ) on_new_crypto_price.mock.assert_called_with( dict(CryptoPrice(price=50000, crypto_currency="BTC")) ) on_price_mean.mock.assert_not_called() await broker.publish( CryptoPrice(price=60000, crypto_currency="BTC"), "new_crypto_price", key=b"BTC", ) on_new_crypto_price.mock.assert_called_with( dict(CryptoPrice(price=60000, crypto_currency="BTC")) ) on_price_mean.mock.assert_not_called() await broker.publish( CryptoPrice(price=70000, crypto_currency="BTC"), "new_crypto_price", key=b"BTC", ) on_new_crypto_price.mock.assert_called_with( dict(CryptoPrice(price=70000, crypto_currency="BTC")) ) on_price_mean.mock.assert_called_with(60000.0)

创建新的Python虚拟环境¤

运行新生成的 FastStream 项目所需的所有依赖项都位于该pyproject.toml文件中。创建新的虚拟环境并安装项目的开发依赖项。

calculate-mean-app使用 venv创建并激活目录内的新虚拟环境 :cd calculate-mean-app python3 -m venv venv source venv/bin/activate

如果需要,升级 pip 并安装开发依赖项:pip install --upgrade pip && pip install -e .[dev]

测试应用程序¤

为了验证应用程序的功能正确性,建议通过运行命令来执行生成的单元和集成测试pytestpytest ============================= test session starts ============================== platform linux -- Python 3.10.8, pytest-7.4.2, pluggy-1.3.0 rootdir: /workspaces/faststream-gen/docs_src/tutorial/calculate-mean-app configfile: pyproject.toml plugins: anyio-3.7.1, asyncio-0.21.1 asyncio: mode=strict collected 1 item tests/test_application.py . [100%] ============================== 1 passed in 0.64s ===============================

预览 AsyncAPI 文档¤

要预览应用程序的 AsyncAPI 文档,请执行以下命令:faststream docs serve app.application:app INFO: Started server process [3596205] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)

现在,您可以通过在浏览器中打开localhost:8000来访问 AsyncAPI 文档 。

使用 FastStream 进行加密货币分析¤

启动应用程序¤

要启动应用程序,请执行以下命令:faststream run app.application:app 2023-10-04 09:31:18,926 INFO - FastStream app starting... 2023-10-04 09:31:18,948 INFO - new_crypto_price | - `OnNewCryptoPrice` waiting for messages Topic new_crypto_price not found in cluster metadata 2023-10-04 09:31:19,069 INFO - FastStream app started successfully! To exit, press CTRL+C 2023-10-04 09:31:40,876 INFO - new_crypto_price | 0-16964047 - Received 2023-10-04 09:31:40,878 INFO - new_crypto_price | 0-16964047 - New crypto price msg=CryptoPrice(price=27414.085, crypto_currency='BTC') 2023-10-04 09:31:40,878 INFO - new_crypto_price | 0-16964047 - Processed 2023-10-04 09:31:40,878 INFO - new_crypto_price | 1-16964047 - Received 2023-10-04 09:31:40,879 INFO - new_crypto_price | 1-16964047 - New crypto price msg=CryptoPrice(price=1642.425, crypto_currency='ETH') 2023-10-04 09:31:40,879 INFO - new_crypto_price | 1-16964047 - Processed 2023-10-04 09:31:43,053 INFO - new_crypto_price | 2-16964047 - Received 2023-10-04 09:31:43,054 INFO - new_crypto_price | 2-16964047 - New crypto price msg=CryptoPrice(price=27414.085, crypto_currency='BTC') 2023-10-04 09:31:43,054 INFO - new_crypto_price | 2-16964047 - Processed 2023-10-04 09:31:43,054 INFO - new_crypto_price | 3-16964047 - Received 2023-10-04 09:31:43,055 INFO - new_crypto_price | 3-16964047 - New crypto price msg=CryptoPrice(price=1642.425, crypto_currency='ETH') ...

您可以在终端中看到应用程序正在读取来自主题的消息new_crypto_price。确保应用程序保持运行状态,因为下一步需要它。

直接订阅本地Kafka代理主题¤

打开新终端,导航到该calculate-mean-app 目录。

要检查是否calculate-mean-app正在向主题发布消息 price_mean,请运行以下命令:./scripts/subscribe_to_kafka_broker_locally.sh price_mean BTC 26405.745 ETH 1621.3733333333332 BTC 26404.865 ETH 1621.375 BTC 26404.865 ...

停止 Kafka 代理¤

要在分析加密货币的平均价格后停止 Kafka 代理,您可以执行以下命令:./scripts/stop_kafka_broker_locally.sh [+] Running 2/2 ⠿ Container bitnami_kafka Removed 1.2s ⠿ Network scripts_default Removed

[可选] GitHub 集成¤

要成功完成本可选教程章节,请确保您有GitHub 帐户 并安装了Git 命令 。此外,请确保 正确设置 GitHub身份验证。

我们需要创建两个 GitHub 存储库,一个用于目录FastStream 中的项目retrieve-publish-crypto,另一个用于 目录FastStream中的项目calculate-mean-app。在本章中,我们将FastStream项目上传到GitHub,其中包含该retrieve-publish-crypto项目。我们还将提供生成的 CI 工作流程的解释。

将本地托管代码添加到 GitHub¤

要创建 GitHub 存储库,请单击以下 链接。对于 Repository name,使用retrieve-publish-crypto并单击 Create repository

使用 FastStream 进行加密货币分析¤

请打开您的开发环境并转到 retrieve-publish-crypto前面步骤中生成的目录。cd path_to/faststream_gen_tutorial/retrieve-publish-crypto

接下来,执行以下命令:git init git add . git commit -m "Retrieve and publish crypto prices application implemented" git branch -M main

如果您使用HTTPS身份验证,请执行(将URL 中的替换 为您自己的 GitHub 用户名):git_username

git remote add origin https://github.com/git_username/retrieve-publish-crypto.git

如果您使用SSH身份验证,请执行(将URL 中的替换 为您自己的 GitHub 用户名):git_username

git remote add origin [email protected]:git_username/retrieve-publish-crypto.git

要使用本地提交更新远程分支,请执行:git push -u origin main

与 GitHub Actions 持续集成¤

一旦更改被推送,CI 管道将再次运行测试,创建和发布文档并使用应用程序构建 Docker 容器。

要验证 CI 的通过状态,请打开 Web 浏览器,转到新创建的 GitHub 存储库,然后单击“操作”选项卡。

使用 FastStream 进行加密货币分析¤

执行的测试pytest成功通过。 

使用 FastStream 进行加密货币分析¤

Docker镜像已成功构建并推送到存储库下的GitHub Container注册表 ghcr.io/your_username/retrieve-publish-crypto

使用 FastStream 进行加密货币分析¤

AsyncAPI 文档已成功生成并部署到 GitHub Pages。

使用 FastStream 进行加密货币分析¤

工作流成功执行后,将创建Deploy FastStream AsyncAPI Docs 一个名为 的新分支。gh-pages要访问 GitHub Pages 设置,请导航到Settings -> Pages,选择 gh-pages分支作为托管 GitHub Pages 的指定分支,然后单击Save

使用 FastStream 进行加密货币分析¤

通过在 GitHub Pages 上设置分支,新的工作流程将在 GitHub Actions 中自动触发。要访问此工作流程,只需单击选项Actions卡并打开 pages build and deployment工作流程即可。

使用 FastStream 进行加密货币分析¤

工作流程中的所有作业完成后,您可以单击提供的 URL 以方便地查看和浏览为您的应用程序生成的 AsyncAPI 文档。

使用 FastStream 进行加密货币分析¤

重复¤

对项目重复相同的过程calculate-mean-app

下一步¤

恭喜!您已成功完成本教程并获得了一套新技能。现在您已经学会了如何使用 faststream-gen,请用您自己的示例尝试一下!

给TA打赏
共{{data.count}}人
人已打赏
0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索