网站运营的具体工作包括哪些,响应式布局网站,太原区域调整最新消息,南京电器网站建设作者#xff1a;来自 Elastic Gustavo Llermaly 学习配置 OneLake#xff0c;使用 Python 消费数据并在 Elasticsearch 中索引文档#xff0c;然后运行语义搜索。 OneLake 是一款工具#xff0c;可让你连接到不同的 Microsoft 数据源#xff0c;例如 Power BI、Data Activ…作者来自 Elastic Gustavo Llermaly 学习配置 OneLake使用 Python 消费数据并在 Elasticsearch 中索引文档然后运行语义搜索。 OneLake 是一款工具可让你连接到不同的 Microsoft 数据源例如 Power BI、Data Activator 和 Data factory 等。它支持将数据集中在 DataLakes 中DataLakes 是支持全面数据存储、分析和处理的大容量存储库。
在本文中我们将学习如何配置 OneLake、使用 Python 消费数据以及在 Elasticsearch 中索引文档然后运行语义搜索。
有时你可能希望在非结构化数据和来自不同来源和软件提供商的结构化数据中运行搜索并使用 Kibana 创建可视化。对于这种任务在 Elasticsearch 中索引文档作为中央存储库会变得非常有用。
在这个例子中我们将使用一家名为 Shoestic 的虚拟公司这是一家在线鞋店。我们在结构化文件 (CSV) 中列出了产品列表而一些产品的数据表则采用非结构化格式 (DOCX)。这些文件存储在 OneLake 中。
你可以在此处找到包含完整示例包括测试文档的笔记本。 步骤
OneLake 初始配置使用 Python 连接到 OneLake索引文档查询 OneLake 初始配置
OneLake 架构可以总结如下 要使用 OneLake 和 Microsoft Fabric我们需要一个 Office 365 帐户。如果你没有可以在此处创建一个试用帐户。
使用你的帐户登录 Microsoft Fabric。然后创建一个名为 “ShoesticWorkspace” 的工作区。进入新创建的工作区后创建一个 Lakehouse 并将其命名为“ShoesticDatalake”。最后一步是在 “Files” 中创建一个新文件夹。单击 “new subfolder” 并将其命名为 “ProductsData”。 完成了我们准备开始提取数据了。 使用 Python 连接到 OneLake
配置完 OneLake 后我们现在可以准备 Python 脚本。Azure 有处理凭据并与 OneLake 通信的库。
pip install azure-identity elasticsearch8.14 azure-storage-file-datalake azure-cli python-docx
“azure-identity azure-storage-file-datalake” 库让我们可以与 OneLake 交互同时 “azure-cli” 可以访问凭据并授予权限。为了读取文件内容以便稍后将其索引到 Elasticsearch我们使用 python-docx。 在我们的本地环境中保存 Microsoft 凭据
我们将使用 “az login” 进入我们的 Microsoft 帐户并运行 az login --allow-no-subscriptions
标志 “ --allow-no-subscriptions”允许我们在没有有效订阅的情况下向 Microsoft Azure 进行身份验证。
此命令将打开一个浏览器窗口你必须在其中访问你的帐户然后选择你帐户的订阅号。 现在我们可以开始编写代码了
创建一个名为 onelake.py 的文件并添加以下内容
_onelake.py_
# Importing dependencies
import chardet
from azure.identity import DefaultAzureCredential
from docx import Document
from azure.storage.filedatalake import DataLakeServiceClient # Initializing the OneLake client
ONELAKE_ACCOUNT_NAME onelake
ONELAKE_WORKSPACE_NAME ShoesticWorkspace
# Path in format DataLake.Lakehouse/files/Folder path
ONELAKE_DATA_PATH shoesticDatalake.Lakehouse/Files/ProductsData # Microsoft token
token_credential DefaultAzureCredential() # OneLake services
service_client DataLakeServiceClient( account_urlfhttps://{ONELAKE_ACCOUNT_NAME}.dfs.fabric.microsoft.com, credentialtoken_credential,
)
file_system_client service_client.get_file_system_client(ONELAKE_WORKSPACE_NAME)
directory_client file_system_client.get_directory_client(ONELAKE_DATA_PATH) # OneLake functions # Upload a file to a LakeHouse directory
def upload_file_to_directory(directory_client, local_path, file_name): file_client directory_client.get_file_client(file_name) with open(local_path, moderb) as data: file_client.upload_data(data, overwriteTrue) print(fFile: {file_name} uploaded to the data lake.) # Get directory contents from your lake folder
def list_directory_contents(file_system_client, directory_name): paths file_system_client.get_paths(pathdirectory_name) for path in paths: print(path.name \n) # Get a file by name from your lake folder
def get_file_by_name(file_name, directory_client): return directory_client.get_file_client(file_name) # Decode docx
def get_docx_content(file_client): download file_client.download_file() file_content download.readall() temp_file_path temp.docx with open(temp_file_path, wb) as temp_file: temp_file.write(file_content) doc Document(temp_file_path) text [] for paragraph in doc.paragraphs: text.append(paragraph.text) return \n.join(text) # Decode csv
def get_csv_content(file_client): download file_client.download_file() file_content download.readall() result chardet.detect(file_content) encoding result[encoding] return file_content.decode(encoding) 将文件上传到 OneLake
在此示例中我们将使用一个 CSV 文件和一些包含有关我们鞋店产品信息的 .docx 文件。虽然你可以使用 UI 上传它们但我们将使用 Python 来完成。在此处下载文件。
我们将文件放在文件夹 /data 中位于名为 upload_files.py 的新 Python 脚本旁边
# upload_files.py # Importing dependencies
from azure.identity import DefaultAzureCredential
from azure.storage.filedatalake import DataLakeServiceClient from functions import list_directory_contents, upload_file_to_directory
from onelake import ONELAKE_DATA_PATH, directory_client, file_system_client csv_file_name products.csv
csv_local_path f./data/{csv_file_name} docx_files [beach-flip-flops.docx, classic-loafers.docx, sport-sneakers.docx]
docx_local_paths [f./data/{file_name} for file_name in docx_files] # Upload files to Lakehouse
upload_file_to_directory(directory_client, csv_local_path, csv_file_name) for docx_local_path in docx_local_paths: docx_file_name docx_local_path.split(/)[-1] upload_file_to_directory(directory_client, docx_local_path, docx_file_name) # To check that the files have been uploaded, run list_directory_contents function to show the contents of the /ProductsData folder in our Datalake:
print(Upload finished, Listing files: )
list_directory_contents(file_system_client, ONELAKE_DATA_PATH)
运行上传脚本
python upload_files.py
结果应该是
Upload finished, Listing files:
shoesticDatalake.Lakehouse/Files/ProductsData/beach-flip-flops.docx
shoesticDatalake.Lakehouse/Files/ProductsData/classic-loafers.docx
shoesticDatalake.Lakehouse/Files/ProductsData/products.csv
shoesticDatalake.Lakehouse/Files/ProductsData/sport-sneakers.docx
现在我们已经准备好文件了让我们开始使用 Elasticsearch 分析和搜索我们的数据 索引文档
我们将使用 ELSER 作为向量数据库的嵌入提供程序以便我们可以运行语义查询。
我们选择 ELSER 是因为它针对 Elasticsearch 进行了优化在域外检索方面胜过大多数竞争对手这意味着按原样使用模型而无需针对你自己的数据进行微调。 配置 ELSER
首先创建推理端点
PUT _inference/sparse_embedding/onelake-inference-endpoint
{ service: elser, service_settings: { num_allocations: 1, num_threads: 1 }
在后台加载模型时如果你以前没有使用过 ELSER则可能会收到 502 Bad Gateway 错误。在 Kibana 中你可以在 “Machine Learning” “Trained Models” 中检查模型状态。等到模型部署完成后再继续执行后续步骤。 索引数据
现在由于我们同时拥有结构化数据和非结构化数据因此我们将在 Kibana DevTools 控制台中使用具有不同映射的两个不同索引。
对于我们的结构化销售让我们创建以下索引
PUT shoestic-products
{ mappings: { properties: { product_id: { type: keyword }, product_name: { type: text }, amount: { type: float }, tags: { type: keyword } } }
}
为了索引我们的非结构化数据产品数据表我们将使用
PUT shoestic-products-descriptions
{ mappings: { properties: { title: { type: text, analyzer: english }, super_body: { type: semantic_text, inference_id: onelake-inference-endpoint }, body: { type: text, copy_to: super_body } } }
} 注意使用带有 copy_to 的字段很重要这样还可以运行全文搜索而不仅仅是在正文字段上运行语义搜索。 读取 OneLake 文件
在开始之前我们需要使用这些命令使用你自己的云 ID 和 API 密钥初始化我们的 Elasticsearch 客户端。
创建一个名为 indexing.py 的 Python 脚本并添加以下几行
# Importing dependencies
import csv
from io import StringIO from onelake import directory_client
from elasticsearch import Elasticsearch, helpers from functions import get_csv_content, get_docx_content, get_file_by_name
from upload_files_to_onelake import csv_file_client ELASTIC_CLUSTER_ID your-cloud-id
ELASTIC_API_KEY your-api-key # Elasticsearch client
es_client Elasticsearch( cloud_idELASTIC_CLUSTER_ID, api_keyELASTIC_API_KEY,
) docx_files [beach-flip-flops.docx, classic-loafers.docx, sport-sneakers.docx]
docx_local_paths [f./data/{file_name} for file_name in docx_files] csv_file_client get_file_by_name(products.csv, directory_client)
docx_files_clients [] for docx_file_name in docx_files: docx_files_clients.append(get_file_by_name(docx_file_name, directory_client)) # We use these functions to extract data from the files:
csv_content get_csv_content(csv_file_client)
reader csv.DictReader(StringIO(csv_content))
docx_contents [] for docx_file_client in docx_files_clients: docx_contents.append(get_docx_content(docx_file_client)) print(CSV FILE CONTENT: , csv_content)
print(DOCX FILE CONTENT: , docx_contents) # The CSV tags are separated by commas (,). Well turn these tags into an array:
rows csv_content.splitlines()
reader csv.DictReader(rows)
modified_rows [] for row in reader: row[tags] row[tags].replace(, ).split(,) modified_rows.append(row) print(row[tags]) # We can now index the files into Elasticsearch
reader modified_rows
csv_actions [{_index: shoestic-products, _source: row} for row in reader] docx_actions [ { _index: shoestic-products-descriptions, _source: {title: docx_file_name, body: docx}, } for docx_file_name, docx in zip(docx_files, docx_contents)
] helpers.bulk(es_client, csv_actions)
print(CSV data indexed successfully.)
helpers.bulk(es_client, docx_actions)
print(DOCX data indexed successfully.)
现在运行脚本
python indexing.py 查询
在 Elasticsearch 中对文档进行索引后我们就可以测试语义查询了。在本例中我们将在某些产品tag中搜索唯一术语。我们将针对结构化数据运行关键字搜索针对非结构化数据运行语义搜索。 1. 关键字搜索
GET shoestic-products/_search
{ query: { term: { tags: summer } }
}
结果
_source: { product_id: P-118, product_name: Casual Sandals, amount: 128.22, tags: [ casual, summer ] } 2. 语义搜索
GET shoestic-products-descriptions/_search
{ _source: { excludes: [ *embeddings, *chunks ] }, query: { semantic: { field: super_body, query: summer } }
}
*我们排除了嵌入和块只是为了便于阅读。
结果
hits: { total: { value: 3, relation: eq }, max_score: 4.3853106, hits: [ { _index: shoestic-products-descriptions, _id: P2Hj6JIBF7lnCNFTDQEA, _score: 4.3853106, _source: { super_body: { inference: { inference_id: onelake-inference-endpoint, model_settings: { task_type: sparse_embedding } } }, title: beach-flip-flops.docx, body: Ideal for warm, sunny days by the water, these lightweight essentials are water-resistant and come in bright colors, bringing a laid-back vibe to any outing in the sun. } } ] }
如你所见当使用关键字搜索时我们会得到与其中一个标签的完全匹配相反当我们使用语义搜索时我们会得到与描述中的含义匹配的结果而无需完全匹配。 结论
OneLake 使使用来自不同 Microsoft 来源的数据变得更容易然后索引这些文档 Elasticsearch 允许我们使用高级搜索工具。在第一部分中我们学习了如何连接到 OneLake 并在 Elasticsearch 中索引文档。在第二部分中我们将使用 Elastic 连接器框架制作更强大的解决方案。敬请期待
想要获得 Elastic 认证了解下一次 Elasticsearch 工程师培训的时间
Elasticsearch 包含许多新功能可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息开始免费云试用或立即在你的本地机器上试用 Elastic。 原文Indexing OneLake data into Elasticsearch - Part 1 - Elasticsearch Labs