AbortPolicyWithReport
dubbo搞了拒绝策略AbortPolicyWithReport、线程池EagerThreadPoolExecutor、线程工厂NameThreadFactory、任务队列TaskQueue、线程服务ExecuteService-ThreadlessExecutor,需要学会这些自定义的用法。
AbortPolicyWithReport在原来支持的AbortPolicy(该拒绝策略是默认的,策略是抛异常)基础上加了WithReport功能(打日志+dump线程堆栈+事件通知)。补充下jdk的四种拒绝策略:
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize时,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务 ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
类、属性、构造方法如下。继承ThreadPoolExecutor.AbortPolicy类,重写了 rejectedExecution方法。我们后面的SPI ThreadPool的那几中,都是用这个拒绝策略,会使用其构造方法,传入的name给传给NameThreadFactory的一致。从lastPrintTime到guard的属性都是给后面dumpJStack方法使用的。lastPrintTime和TEN_MINUTES_MILLS控制dump点的间隔,OS_WIN_PREFIX到DEFAULT_DATETIME_FORMAT是为了针对不同的系统设置不同的日期格式,guard信号量是控制并发的,多线程并发dumpJStack的时候只允许当前时刻一个线程进行dump。
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
private final String threadName;
private final URL url;
private static volatile long lastPrintTime = 0;
private static final long TEN_MINUTES_MILLS = 10 * 60 * 1000;
private static final String OS_WIN_PREFIX = "win";
private static final String OS_NAME_KEY = "os.name";
private static final String WIN_DATETIME_FORMAT = "yyyy-MM-dd_HH-mm-ss";
private static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd_HH:mm:ss";
private static Semaphore guard = new Semaphore(1);
public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}
rejectedExecution,当线程池发生拒绝操作的时候,会调用如下方法,这里是重写了父类AbortPolicy的rejectedExecution方法。直接看注释。
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: "
+ "%d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
dumpJStack();
dispatchThreadPoolExhaustedEvent(msg);
throw new RejectedExecutionException(msg);
}
通知ThreadPoolExhaustedEvent事件 给 相关。
public void dispatchThreadPoolExhaustedEvent(String msg) {
EventDispatcher.getDefaultExtension().dispatch(new ThreadPoolExhaustedEvent(this, msg));
}
private void dumpJStack() {
long now = System.currentTimeMillis();
if (now - lastPrintTime < TEN_MINUTES_MILLS) {
return;
}
if (!guard.tryAcquire()) {
return;
}
ExecutorService pool = Executors.newSingleThreadExecutor();
pool.execute(() -> {
String dumpPath = url.getParameter(DUMP_DIRECTORY, System.getProperty("user.home"));
SimpleDateFormat sdf;
String os = System.getProperty(OS_NAME_KEY).toLowerCase();
if (os.contains(OS_WIN_PREFIX)) {
sdf = new SimpleDateFormat(WIN_DATETIME_FORMAT);
} else {
sdf = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT);
}
String dateStr = sdf.format(new Date());
try (FileOutputStream jStackStream = new FileOutputStream(
new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
JVMUtil.jstack(jStackStream);
} catch (Throwable t) {
logger.error("dump jStack error", t);
} finally {
guard.release();
}
lastPrintTime = System.currentTimeMillis();
});
pool.shutdown();
}