一、Spring-Intergration 简介
Spring-Intergration提供Spring编程模型的拓展,以支持EIP(nterprise Integration Patterns,企业集成模式)。在基于Spring的应用程序中支持轻量级的消息传递,支持通过适配器(Adapters)与外部系统集成。Spring-Intergration有以下目标:
·提供一个简单的模型来实现复杂的企业集成解决方案
·在基于Spring的应用程序中提供异步、消息驱动的行为
·激励Spring的现有用户直观、增量的使用Spring-Intergration
二、Spring-Intergration组成
Spring-Intergration 主要由Message、Message Channel、Message Endpoint几部分组成,接下载我们将分别对其进行简要介绍。
Message:
在Spring-Intergration中,Message是消息传输的载体,它由Header(消息头)和Payload(消息体)两部分组成。Payload中可以存放任何数据类型,Header中通常包含时间戳、信息ID、地址等常用信息。例如,当从一个文件创建Message时,文件名可以存放在Header中,具体的文件内容可以存放在Payload中。
图1.Message
Message Channel:
Message Channel(消息管道),生产者生产消息推送到channel,消费者从channel消费消息。因此,消息通道将消息传递组件解耦,并且还提供了一个方便的消息拦截和监视点。
图2.Message Channel
a)PublishSubscribeChannel:
PublishSubscribeChannel将发送给它的任何消息广播到其所有订阅者,因为它没有实现PollableChannel,因此没有receive()方法,PublishSubscribeChannel仅能用于发送。
b)QueueChannel:
QueueChannel的内部实现包装了一个队列。与PublishSubscribeChannel不同,QueueChannel具有点对点的概念。换句话说,即使该通道有多个使用者,其中只有一个使用者可以接收发送到该通道的任何消息。如果队列已达到容量,发送方将阻塞,直到队列中有可用的空间或者队列中的消息被超时处理。
c)PriorityChannel:
PriorityChannel是QueueChannel先入先出的一种替代实现,它允许根据优先级在通道内对消息进行排序。
d)RendezvousChannel:
RendezvousChannel支持“direct-handoff”场景.。发送方会一直阻塞,直到另一方调用通道的receive()方法执行。或者接收方阻塞,直到发送方发送消息。
e)DirectChannel:
DirectChannel具有点对点含义,与PublishSubscribeChannel的不同之处在于,它将每个消息发送给单个订阅者的MessageHandler进行处理。
f)ExecutorChannel:
Executor通道是一个点对点管道,与DirectChannel不同点在于 Executor将委托给TaskExecutor的实例来执行调度。
g)FluxMessageChannel:
不同于上述管道,FluxMessageChannel既没有实现SubscribableChannel,也没有实现PollableChannel,因此只有org.reactivestreams.Subscriber的实例可从该通道中消费Message。
h)ScopedChannel:
通过对管道添加scope属性, 增加ThreadLocalChannel的功能。
Message Endpoint:
Message Endpoint(消息终端),消息处理的真正节点,接收消息管道发来的消息,处理完成后发送到对应的消息管道。
Message Transformer:
消息转换器负责转换消息的内容或结构,并返回修改后的消息。最常见的转换器类型可能是将消息的Payload从一种格式转换为另一种格式(例如从XML转换为java.lang.String)。类似地,转换器可以添加、修改或删除消息的Header内容。
Message Filter:
消息筛选器确定是否应将消息传递到输出通道。可以对Payload的类型、属性值、是否存在报头或其他条件进行筛选。如果消息被接受,它将被发送到输出通道。如果没有,它将被丢弃(也有可能会抛出异常)。
Message Router:
消息路由器负责决定哪个或者哪些通道应该接收消息。通常,基于Payload或者Header中的消息进行决策。
图3. Message Router
Splitter:
拆分器接收来自其输入通道的消息,将该消息拆分为多个消息,并将每个消息发送到其输出通道。
Aggregator:
聚合器基本上是拆分器的镜像,它是一种接收多条消息并将它们组合成一条消息的消息终端。
Service Activator:
服务激活器是将服务实例连接到消息传递系统的通用端点。必须配置输入消息通道,如果要调用的服务方法能够返回值,则还可以提供输出消息通道。
图4. Service Activator
Channel Adapter:
通道适配器是将消息通道连接到其他系统或传输的终端。通道适配器可以是入站或出站。通常,通道适配器在从另一个系统接收或发送到另一个系统的任何对象或资源(文件、HTTP请求、JMS消息等)之间进行一些映射。根据传输的不同,通道适配器还可以填充或提取Header中的值。
图5. Channel Adapter
Endpoint Bean Names:
消费终端由消费者和消息处理程序两个bean组成。消费者有一个对消息处理程序的引用,并在消息到达时调用它。
三、Demo
HelloWorldApp:
HelloWorld应用程序演示了一个简单的消息流,如下所示:
helloWorldDemo.xml:定义inputChannel,outputChannel(设置容量大小为10),activator(接收helloService里的sayHello方法)。
HelloService.java:
HelloWorldApp.java:
输出结果:
PollerApp:
Poller应用程序每20秒打印两次当前系统时间。
Delay.xml:
PollerApp.java:
作者:王子浩