本文主要介绍 Active Object 模式,在 Java Design Patterns 网站上有对该模式进行介绍。这里主要是做个笔记,并添加一些扩展,以加深对该设计模式的理解。
Java Design Patterns 提供了各种 Java 设计模式的介绍、示例代码和用例说明。该网站旨在帮助 Java 开发人员了解和应用各种常见的设计模式,以提高代码的可读性、可维护性和可扩展性。
Java Design Patterns 网站提供了多种设计模式分类方式,包括创建型模式(Creational Patterns)、结构型模式(Structural Patterns)和行为型模式(Behavioral Patterns),以及其他一些常见的模式。
对于每个设计模式,该网站提供了详细的介绍、示例代码和用例说明,并且提供了一些常见的使用场景和注意事项。开发人员可以根据自己的需求选择适合自己的设计模式,并且可以参考示例代码和用例说明来理解和应用该模式。
此外,Java Design Patterns 网站还提供了一些其他资源,如设计模式的 UML 图、设计模式的优缺点、设计模式的比较等。这些资源可以帮助开发人员更好地理解和应用设计模式。
中文网站:https://java-design-patterns.com/zh/
Github 上源码仓库(非官方):https://github.com/iluwatar/java-design-patterns
目的
活动对象(Active Object)是一种设计模式,其主要目的是将并发和异步处理的问题从客户端代码中分离出来,从而提高系统的性能、可靠性和可维护性。活动对象模式是一种能够帮助开发人员处理多线程、异步和并发问题的设计模式。
在传统的并发编程模型中,客户端代码需要直接管理线程和锁等细节,这样会导致代码复杂度和维护成本的增加,同时也容易出现各种问题,如死锁、竞态条件等。活动对象模式通过引入活动对象来解决这些问题,活动对象将客户端代码发送的消息添加到内部的消息队列中,并使用单独的线程异步处理这些消息。这种模式可以提高系统的性能和可扩展性,同时使得客户端代码更加简单和易于维护。
活动对象模式的目的包括:
- 将并发和异步处理的问题从客户端代码中分离出来,从而使得客户端代码更加简单和易于维护。客户端代码只需要发送消息即可,不需要关心异步处理的细节,活动对象将并发和异步处理的问题封装起来,提供简单的接口供客户端使用。
- 提高系统的性能和可扩展性。活动对象使用单独的线程池异步处理消息,可以更好地利用系统资源,提高系统的性能和可扩展性。
- 提高系统的可靠性和健壮性。活动对象将消息添加到内部的消息队列中,避免了竞态条件和死锁等问题,从而提高了系统的可靠性和健壮性。
- 将多线程和异步处理的细节封装起来,使得客户端代码更加抽象和通用。客户端代码可以使用相同的接口来访问不同的服务,从而提高代码的复用性和可维护性。
解释
活动对象模式的核心思想是将并发和异步处理的问题从客户端代码中分离出来。具体来说,活动对象模式包含以下几个关键组件:
- 活动对象(Active Object):活动对象是一个封装了某种服务的对象,它将客户端代码发送的消息添加到内部的消息队列中,并使用单独的线程异步处理这些消息。活动对象通常包含一个消息队列和一个线程池,用于异步处理消息。
- 方法调用请求(Method Invocation Request):客户端代码向活动对象发送方法调用请求,包括方法名和参数列表等信息。活动对象将方法调用请求封装为一个消息对象,并添加到内部的消息队列中。
- 消息队列(Message Queue):消息队列是活动对象内部用于存储方法调用请求的队列。活动对象将客户端代码发送的消息添加到消息队列中,并使用单独的线程异步处理这些消息。
- 线程池(Thread Pool):线程池是活动对象用于异步处理消息的线程池。活动对象从消息队列中取出消息,并使用线程池中的线程异步处理这些消息。
程序示例
public abstract class ActiveCreature{
private final Logger logger = LoggerFactory.getLogger(ActiveCreature.class.getName());
private BlockingQueue<Runnable> requests;
private String name;
private Thread thread;
public ActiveCreature(String name) {
this.name = name;
this.requests = new LinkedBlockingQueue<Runnable>();
thread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
requests.take().run();
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
}
}
}
);
thread.start();
}
public void eat() throws InterruptedException {
requests.put(new Runnable() {
@Override
public void run() {
logger.info("{} is eating!",name());
logger.info("{} has finished eating!",name());
}
}
);
}
public void roam() throws InterruptedException {
requests.put(new Runnable() {
@Override
public void run() {
logger.info("{} has started to roam and the wastelands.",name());
}
}
);
}
public String name() {
return this.name;
}
}
在该示例代码中,ActiveCreature
类封装了一个消息队列,用于异步处理客户端代码发送的消息。具体来说,该示例代码包含以下几个关键组件:
BlockingQueue<Runnable>
类型的requests
属性:该属性表示消息队列,用于存储客户端代码发送的消息。客户端代码可以通过eat()
和roam()
方法向消息队列中添加消息。Thread
类型的thread
属性:该属性表示活动对象的线程,用于异步处理消息队列中的消息。String
类型的name
属性:该属性表示活动对象的名称。ActiveCreature(String name)
构造方法:该方法用于创建一个活动对象,初始化消息队列和线程等属性。在该方法中,我们创建了一个新的线程,并使用requests.take().run()
从消息队列中取出消息并异步处理。eat()
和roam()
方法:这两个方法用于向消息队列中添加消息,表示活动对象正在吃和漫游。在这两个方法中,我们将一个Runnable
对象添加到消息队列中,并在其run()
方法中执行相应的操作,如输出日志等。name()
方法:该方法用于获取活动对象的名称。
在总体上,该示例代码实现了活动对象模式的基本功能,将并发和异步处理的问题从客户端代码中分离出来,并提供了简单的接口供客户端调用。客户端代码只需要调用 eat()
和 roam()
方法即可,不需要关心异步处理的细节,活动对象将并发和异步处理的问题封装起来,提供简单的接口供客户端使用。
需要注意的是,在该示例代码中,我们使用了阻塞队列 BlockingQueue<Runnable>
来实现消息队列,该队列提供了线程安全的添加和移除操作,保证了消息的有序性和正确性。同时,在活动对象的线程中使用了 requests.take().run()
操作来从消息队列中取出消息并异步处理,这种方式可以保证消息的有序性和正确性,并避免了竞态条件和死锁等问题。
我们可以看到,任何将扩展ActiveCreature的类都将具有自己的控制线程来执行和调用方法。
例如,兽人类:
public class Orc extends ActiveCreature {
public Orc(String name) {
super(name);
}
}
现在,我们可以创建多个生物,例如兽人,告诉他们吃东西和散步,然后他们将在自己的控制线程上执行它:
public static void main(String[] args) {
var app = new App();
app.run();
}
@Override
public void run() {
ActiveCreature creature;
try {
for (int i = 0;i < creatures;i++) {
creature = new Orc(Orc.class.getSimpleName().toString() + i);
creature.eat();
creature.roam();
}
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
Runtime.getRuntime().exit(1);
}
类图
举例
以下是一个简单的活动对象示例代码:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ActiveObject {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public Future<String> process(String message) {
// 创建一个异步任务,并将其提交到线程池中
return executor.submit(() -> {
// 模拟复杂的处理逻辑
Thread.sleep(1000);
// 返回处理结果
return "Processed message: " + message;
});
}
public void shutdown() {
// 关闭线程池
executor.shutdown();
}
}
在上面的示例代码中,ActiveObject
类封装了一个异步处理服务,客户端代码可以使用该服务异步处理消息。ActiveObject
类中的 process()
方法接收一个消息,并将其封装为一个异步任务,然后提交到线程池中异步处理。process()
方法返回一个 Future
对象,可以用于获取异步处理的结果。在该示例代码中,异步任务只是简单地模拟了处理逻辑,实际上可以根据需要编写更加复杂的异步处理逻辑。
下面是一个使用 ActiveObject
类的示例代码:
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class Client {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ActiveObject activeObject = new ActiveObject();
// 发送消息到活动对象
Future<String> future = activeObject.process("Hello, world!");
// 等待异步处理完成,并获取处理结果
String result = future.get();
System.out.println(result);
// 关闭活动对象
activeObject.shutdown();
}
}
在上面的示例代码中,客户端代码使用 ActiveObject
类异步处理了一条消息,并等待异步处理完成后获取处理结果。需要注意的是,在使用 ActiveObject
类时,客户端代码只需要发送消息即可,不需要关心异步处理的细节,从而使得客户端代码更加简单和易于维护。
下面是一个复杂的活动对象示例代码,该代码模拟了一个银行账户系统,支持存款、取款和查询余额等操作。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BankAccount {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private double balance;
public BankAccount(double initialBalance) {
this.balance = initialBalance;
}
public CompletableFuture<Double> deposit(double amount) {
CompletableFuture<Double> future = new CompletableFuture<>();
executor.submit(() -> {
balance += amount;
future.complete(balance);
});
return future;
}
public CompletableFuture<Double> withdraw(double amount) {
CompletableFuture<Double> future = new CompletableFuture<>();
executor.submit(() -> {
if (balance >= amount) {
balance -= amount;
future.complete(balance);
} else {
future.completeExceptionally(new InsufficientFundsException("Insufficient funds"));
}
});
return future;
}
public CompletableFuture<Double> getBalance() {
CompletableFuture<Double> future = new CompletableFuture<>();
executor.submit(() -> {
future.complete(balance);
});
return future;
}
public void shutdown() {
executor.shutdown();
}
}
class InsufficientFundsException extends RuntimeException {
public InsufficientFundsException(String message) {
super(message);
}
}
缺点
虽然活动对象模式具有许多优点,但也存在一些缺点,如下所述:
- 复杂性:活动对象模式需要使用异步处理和事件驱动机制,这增加了系统的复杂性。在设计、实现和测试时,需要考虑许多因素,如并发控制、锁定、死锁、线程池大小、任务队列大小等。
- 性能下降:在处理大量请求时,活动对象模式可能会导致性能下降。这是因为活动对象模式需要创建许多线程和任务,这会增加系统的负载和开销。此外,如果任务队列或线程池过大,会导致内存和CPU资源的浪费。
- 调试难度:由于活动对象模式使用异步处理和事件驱动机制,因此在调试时可能会出现难以预测的行为。例如,多个线程可能会同时访问共享资源,导致死锁或竞态条件,从而导致应用程序崩溃或出现其他问题。
- 状态管理:由于活动对象模式使用异步处理和事件驱动机制,因此在处理请求时需要管理对象的状态。这可能会导致状态同步和状态不一致的问题,从而影响系统的正确性和可靠性。
- 缺乏标准化:活动对象模式没有标准化的实现方式,因此在不同的应用程序和框架中可能会有不同的实现方式和限制。这使得活动对象模式在不同的环境中难以移植和重用。
总的来说,活动对象模式是一种强大的设计模式,可以用于实现异步处理、事件驱动、高性能、可伸缩和可靠的应用程序。但是,它也存在一些缺点,需要仔细考虑和权衡。在使用活动对象模式时,需要关注系统的复杂性、性能、调试难度、状态管理和标准化等问题。
应用
在开源框架中的应用
活动对象设计模式在许多开源框架中都得到了广泛应用,以下是几个常见的例子:
- Akka框架:Akka是一个轻量级的Actor模型框架,通过将并发和异步处理的问题从客户端代码中分离出来,提高了系统的性能、可靠性和可维护性。在Akka中,每个Actor都是一个活动对象,通过消息传递的方式进行通信和协作。Akka提供了丰富的API和工具,可以方便地创建和管理Actor,实现高性能和可扩展的系统。
- Vert.x框架:Vert.x是一个基于事件驱动的异步框架,提供了多种语言的API和工具,支持构建高性能和可扩展的应用程序。在Vert.x中,每个组件都是一个活动对象,可以通过Vert.x的事件总线进行通信和协作。Vert.x提供了丰富的异步API和工具,可以方便地处理并发和异步问题。
- RxJava框架:RxJava是一个基于响应式编程的异步框架,提供了丰富的操作符和工具,支持构建高性能和可维护的应用程序。在RxJava中,每个Observable都是一个活动对象,可以通过异步流的方式进行通信和协作。RxJava提供了丰富的操作符和工具,可以方便地处理并发和异步问题,并支持响应式编程的多种特性,如响应式流、背压控制等。
- Netty框架:Netty是一个基于事件驱动的异步网络通信框架,提供了丰富的API和工具,支持构建高性能和可扩展的网络应用程序。在Netty中,每个Channel都是一个活动对象,可以通过事件的方式进行通信和协作。Netty提供了丰富的异步API和工具,可以方便地处理网络通信和异步问题,并支持多种协议和编解码器。
- Spring框架:Spring是一个广泛使用的企业级Java框架,提供了丰富的API和工具,支持构建高性能和可维护的应用程序。在Spring中,可以使用异步处理、响应式编程和事件驱动等方式实现活动对象模式。Spring提供了丰富的异步API和工具,可以方便地处理异步和并发问题。
- JMS框架:JMS是Java消息服务的标准,提供了异步消息传递的方式,支持构建可靠、高性能和可扩展的消息系统。在JMS中,可以使用活动对象模式实现异步消息的处理和分发。JMS提供了丰富的API和工具,可以方便地处理异步消息的生产和消费。
- Apache Camel框架:Apache Camel是一个基于企业级集成模式的开源框架,提供了丰富的组件和工具,支持构建可扩展、高性能和可靠的应用程序。在Camel中,可以使用活动对象模式实现异步消息的处理和路由。Camel提供了丰富的组件和工具,可以方便地处理异步消息的路由和转换。
下面是一个使用Spring异步处理和事件驱动机制实现活动对象模式的示例代码,该示例代码使用了Spring Boot框架和Spring Reactor库:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@Configuration
public class AppConfig {
@Bean
public MessageHandler messageHandler() {
return new MessageHandler();
}
}
@Component
public class MessageHandler {
@Autowired
private MessageRepository messageRepository;
public Mono<Void> handleMessage(String message) {
return Mono.fromCallable(() -> {
// 处理消息
System.out.println("Received message: " + message);
messageRepository.save(new Message(message));
return null;
}).subscribeOn(Schedulers.boundedElastic());
}
public Flux<Message> getAllMessages() {
return Flux.defer(() -> Flux.fromIterable(messageRepository.findAll()))
.subscribeOn(Schedulers.boundedElastic());
}
}
@Entity
public class Message {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String content;
public Message() {}
public Message(String content) {
this.content = content;
}
public Long getId() {
return id;
}
}
在上述示例代码中,我们定义了一个 MessageHandler
类作为活动对象,用于异步处理消息的接收和存储。该类使用了Spring异步处理和事件驱动机制来实现活动对象模式,客户端代码只需要调用相应的方法即可,不需要关心异步处理的细节。
具体来说,该示例代码包含以下几个部分:
AppConfig
配置类:该类用于配置Spring Bean,定义了一个messageHandler()
方法,返回一个MessageHandler
对象。MessageHandler
活动对象类:该类包含了两个方法:handleMessage()
和getAllMessages()
。handleMessage()
方法用于处理消息,将消息存储到数据库中;getAllMessages()
方法用于获取所有的消息。这两个方法都使用了Spring的异步处理机制和事件驱动机制,使用了 Reactor 库中的Mono
和Flux
类。Message
JPA实体类:该类用于表示消息对象,使用了JPA注解。
在 handleMessage()
方法中,我们使用了 Mono.fromCallable()
方法来异步处理消息的接收和存储,将处理操作提交到线程池中执行,然后返回 Mono<Void>
对象,以便客户端代码可以等待处理操作完成。我们还使用了 subscribeOn()
方法来指定异步处理的线程池,以提高系统的性能和可伸缩性。
在 getAllMessages()
方法中,我们使用了 Flux.defer()
方法来异步获取所有的消息,将获取操作提交到线程池中执行,然后返回 Flux<Message>
对象,以便客户端代码可以异步获取消息。我们同样使用了 subscribeOn()
方法来指定异步处理的线程池。
通过使用Spring异步处理和事件驱动机制,我们可以实现高性能、可靠和可扩展的活动对象模式,提高系统的性能和可维护性。
下面是一个使用活动对象模式实现异步消息处理和分发的JMS示例代码,该示例代码使用了ActiveMQ作为JMS消息中间件:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class JMSMessageHandler implements MessageListener {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private Connection connection;
private Session session;
private Destination destination;
public JMSMessageHandler(String brokerUrl, String destinationName) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(destinationName);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
}
public CompletableFuture<Void> send(String message) {
CompletableFuture<Void> future = new CompletableFuture<>();
executor.submit(() -> {
try {
TextMessage textMessage = session.createTextMessage(message);
MessageProducer producer = session.createProducer(destination);
producer.send(textMessage);
future.complete(null);
} catch (JMSException e) {
future.completeExceptionally(e);
}
});
return future;
}
public void onMessage(Messagemessage) {
CompletableFuture.runAsync(() -> {
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
// 处理消息
System.out.println("Received message: " + text);
}
} catch (JMSException e) {
e.printStackTrace();
}
});
}
public void close() throws JMSException {
connection.close();
executor.shutdown();
}
}
在上述示例代码中,JMSMessageHandler
类表示一个JMS消息处理器,可以异步处理来自JMS队列的消息,并将处理结果发送回JMS队列。该类使用了活动对象模式来实现异步消息的处理和分发,客户端代码只需要发送消息即可,不需要关心异步处理的细节。
具体来说,该类包含以下几个方法:
JMSMessageHandler(String brokerUrl, String destinationName)
构造方法:该方法用于创建一个JMS消息处理器对象,连接到指定的JMS消息中间件并订阅指定的队列。send(String message)
方法:该方法用于发送消息到JMS队列中,客户端代码可以调用该方法将消息发送到指定的队列。onMessage(Message message)
方法:该方法是MessageListener
接口的回调方法,用于异步处理队列中的消息,并将处理结果发送回JMS队列。close()
方法:该方法用于关闭JMS连接和线程池,释放资源。
在 send()
方法中,我们使用了 CompletableFuture
对象来异步处理消息的发送,将发送操作提交到线程池中执行,然后返回 CompletableFuture
对象,以便客户端代码可以等待发送操作完成。
在 onMessage()
方法中,我们使用了 CompletableFuture.runAsync()
方法来异步处理消息的处理,将处理操作提交到线程池中执行,然后返回 CompletableFuture
对象,以便客户端代码可以等待处理操作完成。
通过使用活动对象模式和异步处理技术,我们可以实现高性能、可靠和可扩展的JMS消息处理器,提高系统的性能和可维护性。
在项目中的使用
在公司的项目中,用到过活动对象这个设计模式,只是之前并不清楚这个模式。使用场景是,发送飞书通知和拨打语音电话时,将请求添加到一个内部阻塞队列,然后单独启动一个线程去消费这个队列。
以下是拨打语音电话的代码:
/**
* Tencent Cloud Vms SendTtsVoice
* https://cloud.tencent.com/document/product/1128/51558
*/
@Slf4j
@AllArgsConstructor
public class VmsServiceImpl implements VmsService {
public static final String PREFIX_PHONE = "86";
private static final BlockingQueue<VoiceSenderRequest> queue = new LinkedBlockingQueue<>();
private TtsVoiceSender ttsVoiceSender;
private final AsyncVmsThread asyncVmsThread = new AsyncVmsThread();
@PostConstruct
public void init() {
asyncVmsThread.start();
}
@PreDestroy
public void destroy() {
asyncVmsThread.shutdown();
}
public void sendAsync(Collection<NoticeTarget> noticeUsers, Integer templateId, String[] params) {
if (templateId == null || CollectionUtils.isEmpty(noticeUsers)) {
return;
}
for (NoticeTarget noticeUser : noticeUsers) {
if (!noticeUser.getType().equals(NoticeTargetTypeEnum.PHONE)) {
continue;
}
try {
queue.put(new VoiceSenderRequest().setNoticeUser(noticeUser).setTemplateId(templateId).setParams(params));
} catch (InterruptedException e) {
throw new BusinessException("线程被中断");
}
}
}
public Boolean send(String phone, Integer templateId, String[] params) {
if (templateId == null || StringUtils.isBlank(phone)) {
throw new BusinessException("参数不能为空");
}
TtsVoiceSenderResult ttsVoiceSenderResult = null;
try {
ttsVoiceSenderResult = ttsVoiceSender.send(PREFIX_PHONE, phone, templateId, params, 2, "");
} catch (Exception e) {
throw new RuntimeException(e);
}
if (ttsVoiceSenderResult.result != 0) {
throw new BusinessException(ttsVoiceSenderResult.errMsg);
}
return ttsVoiceSenderResult.result == 0;
}
public class AsyncVmsThread extends Thread {
AtomicBoolean isRunning = new AtomicBoolean(true);
public AsyncVmsThread() {
super("vmsSender");
}
@Override
public void run() {
while (isRunning.get()) {
ThreadUtil.sleep(2000L);
VoiceSenderRequest voiceSenderRequest = null;
try {
voiceSenderRequest = queue.take();
send(voiceSenderRequest.getNoticeUser().getId(), voiceSenderRequest.getTemplateId(), voiceSenderRequest.getParams());
} catch (Exception e) {
log.warn("{}", String.format("发送语音电话给[%s]出现异常: %s", voiceSenderRequest.getNoticeUser(), e.getMessage()), e);
}
}
}
public void shutdown() {
isRunning.set(false);
}
}
@Data
@Accessors(chain = true)
public class VoiceSenderRequest {
private NoticeTarget noticeUser;
private Integer templateId;
private String[] params;
}
}
优化之后的代码:
@Slf4j
public abstract class ActiveObject {
private BlockingQueue<Runnable> requests;
private String name;
private Thread thread;
private volatile boolean isAcceptingRequests = true;
private volatile boolean isProcessingRequests = true;
public ActiveObject(String name) {
this(name, 16, null);
}
public ActiveObject(String name, int queueSize, Long sleepMillis) {
this.name = name;
this.requests = new LinkedBlockingQueue<>(queueSize);
thread = new Thread(() -> {
while (isProcessingRequests) {
try {
processRequest(requests.take());
if (sleepMillis != null) {
Thread.sleep(sleepMillis);
}
} catch (InterruptedException e) {
log.warn("Active Object thread interrupted, reason: {}", e.getMessage());
}
}
}, name);
thread.start();
}
private void processRequest(Runnable task) {
try {
task.run();
} catch (Exception e) {
log.error("Error processing request: {}", e.getMessage());
// 发送告警信息
}
}
public void shutdown() {
isAcceptingRequests = false;
while (!requests.isEmpty()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore exception
}
}
isProcessingRequests = false;
thread.interrupt();
}
public void run(Runnable runnable) {
if (isAcceptingRequests) {
requests.offer(runnable);
} else {
throw new IllegalStateException("Active object is no longer accepting requests");
}
}
public String name() {
return this.name;
}
}
该类具有构造函数,用于创建具有给定名称和指定大小阻塞请求队列的Active Object。Active Object在后台的单独线程上运行,该线程在循环中执行,直到被中断。它使用run
方法执行添加到请求队列中的请求。run
方法接受一个Runnable
对象,并将其添加到请求队列中。如果Active Object不再接受请求,则抛出IllegalStateException
。
shutdown
方法用于停止Active Object。它首先将isAcceptingRequests
标志设置为false,这将防止将任何进一步的请求添加到队列中。然后等待队列变为空,然后将isProcessingRequests
标志设置为false,并中断Active Object的线程。
这个类本身已经是一个很好的Active Object模式的实现了,但是如果需要更高的性能或更好的扩展性,还可以进行一些优化:
使用线程池:当前的实现中,每个Active Object都有一个单独的线程来处理请求。如果需要处理大量的Active Object,这将会产生很多线程,从而影响系统的性能和稳定性。可以使用线程池来管理线程,从而更好地控制线程的数量和资源的使用。
如果在使用ActiveObject时要执行耗时的任务,可以将任务放到一个单独的线程池中执行,以避免阻塞ActiveObject实例的请求处理线程。
@Slf4j public abstract class ActiveObject { private BlockingQueue<Runnable> requests; private String name; private ExecutorService executorService; private Thread thread; private volatile boolean isAcceptingRequests = true; private volatile boolean isProcessingRequests = true; public ActiveObject(String name) { this(name, 16, null); } public ActiveObject(String name, int queueSize, Long sleepMillis) { this.name = name; this.requests = new LinkedBlockingQueue<>(queueSize); this.executorService = Executors.newFixedThreadPool(5); thread = new Thread(() -> { while (isProcessingRequests) { try { processRequest(requests.take(), executorService); if (sleepMillis != null) { Thread.sleep(sleepMillis); } } catch (InterruptedException e) { log.warn("Active Object thread interrupted, reason: {}", e.getMessage()); } } }, name); thread.start(); } private void processRequest(Runnable task, ExecutorService executorService) { try { executorService.submit(task).get(); } catch (Exception e) { log.error("Error processing request: {}", e.getMessage()); // 发送告警信息 } } public void shutdown() { isAcceptingRequests = false; while (!requests.isEmpty()) { try { Thread.sleep(100); } catch (InterruptedException e) { // Ignore exception } } isProcessingRequests = false; executorService.shutdown(); thread.interrupt(); } public void run(Runnable runnable) { if (isAcceptingRequests) { requests.offer(runnable); } else { throw new IllegalStateException("Active object is no longer accepting requests"); } } public String name() { return this.name; } }
在修改后的代码中,添加了一个私有变量executorService,用于存储一个线程池对象,其中线程池的大小为5。在构造方法中,创建了一个新的FixedThreadPoolExecutor实例,并将其作为executorService的值。该线程池会在ActiveObject实例中处理耗时任务,避免阻塞请求队列的处理。
在processRequest方法中,使用executorService.submit(task).get()来提交并执行任务。在执行任务时,使用了get()方法来同步获取任务的执行结果,以确保任务执行完成后再处理下一个请求。
在shutdown方法中,添加了executorService.shutdown()来关闭线程池。该方法会等待所有任务执行完成后关闭线程池,并防止新任务被提交。这样可以确保所有任务都被处理完毕后才关闭ActiveObject实例。
需要注意的是,在使用线程池时,要根据具体的业务需求和系统资源情况选择合适的线程池大小和类型,避免线程池过大或过小,从而影响系统性能或导致线程池拥堵。
优化请求的处理:当前实现中,每个请求都会在执行完毕后等待100毫秒。如果请求处理较快,这将浪费很多时间。可以根据实际情况优化请求的处理方式,例如设置一个最小执行时间,或者使用更高效的数据结构来管理请求队列。
在当前的实现中,使用了一个阻塞队列LinkedBlockingQueue来管理请求队列。这种数据结构的优点是可以保证线程安全,但是在高并发场景下可能会成为瓶颈,因为它是基于链表实现的,每次添加或删除元素时都需要进行同步操作,可能会影响性能。
如果需要更高效的请求队列管理方式,可以考虑使用无锁的并发队列,例如Disruptor或ConcurrentLinkedQueue。这些数据结构可以在高并发场景下提供更好的性能和可扩展性,但是需要更加复杂的实现和使用方法,需要根据具体的场景进行权衡和选择。如果使用Disruptor,需要进行更加细致的配置和调优,以便发挥最大的性能优势。如果使用ConcurrentLinkedQueue,需要考虑并发问题,例如使用CAS操作来保证线程安全。
如果使用ConcurrentLinkedQueue来管理请求队列,需要考虑并发问题,因为该数据结构是非阻塞的,多个线程可以同时对其进行操作,可能会导致并发问题,例如竞态条件和内存一致性问题。
为了保证线程安全,可以使用CAS(Compare and Swap)操作来实现原子性的元素插入和删除。CAS操作可以保证只有一个线程能够成功修改共享变量的值,其他线程需要重试或者等待。
例如,在ActiveObject类中,可以将请求队列声明为ConcurrentLinkedQueue类型,并使用CAS操作来实现元素的插入和删除:
public abstract class ActiveObject { private ConcurrentLinkedQueue<Runnable> requests; // ... public ActiveObject(String name, int queueSize) { this.name = name; this.requests = new ConcurrentLinkedQueue<>(); thread = new Thread(() -> { while (isProcessingRequests) { Runnable task = requests.poll(); if (task != null) { task.run(); } try { Thread.sleep(100); } catch (InterruptedException e) { log.warn("Active Object thread interrupted, reason: {}", e.getMessage()); } } }, name); thread.start; } }
在上面的代码中,使用了ConcurrentLinkedQueue来替换了原有的阻塞队列LinkedBlockingQueue,并使用了poll方法来获取队列中的下一个元素,而不是take方法,这样可以避免线程阻塞。然后使用CAS操作来实现元素的添加,使用了offer方法,并检查返回值是否为true,如果为false则表示CAS操作失败,需要重试或者等待。
需要注意的是,ConcurrentLinkedQueue并不能保证元素的顺序,因此在处理请求时需要注意顺序问题,不能保证先进先出的顺序。如果需要保证顺序,可以使用其他的数据结构,例如BlockingQueue。
上面的代码已经很不错了,但还有一些可以优化的地方:
- 使用工厂方法来创建ActiveObject实例
可以将ActiveObject类中的构造方法改为私有的,然后提供一个工厂方法来创建ActiveObject实例。这样可以将ActiveObject的创建逻辑与客户端代码分离,使得客户端代码更加简洁和易于维护。
- 将日志记录器作为静态变量
可以将日志记录器作为静态变量,以便在整个类中共享。这样可以避免在每个方法中都创建一个日志记录器,提高代码的可读性和性能。
- 使用线程安全的单例模式
可以使用线程安全的单例模式来确保ActiveObject实例的唯一性。这样可以避免在多个地方创建多个ActiveObject实例,从而导致系统资源浪费或数据不一致。
修改后的代码如下:
public abstract class ActiveObject {
private static final Logger log = LoggerFactory.getLogger(ActiveObject.class);
private static final Map<String, ActiveObject> INSTANCES = new ConcurrentHashMap<>();
private BlockingQueue<Runnable> requests;
private String name;
private ExecutorService executorService;
private Thread thread;
private volatile boolean isAcceptingRequests = true;
private volatile boolean isProcessingRequests = true;
private ActiveObject(String name, int queueSize, Long sleepMillis) {
this.name = name;
this.requests = new LinkedBlockingQueue<>(queueSize);
this.executorService = Executors.newFixedThreadPool(5);
thread = new Thread(() -> {
while (isProcessingRequests) {
try {
processRequest(requests.take(), executorService);
if (sleepMillis != null) {
Thread.sleep(sleepMillis);
}
} catch (InterruptedException e) {
log.warn("Active Object thread interrupted, reason: {}", e.getMessage());
}
}
}, name);
thread.start();
}
private void processRequest(Runnable task, ExecutorService executorService) {
try {
executorService.submit(task).get();
} catch (Exception e) {
log.error("Error processing request: {}", e.getMessage());
// 发送告警信息
}
}
public static synchronized ActiveObject getInstance(String name) {
return getInstance(name, 16, null);
}
publicvoid submit(Runnable task) {
if (isAcceptingRequests) {
try {
requests.put(task);
} catch (InterruptedException e) {
log.error("Error submitting request: {}", e.getMessage());
// 发送告警信息
}
} else {
log.warn("Active Object is not accepting requests");
// 发送告警信息
}
}
public static synchronized ActiveObject getInstance(String name, int queueSize, Long sleepMillis) {
ActiveObject instance = INSTANCES.get(name);
if (instance == null) {
instance = new ActiveObject(name, queueSize, sleepMillis);
INSTANCES.put(name, instance);
}
return instance;
}
public void shutdown() {
isAcceptingRequests = false;
while (!requests.isEmpty()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore exception
}
}
isProcessingRequests = false;
executorService.shutdown();
thread.interrupt();
}
}
上面的代码已经很不错了,但还有一些可以进一步优化的地方:
- 使用线程池来管理线程,而不是每次创建一个新的线程。这样可以减少线程的创建和销毁的开销,提高代码的性能。
使用Lambda表达式简化代码,使代码更加简洁和易于理解。例如,可以将线程中的while循环改为Lambda表达式,简化代码。
使用CompletableFuture来实现异步处理,将请求的处理过程和结果的返回分离开来。这样可以提高代码的可读性和可维护性,同时也可以提高代码的性能。
使用ThreadFactory来命名线程,以便更好地跟踪线程的执行情况以及排查问题。
使用CompletableFuture的exceptionally方法处理异常,以避免在处理请求时出现异常而导致整个线程停止。
public class ActiveObject {
private static final Logger log = LoggerFactory.getLogger(ActiveObject.class);
private static final Map<String, ActiveObject> INSTANCES = new ConcurrentHashMap<>();
private static final int THREAD_POOL_SIZE = 5;
private BlockingQueue<Runnable> requests;
private String name;
private ExecutorService executorService;
private CompletableFuture<Void> processingFuture;
private volatile boolean isAcceptingRequests = true;
private ActiveObject(String name, int queueSize, Long sleepMillis) {
this.name = name;
this.requests = new LinkedBlockingQueue<>(queueSize);
this.executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE, new ThreadFactory() {
private AtomicInteger count = new AtomicInteger(0);
public Thread newThread(Runnable r) {
return new Thread(r, "ActiveObject-" + name + "-" + count.getAndIncrement());
}
});
processingFuture = CompletableFuture.runAsync(() -> {
while (isAcceptingRequests) {
try {
executorService.execute(requests.take());
if (sleepMillis != null) {
Thread.sleep(sleepMillis);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Active Object thread interrupted, reason: {}", e.getMessage());
} catch (Exception e) {
log.error("Error processing request: {}", e.getMessage());
// 发送告警信息
}
}
}).exceptionally(e -> {
log.error("Exception occurred in ActiveObject thread: {}", e.getMessage());
return null;
});
}
public void submit(Runnable task) {
requests.offer(task);
}
public void stop() {
isAcceptingRequests = false;
processingFuture.cancel(true);
executorService.shutdown();
}
public static synchronized ActiveObject getInstance(String name) {
return getInstance(name, 16, null);
}
public static synchronized ActiveObject getInstance(String name, int queueSize, Long sleepMillis) {
ActiveObject instance = INSTANCES.get(name);
if (instance == null) {
instance = new ActiveObject(name, queueSize, sleepMillis);
INSTANCES.put(name, instance);
}
return instance;
}
public String getName() {
return name;
}
}
总结
活动对象模式是一种支持异步处理和事件驱动的设计模式,适用于一些需要高性能、可伸缩和可靠的应用场景。以下是几个适用于活动对象模式的使用场景:
- 大规模并发处理:在大规模并发的情况下,使用传统的同步处理方式会导致系统性能下降和响应时间延长。使用活动对象模式可以将并发请求转换成异步事件,通过事件驱动机制实现高性能和可伸缩。
- 高吞吐量数据处理:在需要处理大量数据的情况下,使用活动对象模式可以利用多核CPU和异步处理技术,提高系统的处理能力和吞吐量。
- 异步消息传递:在需要异步处理消息的情况下,使用活动对象模式可以实现异步消息的处理和分发,提高系统的可靠性和可维护性。
- 分布式系统:在分布式系统中,使用活动对象模式可以实现异步消息传递和事件驱动,提高系统的可靠性和可伸缩性。同时,活动对象模式还可以通过分布式锁和分布式计算等技术实现分布式并发控制和计算,提高系统的性能和可靠性。
- UI和后台逻辑分离:在需要将UI和后台逻辑分离的情况下,使用活动对象模式可以实现UI和后台逻辑的解耦和异步处理,提高系统的可维护性和可扩展性。
- 异步IO操作:在需要进行异步IO操作的情况下,使用活动对象模式可以实现非阻塞IO和异步事件处理,提高系统的性能和响应时间。
总的来说,活动对象模式适用于一些需要异步处理、事件驱动、高性能、可伸缩和可靠的应用场景。通过使用活动对象模式,可以提高系统的性能和可维护性,同时降低系统的复杂度和成本。