Java8新特性

Java 8新特性

lambda表达式

也可称为闭包

闭包又称词法闭包

解释一:闭包是引用了自由变量的函数,这个被引用的变量将和这个函数一同存在。

解释二:闭包是函数和相关引用环境组成的实体。

1
2
(parameters)->expression
(parameters)->{ statements;}

主要特征

  • 可选类型声明:不需要声明参数类型,编译器统一识别参数值
  • 可选的参数圆括号:一个参数无需定义圆括号,多个参数需要定义圆括号
  • 可选大括号:主题包含了一个语句,就不需要使用大括号
  • 可选的返回关键字:主体只有一个表达式返回值则编译器自动返回值,大括号需要指定表达式返回一个数值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//lambda表达式例子
//不需要参数,返回值为5
()->5

//接收一个参数,返回其两倍
x -> 2*x

//返回x,y差值
(x,y)->x-y

//接受2个int整数,返回和
(int x, int y)->x+y

//接受一个String对象
(String s)->System.out.print(s)

注意事项:

  • lambda表达式主要用来执行定义行内执行的方法类型接口。
  • 免去了使用匿名方法的麻烦,简单强大的函数式编程。

lambda表达式中的局部变量具有final语义(即是final类型,或者必须不被后面代码修改)

lambda表达式中的this

lambda表达式和內部匿名类的区别就内部匿名类中的this指向的是内部匿名类本身,而lambda表达式中的类指的是当前类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package jdk8.lambda;

/**
* lambda表达式的this
*
*/
public class ThisDemo {

private String name = "ThisDemo";

public void test() {
// 匿名类实现
new Thread(new Runnable() {

private String name = "Runnable";

@Override
public void run() {
System.out.println("这里的this指向匿名类:" + this.name);
}
}).start();

// lambda实现
new Thread(() -> {
System.out.println("这里的this指向当前的ThisDemo类:" + this.name);
}).start();
}

public static void main(String[] args) {
ThisDemo demo = new ThisDemo();
demo.test();
}
}

Java中this实现原理

lambda表达式中,会把lambda表达式在本类中生产一个lambda$+数字的方法。关键点:该方法不一定是static方法,是否是static方法,取决于lambda表达式里面是否引用了this。

1
2
3
4
5
6
7
8
9
10
11
// lambda实现
// 下面会自动生成lambda$0方法,
new Thread(() -> {
System.out.println("这里的this指向当前的ThisDemo类:" + this.name);
}).start();

// lambda实现
// 下面会自动生成lambda$1方法,
new Thread(() -> {
System.out.println("这里没有引用this,生成的lambda1方法是static的");
}).start();

用javap -s -p 类名可以看出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
PS D:\CodeHub\Java8\out\production\Java8\com\jdk8\lambda>  javap -s -p '.\ThisDemo.class'
Compiled from "ThisDemo.java"
public class com.jdk8.lambda.ThisDemo {
java.lang.String name;
descriptor: Ljava/lang/String;
public com.jdk8.lambda.ThisDemo();
descriptor: ()V

public void test();
descriptor: ()V

public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V

private static void lambda$test$1();
descriptor: ()V

private void lambda$test$0();
descriptor: ()V
}

Lambda实例方法的方法引用

方法引用有多种,静态方法的方法引用很好理解,但是实例对象的方法引用有点难以理解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class MethodDemo {

/**
* 静态方法
*/
public static int staticMethod(int i){
return i*2;
}

/**
* 实例方法
*/
public int normalMethod(int i){
return i*2;
}

public static void main(String[] args) {
IntUnaryOperator methodReferency = MethodDemo::staticMethod;
System.out.println(methodReferency.applyAsInt(11));

MethodDemo demo = new MethodDemo();

IntUnaryOperator normalReference = demo::normalMethod;
System.out.println(normalReference.applyAsInt(2));

}
}

java里面在默认把this作为参数,放到实例方法的第一个参数

Lambda表达式实现惰性求值

惰性求值在Lambda表达式中非常重要,也十分有用。

1
2
3
4
// 打印日志前需要先判断日志级别
if (logger.isLoggable(Level.FINE)) {
logger.fine("打印一些日志:" + this);
}

为什么要判断日志级别,

因为不判断执行代码,发现虽然日志没有打印,但toString方法还是执行了,属于多余浪费的开销。

每一个日志打印都加判断,看着很别扭,现在有了lambda表达式之后,可以使用lambda的惰性求值,就可以去掉if判断,如下

1
2
// 使用lambda表达式的惰性求值,不需要判断日志级别
logger.fine(() -> "打印一些日志:" + this);

lambda惰性求值底层机制

这个现象很好理解,简单讲解一下。就是没有使用表达式的时候,相当于

1
2
String msg = "打印一些日志:" + this
logger.fine(msg);

虽然最后没有打印,但字符串拼接的工作还是执行了。而使用了lambda表达式之后,字符串的拼接放到一个函数里面,fine日志需要打印的时候才去调用这个方法才真正执行!从而实现了惰性求值。

后面我们学习的jdk8的stream流编程里面,没有调用最终操作的时候,中间操作的方法都不会执行,这也是惰性求值。

lambda表达式作用域

访问局部变量

我们可以直接在lambda表达式中访问外部的局部变量:

1
2
3
4
final int num = 1;
Converter<Integer, String> StringConverter =
(from)->String.valueOf(from + num);
stringConverter.convert(2);

但是和匿名对象不同的是,这里的变量num可以不用声明为final,该代码同样正确

1
2
3
4
5
int num = 1;
Converter<Integer, String> stringConverter =
(from) -> String.valueOf(from + num);

stringConverter.convert(2);

但是这里的num必须不可以被后面的代码修改

访问字段和静态变量

和局部变量相比,我们对lambda表达式中的实例字段和静态变量都有读写访问权限

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class lambda4{
static int outerStaticNum;
int outerNum;

void testScopes(){
Converter<Integer, String> StringConverter1 =
(from)->{
outerNum = 23;
return String.ValueOf(from);
}
Converter<Integer, String> stringConverter2 = (from) -> {
outerStaticNum = 72;
return String.valueOf(from);
};
}
}

访问默认接口方法

无法在lambda表达式访问默认方法,下面代码无法编译

1
2


Java8–foreach使用

foreach:新的简洁而有趣的迭代集合的方法;

基础知识

1
public interface Collection<E> extends Iterable<E>

Collection接口实现了iterable接口,而iterable接口在java8开始具有一个新的api

1
void forEach(Consumer<? super T> action) //对Iterable的每个元素执行特定的操作,直到所有元素被处理或者引发异常

迭代和打印字符串的for循环版本

1
2
3
for(String name : names){
System.out.println(name);
}

我们可以使用forEach写这个

1
2
3
names.forEach(name->{
System.out.println(name);
})

使用forEach方法

使用forEach迭代集合并且对每个元素执行特定操作,要执行的操作包含在实现Consumer接口的类中 ,并且作为参数传递给forEach。

消费者接口是一个功能接口,接受输入并且不返回任何结果。

Consumer接口定义

1
2
3
4
@FunctionalInterface
public interface Consumer{
void accept(T t);
}

任何实现,例如只是打印字符串的消费者

1
2
3
4
5
Consumer<String> printConsumer = new Consumer<String>(){
public void accept(String name){
System.out.println(name);
}
}

可以作为参数传递给forEach:

1
names.forEach(printConsumer);

我们可以使用方法引用语法而不是普通的lambda语法

1
names.forEach(System.out::println)

forEach在集合里的使用

迭代集合

正如我们已经看到的迭代列表的元素

1
2
List<String> names = Arrays.asList("Larry", "Steve", "James");
names.forEach(System.out::println);

同样对于一组:

1
2
3
Set<String> uniqueNames = new HashSet<>(Arrays.asList("Larry", "Steve", "James"));

uniqueNames.forEach(System.out::println);

或者让我们说一个队列也是一个集合

1
2
3
Queue<String> namesQueue = new ArrayDeque<>(Arrays.asList("Larry", "Steve", "James"));

namesQueue.forEach(System.out::println);

迭代Map

迭代Map - 使用Map的forEach

Map没有实现Iterable接口,但它提供了自己的forEach 变体,它接受BiConsumer。*

1
2
3
4
5
Map<Integer, String> namesMap = new HashMap<>();
namesMap.put(1, "Larry");
namesMap.put(2, "Steve");
namesMap.put(3, "James");
namesMap.forEach((key, value) -> System.out.println(key + " " + value));

4.3.迭代一个Map - 通过迭代entrySet

1
namesMap.entrySet().forEach(entry -> System.out.println(entry.getKey() + " " + entry.getValue()));

Stream流编程

Stream内部迭代和外部迭代

首先分为中间操作最终操作,在最终操作没有被调用的情况下,所有的中级操作都不会执行。

简单来说,返回Stream流就是中间操作,可以继续链式调用下去,不返回stream就是最终操作。

最终操作分为短路操作和非短路操作,

中间操作又分为有状态操作和无状态操作。状态就是和其他数据有关系。

Stream运行机制

通过下面代码理解流的运行机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package stream;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

/**
* 验证stream运行机制
*
* 1. 所有操作是链式调用, 一个元素只迭代一次
* 2. 每一个中间操作返回一个新的流. 流里面有一个属性sourceStage
* 指向同一个 地方,就是Head
* 3. Head->nextStage->nextStage->... -> null
* 4. 有状态操作会把无状态操作阶段,单独处理
* 5. 并行环境下, 有状态的中间操作不一定能并行操作.
*
* 6. parallel/ sequetial 这2个操作也是中间操作(也是返回stream)
* 但是他们不创建流, 他们只修改 Head的并行标志
*
*/
public class RunStream {

public static void main(String[] args) {
Random random = new Random();
// 随机产生数据
Stream<Integer> stream = Stream.generate(() -> random.nextInt())
// 产生500个 ( 无限流需要短路操作. )
.limit(500)
// 第1个无状态操作
.peek(s -> print("peek: " + s))
// 第2个无状态操作
.filter(s -> {
print("filter: " + s);
return s > 1000000;
})
// 有状态操作
.sorted((i1, i2) -> {
print("排序: " + i1 + ", " + i2);
return i1.compareTo(i2);
})
// 又一个无状态操作
.peek(s -> {
print("peek2: " + s);
}).parallel();

// 终止操作
stream.count();
}

/**
* 打印日志并sleep 5 毫秒
*
* @param s
*/
public static void print(String s) {
// System.out.println(s);
// 带线程名(测试并行情况)
System.out.println(Thread.currentThread().getName() + " > " + s);
try {
TimeUnit.MILLISECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
}

JDK9的响应式流

叫做reactive Stream,也就是flow,是一个发布订阅模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;

/**
* 带 process 的 flow demo
*/

/**
* Processor, 需要继承SubmissionPublisher并实现Processor接口
*
* 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去
*/
class MyProcessor extends SubmissionPublisher<String>
implements Processor<Integer, String> {

private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;

// 请求一个数据
this.subscription.request(1);
}

@Override
public void onNext(Integer item) {
// 接受到一个数据, 处理
System.out.println("处理器接受到数据: " + item);

// 过滤掉小于0的, 然后发布出去
if (item > 0) {
this.submit("转换后的数据:" + item);
}

// 处理完调用request再请求一个数据
this.subscription.request(1);

// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}

@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();

// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}

@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理器处理完了!");
// 关闭发布者
this.close();
}
}

public class FlowDemo2 {

public static void main(String[] args) throws Exception {
// 1. 定义发布者, 发布的数据类型是 Integer
// 直接使用jdk自带的SubmissionPublisher
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();

// 2. 定义处理器, 对数据进行过滤, 并转换为String类型
MyProcessor processor = new MyProcessor();

// 3. 发布者 和 处理器 建立订阅关系
publiser.subscribe(processor);

// 4. 定义最终订阅者, 消费 String 类型数据
Subscriber<String> subscriber = new Subscriber<String>() {

private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;

// 请求一个数据
this.subscription.request(1);
}

@Override
public void onNext(String item) {
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);

// 处理完调用request再请求一个数据
this.subscription.request(1);

// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}

@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();

// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}

@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
}

};

// 5. 处理器 和 最终订阅者 建立订阅关系
processor.subscribe(subscriber);

// 6. 生产数据, 并发布
// 这里忽略数据生产过程
publiser.submit(-111);
publiser.submit(111);

// 7. 结束后 关闭发布者
// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
publiser.close();

// 主线程延迟停止, 否则数据没有消费就退出
Thread.currentThread().join(1000);
}

}

响应式基石Reactor

Spring webflux基于reactor实现响应式,那么reactor是什么,

reactor = jdk8Stream + jdk9flow响应流

通过代码理解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import java.util.concurrent.TimeUnit;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import reactor.core.publisher.Flux;

public class ReactorDemo {

public static void main(String[] args) {
// reactor = jdk8 stream + jdk9 reactive stream
// Mono 0-1个元素
// Flux 0-N个元素
String[] strs = { "1", "2", "3" };

// 2. 定义订阅者
Subscriber<Integer> subscriber = new Subscriber<Integer>() {

private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;

// 请求一个数据
this.subscription.request(1);
}

@Override
public void onNext(Integer item) {
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);

try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 处理完调用request再请求一个数据
this.subscription.request(1);

// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}

@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();

// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}

@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
}

};

// 这里就是jdk8的stream
Flux.fromArray(strs).map(s -> Integer.parseInt(s))
// 最终操作
// 这里就是jdk9的reactive stream
.subscribe(subscriber);
}
}

Spring5中的webflux

webflux的关键在于自己编写代码返回(Flux|Mono),Spring框架用来负责处理订阅。Spring提供两种开发模式来编写响应式代码,使用MVC注解或者使用 router function模式。

异步servlet

学习异步servlet的关键在于了解同步servlet阻塞了什么,为什么需要异步servlet,异步servlet支持高吞吐量的原理是什么?

servlet容器,没处理一次请求都会占用一个线程,同步servlet里面业务代码处理多久,servlet容器的线程阻塞多久。如果servlet线程用完,就无法再处理请求了。

在异步servlet中,servlet容器的线程不会傻傻的等待业务代码处理完毕,而是直接返回,给业务代码一个回调函数,业务代码处理完了在通知。这样就可以用少量线程处理更高的请求,从而实现高吞吐量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@WebServlet(asyncSupported=true,urlPatterns = {"/AsyncServlet"})
public class AsyncServlet extends HttpServlet{
private static final long serialVersionUID = 1L;

/**
* @see HttpServlet#HttpServlet()
*/
public AsyncServlet() {
super();
}

/**
* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
long t1 = System.currentTimeMillis();

// 开启异步
AsyncContext asyncContext = request.startAsync();

// 执行业务代码
CompletableFuture.runAsync(() -> doSomeThing(asyncContext,
asyncContext.getRequest(), asyncContext.getResponse()));

System.out.println("async use:" + (System.currentTimeMillis() - t1));
}

private void doSomeThing(AsyncContext asyncContext,
ServletRequest servletRequest, ServletResponse servletResponse) {

// 模拟耗时操作
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}

//
try {
servletResponse.getWriter().append("done");
} catch (IOException e) {
e.printStackTrace();
}

// 业务代码处理完毕, 通知结束
asyncContext.complete();
}

/**
* @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
doGet(request, response);
}
}

SSE(server-sent event)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@WebServlet("/SSE")
public class SSE extends HttpServlet{
/**
* @see HttpServlet#HttpServlet()
*/
public SSE() {
super();
}

protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException{
response.setContentType("text/event-stream");
response.setCharacterEncoding("utf-8");

for(int i = 0; i<5; i++){
response.getWriter().write("event:me\n");
response.getWriter().flush();
}

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
}
}

同步异步和水平垂直扩展

打个比喻,假设现在有一堆的任务完成不了,当前的生产力无法完成,需要扩展的话,那么简单来说,水平扩展就是加人,垂直扩展就是加班l。人不够,加人就是水平扩展,那么加人肯定是比较简单的,一个人做不完就2个,2个不够就10个,总有够的一天。加人容易解决,但是成本也高啊!所以你是老板的话,你更加乐意是让员工垂直扩展-加班,人还是一个人,活多了!加班和加人不一样,是有极限的,一个人加的班是由一定限度的。这里的加班不一定是工作时间的加长,更加多的是工作能力的提升,个人的成长。怎么样才能加强工作能力?就是改变工作模式,把自己的处理任务模式优化一下,让相同的资源能处理更加多的任务,如把小任务集中处理,任务管理的统筹学等,这些都能提高效率。

Webflux为什么支持高吞吐量

响应式编程里面,tomcat容器的线程根本不需要阻塞的等待!等任务处理完了,会通知tomcat容器,这就是异步!