分布式系统设计 - 从零到精通
课程目标
学完这套课程,你将能够:
- 理解分布式系统的核心概念和挑战
- 掌握分布式系统设计原则和模式
- 熟练使用分布式协调、一致性协议
- 能够设计高可用的分布式系统
- 掌握分布式系统的治理和监控
- 面试时自信回答各种分布式问题
第一部分:分布式基础概念
1.1 什么是分布式系统?
定义:
分布式系统是由多个计算机(节点)组成的系统,这些节点通过网络通信,共同完成一个任务,对用户来说表现为一个统一的系统。
类比理解:
单机系统 = 一个人干所有活
分布式系统 = 一个团队分工合作
为什么需要分布式?
| 原因 | 说明 | 举例 |
|---|---|---|
| 性能 | 单机性能受限,多机并行处理 | 大数据计算 |
| 容量 | 单机存储空间有限 | 海量数据存储 |
| 可用性 | 单机故障导致服务不可用 | 高可用服务 |
| 成本 | 高配单机成本高 | 便宜机器集群 |
面试回答:
"分布式系统是指多台计算机协同工作的系统。需要分布式的原因:1)单机性能有限,多机并行提升性能;2)单机存储容量有限,分布式存储扩容;3)单机故障导致服务不可用,分布式提供高可用;4)多台廉价机器比单台高性能机器成本更低。但分布式也带来了复杂性:网络分区、数据一致性、分布式事务等。"
1.2 分布式系统的挑战(面试必问!)
挑战一:网络不可靠
现象:
- 网络延迟:请求响应慢
- 网络分区:部分节点不可达
- 丢包:消息丢失
- 乱序:消息到达顺序不一致
解决方案:
- 超时重试
- 心跳检测
- 消息确认
- 幂等性设计
// 超时重试示例
public Result callRemoteService(Request request) {
int maxRetries = 3;
int timeout = 3000;
for (int i = 0; i < maxRetries; i++) {
try {
return restTemplate.postForEntity(
url, request, Result.class
).getBody();
} catch (Exception e) {
if (i == maxRetries - 1) {
throw e;
}
Thread.sleep(1000);
}
}
throw new RuntimeException("重试失败");
}
挑战二:时钟不同步
现象:
- 各服务器时钟不一致
- 影响事件顺序判断
解决方案:
- NTP同步时钟
- 使用逻辑时钟(Lamport Clock)
- 使用向量时钟(Vector Clock)
- 使用全局唯一ID(雪花算法)
// 雪花算法生成全局唯一ID
public class SnowflakeIdGenerator {
private final long workerId;
private final long datacenterId;
private long sequence = 0;
private long lastTimestamp = -1L;
public synchronized long nextId() {
long timestamp = System.currentTimeMillis();
if (timestamp < lastTimestamp) {
throw new RuntimeException("时钟回拨");
}
if (timestamp == lastTimestamp) {
sequence = (sequence + 1) & 4095;
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0;
}
lastTimestamp = timestamp;
return ((timestamp - 1288834974657L) << 22)
udi | (datacenterId << 17)
| (workerId << 12)
| sequence;
}
}
挑战三:节点故障
现象:
- 节点宕机
- 节点响应慢
- 节点进程崩溃
解决方案:
- 故障检测(心跳)
- 故障转移(主备切换)
- 故障恢复(自动重启)
- 熔断降级(快速失败)
挑战四:数据一致性
现象:
- 多节点数据不一致
- 读到过期数据
解决方案:
- 强一致性协议(Paxos、Raft)
- 最终一致性
- 读写一致性级别
面试回答:
"分布式系统面临四大核心挑战:1)网络不可靠,通过超时重试、心跳检测解决;2)时钟不同步,使用NTP同步或逻辑时钟;3)节点故障,通过故障检测、故障转移处理;4)数据一致性,根据业务场景选择一致性级别。这些挑战是分布式系统的固有特性,需要在设计时充分考虑。"
1.3 CAP理论(面试必背!)
CAP三个特性:
| 特性 | 说明 | 含义 |
|---|---|---|
| C(Consistency) | 一致性 | 所有节点同时看到相同的数据 |
| A(Availability) | 可用性 | 每个请求都能得到响应 |
| P(Partition tolerance) | 分区容错 | 系统在网络分区时仍能运行 |
核心结论:
在分布式系统中,P(分区容错)是必须保证的,因此只能在C和A之间做权衡。
分区发生时:
├─ 选择CP:保证一致性,牺牲可用性
│ - 网络分区时,部分节点不可用
│ - 例子:数据库主节点故障,从节点只读
│
└─ 选择AP:保证可用性,牺牲一致性
- 网络分区时,允许数据不一致
- 例子:Redis主从复制,从节点可读旧数据
CAP权衡示例:
CP系统:
- MySQL(主从复制)
- HBase
- Zookeeper
- 适合:金融、交易系统
AP系统:
- Redis(主从复制)
- Cassandra
- DynamoDB
- 适合:社交、内容系统
CA系统(非真正分布式):
- 单机数据库
- 本地缓存
面试回答模板:
"CAP理论指出分布式系统最多同时满足两个特性。由于网络分区必然存在,P是必须保证的,因此只能在C和A之间选择。
CP系统保证强一致性但牺牲可用性,如MySQL、HBase,适合金融交易场景;
AP系统保证高可用但接受短暂不一致,如Redis、Cassandra,适合社交内容场景。实际设计中,我们通常追求BASE理论:基本可用、软状态、最终一致性,在性能和一致性间取得平衡。"
1.4 BASE理论
BASE vs CAP:
| 理论 | 关注点 | 目标 |
|---|---|---|
| CAP | 理论极限 | 描述不可能三角 |
| BASE | 实践指导 | 指导系统设计 |
BASE三个特性:
| 特性 | 说明 | 举例 |
|---|---|---|
| BA(Basically Available) | 基本可用 | 系统故障时核心功能可用 |
| S(Soft state) | 软状态 | 允许数据短暂不一致 |
| E(Eventually consistent) | 最终一致性 | 一段时间后数据一致 |
最终一致性的实现:
1. 读后写一致性
// 读取版本号
User user = userRepository.findById(userId);
int version = user.getVersion();
// 写入时带上版本号
user.setName(newName);
user.setVersion(version + 1);
userRepository.update(user);
// 数据库乐观锁检查版本号
// UPDATE user SET name=?, version=version+1
// WHERE id=? AND version=?
2. 写后读一致性
// 写入后立即刷新缓存
userRepository.update(user);
redisTemplate.delete("user:" + userId); // 删除缓存
// 读取时重新加载
User user = redisTemplate.opsForValue().get("user:" + userId);
if (user == null) {
user = userRepository.findById(userId);
redisTemplate.opsForValue().set("user:" + userId, user);
}
3. 会话一致性
// 同一会话内保证一致性
public User getUserInSession(Long userId, String sessionId) {
String key = "user:" + userId + ":" + sessionId;
User user = redisTemplate.opsForValue().get(key);
if (user == null) {
user = userRepository.findById(userId);
redisTemplate.opsForValue().set(key, user, 1800);
}
return user;
}
第二部分:分布式共识算法
2.1 Paxos算法(理论基石)
核心角色:
- Proposer(提议者): 提出提案
- Acceptor(接受者): 接受提案
- Learner(学习者): 学习最终结果
两阶段提交:
阶段一:Prepare(准备)
Proposer:
1. 生成提案编号 n
2. 向多数派Acceptors发送Prepare(n)
Acceptors:
1. 收到Prepare(n)
2. 如果n > 之前接受的提案编号
- 承诺不再接受编号 < n 的提案
- 返回之前接受的最高编号提案
3. 否则拒绝
阶段二:Accept(接受)
Proposer:
1. 收到多数派响应
2. 选择响应中最大编号提案的值v
3. 向多数派Acceptors发送Accept(n, v)
Acceptors:
1. 收到Accept(n, v)
2. 如果n >= 承诺的编号
- 接受提案(n, v)
- 返回Accept
3. 否则拒绝
面试回答:
"Paxos是分布式共识算法的理论基础,保证在少数节点故障时仍能达成一致。它有两阶段:Prepare阶段Proposer获取承诺,Accept阶段Proposer提交提案。Paxos保证安全性(不会选出不一致的值)和活性(只要有多数派存活就能选出值)。实际实现中通常使用Multi-Paxos优化性能。"
2.2 Raft算法(工程实现)
为什么Raft比Paxos流行?
- 易理解:设计时优先考虑可理解性
- 易实现:代码实现更简单
- 功能完整:包含领导选举、日志复制
Raft三个核心组件:
1. 领导选举(Leader Election)
节点状态:
- Follower(跟随者): 正常状态
- Candidate(候选者): 选举状态
- Leader(领导者): 领导状态
选举过程:
1. Follower在election timeout后未收到Leader心跳
2. Follower转为Candidate,term++,投票给自己
3. 向其他节点发送RequestVote RPC
4. 获得多数派投票后成为Leader
5. Leader定期发送心跳维持领导地位
// Raft选举伪代码
func (rf *Raft) RequestVote(args *RequestVoteArgs) *RequestVoteReply {
reply := &RequestVoteReply{Term: rf.currentTerm, VoteGranted: false}
if args.Term < rf.currentTerm {
return reply
}
if args.Term > rf.currentTerm ||
(rf.votedFor == nil || rf.votedFor == args.CandidateId) {
rf.votedFor = args.CandidateId
rf.currentTerm = args.Term
reply.VoteGranted = true
}
return reply
}
2. 日志复制(Log Replication)
过程:
1. Leader接收客户端请求
2. Leader追加日志到本地日志
3. Leader并行向Follower发送AppendEntries RPC
4. 收到多数派响应后,提交日志
5. 通知客户端请求成功
日志结构:
Index | Term | Command
------|------|--------
1 | 1 | SET x=1
2 | 1 | SET y=2
3 | 2 | SET z=3 ← commitIndex
4 | 2 | SET w=4
3. 安全性(Safety)
日志匹配特性:
- 如果两个日志在某个index的term相同,则该index之前的日志都相同
- 如果某条日志在某个term已被提交,则该日志在更高term的Leader中也存在
选举限制:
- Candidate请求投票时,必须包含自己最新的日志
- 只有日志不比Follower旧的Candidate才能获得投票
面试回答:
"Raft是工程上广泛使用的共识算法,核心有三个组件:1)领导选举,通过投票选出Leader;2)日志复制,Leader将日志复制到多数派Follower后提交;3)安全性,保证已提交的日志不会丢失。相比Paxos,Raft更易理解和实现。Etcd、Consul都使用Raft实现。"
2.3 Gossip协议(最终一致性)
原理:
- 节点随机选择其他节点交换信息
- 信息通过多轮传播最终达到一致
传播模式:
Round 1: A → B, A → C
Round 2: B → D, B → E, C → F, C → G
Round 3: D → H, E → I, ...
应用场景:
- Cassandra(反熵)
- Redis Cluster(节点发现)
- Consul(服务发现)
Gossip实现:
public class GossipNode {
private final String nodeId;
private final Map<String, String> data = new ConcurrentHashMap<>();
private final List<String> peers;
// 定期向随机节点传播数据
@Scheduled(fixedDelay = 1000)
public void gossip() {
String randomPeer = selectRandomPeer();
exchangeData(randomPeer);
}
// 接收其他节点的数据
public void receiveData(Map<String, String> remoteData) {
remoteData.forEach((key, value) -> {
// 版本号比较,保留新数据
if (isNewer(key, value)) {
data.put(key, value);
}
});
}
}
第三部分:分布式协调服务
3.1 Zookeeper(经典方案)
核心概念:
1. ZNode(数据节点)
Zookeeper树形结构
/
├── /services
│ ├── /services/order
│ └── /services/user/user
├── /config
│ └── /config/app.properties
└── /locks
└── /locks/order/lock-0000000000
ZNode类型:
- 持久节点(Persistent): 创建后一直存在
- 临时节点(Ephemeral): 会话结束自动删除
- 顺序节点(Sequential): 自动递增后缀
- 临时顺序节点: 用于分布式锁
2. Watcher(监听器)
// 监听节点变化
zk.exists("/config/app.properties", event -> {
if (event.getType() == Event.EventType.NodeDataChanged) {
// 重新加载配置
reloadConfig();
}
});
3. ACL(权限控制)
CREATE: 创建子节点
READ: 读取节点数据
WRITE: 写入节点数据
DELETE: 删除节点
ADMIN: 设置ACL
3.2 应用场景
场景1:服务发现
// 服务注册
public void registerService(String serviceName, String address) {
String path = "/services/" + serviceName + "/" + address;
// 创建临时节点,会话关闭自动删除
zk.create(path, address.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
}
// 服务发现
public List<String> discoverServices(String serviceName) {
String path = "/services/" + serviceName;
return zk.getChildren(path, event -> {
// 监听服务列表变化
if (event.getType() == Event.EventType.NodeChildrenChanged) {
// 更新服务列表
updateServiceList();
}
});
}
场景2:分布式锁
public class DistributedLock {
private final String lockPath;
private final String currentPath;
private final ZooKeeper zk;
public boolean tryLock(long timeout) {
// 创建临时顺序节点
currentPath = zk.create(lockPath + "/lock-",
data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// 获取所有锁节点
List<String> children = zk.getChildren(lockPath, false);
Collections.sort(children);
// 检查自己是否是最小的
String smallest = children.get(0);
String currentNode = currentPath.substring(lockPath.length() + 1);
if (currentNode.equals(smallest)) {
return true; // 获取到锁
}
// 不是最小的,监听前一个节点
int index = children.indexOf(currentNode);
String previousNode = children.get(index - 1);
// 监听前一个节点
final CountDownLatch latch = new CountDownLatch(1);
Stat stat = zk.exists(lockPath + "/" + previousNode,
event -> latch.countDown());
if (stat != null) {
latch.await(timeout, TimeUnit.MILLISECONDS);
}
return true;
}
public void unlock() {
zk.delete(currentPath, -1);
}
}
场景3:配置中心
public class ConfigCenter {
private final Map<String, String> config = new ConcurrentHashMap<>();
private final ZooKeeper zk;
public void init() {
// 读取所有配置
loadConfigs();
// 监听配置变化
watchConfigChanges();
}
private void loadConfigs() {
List<String> children = zk.getChildren("/config", false);
for (String child : children) {
String path = "/config/" + child;
byte[] data = zk.getData(path, false, null);
config.put(child, new String(data));
}
}
private void watchConfigChanges() {
zk.getChildren("/config", event -> {
if (event.getType() == Event.EventType.NodeChildrenChanged ||
event.getType() == Event.EventType.NodeDataChanged) {
loadConfigs(); // 重新加载配置
}
});
}
public String getConfig(String key) {
return config.get(key);
}
}
面试回答:
"Zookeeper是经典的分布式协调服务,核心应用场景包括:1)服务发现,服务启动时创建临时节点注册,客户端监听节点变化;2)分布式锁,利用临时顺序节点和Watcher实现公平锁;3)配置中心,配置变更通知所有客户端。Zookeeper通过ZAB协议保证数据一致性,基于Raft实现。"
3.3 Etcd(现代方案)
为什么Etcd比Zookeeper好?
- 简单:API更简洁
- 高性能:使用Raft+gRPC
- 云原生:Kubernetes使用
- 接口友好:HTTP API
Etcd操作:
# 写入key
etcdctl put /config/app "value1"
# 读取key
etcdctl get /config/app
# 监听key变化
etcdctl watch /config/app
# 租约(分布式锁)
etcdctl lock mylock
# 事务(原子操作)
etcdctl txn \
--help
Java客户端:
public class EtcdService {
private final Client client;
public EtcdClient(String endpoints) {
this.client = Client.builder()
.endpoints(endpoints.split(","))
.build();
}
// 服务注册
public void registerService(String service, String instance) {
Lease lease = client.getLeaseClient().grant(10).get();
long leaseId = lease.getID();
ByteString key = ByteString.copyFromUtf8(
"/services/" + service + "/" + instance
);
ByteString value = ByteString.copyFromUtf8(instance);
client.getKVClient().put(
PutRequest.newBuilder()
.setKey(key)
.setValue(value)
.setLease(leaseId)
.build()
).get();
// 定期续约
keepAlive(leaseId);
}
// 服务发现
public List<String> discoverServices(String service) {
ByteString prefix = ByteString.copyFromUtf8(
"/services/" + service + "/"
);
GetResponse response = client.getKVClient()
.get(GetRequest.newBuilder()
.setPrefix(prefix)
.build())
.get();
return response.getKvsList().stream()
.map(kv -> kv.getValue().toStringUtf8())
.collect(Collectors.toList());
}
}
3.4 Consul(一站式方案)
Consul功能:
- 服务发现
- 健康检查
- KV存储
- 多数据中心
服务注册:
// 服务注册
reg := &consul.AgentServiceRegistration{
ID: "order-service-1",
Name: "order-service",
Address: "192.168.1.10",
Port: 8080,
Check: &consul.AgentServiceCheck{
HTTP: "http://192.168.1.10:8080/health",
Interval: "10s",
Timeout: "5s",
DeregisterCriticalServiceAfter: "30s",
},
}
consulClient.Agent().ServiceRegister(reg)
服务发现:
// 服务发现
services, _, _ := consulClient.Health().Service("order-service", "", true, nil)
for _, service := range services {
fmt.Printf("Service: %s:%d\n",
service.Service.Address,
service.Service.Port)
}
第四部分:分布式事务
4.1 两阶段提交(2PC)
角色:
- 协调者(Coordinator): 协调事务
- 参与者(Participants): 执行事务
过程:
阶段一:准备(Prepare)
1. 协调者向所有参与者发送Prepare
2. 参与者执行本地事务,但不提交
3. 参与者返回Yes(可以提交)或No(不能提交)
阶段二:提交(Commit)
如果所有参与者都返回Yes:
协调者发送Commit
参与者提交本地事务
如果有参与者返回No或超时:
协调者发送Rollback
参与者回滚本地事务
缺点:
- 同步阻塞: 参与者锁定资源直到提交
- 单点故障: 协调者故障导致事务卡住
- 数据不一致: 参与者故障导致部分提交
代码示例:
public void twoPhaseCommit(List<Participant> participants) {
Coordinator coordinator = new Coordinator(participants);
// 阶段一:准备
boolean allPrepared = true;
for (Participant p : participants) {
if (!p.prepare()) {
allPrepared = false;
break;
}
}
// 阶段二:提交或回滚
if (allPrepared) {
for (Participant p : participants) {
p.commit();
}
} else {
for (Participant p : participants) {
p.rollback();
}
}
}
4.2 三阶段提交(3PC)
相比2PC的改进:
- 增加预提交阶段
- 参与者可以超时自行决定
- 减少阻塞时间
过程:
阶段一:CanCommit
协调者询问参与者是否可以提交
参与者返回Yes或No
阶段二:PreCommit
如果CanCommit都是Yes
协调者发送PreCommit
参与者预提交但不提交
否则
协调者发送Abort
参与者回滚
阶段三:DoCommit
如果PreCommit都成功
协调者发送DoCommit
参与者提交
否则
协调者发送Abort
参与者回滚
4.3 TCC(Try-Confirm-Cancel)
三阶段:
- Try: 预留资源
- Confirm: 确认提交
- Cancel: 取消回滚
示例:电商下单
// 库存服务
public interface StockService {
// Try:冻结库存
boolean tryDeduct(Long productId, int count);
// Confirm:确认扣减
boolean confirmDeduct(Long productId, int count);
// Cancel:释放库存
boolean cancelDeduct(Long productId, int count);
}
// 订单服务
public interface OrderService {
// Try:创建预订单
String tryCreateOrder(Order order);
// Confirm:确认订单
boolean confirmOrder(String orderId);
// Cancel:取消订单
boolean cancelOrder(String orderId);
}
// TCC事务管理器
public class TccTransactionManager {
public void executeOrder(Order order) {
String txId = generateTxId();
// Try阶段
boolean stockReserved = stockService.tryDeduct(
order.getProductId(), order.getCount()
);
if (!stockReserved) {
throw new BusinessException("库存不足");
}
String orderId = orderService.tryCreateOrder(order);
// Confirm阶段
try {
stockService.confirmDeduct(order.getProductId(), order.getCount());
orderService.confirmOrder(orderId);
} catch (Exception e) {
// Cancel阶段
stockService.cancelDeduct(order.getProductId(), order.getCount());
orderService.cancelOrder(orderId);
throw e;
}
}
}
TCC框架推荐:
- Hmily: 蚂蚁TCC框架
- ByteTCC: Spring Cloud TCC
- Seata: 阿里分布式事务框架
4.4 Saga(长事务)
原理:
- 将长事务拆分成多个本地事务
- 每个本地事务都有补偿操作
- 失败时反向执行补偿
示例:酒店预订流程
1. 预订酒店(本地事务)
2. 预订机票(本地事务)
3. 预订租车(本地事务)
如果第2步失败:
执行第1步的补偿:取消酒店预订
Saga实现:
public class HotelBookingSaga {
private final List<SagaStep> steps = new ArrayList<>();
public HotelBookingSaga() {
// 定义步骤和补偿
steps.add(new SagaStep(
() -> bookHotel(), // 执行
() -> cancelHotel() // 补偿
));
steps.add(new SagaStep(
() -> bookFlight(),
() -> cancelFlight()
));
steps.add(new SagaStep(
() -> bookCar(),
() -> cancelCar()
));
}
public void execute() {
int successIndex = -1;
try {
// 执行所有步骤
for (int i = 0; i < steps.size(); i++) {
steps.get(i).execute();
successIndex = i;
}
} catch (Exception e) {
// 反向执行补偿
for (int i = successIndex; i >= 0; i--) {
try {
steps.get(i).compensate();
} catch (Exception ex) {
log.error("补偿失败", ex);
}
}
throw e;
}
}
}
4.5 本地消息表(最终一致性)
原理:
- 业务和消息在同一本地事务
- 定时任务扫描并投递消息
- 保证消息不丢失
表设计:
CREATE TABLE local_messages (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
topic VARCHAR(100) NOT NULL,
message JSON NOT NULL,
status VARCHAR(20) DEFAULT 'PENDING',
retry_count INT DEFAULT 0,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_status (status, create_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
实现:
@Service
@Transactional
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private LocalMessageRepository messageRepository;
public void createOrder(Order order) {
// 1. 保存订单
orderRepository.save(order);
// 2. 保存消息(同一事务)
LocalMessage message = LocalMessage.builder()
.topic("order-created")
.message(order)
.status("PENDING")
.build();
messageRepository.save(message);
}
}
// 定时任务投递消息
@Scheduled(fixedDelay = 5000)
public void sendPendingMessages() {
List<LocalMessage> messages = messageRepository
.findByStatusAndCreateTimeBefore(
"PENDING",
LocalDateTime.now().minusMinutes(5)
);
for (LocalMessage msg : messages) {
try {
// 发送到MQ
rabbitTemplate.convertAndSend(
msg.getTopic(),
msg.getMessage()
);
// 更新状态
msg.setStatus("SENT");
messageRepository.save(msg);
} catch (Exception e) {
// 增加重试次数
msg.setRetryCount(msg.getRetryCount() + 1);
if (msg.getRetryCount() > 3) {
msg.setStatus("FAILED");
}
messageRepository.save(msg);
}
}
}
面试回答:
"分布式事务实现方案:1)2PC/3PC,强一致性但性能差;2)TCC,Try预留资源、Confirm确认、Cancel补偿,适合短事务;3)Saga,长事务拆分成多个本地事务,每个有补偿;4)本地消息表,业务和消息在同一事务,定时任务投递,保证最终一致性。实际项目中,短事务用TCC,长事务用Saga,对实时性要求不高用本地消息表。"
第五部分:分布式服务治理
5.1 服务注册与发现
流程:
1. 服务启动时向注册中心注册
2. 客户端从注册中心获取服务列表
3. 客户端选择实例调用
4. 服务下线时从注册中心注销
方案对比:
| 方案 | 优点 | 缺点 |
|---|---|---|
| Zookeeper | 成熟稳定 | CP模式,注册中心不可用时影响服务 |
| Eureka | AP模式,高可用 | 维护停止,学习曲线陡 |
| Consul | 功能全面,支持多数据中心 | 资源占用较高 |
| Nacos | 动态配置+服务发现 | 阿里系,学习成本 |
| Etcd | 简单,K8s使用 | 功能相对少 |
Nacos服务注册:
# application.yml
spring:
cloud:
nacos:
discovery:
server-addr: localhost:8848
namespace: dev
group: DEFAULT_GROUP
service: order-service
metadata:
version: 1.0.0
region: cn-beijing
服务发现调用:
@RestController
public class OrderController {
@Autowired
private DiscoveryClient discoveryClient;
@Autowired
private LoadBalancerClient loadBalancer;
@GetMapping("/call-user")
public String callUser() {
// 服务发现
ServiceInstance instance = loadBalancer.choose("user-service");
// 调用服务
String url = String.format("http://%s:%s/api/user",
instance.getHost(),
instance.getPort()
);
return restTemplate.getForObject(url, String.class);
}
}
5.2 负载均衡
算法:
算法1:轮询(Round Robin)
public class RoundRobinLoadBalancer {
private final AtomicInteger index = new AtomicInteger(0);
public ServiceInstance choose(List<ServiceInstance> instances) {
int pos = index.getAndIncrement() % instances.size();
return instances.get(pos);
}
}
算法2:随机(Random)
public class RandomLoadBalancer {
private final Random random = new Random();
public ServiceInstance choose(List<ServiceInstance> instances) {
int pos = random.nextInt(instances.size());
return instances.get(pos);
}
}
算法3:加权轮询(Weighted Round Robin)
public class WeightedRoundRobinLoadBalancer {
private final AtomicInteger counter = new AtomicInteger(0);
public ServiceInstance choose(List<ServiceInstance> instances) {
int totalWeight = instances.stream()
.mapToInt(ServiceInstance::getWeight)
.sum();
int pos = counter.getAndIncrement() % totalWeight;
int sum = 0;
for (ServiceInstance instance : instances) {
sum += instance.getWeight();
if (pos < sum) {
return instance;
}
}
return instances.get(0);
}
}
算法4:最少连接(Least Connections)
public class LeastConnectionsLoadBalancer {
public ServiceInstance choose(List<ServiceInstance> instances) {
return instances.stream()
.min(Comparator.comparingInt(ServiceInstance::getActiveConnections))
.orElse(null);
}
}
算法5:一致性哈希(Consistent Hash)
public class ConsistentHashLoadBalancer {
private final TreeMap<Long, ServiceInstance> ring = new TreeMap<>();
private final int virtualNodes = 150;
public ConsistentHashLoadBalancer(List<ServiceInstance> instances) {
for (ServiceInstance instance : instances) {
for (int i = 0; i < virtualNodes; i++) {
long hash = hash(instance.getAddress() + ":" + i);
ring.put(hash, instance);
}
}
}
public ServiceInstance choose(String key) {
long hash = hash(key);
Map.Entry<Long, ServiceInstance> entry = ring.ceilingEntry(hash);
if (entry == null) {
entry = ring.firstEntry();
}
return entry.getValue();
}
private long hash(String key) {
// 使用FNV哈希
long hash = 2166136261L;
for (byte b : key.getBytes()) {
hash = (hash ^ b) * 16777619L;
}
return hash;
}
}
面试回答:
"负载均衡算法包括:1)轮询,简单但可能不均衡;2)随机,适合无状态服务;3)加权轮询,按配置权重分配,适合不同性能实例;4)最少连接,选择连接数最少的实例,适合长连接;5)一致性哈希,相同请求路由到相同实例,适合有状态服务。Spring Cloud默认使用轮询,可以切换。"
5.3 服务熔断与降级
Hystrix断路器:
@RestController
public class UserController {
@HystrixCommand(
fallbackMethod = "getUserFallback",
commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "3000"),
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "20"),
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "50"),
@HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "5000")
}
)
@GetMapping("/user/{id}")
public User getUser(@PathVariable Long id) {
return restTemplate.getForObject(
"http://user-service/api/user/" + id,
User.class
);
}
// 降级方法
public User getUserFallback(@PathVariable Long id) {
return User.builder()
.id(id)
.name("默认用户")
.build();
}
}
Sentinel熔断:
@RestController
public class UserController {
@SentinelResource(
value = "getUser",
blockHandler = "getUserBlock",
fallback = "getUserFallback"
)
@GetMapping("/user/{id}")
public User getUser(@PathVariable Long id) {
return restTemplate.getForObject(
"http://user-service/api/user/" + id,
User.class
);
}
// 降级方法
public User getUserFallback(Long id) {
return User.builder()
.id(id)
.name("默认用户")
.build();
}
// 限流处理
public User getUserBlock(Long id, BlockException ex) {
return User.builder()
.id(id)
.name("限流降级")
.build();
}
}
5.4 服务限流
Sentinel限流:
@RestController
public class OrderController {
@GetMapping("/order/create")
@SentinelResource(
value = "createOrder",
blockHandler = "createOrderBlock"
)
public Result createOrder(OrderRequest request) {
return orderService.createOrder(request);
}
public Result createOrderBlock(OrderRequest request, BlockException ex) {
return Result.error("系统繁忙,请稍后重试");
}
}
动态配置规则:
@PostConstruct
public void initFlowRules() {
List<FlowRule> rules = new ArrayList<>();
// QPS限流
FlowRule rule1 = new FlowRule();
rule1.setResource("createOrder");
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule1.setCount(1000);
rule1.setLimitApp("default");
rule1.setStrategy(RuleConstant.STRATEGY_DIRECT);
rule1.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);
rules.add(rule1);
// 并发限流
FlowRule rule2 = new FlowRule();
rule2.setResource("createOrder");
rule2.setGrade(RuleConstant.FLOW_GRADE_THREAD);
rule2.setCount(100);
rules.add(rule2);
FlowRuleManager.loadRules(rules);
}
5.5 服务网格(Service Mesh)
Istio架构:
应用程序
↓
Sidecar代理(Envoy)
↓
服务网格
↓
其他服务
核心功能:
- 流量管理(路由)
- 安全(TLS mTLS)
- 可观测性(追踪、监控、日志)
- 策略(限流、熔断)
Istio配置示例:
# VirtualService - 流量路由
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: reviews
spec:
hosts:
- reviews
http:
- match:
- headers:
end-user:
exact: jason
route:
- destination:
host: reviews
subset: v2
- route:
- destination:
host: reviews
subset: v1
# DestinationRule - 服务子集
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
name: reviews
spec:
host: reviews
subsets:
- name: v1
labels:
version: v1
- name: v2
labels:
version: v2
# Gateway - 入口网关
apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
name: bookinfo-gateway
spec:
selector:
istio: ingressgateway
servers:
- port:
number: 443
name: https
protocol: HTTPS
hosts:
- bookinfo.com
tls:
mode: SIMPLE
credentialName: bookinfo-credential
面试回答:
"Service Mesh是服务间通信的基础设施层,通过Sidecar代理处理流量。Istio是主流实现,提供流量管理、安全、可观测性等功能。使用Istio可以将业务逻辑和基础设施功能解耦,运维人员通过配置管理流量,开发人员专注业务。但Istio增加了系统复杂度,适合微服务规模较大的场景。"
第六部分:分布式追踪
6.1 分布式追踪原理
目的:
- 追踪请求在微服务间的调用链
- 定位性能瓶颈
- 分析故障根因
核心概念:
- TraceId: 全局唯一追踪ID
- SpanId: 调用链上的每一步
- ParentSpanId: 父调用ID
调用链结构:
TraceId: abc123
├─ Span1: Gateway (ParentSpanId: null)
│ ├─ Span2: OrderService (ParentSpanId: Span1)
│ │ ├─ Span3: UserService (ParentSpanId: Span2)
│ │ └─ Span4: ProductService (ParentSpanId: Span2)
│ └─ Span5: PaymentService (ParentSpanId: Span1)
└─ Span6: Response
6.2 SkyWalking(推荐)
架构:
Agent (采集数据)
↓
OAP Server (处理分析)
↓
Elasticsearch (存储)
↓
UI (可视化)
Agent配置:
# skywalking-agent/config/agent.conf
agent.service_name=order-service
agent.collector.backend_service=localhost:11800
agent.logging.level=DEBUG
Java应用启动参数:
java -javaagent:/path/to/skywalking-agent.jar \
-Dskywalking.agent.service_name=order-service \
-Dskywalking.collector.backend_service=localhost:11800 \
-jar order-service.jar
查询追踪:
- 通过TraceId查询完整调用链
- 查看每个Span的耗时
- 定位慢SQL、慢HTTP调用
6.3 Zipkin + Sleuth
Spring Cloud Sleuth集成:
# application.yml
spring:
application:
name: order-service
sleuth:
zipkin:
base-url: http://localhost:9411
sampler:
probability: 1.0 # 100%采样
zipkin:
base-url: http://localhost:9411
手动创建Span:
@RestController
public class OrderController {
@Autowired
private Tracer tracer;
@GetMapping("/order/{id}")
public Order getOrder(@PathVariable Long id) {
// 创建自定义Span
Span span = tracer.nextSpan().name("get-order-db");
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span.start())) {
span.tag("order-id", String.valueOf(id));
// 查询数据库
Order order = orderRepository.findById(id);
span.tag("db-time", String.valueOf(System.currentTimeMillis()));
return order;
} finally {
span.end();
}
}
}
面试回答:
"分布式追踪用于追踪请求在微服务间的调用链。我们使用SkyWalking实现,它包含Agent采集、OAP处理、ES存储、UI展示四个组件。Agent无侵入采集调用数据,通过TraceId和SpanId关联完整调用链,可以定位性能瓶颈和故障根因。SkyWalking还提供服务拓扑、APM指标等功能。"
第七部分:实战案例
案例1:分布式ID生成器
需求:
- 全局唯一
- 趋势递增(对数据库友好)
- 高性能
- 分布式环境可用
方案对比:
| 方案 | 优点 | 缺点 |
|---|---|---|
| UUID | 简单 | 无序,占用空间大 |
| 数据库自增 | 简单 | 性能差,单点 |
| Redis自增 | 性能好 | 依赖Redis |
| Snowflake | 完美 | 时钟回拨问题 |
Snowflake实现:
public class SnowflakeIdGenerator {
private final long workerId;
private final long datacenterId;
private long sequence = 0;
private long lastTimestamp = -1L;
// 各部分位数
private static final long WORKER_ID_BITS = 5L;
private static final long DATACENTER_ID_BITS = 5L;
private static final long SEQUENCE_BITS = 12L;
// 最大值
private static final long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);
private static final long MAX_DATACENTER_ID = ~(-1L << DATACENTER_ID_BITS);
// 偏移量
private static final long WORKER_ID_SHIFT = SEQUENCE_BITS;
private static final long DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS;
private static final long TIMESTAMP_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS;
// 起始时间戳
private static final long EPOCH = 1288834974657L;
public SnowflakeIdGenerator(long workerId, long datacenterId) {
if (workerId > MAX_WORKER_ID || workerId < 0) {
throw new IllegalArgumentException("Worker ID invalid");
}
if (datacenterId > MAX_DATACENTER_ID || datacenterId < 0) {
throw new IllegalArgumentException("Datacenter ID invalid");
}
this.workerId = workerId;
this.datacenterId = datacenterId;
}
public synchronized long nextId() {
long timestamp = System.currentTimeMillis();
// 时钟回拨处理
if (timestamp < lastTimestamp) {
long offset = lastTimestamp - timestamp;
if (offset <= 5) {
// 小幅回拨,等待
try {
Thread.sleep(offset << 1);
timestamp = System.currentTimeMillis();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else {
// 大幅回拨,抛出异常
throw new RuntimeException("Clock moved backwards");
}
}
if (timestamp == lastTimestamp) {
// 同一毫秒,序列号递增
sequence = (sequence + 1) & ~(-1L << SEQUENCE_BITS);
if (sequence == 0) {
// 序列号溢出,等待下一毫秒
timestamp = tilNextMillis(lastTimestamp);
}
} else {
// 新的毫秒,序列号重置
sequence = 0;
}
lastTimestamp = timestamp;
// 组合ID
return ((timestamp - EPOCH) << TIMESTAMP_SHIFT)
| (datacenterId << DATACENTER_ID_SHIFT)
| (workerId << WORKER_ID_SHIFT)
| sequence;
}
private long tilNextMillis(long lastTimestamp) {
long timestamp = System.currentTimeMillis();
while (timestamp <= lastTimestamp) {
timestamp = System.currentTimeMillis();
}
return timestamp;
}
}
ID结构:
0 | 0000000000 0000000000 0000000000 0000000000 0 | 00000 | 00000 | 000000000000
↑ ────────────────────── 时间戳 ────────────────── ↑ 机房 │ 机器 │ 序列
(41位, 69年) (5位) (5位) (12位)
面试回答:
"分布式ID生成器使用Snowflake算法,ID由时间戳、机房ID、机器ID、序列号组成,共64位。时间戳41位可用69年,机器ID各5位支持32个机房和32台机器,序列号12位单机每毫秒可产生4096个ID。Snowflake生成的ID趋势递增,对数据库友好。时钟回拨时小幅回拨等待,大幅回拨抛出异常。"
案例2:分布式计数器
需求:
- 高并发计数
- 支持增量
- 数据持久化
方案:Redis INCR
@Service
public class DistributedCounter {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 自增
*/
public long increment(String key) {
return redisTemplate.opsForValue().increment(key);
}
/**
* 自增指定值
*/
public long incrementBy(String key, long delta) {
return redisTemplate.opsForValue().increment(key, delta);
}
/**
* 获取当前值
*/
public long get(String key) {
Object value = redisTemplate.opsForValue().get(key);
return value != null ? Long.parseLong(value.toString()) : 0L;
}
}
支持过期时间:
/**
* 自增并设置过期时间
*/
public long incrementWithExpire(String key, long expireSeconds) {
LuaScript script = new DefaultRedisScript<>(
"local current = redis.call('get', KEYS[1]) " +
"if current == false then " +
" redis.call('set', KEYS[1], 1) " +
" redis.call('expire', KEYS[1], ARGV[1]) " +
" return 1 " +
"else " +
" local next = redis.call('incr', KEYS[1]) " +
" redis.call('expire', KEYS[1], ARGV[1]) " +
" return next " +
"end",
Long.class
);
return redisTemplate.execute(
script,
Collections.singletonList(key),
String.valueOf(expireSeconds)
);
}
案例3:分布式配置中心
需求:
- 配置热更新
- 多环境支持
- 配置版本管理
- 灰度发布
基于Nacos实现:
# bootstrap.yml
spring:
application:
name: order-service
cloud:
nacos:
config:
server-addr: localhost:8848
namespace: dev
group: DEFAULT_GROUP
file-extension: yaml
refresh-enabled: true
shared-configs:
- data-id: common.yaml
refresh: true
监听配置变更:
@Component
public class ConfigListener {
@NacosConfigListener(dataId = "order-service.yaml")
public void onConfigChange(String newConfig) {
log.info("配置变更: {}", newConfig);
// 处理配置变更
}
}
自定义配置源:
public class DatabaseConfigSource implements ConfigSource {
@Autowired
private ConfigRepository configRepository;
@Override
public String getProperty(String key) {
ConfigEntity config = configRepository.findByKey(key);
return config != null ? config.getValue() : null;
}
@Override
public Map<String, String> getAllProperties() {
return configRepository.findAll().stream()
.collect(Collectors.toMap(
ConfigEntity::getKey,
ConfigEntity::getValue
));
}
}
第八部分:面试题库
面试题1:什么是CAP理论?
回答模板:
"CAP理论指出分布式系统最多同时满足两个特性:C(一致性)、A(可用性)、P(分区容错)。由于网络分区必然存在,P是必须保证的,因此只能在C和A之间选择。
CP系统保证强一致性但牺牲可用性,如MySQL、HBase,适合金融交易;
AP系统保证高可用但接受短暂不一致,如Redis、Cassandra,适合社交内容;
CA系统不是真正的分布式,如单机数据库。实际设计中,我们通常追求BASE理论:基本可用、软状态、最终一致性。"
面试题2:分布式锁怎么实现?
回答模板:
"分布式锁实现方案:
方案1:Redis SETNX + 过期时间
boolean locked = redisTemplate.opsForValue() .setIfAbsent(lockKey, value, timeout, TimeUnit.SECONDS);方案2:Redisson(推荐)
Redisson实现了Redis分布式锁,支持可重入、自动续约(看门狗)、公平锁。方案3:Zookeeper临时有序节点
创建临时有序节点,最小的节点获取锁。Redis方案性能好,Zookeeper方案可靠性高。实际项目中使用Redisson。"
面试题3:分布式事务如何实现?
回答模板:
"分布式事务实现方案:
1)2PC/3PC: 强一致性但性能差,协调者单点问题。
2)TCC: Try预留资源、Confirm确认、Cancel补偿,适合短事务,业务需实现三阶段接口。
3)Saga: 长事务拆分成多个本地事务,每个有补偿,失败时反向补偿。
4)本地消息表: 业务和消息在同一事务,定时任务投递,保证最终一致性。
实际项目中,短事务用TCC,长事务用Saga,对实时性要求不高用本地消息表。"
面试题4:如何实现服务发现?
回答模板:
"服务发现包括服务注册和服务发现两个部分:
服务注册: 服务启动时向注册中心注册,发送服务名、地址、端口、元数据等信息;服务下线时从注册中心注销。
服务发现: 客户端从注册中心获取服务列表,通过负载均衡算法选择实例调用。
实现方案:
- Nacos:支持AP和CP模式,动态配置+服务发现
- Eureka:AP模式,自我保护机制
- Consul:支持多数据中心,健康检查
- Zookeeper:CP模式,稳定成熟
实际项目中使用Nacos,支持动态配置和服务发现。"
面试题5:什么是分布式追踪?
回答模板:
"分布式追踪用于追踪请求在微服务间的调用链,帮助定位性能瓶颈和故障根因。
核心概念:
- TraceId:全局唯一追踪ID,贯穿整个调用链
- SpanId:调用链上的每一步
- ParentSpanId:父调用ID
实现方案:
- SkyWalking:无侵入Agent,服务拓扑、APM指标
- Zipkin:Spring Cloud Sleuth集成
- Jaeger:Uber开源
实际项目中使用SkyWalking,通过Agent无侵入采集数据,UI展示调用链和性能指标。"
面试题6:Paxos和Raft的区别?
回答模板:
"Paxos和Raft都是分布式共识算法:
Paxos:
- 理论基础,描述了达成一致的过程
- Multi-Paxos用于实际实现
- 理解困难,实现复杂
Raft:
- 工程实现,优先考虑可理解性
- 分为领导选举、日志复制、安全性三个部分
- 易理解和实现
Raft是Paxos的简化版本,Etcd、Consul都使用Raft实现。"
面试题7:如何处理网络分区?
回答模板:
"网络分区处理根据系统类型不同:
CP系统(如Zookeeper、Etcd):
- 网络分区时,少数派节点停止服务
- 多数派节点继续服务
- 恢复后数据同步
AP系统(如Redis、Cassandra):
- 网络分区时,所有节点继续服务
- 可能出现数据不一致
- 恢复后冲突解决(如Last-Write-Win)
实际项目中,核心服务选择CP,非核心服务选择AP。"
面试题8:分布式ID生成方案?
回答模板:
"分布式ID生成方案:
1)UUID: 简单但无序,占用空间大,不适合做数据库主键。
2)数据库自增: 简单但性能差,单点问题。
3)Redis自增: 性能好但依赖Redis,可能丢失。
4)Snowflake: ID由时间戳、机房ID、机器ID、序列号组成,趋势递增,高性能。
5)号码段模式: 从数据库预分配号段,性能好,可能不连续。
实际项目中使用Snowflake,生成的ID趋势递增对数据库友好。"
面试题9:什么是最终一致性?
回答模板:
"最终一致性是指系统保证在没有新更新的情况下,经过一段时间后,所有副本最终会达到一致。
实现方式:
1)读后写一致性:读取版本号,写入时带上版本号
2)写后读一致性:写入后立即刷新缓存
3)会话一致性:同一会话内保证读一致性适用场景:
- 社交点赞、评论
- 用户浏览历史
- 日志数据
实际项目中,对于不需要强一致性的场景使用最终一致性,提升系统性能。"
面试题10:如何选择注册中心?
回答模板:
"注册中心选型考虑以下因素:
1)Zookeeper: 成熟稳定,CP模式,适合对一致性要求高的场景。
2)Eureka: Spring Cloud原生,AP模式,自我保护机制,但维护停止。
3)Consul: 功能全面,支持多数据中心,健康检查,但资源占用较高。
4)Nacos: 阿里开源,支持AP和CP模式,动态配置+服务发现,社区活跃。
实际项目中选择Nacos,支持动态配置和服务发现,社区活跃,文档完善。"
第九部分:快速记忆卡片
核心概念速记
【CAP理论】
C(一致性)、A(可用性)、P(分区容错)
三选二,P必须选,CP或AP
【BASE理论】
基本可用、软状态、最终一致性
【Raft三个组件】
领导选举、日志复制、安全性
【分布式锁方案】
Redis SETNX、Redisson、Zookeeper
【分布式事务方案】
2PC/3PC、TCC、Saga、本地消息表
【分布式ID方案】
UUID、数据库自增、Redis、Snowflake
技术选型决策树
需要共识算法?
├─ 理论学习 → Paxos
└─ 工程实现 → Raft
需要注册中心?
├─ Spring Cloud → Eureka/Nacos
├─ 一致性优先 → Zookeeper
└─ 功能全面 → Consul
需要分布式锁?
├─ 性能优先 → Redis/Redisson
└─ 可靠性优先 → Zookeeper
需要分布式事务?
├─ 强一致性 → 2PC
├─ 短事务 → TCC
├─ 长事务 → Saga
└─ 最终一致 → 本地消息表
第十部分:学习路线图
阶段1:基础理解(1周)
- [ ] 理解分布式系统定义和挑战
- [ ] 掌握CAP和BASE理论
- [ ] 理解网络不可靠、时钟不同步
- [ ] 掌握雪花算法生成分布式ID
阶段2:共识算法(1周)
- [ ] 理解Paxos算法原理
- [ ] 掌握Raft算法三组件
- [ ] 了解Gossip协议
阶段3:分布式协调(1周)
- [ ] Zookeeper基础和应用
- [ ] Etcd基础和应用
- [ ] Consul基础和应用
阶段4:分布式事务(1周)
- [ ] 理解2PC/3PC
- [ ] 掌握TCC实现
- [ ] 掌握Saga实现
- [ ] 掌握本地消息表
阶段5:服务治理(1周)
- [ ] 服务注册与发现
- [ ] 负载均衡算法
- [ ] 服务熔断降级
- [ ] 服务限流
阶段6:分布式追踪(1周)
- [ ] 理解分布式追踪原理
- [ ] 掌握SkyWalking使用
- [ ] 了解Zipkin和Sleuth
阶段7:实战项目(2周)
- [ ] 分布式ID生成器
- [ ] 分布式计数器
- [ ] 分布式配置中心
- [ ] 分布式锁实现
阶段8:面试准备(1周)
- [ ] 背诵10个核心面试题回答
- [ ] 练习设计3个完整方案
- [ ] 模拟面试
结语
记住几个关键点:
- 核心思想: 分工协作、容错设计、一致性保证
- 设计原则: 根据业务场景选择一致性级别
- 架构原则: CP还是AP,根据需求权衡
- 实现方案: 理论+工程,选择成熟方案
面试时,先说理论,再说方案,最后说实现。你不需要做过大型分布式系统,只要理解原理和设计思路,就能给出令人信服的答案!
最重要的是:自信! 分布式系统设计是工程经验的总结,核心是理解权衡和各种方案的适用场景。