沧州网站优化,做详情图的网站,wordpress怎么做商城,wordpress邮件群发ETL的过程
1、数据抽取#xff1a;确定数据源#xff0c;定义数据接口#xff0c;选择数据抽取方法#xff08;主动抽取或由源系统推送#xff09;。 2、数据清洗#xff1a;处理不完整数据、错误数据、重复数据等#xff0c;确保数据的准确性和一致性。#xff08;是…ETL的过程
1、数据抽取确定数据源定义数据接口选择数据抽取方法主动抽取或由源系统推送。 2、数据清洗处理不完整数据、错误数据、重复数据等确保数据的准确性和一致性。是数据转换的一部分 3、数据转换进行空值处理、数据标准统一、数据拆分、数据验证、数据替换和数据关联等操作。 4、规则检查根据业务需求进行数据质量和业务规则的校验。 5、数据加载将数据缓冲区的数据加载到目标数据库或数据仓库中可能是全量加载或增量加载。
1、数据抽取Extract
选择抽取策略
全量抽取 特点一次性抽取所有数据适合数据量较小或首次抽取的场景。实现方式直接查询整个表或读取整个文件。 增量抽取 特点仅抽取发生变化的数据适合数据量较大且需要频繁更新的场景。常用技术时间戳通过记录最后更新时间来抽取新增或修改的数据。CDCChange Data Capture通过数据库日志或触发器捕获数据变化
数据抽取 数据库抽取 mysql、oracle等 文件抽取 读取文件使用文件读取库如csv。 API 抽取 HTTP 请求使用 HTTP 客户端库如 requests发送请求获取 API 返回的数据。 消息队列抽取 订阅消息使用消息队列客户端如 Kafka Consumer订阅消息并获取数据。
抽取开源工具 Apache NiFi 特点提供可视化界面支持实时和批量数据抽取内置多种数据源连接器如数据库、API、文件系统等。适用场景适合需要实时数据流处理的场景支持复杂的数据路由和转换逻辑。 Apache Kafka
特点分布式流处理平台支持高吞吐量的实时数据抽取和传输。适用场景适合需要实时数据流处理的场景常与 Spark Streaming 或 Flink 结合使用。
Talend Open Studio
特点提供图形化界面支持多种数据源如数据库、文件、API的抽取支持代码生成。适用场景适合中小型项目支持快速开发和部署。
Apache Sqoop
特点专门用于在 Hadoop 和关系型数据库之间传输数据支持增量抽取。适用场景适合大数据场景尤其是 Hadoop 生态系统的数据抽取。
Logstash
特点主要用于日志数据的抽取和传输支持多种输入和输出插件。适用场景适合日志数据的实时抽取和处理。
数据抽取例子
一、数据抽取的场景
假设我们需要从以下三个数据源中抽取数据
MySQL 数据库抽取用户表users中的数据。CSV 文件抽取一个包含订单信息的文件orders.csv。API从一个公开的 API 中抽取天气数据。 二、数据抽取的实现
1. 从 MySQL 数据库抽取数据
工具Python pymysql 或 SQLAlchemy。步骤 连接数据库。执行 SQL 查询。将查询结果保存到 DataFrame 或文件中。
import pandas as pd
from sqlalchemy import create_engine# 数据库连接配置
db_config {host: localhost,user: root,password: password,database: test_db
}# 创建数据库连接
engine create_engine(fmysqlpymysql://{db_config[user]}:{db_config[password]}{db_config[host]}/{db_config[database]})# 执行 SQL 查询
query SELECT * FROM users # 假设 users 表包含 id, name, age 字段
df_users pd.read_sql(query, engine)# 输出结果
print(从 MySQL 抽取的用户数据)
print(df_users)2. 从 CSV 文件抽取数据
工具Python pandas。步骤 读取 CSV 文件。将数据加载到 DataFrame 中。
import pandas as pd# 读取 CSV 文件
df_orders pd.read_csv(orders.csv) # 假设 orders.csv 包含 order_id, user_id, amount 字段# 输出结果
print(从 CSV 文件抽取的订单数据)
print(df_orders)3. 从 API 抽取数据
工具Python requests。步骤 发送 HTTP 请求到 API。解析返回的 JSON 数据。将数据保存到 DataFrame 中。
import requests
import pandas as pd# API 配置
api_url https://api.weatherapi.com/v1/current.json
api_key your_api_key # 替换为你的 API Key
params {key: api_key,q: Beijing # 查询北京的天气
}# 发送 HTTP 请求
response requests.get(api_url, paramsparams)
data response.json() # 解析 JSON 数据# 将数据保存到 DataFrame
weather_data {location: data[location][name],temperature: data[current][temp_c],condition: data[current][condition][text]
}
df_weather pd.DataFrame([weather_data])# 输出结果
print(从 API 抽取的天气数据)
print(df_weather) 三、完整代码示例
以下是整合了上述三种数据抽取方式的完整代码
import pandas as pd
from sqlalchemy import create_engine
import requests# 1. 从 MySQL 数据库抽取数据
def extract_from_mysql():# 数据库连接配置db_config {host: localhost,user: root,password: password,database: test_db}# 创建数据库连接engine create_engine(fmysqlpymysql://{db_config[user]}:{db_config[password]}{db_config[host]}/{db_config[database]})# 执行 SQL 查询query SELECT * FROM usersdf_users pd.read_sql(query, engine)return df_users# 2. 从 CSV 文件抽取数据
def extract_from_csv():df_orders pd.read_csv(orders.csv)return df_orders# 3. 从 API 抽取数据
def extract_from_api():api_url https://api.weatherapi.com/v1/current.jsonapi_key your_api_key # 替换为你的 API Keyparams {key: api_key,q: Beijing}response requests.get(api_url, paramsparams)data response.json()weather_data {location: data[location][name],temperature: data[current][temp_c],condition: data[current][condition][text]}df_weather pd.DataFrame([weather_data])return df_weather# 主函数
if __name__ __main__:# 抽取数据df_users extract_from_mysql()df_orders extract_from_csv()df_weather extract_from_api()# 输出结果print(从 MySQL 抽取的用户数据)print(df_users)print(\n从 CSV 文件抽取的订单数据)print(df_orders)print(\n从 API 抽取的天气数据)print(df_weather) 四、运行结果
假设数据如下
MySQL 用户表
idnameage1Alice252Bob30
CSV 订单文件
order_iduser_idamount1011100.01022200.0
API 天气数据
locationtemperatureconditionBeijing20.0Sunny
运行代码后输出如下
从 MySQL 抽取的用户数据id name age
0 1 Alice 25
1 2 Bob 30从 CSV 文件抽取的订单数据order_id user_id amount
0 101 1 100.0
1 102 2 200.0从 API 抽取的天气数据location temperature condition
0 Beijing 20.0 Sunny 五、总结
数据抽取是 ETL 流程的第一步通常涉及从多种数据源如数据库、文件、API中提取数据。通过 Python 和相关库如 pandas、SQLAlchemy、requests可以轻松实现数据抽取任务。你可以根据实际需求扩展这个例子比如支持增量抽取、处理异常情况等。希望这个例子对你有帮助
原文地址码农小站 公众号码农小站