Netty的基本使用

服务端定义

开启服务端端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class MyServer {
public static void main(String[] args) {
EventLoopGroup boseGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// handler 只针对boseGroup
// childHandler 经过boseGroup后再经过workerGroup
serverBootstrap.group(boseGroup,workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();


}catch (Exception e) {
e.printStackTrace();
}finally {
boseGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

定义服务端initialize

1
2
3
4
5
6
7
8
9
10
11
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyHttpServerHandler());
}
}

定义服务端handler

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MyHttpServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(ctx.channel().remoteAddress() + ", " + msg);
ctx.channel().writeAndFlush("from server "+ UUID.randomUUID());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

客户端定义

绑定客户端端口和ip

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class MyClient {
public static void main(String[] args) {
EventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors).channel(NioSocketChannel.class)
.handler(new MyClientInitializer());
ChannelFuture localhost = bootstrap.connect("localhost", 8899).sync();
localhost.channel().closeFuture().sync();

}catch (Exception e) {
e.printStackTrace();
}finally {
eventExecutors.shutdownGracefully();
}
}
}

定义客户端initialize

1
2
3
4
5
6
7
8
9
10
11
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyClientHandler());
}
}

定义客户端handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MyClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(ctx.channel().remoteAddress());
System.out.println("client ouput: " + msg);
ctx.channel().writeAndFlush("from client:"+ LocalDateTime.now());
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("来自于客户端的问候!");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

HashMap和Hashtable的比较

共同点

结构

都是数组 + 链表的形式

存值方式

都是根据key的hash计算,再将(key,value)组装为Entry或Node

区别

put区别

1、HashMap允许key和value都为null,而Hashtable不允许

2、HashMap是线程不安全的,Hashtable是线程安全的。

3、HashMap存值,在链表长度 > 8 并且 数组长度 < 64,会进行扩容,如果数组长度 > 64,则将链表转换为红黑树。

Hashtable存值只有在table的size达到临界值时才会进行扩容

4、HashMap在链表存值时尾部插入,而Hashtable是头部插入

HashMap的底层原理(一)

组成结构

数据 + 链表的形式+红黑树

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// node 数组
transient Node<K,V>[] table;
// Node结构
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
V value;
Node<K,V> next;

Node(int hash, K key, V value, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
}

其他参数

负载因子

1
2
3
4
/**
* The load factor used when none specified in constructor.
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;

链表最大长度:

1
static final int TREEIFY_THRESHOLD = 8;

链表转换为红黑树的重要参数

1
2
// 链表长度 > 8 并且数组长度小于64那么进行扩容,否则转为红黑树
static final int MIN_TREEIFY_CAPACITY = 64;

链表长度的界限

1
static final int TREEIFY_THRESHOLD = 8;

使用

1
2
3
4
5
6
// 定义HashMap,初始大小可不写
HashMap<String, String> map = new HashMap<>(5);
// 存放值
map.put("test","name2");
// 取值
String test = map.get("test");

原理解析

简括流程

  • 计算key的hash,高16位于低16位进行异或运算

    1
    return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
  • 判断数组是否为空,为空时初始化。采用初次扩容。初始化的数组大小为16,扩容界限为16x0.75=12

    1
    2
    3
    Node<K,V>[] tab; Node<K,V> p; int n, i;
    if ((tab = table) == null || (n = tab.length) == 0)
    n = (tab = resize()).length;
  • 使用hash&(n-1)获取key存放的下标位置,初始化Node,存放进Node [i]

    1
    2
    if ((p = tab[i = (n - 1) & hash]) == null)
    tab[i] = newNode(hash, key, value, null);
  • key的hash值与初始化的第一个Node的hash一致,并且key的值也一致,直接覆盖

    1
    2
    3
    4
    Node<K,V> e; K k;
    if (p.hash == hash &&
    ((k = p.key) == key || (key != null && key.equals(k))))
    e = p;
  • 如果是红黑树,直接存放进红黑树内

    1
    e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
  • 遍历Node链表,直到next == null,创建新的Node节点赋值给next。如果链表大小大于8个,则进行扩容或转红黑树操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    for (int binCount = 0; ; ++binCount) {
    if ((e = p.next) == null) {
    p.next = newNode(hash, key, value, null);
    if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
    treeifyBin(tab, hash);
    break;
    }
    if (e.hash == hash &&
    ((k = e.key) == key || (key != null && key.equals(k))))
    break;
    p = e;
    }
  • 判断map大小是否大于扩容大小,是的话进行扩容

    1
    2
    3
    4
    // size:hashMao大小
    // threshold:threshold = newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
    if (++size > threshold)
    resize();
  • 覆盖旧值操作

    1
    2
    3
    4
    5
    6
    7
    if (e != null) { // existing mapping for key
    V oldValue = e.value;
    if (!onlyIfAbsent || oldValue == null)
    e.value = value;
    afterNodeAccess(e);
    return oldValue;
    }

详细代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// 扩容操作 
final Node<K,V>[] resize() {
// 数组
Node<K,V>[] oldTab = table;
// 旧数组的大小
int oldCap = (oldTab == null) ? 0 : oldTab.length;
// 扩容界限
int oldThr = threshold;
// 扩容后的大小,扩容后的界限
int newCap, newThr = 0;
if (oldCap > 0) {
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
// 成倍的扩容,16 = 010000 << 1 --> 100000 = 32
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;
else { // zero initial threshold signifies using defaults
// 初始化时的扩容操作
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
// 扩容后的数据迁移
if (oldTab != null) {
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
if (e.next == null)
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode)
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { // preserve order
// 扩容数据的低位链表,低位不变
Node<K,V> loHead = null, loTail = null;
// 扩容数据的高位链表,高位迁移
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
/**
* oldCap 16: 10000
* 假设e.hash = 9
* 第一次存放的位置:01001 & 01111 = 01001 = 9
* 为了保证扩容后用同样的操作取下标结果不会改变,也就是通过(n - 1) & hash还会取到e
* 扩容后:newCap = 32 = 100000
* 001001 & 011111 = 01001 = 9
* 所以对e这个数据不改变下标位置
* 01001 &
* 10000 = 0
**/
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
/**
* oldCap 16: 10000
* 假设e.hash = 20
* 第一次存放的位置:10100 & 01111 = 00100 = 4
* 为了保证扩容后用同样的操作取下标结果不会改变,也就是通过(n - 1) & hash还会取到e
* 扩容后:newCap = 32 = 100000
* 010100 & 011111 = 010100 = 20
* 所以对e这个数据要改变下标位置为20
* 10100 &
* 10000 = 10000 != 0
* 所以要对该数据进行迁移,迁移坐标为:4 + 16(扩容的大小) = 20
**/
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}

一条http请求的处理过程

http请求

client发送请求

DNS解析请求得域名 —> ip地址

TCP/IP,UDP/IP四层传输协议

应用层

发送http请求

传输层

组装TCP头信息 —> 【TCP头信息】【http请求报文】

网络层

组装IP信息 —> 【ip信息】【TCP头信息】【http请求报文】

网络接口层

组装MAC信息 —> 【MAC信息】【ip信息】【TCP头信息】【http请求报文】

网络接口层发送数据

发送这样的数据:[01010110101010]

目标服务器处理数据

redis数据结构 :

string:

set key,value ,set key value ex time

map

所有的键值对的键和值的长度都小于等于64byte,哈希对象保存的键值对数量都小于512:ziplist,hashtable 对象

list

quicklist 消息队列

set

inset:如果元素类型都是整型,并且元素个数不小于512,hashtab,点赞,互相关注

zset

元素数量小于128,所有member的长度小于64字节使用ziplist,否则升级为skiplist,热点排序 ,有一个score

redis常用命令

list

lpush:左插入数据

lpushx:做插入数据, 前提条件是key存在并且key为列表

rpush:右插入

rpushx:右插入, 前提条件是key存在并且key为列表

rpop:右弹出

lpop:左弹出

lrange:弹出一个范围内的数据

blpop:左阻塞式的弹出

brpop:右阻塞式的弹出

set

1、set:sadd添加,spop删除,smembers 查看所有元素,sinter交集,sunion合集,sdiff差集

map

zset

string

redis热点数据:

1、客户端统计的方式

2、代理层:TwemProxy或 Codis

3、服务端统计:monitor监控,但是只能监控到一个节点

4、机器层面:抓包的形式

redis查看内存使用信息:

info memory

Redis主从复制

master node 第一次执行全量复制,通过bgsave命令在本地生成一份RDB文件,将RDB快照文件发送给slave节点,slave node 会先清空自己的旧数据,然后从RDB文件中加载数据。

在生成rdb文件中,master node 如果接收到新的写命令怎么办?

master会把所有的新的写命令缓存在内存中,在slave node 保存了RDB文件之后,再讲新的命令复制给slave node

主从复制的不足:

1、当RDB文件过大时同步非常耗时

2、在一主多从情况下,当master node 挂了后。对外服务就不可用了,单点问题并没有解决。如果每次都手动把之前的服务器切换为主服务器,这就比较费力。

可用性保证之sentinel

sentinel是一个监听,监听redis集群中的master和slave,如果master一定时间内没有给sentinel回复消息,则将master标记为下线,然后把某一个slave标记为master,应用每一次都从这个监控服务器拿到master的地址。

sentinel通过info命令来获取到redis集群中的节点的信息。

服务下线:当sentinel监听到master节点下线后,会进行故障转移

故障转移:故障转移的第一步就是在sentinel集群中选举出一个leader节点,然后让这个leader指定出一个master节点。

sentinel集群选举算法

类似raft算法。raft算法:首先会生成一个150-300ms的时间,默认最小时间的为leader,如果时间一样则再次生成时间。

1、master客观下线会触发选举,而不是过了时间才会

2、leader并不会把自己成为leader的信息发送给其他sentinel节点。其他sentinel等待leader从slave选举出master后,检测新的master节点正常运行后,就会去掉客观下线的标识,从而不需要进入故障转移流程。

sentienl根据什么来指定master节点:

1、断开连接时间:如果与哨兵连接断开时间过长,超过了某个阈值,就直接失去了选举权,

2、优先级:如果拥有了选举权,那就判断谁的优先级更高,数值越小优先级越高

3、复制数量:如果优先级也一样,就判断谁从master节点中复制的数据最多。

4、进程id:如果复制数量一样,就选择进程id最小的那个。

哨兵机制的不足:

主从切换中会丢失数据,因为只有一个master,salve节点 的数据使用RDB文件复制的。

只能单点写,没有解决扩容的问题

如果数据量过大,我们就需要多个master-slave的group

redis数据的分片

客户端实现

客户端根据key进行hash计算,来进行分片。

ShardedJedis

代理层

将分片逻辑提取出来,运行一个独立的代理服务,客户端连接到这个代理服务,代理服务做请求转发。

Twemproxy,Codis

服务层

Redis Cluster

JAVA

  1. 多个线程同时读写,读线程的数量量远远⼤大于写线程,你认为应该如何解决 并发的问题?你会选择加什么样的锁?
  2. JAVA的AQS是否了了解,它是⼲干嘛的?
  3. 除了了synchronized关键字之外,你是怎么来保障线程安全的?
  4. 什么时候需要加volatile关键字?它能保证线程安全吗?
  5. 线程池内的线程如果全部忙,提交⼀一个新的任务,会发⽣生什么?队列列全部 塞满了了之后,还是忙,再提交会发⽣生什什么?
  6. Tomcat本身的参数你⼀一般会怎么调整?
  7. synchronized关键字锁住的是什么东⻄西?在字节码中是怎么表示的?在内 存中的对象上表现为什么?
  8. wait/notify/notifyAll⽅方法需不不需要被包含在synchronized块中?这是为什 么?
  9. ExecutorService你⼀一般是怎么⽤用的?是每个service放⼀一个还是⼀一个项⽬目
    ⾥里里⾯面放⼀一个?有什么好处?

    Spring

  10. 你有没有⽤用过Spring的AOP? 是⽤用来干嘛的? ⼤大概会怎么使⽤用?
  11. 如果一个接口有2个不同的实现, 那么怎么来Autowire⼀一个指定的实现?
  12. Spring的声明式事务 @Transaction注解⼀一般写在什么位置? 抛出了了异常会⾃自动回滚吗?有没有办法控制不不触发回滚?
  13. 如果想在某个Bean⽣生成并装配完毕后执⾏行行⾃自⼰己的逻辑,可以什么⽅方式实现?
  14. SpringBoot没有放到web容器器⾥里里为什么能跑HTTP服务?
  15. SpringBoot中如果你想使⽤用⾃自定义的配置⽂文件⽽而不不仅仅是
    application.properties,应该怎么弄弄?
  16. SpringMVC中RequestMapping可以指定GET, POST方法么?怎么指定?
  17. SpringMVC如果希望把输出的Object(例例如XXResult或者XXResponse)这种包装为JSON输出, 应该怎么处理理?
  18. 怎样拦截SpringMVC的异常,然后做⾃自定义的处理理,比如打⽇日志或者包装成JSON

    MySQL

  19. 如果有很多数据插⼊入MYSQL 你会选择什么⽅方式?
  20. 如果查询很慢,你会想到的第⼀一个⽅方式是什么?索引是⼲干嘛的?
  21. 如果建了了⼀一个单列列索引,查询的时候查出2列列,会⽤用到这个单列列索引吗?
  22. 如果建了了⼀一个包含多个列列的索引,查询的时候只⽤用了了第⼀一列列,能不不能⽤用上这个索引?查三列列呢?
  23. 接上题,如果where条件后⾯面带有⼀一个 i + 5 < 100 会使⽤用到这个索引吗?
  24. 怎么看是否⽤用到了了某个索引?
  25. like %aaa%会使⽤用索引吗? like aaa%呢?
  26. drop、truncate、delete的区别?
  27. 平时你们是怎么监控数据库的? 慢SQL是怎么排查的?
  28. 你们数据库是否⽀支持emoji表情,如果不不⽀支持,如何操作?
  29. 你们的数据库单表数据量量是多少?⼀一般多⼤大的时候开始出现查询性能急剧下降?
  30. 查询死掉了了,想要找出执⾏行行的查询进程⽤用什什么命令?找出来之后⼀一般你 会⼲干嘛?
  31. 读写分离是怎么做的?你认为中间件会怎么来操作?这样操作跟事务有 什什么关系?
  32. 分库分表有没有做过?线上的迁移过程是怎么样的?如何确定数据是正确的?

    JVM

  33. 你知道哪些或者你们线上使⽤用什什么GC策略略? 它有什什么优势,适⽤用于什什么场景?
  34. JAVA类加载器器包括⼏几种?它们之间的⽗父⼦子关系是怎么样的?双亲委派机制是什什么意思?有什什么好处?
  35. 如何⾃自定义⼀一个类加载器器?你使⽤用过哪些或者你在什什么场景下需要⼀一个⾃自定义的类加载器器吗?
  36. 堆内存设置的参数是什什么?
  37. Perm Space中保存什什么数据? 会引起OutOfMemory吗?
  38. 做gc时,⼀一个对象在内存各个Space中被移动的顺序是什什么?
  39. 你有没有遇到过OutOfMemory问题?你是怎么来处理理这个问题的?处理理过程中有哪些收获?
  40. 1.8之后Perm Space有哪些变动? MetaSpace⼤大⼩小默认是⽆无限的么? 还是你们会通过什什么⽅方式来指定⼤大⼩小?
  41. Jstack是⼲干什什么的? Jstat呢? 如果线上程序周期性地出现卡顿,你怀疑可能是gc导致的,你会怎么来排查这个问题?线程⽇日志⼀一般你会看其中的什什么 部分?
  42. StackOverFlow异常有没有遇到过?⼀一般你猜测会在什什么情况下被触发?如何指定⼀一个线程的堆栈⼤大⼩小?⼀一般你们写多少?

    Linux命令

  43. ⽇日志特别⼤大只想看最后100⾏行行怎么弄弄? 如果想⼀一直看⽇日志的持续输出,⽤用什什么命令?
  44. 如果⽇日志⼀一边输出,⼀一边想实时看到有没有某个关键字应该怎么弄弄?
  45. grep如果忽略略⼤大⼩小写应该怎么弄弄? 正则表达式呢?
  46. vim往下⼀一⾏行行是什什么键?往下30⾏行行呢? 跳到⽂文件末尾⼀一⾏行行是什什么? 跳回来是什什么? 向后搜索是什什么?
  47. 如果有个⽂文本⽂文件,按空格作为列列的分隔符,如果想统计第三列列⾥里里⾯面的每个单词的出现次数应该怎么弄弄?
  48. 如果把上⾯面的出现次数排个序应该怎么弄弄? 想按照数字本身的顺序⽽而不不是字符串串的顺序排列列怎么弄弄?
  49. Linux环境变量量是以什什么作为分隔符的?环境变量量通过什什么命令设置?
  50. 给某个⽂文件权设置限⽐比如设置为644 是⽤用什什么命令?这个6是什什么意思?
  51. Linux下⾯面如果想看某个进程的资源占⽤用情况是怎么看的?系统load⼤大概 指的什什么意思?你们线上系统load⼀一般多少?如果⼀一个4核机器器,你认为多少load是⽐比较正常的?top命令⾥里里⾯面按⼀一下1会发⽣生什什么?
  52. top命令⾥里里⾯面,有时候所有进程的CPU使⽤用率加起来超过100%是怎么回 事?
  53. 还有哪些查看系统性能或者供你发现问题的命令?你⼀一般是看哪个参数?
  54. 想看某个进程打开了了哪些⽹网络连接是什什么命令?⾥里里⾯面连接的状态你⽐比较关⼼心哪⼏几种?

– 偏题

  1. 有没有做过Linux系统参数⽅方⾯面的优化,⼤大概优化过什什么?
  2. 系统参数⾥里里⾯面有个叫做backlog的可以⽤用来⼲干什什么?
  3. 查看⽹网络连接发现好多TIME_WAIT 可能是什什么原因?对你的应⽤用会有什什么影响?你会选择什什么样的⽅方式来减少这些TIME_WAIT
  4. 可否介绍⼀一下TCP三次握⼿手的过程,如果现在有个⽹网络程序,你⽤用第三⽅方 的library来发送数据,你怀疑这个library发送的数据有问题,那么怎么来验 证?tcpdump导出的⽂文件你⼀一般是怎么分析的?
  5. KeepAlive是⽤用来⼲干什什么的?这样的好处是什什么?

    Redis

– 开发

  1. 缓存穿透可以介绍⼀一下么?你认为应该如何解决这个问题
  2. 你是怎么触发缓存更更新的?(⽐比如设置超时时间(被动⽅方式), ⽐比如更更新的时候主动update)?如果是被动的⽅方式如何控制多个⼊入⼝口同时触发某个缓存更更新?
  3. 你们⽤用Redis来做什什么?为什什么不不⽤用其他的KV存储例例如Memcached,
    Cassandra等?
  4. 你们⽤用什什么Redis客户端? Redis⾼高性能的原因⼤大概可以讲⼀一些?
  5. 你熟悉哪些Redis的数据结构? zset是⼲干什什么的? 和set有什什么区别?
  6. Redis的hash, 存储和获取的具体命令叫什什么名字?
  7. LPOP和BLPOP的区别?
  8. Redis的有⼀一些包含SCAN关键字的命令是⼲干嘛的? SCAN返回的数据量量是固定的吗?
  9. Redis中的Lua有没有使⽤用过? 可以⽤用来做什什么? 为什什么可以这么⽤用?
  10. Redis的Pipeline是⽤用来⼲干什什么的?

– 运维
11. Redis持久化⼤大概有⼏几种⽅方式? aof和rdb的区别是什什么? AOF有什什么优缺点吗?
12. Redis Replication的⼤大致流程是什什么? bgsave这个命令的执⾏行行过程?
– 偏题
13. 如果有很多 KV数据要存储到Redis, 但是内存不不⾜足, 通过什什么⽅方式可以缩减内存? 为什什么这样可以缩⼩小内存?
14. Redis中List, HashTable都⽤用到了了ZipList, 为什什么会选择它?

监控、稳定性

  1. 业务⽇日志是通过什什么⽅方式来收集的?
  2. 线上机器器如何监控?采⽤用什什么开源产品或者⾃自研的产品?它是分钟级的还 是秒级的?
  3. 如果让你来想办法收集⼀一个JAVA后端应⽤用的性能数据,你会在意哪些⽅方
    ⾯面? 你会选择什什么样的⼯工具、思路路来收集?
  4. ⼀一般你调⽤用第三⽅方的时候会不不会监控调⽤用情况?

线程池的基本使用

线程池的参数:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,// 核心线程数
int maximumPoolSize,// 最大线程数
long keepAliveTime,// 超过核心线程数的其他线程存活时间
TimeUnit unit,// 超过核心线程数的其他线程存活时间单位
BlockingQueue<Runnable> workQueue,// 阻塞队列
ThreadFactory threadFactory,// 线程创建工厂
RejectedExecutionHandler handler) // 拒绝策略

阻塞队列:

类名 说明
ArrayBlockingQueue 初始化是设置queue的大小
LinkedBlockingQueue 链表实现的有界阻塞队列, 此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序
SynchronousQueue 不存储元素的阻塞队列, 每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。
PriorityBlockingQueue 支持优先级排序的无界阻塞队列, 默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo()方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。
DelayQueue 优先级队列实现的无界阻塞队列
LinkedTransferQueue 链表实现的无界阻塞队列
LinkedBlockingDeque 链表实现的双向阻塞队列

阻塞队列的操作:

增加:
API 说明
add 往Queue里添加数据,如果队列满了,抛出异常
offer 往Queue里添加数据,添加成功return true,添加失败return false
enqueue(不能直接使用) 添加数据的具体操作,添加完成会signal()唤醒阻塞点线程
put 添加数据,阻塞式添加。
取出:
API 说明
take 阻塞式取值
dequeue(不可直接使用) 取值核心逻辑
poll 非阻塞时取值
删除:

remove删除数据。

spring 的事务传播性

常量名称 常量解释
PROPAGATION_REQUIRED spring中的默认事务传播机制。支持当前事务,如果当前没有事务,就新建一个事务。
PROPAGATION_REQUIRES_NEW 新建事务,如果当前存在事务,则将当前事务挂起。新建的事务和被挂起的事务没有关联,是两个独立的事务,外层事务失败回滚后,不会影响到内层的事务。内层事务失败抛出异常后,外层事务捕获,也可以不处理回滚操作。
PROPAGITION_SUPPORTS 支持当前事务,如果当前没有事务,就以非事务方式运行
PROPAGITION_MANDATORY 支持当前事务,如果当前没有事务,则抛出异常。
PROPAGITION_NOT_SUPPORTED 以非事务方式运行,如果当前存在事务,则将当前事务挂起
PROPAGITION_NEVER 以非事务方式运行,如果当前存在事务,则抛出异常。
PROPAGITION_NESTED 如果一个活动事务存在,则运行在一个嵌套事务中。如果没有活动事务,则按REQUIRED运行。它使用了一个单独的事务,这个事务拥有多个可以回滚的保存点。内部事务的回滚不会对外部事物造成影响。他只对DataSourceTransationManager事务管理器起效。

REQUIREDREQUIRES_NEW : 两个方法必须在不同的类中才会生效,才会创建新的事务。否则只会创建一个。

乐观锁

认为同一时间访问共享资源的线程不会冲突,所以采用共享资源的状态对资源进行修改。CAS操作也是乐观锁的体现。适合读多写少的操作。效率较高。

乐观锁的实现方式:

1、使用version方式,版本号控制对数据的操作

2、JDK的CAS操作就是乐观锁的一种实现。但是CAS可能会出现“ABA”的问题。

“ABA”问题:

CAS操作表示compareAndSet,提供了三个参数,旧值V,预期值E,更新值U,当V和E相等时,才会对数据进行操作。但是操作时可能出现了其他线程对数据操作的情况:A --> B -->A,其他线程将数据改为B后又改回成A。所以前面的线程会对数据操作成功。类似于数据库的脏读问题 。

如何解决“ABA”问题

采用version版本号控制,操作一次版本号version增加1,后续操作时带上参数version。版本号一直才会对数据操作。

悲观锁

认为同一时间一定会有多个线程访问同一静态资源,所以采用对共享资源加锁的方式保证数据的安全性,如数据库操作的for update操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class SingletonDemo {
private static SingletonDemo singletonDemo = new SingletonDemo();
private static int count1;
private static int count2 = 0;

public SingletonDemo() {
count1++;
count2++;
System.out.println("count1:" + count1);// 1
System.out.println("count2:" + count2);// 2

}
public static void main(String[] args) {
System.out.println("count1:" + count1);// 3
System.out.println("count2:" + count2);// 4
}
}

分析:

类的加载过程为:转载 – 链接 – 初始化

  • 装载:根据类全路径进行加载
  • 链接包含三个步骤:验证 - 准备 - 解析。
    • 验证:
      • 文件格式验证
      • 元数据验证
      • 字节码验证
      • 符号引用验证
    • 准备:为类的静态变量分配内存,并将其初始化为默认值。
    • 解析:把类中的符号引用转换为直接引用

初始化阶段会初始化类中的:

  • 静态成员变量singletonDemo,初始化时,count1count2默认为0,初始化后count1count2都是1,所以1和2位置输出均为1。
  • 静态变量count1count2,初始化前count1count2都是1,初始化后count2被赋值为0。所以3和4的位置输出1,0
0%