将 SQL 关系型数据迁移到 MongoDB 的循序渐进指南
通过这份全面的循序渐进指南,了解如何将您的 SQL 关系型数据迁移到 MongoDB。探索将传统模式转换为高效 MongoDB 文档结构的最佳实践,包括必要的规划、嵌入和引用等模式设计策略、数据提取、转换技术以及加载到 MongoDB。本教程提供了实用的示例和可操作的建议,以实现向 NoSQL 数据库的平稳成功过渡。
从SQL关系型数据库迁移到MongoDB的分步指南
将SQL关系型数据迁移到MongoDB并非简单的表复制。难点在于决定哪些关系应成为嵌入文档,哪些应保持引用,以及您的应用程序将如何查询新的数据形态。
关系型模式通常跨表进行规范化。而MongoDB的文档模型在相关数据被一起读取时,往往最适合将它们存储在一起。本指南将逐步介绍规划、转换、加载、验证和应用程序更改,而不假设每个SQL表都应成为一个MongoDB集合。
理解核心差异:关系型模型与文档模型
在深入迁移过程之前,必须掌握概念上的差异:
- 关系型模型: 数据存储在具有预定义模式的表中。关系通过外键管理,需要JOIN操作来检索相关数据。规范化是一个关键原则。
- 文档模型(MongoDB): 数据存储在灵活的、类似JSON的文档中。文档可以具有不同的结构。相关数据可以嵌入到单个文档中(反规范化),也可以使用应用程序级连接或MongoDB的
$lookup聚合阶段进行引用。
这种数据建模的差异直接影响您设计MongoDB集合和文档的方式。
第一阶段:规划和模式设计
这是最关键阶段。精心设计的MongoDB模式是发挥其优势的关键。目标是根据应用程序的访问模式来建模数据,而不仅仅是直接翻译SQL表。
1. 分析应用程序的访问模式
- 识别读密集型与写密集型操作: 数据读取的频率如何?通常如何查询?哪些字段最常一起检索?
- 确定常见查询路径: SQL应用程序中最常见的
SELECT语句是什么?哪些表通常被连接? - 理解数据关系: 实体之间如何关联?这些是一对一、一对多还是多对多关系?
2. 选择反规范化策略
MongoDB的强大之处在于其嵌入相关数据的能力。考虑以下策略:
- 嵌入(反规范化): 最常用的方法。当关系是一对多或数据经常一起访问时,将文档或文档数组嵌入到父文档中。这减少了连接的需求。
- 示例: 与其有单独的
orders和order_items表,不如将order_items作为数组嵌入到order文档中。
- 示例: 与其有单独的
- 引用: 当嵌入会导致文档过大,或数据被独立访问时使用。存储相关文档的
_id,类似于外键,并执行应用程序级连接或使用MongoDB的$lookup。- 示例:
users集合和posts集合。帖子可能存储其作者的user_id。然后,您可以使用$lookup在获取帖子时检索作者的详细信息。
- 示例:
3. 设计MongoDB集合和文档
基于您的访问模式和反规范化策略,设计您的集合。一个好的起点是将SQL表映射到MongoDB集合。然后,决定哪些相关数据应嵌入,哪些应引用。
SQL模式示例:
-- Customers表
CREATE TABLE Customers (
CustomerID INT PRIMARY KEY,
FirstName VARCHAR(50),
LastName VARCHAR(50),
Email VARCHAR(100)
);
-- Orders表
CREATE TABLE Orders (
OrderID INT PRIMARY KEY,
CustomerID INT,
OrderDate DATE,
TotalAmount DECIMAL(10, 2),
FOREIGN KEY (CustomerID) REFERENCES Customers(CustomerID)
);
-- OrderItems表
CREATE TABLE OrderItems (
OrderItemID INT PRIMARY KEY,
OrderID INT,
ProductID INT,
Quantity INT,
Price DECIMAL(10, 2),
FOREIGN KEY (OrderID) REFERENCES Orders(OrderID)
);
MongoDB文档设计选项:
选项A:客户嵌入订单,如果客户订单数量可控且订单经常与客户一起查看:
{ "_id": ObjectId("..."), "customer_id": 1, "first_name": "John", "last_name": "Doe", "email": "[email protected]", "orders": [ { "order_id": 101, "order_date": ISODate("2023-10-26T00:00:00Z"), "total_amount": 50.00, "items": [ { "product_id": 1, "quantity": 2, "price": 25.00 }, { "product_id": 3, "quantity": 1, "price": 0.00 } ] } ] }选项B:使用引用的独立集合,如果订单数量众多或经常独立查询。
Customers集合:
{ "_id": ObjectId("..."), "customer_id": 1, "first_name": "John", "last_name": "Doe", "email": "[email protected]" }Orders集合:
{ "_id": ObjectId("..."), "order_id": 101, "customer_id": 1, "order_date": ISODate("2023-10-26T00:00:00Z"), "total_amount": 50.00, "items": [ { "product_id": 1, "quantity": 2, "price": 25.00 }, { "product_id": 3, "quantity": 1, "price": 0.00 } ] }
关于文档大小的考虑: MongoDB有文档大小限制(16MB)。避免嵌入可能超过此限制的过大数组。如果数组无限增长,考虑将其拆分为单独的集合。
第二阶段:数据提取和转换
一旦目标模式设计完成,您需要从SQL数据库中提取数据并将其转换为新的文档格式。
1. 从SQL提取数据
使用标准SQL查询来选择所需数据。您可以将这些数据导出为CSV或JSON等格式。
- 使用SQL客户端: 大多数SQL数据库工具(例如DBeaver、SQL Developer、pgAdmin)允许您将查询结果导出为CSV或JSON。
- 脚本编写: 编写脚本(Python、Node.js等)连接到您的SQL数据库,执行查询并获取数据。
2. 转换数据
这是您实现设计模式的地方。您需要编写代码或使用工具来:
- 分组相关记录: 例如,收集属于特定
Order的所有OrderItems。 - 重构数据: 将关系型行转换为嵌套的JSON文档。
- 处理数据类型: 确保数据类型与MongoDB兼容(例如日期、数字、字符串)。
使用Python的示例:
假设您已将Customers、Orders和OrderItems导出为CSV文件。
import pandas as pd
import json
from bson import ObjectId
# 从CSV文件加载数据(假设它们在同一目录下)
customers_df = pd.read_csv('customers.csv')
orders_df = pd.read_csv('orders.csv')
order_items_df = pd.read_csv('order_items.csv')
# --- 数据转换逻辑 ---
# 将DataFrame转换为字典以便于操作
customers_list = customers_df.to_dict('records')
orders_list = orders_df.to_dict('records')
order_items_list = order_items_df.to_dict('records')
# 创建订单和订单项的映射以便快速查找
orders_by_customer = {}
for order in orders_list:
customer_id = order['CustomerID']
if customer_id not in orders_by_customer:
orders_by_customer[customer_id] = []
orders_by_customer[customer_id].append(order)
order_items_by_order = {}
for item in order_items_list:
order_id = item['OrderID']
if order_id not in order_items_by_order:
order_items_by_order[order_id] = []
order_items_by_order[order_id].append(item)
# --- 构建MongoDB文档(选项A:客户嵌入订单)---
mongo_documents = []
for customer in customers_list:
mongo_doc = {
"_id": ObjectId(), # MongoDB自动生成_id,但您也可以根据需要映射
"customer_id": customer['CustomerID'],
"first_name": customer['FirstName'],
"last_name": customer['LastName'],
"email": customer['Email'],
"orders": []
}
customer_id = customer['CustomerID']
if customer_id in orders_by_customer:
for order in orders_by_customer[customer_id]:
order_doc = {
"order_id": order['OrderID'],
"order_date": order['OrderDate'], # 确保正确的日期格式
"total_amount": order['TotalAmount'],
"items": []
}
order_id = order['OrderID']
if order_id in order_items_by_order:
for item in order_items_by_order[order_id]:
order_doc['items'].append({
"product_id": item['ProductID'],
"quantity": item['Quantity'],
"price": item['Price']
})
mongo_doc['orders'].append(order_doc)
mongo_documents.append(mongo_doc)
# 现在'mongo_documents'是一个字典列表,准备插入到MongoDB
# print(json.dumps(mongo_documents[0], indent=2, default=str)) # 打印第一个文档为JSON
# 对于选项B(独立集合),您需要为每个集合创建列表:
# customers_mongo = [{'customer_id': c['CustomerID'], ...} for c in customers_list]
# orders_mongo = [{'order_id': o['OrderID'], 'customer_id': o['CustomerID'], ...} for o in orders_list]
# 保存为JSON以便导入(可选)
# with open('mongo_customer_data.json', 'w') as f:
# json.dump(mongo_documents, f, indent=2, default=str)
3. 转换工具
- 自定义脚本: 使用Pandas的Python、使用
csv-parser和mysql/pg等库的Node.js,对于复杂转换非常强大。 - ETL工具: Apache NiFi、Talend或AWS Glue等工具可以编排复杂的数据管道,包括SQL到MongoDB的迁移。
- 数据库迁移平台: 一些商业ETL和CDC工具可以将关系型数据源同步到MongoDB。在围绕工具进行规划之前,请检查您的确切SQL数据库和MongoDB目标是否支持连接器。
第三阶段:将数据加载到MongoDB
数据转换完成后,您可以将其加载到MongoDB实例中。
1. 连接到MongoDB
使用MongoDB Shell(mongosh)或MongoDB驱动程序(针对您的编程语言)连接到您的数据库。
2. 导入转换后的数据
使用
mongoimport: 如果您将转换后的数据导出为JSON文件,可以使用mongoimport:# 假设您的数据在mongo_customer_data.json中,并且您想导入到'customers'集合 mongoimport --db your_database_name --collection customers --file mongo_customer_data.json --jsonArray--jsonArray:如果您的JSON文件包含文档数组,请使用此标志。
使用MongoDB驱动程序: 如果您在编程语言中生成了数据结构(如Python中的
mongo_documents列表),可以直接插入它们:使用
pymongo的Python示例:from pymongo import MongoClient # 假设'mongo_documents'列表已从之前的Python脚本定义 client = MongoClient('mongodb://localhost:27017/') db = client['your_database_name'] customers_collection = db['customers'] # 插入转换后的文档 if mongo_documents: insert_result = customers_collection.insert_many(mongo_documents) print(f"已插入 {len(insert_result.inserted_ids)} 个文档。") else: print("没有文档可插入。") client.close()
3. 验证数据完整性
加载后,在MongoDB中运行查询以验证数据是否正确导入并符合预期。
// 示例:统计'customers'集合中的文档数
use your_database_name;
print(db.customers.countDocuments());
// 示例:查找特定客户并检查其嵌入的订单
db.customers.findOne({ "customer_id": 1 })
第四阶段:应用程序重构
这可以说是最耗时的阶段。您的应用程序代码需要更新以与MongoDB交互,而不是SQL。
- 更新数据库连接: 更改连接字符串和库。
- 重写查询: 使用所选驱动程序的API,将SQL查询替换为MongoDB查询语言。
- 调整数据访问层: 修改您的ORM或数据访问层以处理MongoDB文档。
- 利用MongoDB特性: 调整您的应用程序以利用灵活模式、聚合框架和地理空间查询等特性(如果适用)。
最佳实践和提示
- 从小处着手: 如果可能,先迁移一部分数据或不太关键的应用程序,以积累经验。
- 迭代模式设计: 您最初的MongoDB模式可能并不完美。准备好根据性能测试和应用程序反馈进行迭代和优化。
- 明智地创建索引: 与SQL一样,索引对于MongoDB的性能至关重要。识别您的查询模式并创建适当的索引。
- 监控性能: 持续监控MongoDB部署的性能瓶颈,并根据需要优化查询和模式。
- 考虑增量迁移: 对于大型数据库,考虑增量迁移策略,在最终切换之前,将近实时地将更改从SQL同步到MongoDB。
要点
最安全的SQL到MongoDB迁移始于访问模式,而不是表名。为一个重要工作流建模,转换一小部分数据,加载到MongoDB,验证计数和样本文档,然后围绕该形态更新应用程序代码,再扩展迁移范围。