前言
无言。
继续复盘。
复盘
消息队列选型?kafka和rabbitmq对比?
|
RabbitMq |
RocketMq |
Kafka |
| 开发语言 |
erlang |
Java |
Java |
| 单机吞吐 |
万级 |
万级 |
十万级 |
| 延时 |
微秒 |
毫秒 |
毫秒 |
| 消息重复 |
可控制 |
|
可能会有重复 |
| 持久化 |
内存,文件 |
磁盘 |
|
| 事务 |
不支持 |
支持 |
|
| 优点 |
性能较好,支持amqp |
阿里开源,性能非常好,在阿里内部大规模应用。支持多种模式,集群消费,广播消费等。 |
高吞吐量,低延时,稳定性高,消息有序 |
| 缺点 |
erlang语言开发,不利于扩展 |
阿里开源的东西,说不定什么时候社区会停止维护。 |
社区更新较慢,不支持延迟,重试等。 |
如果需要日志采集追求高吞吐量,那么采用kafka;Rabbitmq使用简单,但是不利于二次开发。Rocketmq背靠阿里,成也如此败也如此,阿里的开源贡献很大,但是很多项目稳定后社区经常陷入停滞,不过阿里内部既然在使用,那么说明它的性能和可靠性有保证。
xxljob原理
xxl-job是一个分布式的定时任务调度平台。主要分为admin和executor
xxl-job其实也是在quartz的基础上实现的,但是修改了任务调度的模式,并且任务调度采用注册和RPC调用方式来实现。2.1.0版本前核心调度模块都是基于quartz框架,2.1.0版本开始自研调度组件,移除quartz依赖 ,使用时间轮调度。
xxl_job_info表是记录定时任务的db表,里面有个trigger_next_time(Long)字段,表示下一次触发的时间点任务时间被修改 / 每一次任务触发后,可以根据cronb表达式计算下一次触发时间戳:Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date()))
定时执行任务逻辑:
- 定时任务
scheduleThread:不断从db把5秒内要执行的任务读出,立即触发 / 放到时间轮等待触发,并更新trigger_next_time.
- 获取当前时间
now
- 轮询
db,找出trigger_next_time在距now 5秒内的任务,对到达now时间后的任务(超出now 5秒外)直接跳过不执行(调度过期,如果有调度过期策略则触发执行)或者重置trigger_next_time;对到达now时间后的任务(超出now 5秒内),开线程执行触发逻辑,若任务下一次触发时间是在5秒内,则放到时间轮内(Map<Integer, List>秒数(1-60) => 任务id列表),重置trigger_next_time;对未到达now时间的任务,直接放到时间轮内并重置trigger_next_time。
- 定时任务
ringThread:时间轮实现到点触发任务。时间轮数据结构:Map<Integer, List<Integer>> key是秒数(1-60) ,value是任务id列表。
- 获取当前时间秒数
- 从时间轮内移出当前秒数前2个秒数(避免处理耗时太长,跨过刻度,向前校验一个刻度)的任务列表id,一一触发任务;
如何避免集群中的多个服务器同时调度任务?
当xxl-job应用本身集群部署(实现高可用HA)时,通过mysql悲观锁实现分布式锁(for update语句)
setAutoCommit(false)关闭隐式自动提交事务,启动事务
select lock for update(显式排他锁,其他事务无法进入&无法实现for update)
- 读
db任务信息 -> 拉任务到内存时间轮 -> 更新db任务信息
commit提交事务,同时会释放for update的排他锁(悲观锁)
es优势
前一篇应该说过了,用作全文搜索,相较于mysql会快很多。es是document格式的存储,mysql是行格式的,所以es并不需要显式定义字段。mysql由于其索引实现(innodb为例)导致在数据量大到一定级别后会出现性能衰减;而es只要内存足够就没太大问题。插入速度上如果正确的配置mysql其性能并不低,当然相对于正常状态es而言还是差了一个到多个量级(es>mongo>mysql)。查询速度这个主要看索引和数量,在需要复杂关联查询的时候建议优先考虑mysql。资源开销上,当数据量上去了后如果为了维持性能的话,es的占用内存是十分夸张的。
去掉redis广播怎么通知各服务
从广播推模式改为存在数据库中,各个服务扫表实现。
编程题:两个线程交替打印AB。三个线程交替打印ABC。
被CSDN坑了一把。
相互唤醒
这种写法有的问题是最后会卡住,其实改造下wait的条件就行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public class Main { public static void main(String[] args) { Print print = new Print(10); new Thread(print::printA).start(); new Thread(print::printB).start(); }
public static class Print { private boolean flag = false;
private int count; private int countA; private int countB;
public Print(int count) { this.count = count; }
public synchronized void printA() {
while (!flag && countA++ < count) { this.notify(); System.out.println("A"); flag = true; try { this.wait(); }catch (Exception ignored) {
}
} }
public synchronized void printB() { while (flag && countB++ < count) { this.notify(); System.out.println("B"); flag = false; try { this.wait(); }catch (Exception ignored) {
} } }
} }
|
lock版,本质上也是synchronized和wait/notify
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| Object lock = new Object(); new Thread(() -> { for (int i = 0; i < 10; i++) { synchronized (lock) { lock.notify(); System.out.println("A"); try { lock.wait(); }catch (Exception ignored) {
} } } }).start();
new Thread(() -> { for (int i = 0; i < 10; i++) { synchronized (lock) { lock.notify(); System.out.println("B"); try { lock.wait(); }catch (Exception ignored) {
} } } }).start();
|
lock版 三线程
在网上看到这种写法,不建议,易读性十分不好。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| public static class PrintABCWithLock {
public static void main(String[] args) throws Exception { Object lockA = new Object(); Object lockB = new Object(); Object lockC = new Object();
Thread a = new Thread(() -> { for (int i = 0; i < 10; i++) { synchronized (lockC) { synchronized (lockA) {
System.out.println("A"); lockA.notifyAll(); } try { lockC.wait(); } catch (Exception ignored) {
} } } });
Thread b = new Thread(() -> { for (int i = 0; i < 10; i++) { synchronized (lockA) { synchronized (lockB) { System.out.println("B");
lockB.notifyAll(); } try { lockA.wait(); } catch (Exception ignored) {
} } }
});
Thread c = new Thread(() -> { for (int i = 0; i < 10; i++) { synchronized (lockB) { synchronized (lockC) { System.out.println("C"); lockC.notifyAll(); } try { lockB.wait(); } catch (Exception ignored) {
} } } });
a.start(); Thread.sleep(100); b.start(); Thread.sleep(100); c.start(); a.join(); } }
|
真正的锁版。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| public static class PrintWithLock { private static int state = 0; private static final Lock lock = new ReentrantLock();
private static final Condition A = lock.newCondition(); private static final Condition B = lock.newCondition(); private static final Condition C = lock.newCondition();
public static void main(String[] args) throws Exception { new Thread(() -> { for (int i = 0; i < 10; i++) { lock.lock(); try { while (state % 3 != 0) { A.await(); } System.out.println("A"); state++; B.signal(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } } }).start();
new Thread(() -> { for (int i = 0; i < 10; i++) { lock.lock(); try { while (state % 3 != 1) { B.await(); } System.out.println("B"); state++; C.signal(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } } }).start();
new Thread(() -> { for (int i = 0; i < 10; i++) { lock.lock(); try { while (state % 3 != 2) { C.await(); } System.out.println("C"); state++; A.signal(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } } }).start(); } }
|
去掉不需要的condition
和atomicInteger差不多。内部都在循环。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| public static class PrintWithLock { private static int state = 0; private static final Lock lock = new ReentrantLock();
public static void main(String[] args) throws Exception { new Thread(() -> { for (int i = 0; i < 10;) { lock.lock(); try { while (state % 3 == 0) { System.out.println("A"); state++; i++; } } finally { lock.unlock(); } } }).start();
new Thread(() -> { for (int i = 0; i < 10;) { lock.lock(); try { while (state % 3 == 1) { System.out.println("B"); state++; i++; }
} finally { lock.unlock(); } } }).start();
new Thread(() -> { for (int i = 0; i < 10;) { lock.lock(); try { while (state % 3 == 2) { System.out.println("C"); state++; i++; }
} finally { lock.unlock(); } } }).start(); } }
|
AtomicInteger版本
比较费cpu,因为在循环
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public static class PrintWithCas { private static final AtomicInteger state = new AtomicInteger(0);
public static void main(String[] args) { new Thread(() -> { for (int i = 0; i < 10; ) { if (state.get() % 3 == 0) { System.out.println("A"); state.compareAndSet(state.get(), state.get() + 1); i++; } } }).start();
new Thread(() -> { for (int i = 0; i < 10; ) { if (state.get() % 3 == 1) { System.out.println("B"); state.compareAndSet(state.get(), state.get() + 1); i++; } } }).start();
new Thread(() -> { for (int i = 0; i < 10; ) { if (state.get() % 3 == 2) { System.out.println("C"); state.compareAndSet(state.get(), state.get() + 1); i++; } } }).start(); } }
|
信号量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public static class PrintWithSemaphore { private static final Semaphore A = new Semaphore(1); private static final Semaphore B = new Semaphore(0); private static final Semaphore C = new Semaphore(0);
public static void main(String[] args) { new Thread(() -> { try { for (int i = 0; i < 10; i++) { A.acquire(); System.out.println("A"); B.release(); } } catch (InterruptedException e) { e.printStackTrace(); }
}).start();
new Thread(() -> { try { for (int i = 0; i < 10; i++) { B.acquire(); System.out.println("B"); C.release(); } } catch (InterruptedException e) { e.printStackTrace(); } }).start();
new Thread(() -> { try { for (int i = 0; i < 10; i++) { C.acquire(); System.out.println("C"); A.release(); } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
|