推荐阅读:阿里P8架构师谈:工作1-5年的Java工程师,怎样提高核心竞争力 阿里架构师直言:“没有实战都是纸上谈兵”!Redis实战PDF 单例是最常见的一种设计模式, 一般用于全局对象管理, 比如xml配置读写之类的. 一般分为懒汉式, 饿汉式. 这种方式, 由于每次获取示例都要获取锁, 不推荐使用, 性能较差 本方式是对直接在方法上加锁的一个优化, 好处在于只有第一次初始化获取了锁. 后续调用getInstance已经是无锁状态. 只是写法上稍微繁琐点. 至于为什么要volatile关键字, 主要涉及到jdk指令重排, 详见之前的博文: Java内存模型与指令重排 该方式既解决了同步问题, 也解决了写法繁琐问题. 推荐使用改写法. 缺点在于无法响应事件来重新初始化INSTANCE. 缺点在于对象在一开始就直接初始化了. 该模式的核心思想是异步调用. 有点类似于异步的ajax请求. 当调用某个方法时, 可能该方法耗时较久, 而在主函数中也不急于立刻获取结果. 因此可以让调用者立刻返回一个凭证, 该方法放到另外线程执行,后续主函数拿凭证再去获取方法的执行结果即可, 其结构图如下 jdk中内置了Future模式的支持, 其接口如下: 注意其中两个耗时操作. 与上述FutureTask不同的是, RealData需要实现Callable接口. 另外Future本身还提供了一些额外的简单控制功能, 其API如下 生产者-消费者模式是一个经典的多线程设计模式. 它为多线程间的协作提供了良好的解决方案。 在生产者-消费者模式中,通常有两类线程,即若干个生产者线程和若干个消费者线程。 生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。 生产者和消费者之间则通过共享内存缓冲区进行通信, 其结构图如下 PCData为我们需要处理的元数据模型, 生产者构建PCData, 并放入缓冲队列. 消费者从缓冲队列中获取数据, 并执行计算. 生产消费者模式可以有效对数据解耦, 优化系统结构. 降低生产者和消费者线程相互之间的依赖与性能要求. 一般使用BlockingQueue作为数据缓冲队列, 他是通过锁和阻塞来实现数据之间的同步, 如果对缓冲队列有性能要求, 则可以使用基于CAS无锁设计的ConcurrentLinkedQueue. 严格来讲, 分而治之不算一种模式, 而是一种思想. 它可以将一个大任务拆解为若干个小任务并行执行, 提高系统吞吐量. 我们主要讲两个场景, Master-Worker模式, ForkJoin线程池. Master-Worker模式 该模式核心思想是系统由两类进行协助工作: Master进程, Worker进程. Master负责接收与分配任务, Worker负责处理任务. 当各个Worker处理完成后, 将结果返回给Master进行归纳与总结. 假设一个场景, 需要计算100个任务, 并对结果求和, Master持有10个子进程. Master代码 Worker代码 主函数测试 该线程池是jdk7之后引入的一个并行执行任务的框架, 其核心思想也是将任务分割为子任务, 有可能的任务还是很大, 还需要进一步拆解, 最终得到足够小的任务. 将分割出来的子任务放入双端队列中, 然后几个启动线程从双端队列中获取任务执行. 子任务执行的结果放到一个队列里, 另起线程从队列中获取数据, 合并结果. 假设我们的场景需要计算从0到20000000L的累加求和. CountTask继承自RecursiveTask, 可以携带返回值. 每次分解大任务, 简单的将任务划分为100个等规模的小任务, 并使用fork()提交的任务. 在子任务中通过THRESHOLD设置子任务分解的阈值, 如果当前需要求和的总数大于THRESHOLD, 则子任务需要再次分解,如果子任务可以直接执行, 则进行求和操作, 返回结果. 最终等待所有的子任务执行完毕, 对所有结果求和. ForkJoin线程池使用一个无锁的栈来管理空闲线程, 如果一个工作线程暂时取不到可用的任务, 则可能被挂起. 挂起的线程将被压入由线程池维护的栈中, 待将来有任务可用时, 再从栈中唤醒这些线程.本文主要讲解几种常见并行模式, 具体目录结构如下图.
单例
懒汉式: 方法上加synchronized
public static synchronized Singleton getInstance() { if (single == null) { single = new Singleton(); } return single; }
懒汉式: 使用双检锁 + volatile
private volatile Singleton singleton = null; public static Singleton getInstance() { if (singleton == null) { synchronized (Singleton.class) { if (singleton == null) { singleton = new Singleton(); } } } return singleton; }
懒汉式: 使用静态内部类
public class Singleton { private static class LazyHolder { private static final Singleton INSTANCE = new Singleton(); } private Singleton (){} public static final Singleton getInstance() { return LazyHolder.INSTANCE; } }
饿汉式
public class Singleton1 { private Singleton1() {} private static final Singleton1 single = new Singleton1(); public static Singleton1 getInstance() { return single; } }
Future模式
通过FutureTask实现
public class FutureDemo1 { public static void main(String[] args) throws InterruptedException, ExecutionException { FutureTask<String> future = new FutureTask<String>(new Callable<String>() { @Override public String call() throws Exception { return new RealData().costTime(); } }); ExecutorService service = Executors.newCachedThreadPool(); service.submit(future); System.out.println("RealData方法调用完毕"); // 模拟主函数中其他耗时操作 doOtherThing(); // 获取RealData方法的结果 System.out.println(future.get()); } private static void doOtherThing() throws InterruptedException { Thread.sleep(2000L); } } class RealData { public String costTime() { try { // 模拟RealData耗时操作 Thread.sleep(1000L); return "result"; } catch (InterruptedException e) { e.printStackTrace(); } return "exception"; } }
通过Future实现
public class FutureDemo2 { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService service = Executors.newCachedThreadPool(); Future<String> future = service.submit(new RealData2()); System.out.println("RealData2方法调用完毕"); // 模拟主函数中其他耗时操作 doOtherThing(); // 获取RealData2方法的结果 System.out.println(future.get()); } private static void doOtherThing() throws InterruptedException { Thread.sleep(2000L); } } class RealData2 implements Callable<String>{ public String costTime() { try { // 模拟RealData耗时操作 Thread.sleep(1000L); return "result"; } catch (InterruptedException e) { e.printStackTrace(); } return "exception"; } @Override public String call() throws Exception { return costTime(); } }
// 取消任务 boolean cancel(boolean mayInterruptIfRunning); // 是否已经取消 boolean isCancelled(); // 是否已经完成 boolean isDone(); // 取得返回对象 V get() throws InterruptedException, ExecutionException; // 取得返回对象, 并可以设置超时时间 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
生产消费者模式
生产者核心代码
while(isRunning) { Thread.sleep(r.nextInt(SLEEP_TIME)); data = new PCData(count.incrementAndGet); // 构造任务数据 System.out.println(data + " is put into queue"); if (!queue.offer(data, 2, TimeUnit.SECONDS)) { // 将数据放入队列缓冲区中 System.out.println("faild to put data : " + data); } }
消费者核心代码
while (true) { PCData data = queue.take(); // 提取任务 if (data != null) { // 获取数据, 执行计算操作 int re = data.getData() * 10; System.out.println("after cal, value is : " + re); Thread.sleep(r.nextInt(SLEEP_TIME)); } }
分而治之
public class MasterDemo { // 盛装任务的集合 private ConcurrentLinkedQueue<TaskDemo> workQueue = new ConcurrentLinkedQueue<TaskDemo>(); // 所有worker private HashMap<String, Thread> workers = new HashMap<>(); // 每一个worker并行执行任务的结果 private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<>(); public MasterDemo(WorkerDemo worker, int workerCount) { // 每个worker对象都需要持有queue的引用, 用于领任务与提交结果 worker.setResultMap(resultMap); worker.setWorkQueue(workQueue); for (int i = 0; i < workerCount; i++) { workers.put("子节点: " + i, new Thread(worker)); } } // 提交任务 public void submit(TaskDemo task) { workQueue.add(task); } // 启动所有的子任务 public void execute(){ for (Map.Entry<String, Thread> entry : workers.entrySet()) { entry.getValue().start(); } } // 判断所有的任务是否执行结束 public boolean isComplete() { for (Map.Entry<String, Thread> entry : workers.entrySet()) { if (entry.getValue().getState() != Thread.State.TERMINATED) { return false; } } return true; } // 获取最终汇总的结果 public int getResult() { int result = 0; for (Map.Entry<String, Object> entry : resultMap.entrySet()) { result += Integer.parseInt(entry.getValue().toString()); } return result; } }
public class WorkerDemo implements Runnable{ private ConcurrentLinkedQueue<TaskDemo> workQueue; private ConcurrentHashMap<String, Object> resultMap; @Override public void run() { while (true) { TaskDemo input = this.workQueue.poll(); // 所有任务已经执行完毕 if (input == null) { break; } // 模拟对task进行处理, 返回结果 int result = input.getPrice(); this.resultMap.put(input.getId() + "", result); System.out.println("任务执行完毕, 当前线程: " + Thread.currentThread().getName()); } } public ConcurrentLinkedQueue<TaskDemo> getWorkQueue() { return workQueue; } public void setWorkQueue(ConcurrentLinkedQueue<TaskDemo> workQueue) { this.workQueue = workQueue; } public ConcurrentHashMap<String, Object> getResultMap() { return resultMap; } public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { this.resultMap = resultMap; } }
public class TaskDemo { private int id; private String name; private int price; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getPrice() { return price; } public void setPrice(int price) { this.price = price; } }
MasterDemo master = new MasterDemo(new WorkerDemo(), 10); for (int i = 0; i < 100; i++) { TaskDemo task = new TaskDemo(); task.setId(i); task.setName("任务" + i); task.setPrice(new Random().nextInt(10000)); master.submit(task); } master.execute(); while (true) { if (master.isComplete()) { System.out.println("执行的结果为: " + master.getResult()); break; } }
ForkJoin线程池
public class CountTask extends RecursiveTask<Long>{ // 任务分解的阈值 private static final int THRESHOLD = 10000; private long start; private long end; public CountTask(long start, long end) { this.start = start; this.end = end; } public Long compute() { long sum = 0; boolean canCompute = (end - start) < THRESHOLD; if (canCompute) { for (long i = start; i <= end; i++) { sum += i; } } else { // 分成100个小任务 long step = (start + end) / 100; ArrayList<CountTask> subTasks = new ArrayList<CountTask>(); long pos = start; for (int i = 0; i < 100; i++) { long lastOne = pos + step; if (lastOne > end) { lastOne = end; } CountTask subTask = new CountTask(pos, lastOne); pos += step + 1; // 将子任务推向线程池 subTasks.add(subTask); subTask.fork(); } for (CountTask task : subTasks) { // 对结果进行join sum += task.join(); } } return sum; } public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool pool = new ForkJoinPool(); // 累加求和 0 -> 20000000L CountTask task = new CountTask(0, 20000000L); ForkJoinTask<Long> result = pool.submit(task); System.out.println("sum result : " + result.get()); } }
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算