Skip to content

Rx 响应式示例

本节展示如何使用 System.Reactive 处理变更事件。

基本订阅

csharp
using Apq.ChangeBubbling.Messaging;
using System.Reactive.Linq;

// 启用 Rx 流
ChangeMessenger.EnableRxStream = true;

// 订阅所有变更
ChangeMessenger.AsObservable()
    .Subscribe(change =>
    {
        Console.WriteLine($"[{change.Kind}] {change.NodeName}");
    });

过滤事件

csharp
// 只处理添加事件
ChangeMessenger.AsObservable()
    .Where(c => c.Kind == NodeChangeKind.CollectionAdd)
    .Subscribe(c => Console.WriteLine($"Added: {c.NewValue}"));

// 只处理特定节点
ChangeMessenger.AsObservable()
    .Where(c => c.NodeName == "Users")
    .Subscribe(c => Console.WriteLine($"User change: {c.Kind}"));

// 只处理特定属性
ChangeMessenger.AsObservable()
    .Where(c => c.PropertyName == "Name")
    .Subscribe(c => Console.WriteLine($"Name changed: {c.OldValue} -> {c.NewValue}"));

节流与防抖

csharp
// 节流:每 100ms 最多处理一个事件
ChangeMessenger.AsObservable()
    .Throttle(TimeSpan.FromMilliseconds(100))
    .Subscribe(c => SaveChanges(c));

// 防抖:等待 300ms 无新事件后处理
ChangeMessenger.AsObservable()
    .Debounce(TimeSpan.FromMilliseconds(300))
    .Subscribe(c => Search(c.NewValue?.ToString()));

缓冲批处理

csharp
// 每秒批量处理
ChangeMessenger.AsObservable()
    .Buffer(TimeSpan.FromSeconds(1))
    .Where(batch => batch.Count > 0)
    .Subscribe(batch =>
    {
        Console.WriteLine($"Processing {batch.Count} changes");
        foreach (var change in batch)
        {
            Process(change);
        }
    });

// 每 10 个事件批量处理
ChangeMessenger.AsObservable()
    .Buffer(10)
    .Subscribe(batch => ProcessBatch(batch));

转换数据

csharp
// 转换为 DTO
ChangeMessenger.AsObservable()
    .Select(c => new ChangeDto
    {
        Type = c.Kind.ToString(),
        Node = c.NodeName,
        Path = c.Path,
        Value = c.NewValue
    })
    .Subscribe(dto => SendToServer(dto));

组合流

csharp
// 合并多个流
var userChanges = ChangeMessenger.AsObservable()
    .Where(c => c.NodeName == "Users");

var settingChanges = ChangeMessenger.AsObservable()
    .Where(c => c.NodeName == "Settings");

userChanges.Merge(settingChanges)
    .Subscribe(c => LogChange(c));

完整示例

csharp
using Apq.ChangeBubbling.Nodes;
using Apq.ChangeBubbling.Messaging;
using System.Reactive.Linq;

public class Program
{
    public static void Main()
    {
        // 启用 Rx 流
        ChangeMessenger.EnableRxStream = true;

        // 创建节点
        var users = new ListBubblingNode<User>("Users");

        // 订阅添加事件
        ChangeMessenger.AsObservable()
            .Where(c => c.Kind == NodeChangeKind.CollectionAdd)
            .Subscribe(c => Console.WriteLine($"User added: {c.NewValue}"));

        // 订阅移除事件
        ChangeMessenger.AsObservable()
            .Where(c => c.Kind == NodeChangeKind.CollectionRemove)
            .Subscribe(c => Console.WriteLine($"User removed: {c.OldValue}"));

        // 批量统计(每秒)
        ChangeMessenger.AsObservable()
            .Buffer(TimeSpan.FromSeconds(1))
            .Where(batch => batch.Count > 0)
            .Subscribe(batch =>
            {
                Console.WriteLine($"[Stats] {batch.Count} changes in last second");
            });

        // 操作数据
        users.Add(new User { Name = "Alice" });
        users.Add(new User { Name = "Bob" });
        users.Add(new User { Name = "Charlie" });
        users.RemoveAt(0);

        Console.ReadLine();
    }
}

基于 MIT 许可发布