Java Stream基础操作
概述
jdk8之前,操作集合中的元素,一般都是通过for或者iterator去遍历,代码特别冗长。jdk8提供了Stream,让我对更加方便的操作集合
Stream构造方式
stream本身的
public static<T> Builder<T> builder();
public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b);
public static<T> Stream<T> empty();
public static<T> Stream<T> generate(Supplier<T> s);
public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f);
public static<T> Stream<T> of(T t);
public static<T> Stream<T> of(T... values);
Collection接口提供
default Stream<E> stream();
默认方法,也就是任意Collection的之类,都能通过调用stream方法获取Stream对象,常见的比如List.stream()
Stream中间操作
filter过滤操作
Stream<T> filter(Predicate<? super T> predicate);
- predicate函数时接口,可以使用lambda替代,也可以使用传统实现的方式来入参。如果有复杂的逻辑可以用传统是实现接口方式,并用predicate提供的接口来组合
List<String> demo = Arrays.asList("q", "w", "e");
Predicate<String> f1 = e -> e.equals("q");
Predicate<String> f2 = e -> e.equals("w");
demo.stream().filter(f1.or(f2)).forEach(System.out::println);
//或者
demo.stream().filter(e->e.equals("q")||e.equals("w")).forEach(System.out::println);
-------result--------
a
b
a
b
map映射转化
<R> Stream<R> map(Function<? super T, ? extends R> mapper)
IntStream mapToInt(ToIntFunction<? super T> mapper);
LongStream mapToLong(ToLongFunction<? super T> mapper);
DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);
class User {
private Long id;
private String name;
public User(Long id, String name) {
this.id = id;
this.name = name;
}
public Long getId() {
return id;
}
public User setId(Long id) {
this.id = id;
return this;
}
public String getName() {
return name;
}
public User setName(String name) {
this.name = name;
return this;
}
}
static void mapTest(){
List<User> users = Arrays.asList(new User(1L, "11"), new User(2L, "22"));
users.stream().map(e->e.getId()).forEach(System.out::println);
}
---result---
1
2
peek数据处理
Stream<T> peek(Consumer<? super T> action);
- 和map的区别是,map有返回值,这个没有
public static void main(String[] args) {
List<User> demo = Arrays.asList(new User(1L,"11"), new User(2L,"22"), new User(3L,"33"));
// id平方,User 转为 Integer(id)
demo.stream().peek(user -> user.setId(user.id * user.id)).map(User::getId).forEach(System.out::println);
-------result--------
1
4
9
flatMap
这个没有好的翻译,暂时不翻译。为了更好的了解这个操作的意义,我们假设张三有经理和财务两个角色,两个角色对应不同权限,那么张三的权限=经理权限+财务权限,相关类如下:
class Role{
private List<Private> private;//权限集合
}
class Private{
private String name;//权限名称
}
//经理角色
Private p1 = new Private().setName("发布任务");
Private p2 = new Private().setName("制定计划");
List<Private> provates = Arrays.asList(p1,p2);
Role manager = new Role().setPrivates(provates);
//财务角色
Private p3 = new Private().setName("做预算");
Private p4 = new Private().setName("发工资");
List<Private> provates2 = Arrays.asList(p3,p4);
Role caiwu = new Role().setPrivates(provates2);
//张三角色集合
List<Role> roles = Arrays.asList(manager,caiwu);
//计算张三所有权限
roles.stream().flatMap(e->e.getPrivates().stream()).forEach(e-> System.out.println(e.getName()));
---result----
发布任务
制定计划
做预算
发工资
由上面的demo,可以看出flatMap的作用是,将多个provates列表,压缩成一个
distinct 去重
Stream<T> distinct();
List<Integer> demo = Arrays.asList(1, 1, 2);
demo.stream().distinct().forEach(System.out::println);
-------result--------
1
2
如果是对象去重,需要重写对象的equals和hashcode方法
sorted 排序
Stream<T> sorted();
Stream<T> sorted(Comparator<? super T> comparator);
List<Integer> demo = Arrays.asList(5, 1, 2);
//默认升序
demo.stream().sorted().forEach(System.out::println);
//降序
Comparator<Integer> comparator = Comparator.<Integer, Integer>comparing(item -> item).reversed();
demo.stream().sorted(comparator).forEach(System.out::println);
-------默认升序 result--------
1
2
5
-------降序 result--------
5
2
1
limit和skip
//截取前maxSize个元素
Stream<T> limit(long maxSize);
//跳过前n个流
Stream<T> skip(long n);
List<Integer> demo = Arrays.asList(1, 2, 3, 4, 5, 6);
//跳过前两个,然后限制截取两个
demo.stream().skip(2).limit(2).forEach(System.out::println);
-------result--------
3
4
Stream终止操作
遍历消费
//遍历消费
void forEach(Consumer<? super T> action);
//顺序遍历消费,和forEach的区别是forEachOrdered在多线程parallelStream执行,保证顺序
void forEachOrdered(Consumer<? super T> action);
List<String> strings = Arrays.asList("AA", "BB", "CC", "DD");
strings.parallelStream().forEach(System.out::print);
System.out.println();
strings.parallelStream().forEachOrdered(System.out::print);
----------result----------
CCDDBBAA
AABBCCDD
转换成数组
//流转成Object数组
Object[] toArray();
//流转成A[]数组,指定类型A
<A> A[] toArray(IntFunction<A[]> generator)
List<String> strings = Arrays.asList("AA", "BB", "CC", "DD");
Object[] objects = strings.stream().toArray();
String[] strings1 = strings.stream().toArray(String[]::new);
最大最小
//获取最小值
Optional<T> min(Comparator<? super T> comparator)
//获取最大值
Optional<T> max(Comparator<? super T> comparator)
List<Integer> integers = Arrays.asList(1, 2, 3, 45, 6);
Optional<Integer> max = integers.stream().max(Comparator.comparing(i -> i));
Optional<Integer> min = integers.stream().min(Comparator.comparing(i -> i));
System.out.println("max="+max.get()+",min="+min.get());
--------result-------
max=45,min=1
合并
//两两合并
Optional<T> reduce(BinaryOperator<T> accumulator)
//两两合并,带初始值的
T reduce(T identity, BinaryOperator<T> accumulator)
//如果是stream的话与第二个完全一样,parallelStream会在合并一次
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner)
List<Integer> integers = Arrays.asList(1, 2, 3, 45, 6);
Optional<Integer> reduce = integers.stream().reduce((v1, v2) -> v1 + v2);
Integer reduce1 = integers.stream().reduce(1, (v1, v2) -> v1 + v2);
String reduce2 = integers.stream().reduce("初始值", (v1, v2) -> v1 + "第一次" + v2, (v3, v4) -> v3 + "第二次" + v4);
String reduce3 = integers.parallelStream().reduce("a", (v1, v2) -> v1 + "-" + v2, (v3, v4) -> v3 + "第二次" + v4);
System.out.println(reduce.get());
System.out.println(reduce1);
System.out.println(reduce2);
--------result--------
57
58
初始值第一次1第一次2第一次3第一次45第一次6
a-1第二次a-2第二次a-3第二次a-45第二次a-6
计数
//统计元素个数
long count()
聚合处理
/**
* supplier:返回结果类型的生产者
* accumulator:元素消费者(处理并加入R)
* combiner: 返回结果 R 怎么组合(多线程执行时,会产生多个返回值R,需要合并)
*/
<R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner);
/**
* collector一般是由 supplier、accumulator、combiner、finisher、characteristics组合成的聚合类
* Collectors 可提供一些内置的聚合类或者方法
*/
<R, A> R collect(Collector<? super T, A, R> collector);
具体可以看下面
Collector(聚合类)的操作
接口Collector和实现类CollectorImpl
//返回值类型的生产者
Supplier<A> supplier();
//流元素消费者
BiConsumer<A, T> accumulator();
//返回值合并器(多个线程操作时,会产生多个返回值,需要合并)
BinaryOperator<A> combiner();
//返回值转化器(最后一步处理,实际返回结果,一般原样返回)
Function<A, R> finisher();
//流的特性
Set<Characteristics> characteristics();
public static<T, A, R> Collector<T, A, R> of(Supplier<A> supplier,
BiConsumer<A, T> accumulator, BinaryOperator<A> combiner,
Function<A, R> finisher, Characteristics... characteristics)
转换成List, Set
//流转化成List
public static <T> Collector<T, ?, List<T>> toList()
//流转化成Set
public static <T> Collector<T, ?, Set<T>> toSet()
List<Integer> demo = Arrays.asList(1, 2, 3);
List<Integer> col = demo.stream().collect(Collectors.toList());
Set<Integer> set = demo.stream().collect(Collectors.toSet());
转换成Map
//流转化成Map
public static <T, K, U> Collector<T, ?, Map<K,U>> toMap(
Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper)
/**
* mergeFunction:相同的key,值怎么合并
*/
public static <T, K, U> Collector<T, ?, Map<K,U>> toMap(
Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper,
BinaryOperator<U> mergeFunction)
/**
* mergeFunction:相同的key,值怎么合并
* mapSupplier:返回值Map的生产者
*/
public static <T, K, U, M extends Map<K, U>> Collector<T, ?, M> toMap(
Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper,
BinaryOperator<U> mergeFunction,
Supplier<M> mapSupplier)
List<User> users = Arrays.asList(new User(1), new User(2), new User(3), new User(1));
//Map<Integer, Integer> collect = users.stream().collect(Collectors.toMap(e -> e.getId(), f -> f.getId()));
//System.out.println(collect);
Map<Integer, Integer> collect1 = users.stream().collect(Collectors.toMap(User::getId, User::getId, (v1,v2)->v1+v2, HashMap::new ));
System.out.println(collect1);
--------result------------
{1=2, 2=2, 3=3}
字符流聚合
//多个字符串拼接成一个字符串
public static Collector<CharSequence, ?, String> joining();
//多个字符串拼接成一个字符串(指定分隔符)
public static Collector<CharSequence, ?, String> joining(CharSequence delimiter);
//多个字符串拼接,指定分隔符,前缀,后缀
public static Collector<CharSequence, ?, String> joining(CharSequence delimiter,CharSequence prefix,CharSequence suffix)
List<String> integers = Arrays.asList("1","2","3");
String collect = integers.stream().collect(Collectors.joining());
String collect1 = integers.stream().collect(Collectors.joining("|"));
String collect2 = integers.stream().collect(Collectors.joining("|","[","]"));
System.out.println(collect);
System.out.println(collect1);
System.out.println(collect2);
--------result----------
123
1|2|3
[1|2|3]
映射、聚合流
/**
* mapper:映射处理器,相当于map功能
* downstream:映射处理后需要再次聚合处理
*/
public static <T, U, A, R> Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper, Collector<? super U, A, R> downstream);
List<String> integers = Arrays.asList("1","2","3");
List collect = integers.stream().collect(Collectors.mapping(e->e+"8", Collectors.toList()));
System.out.println(collect);
--------result--------
[18, 28, 38]
聚合、转换
/**
* downstream:聚合处理
* finisher:结果转换处理
*/
public static<T,A,R,RR> Collector<T,A,RR> collectingAndThen(Collector<T,A,R> downstream,
Function<R, RR> finisher);
List<String> integers = Arrays.asList("1","2","3");
//聚合成List,最后提取数组的size作为返回值
Integer size =integers.stream().collect(Collectors.collectingAndThen(Collectors.toList(), List::size));
System.out.println(size);
---------result---------
3
分组
/**
* classifier 指定T类型某一属性作为Key值分组
* 分组后,使用List作为每个流的容器
*/
public static <T, K> Collector<T, ?, Map<K, List<T>>> groupingBy(
Function<? super T, ? extends K> classifier);
/**
* classifier: 流分组器
* downstream: 每组流的聚合处理器
*/
public static <T, K, A, D> Collector<T, ?, Map<K, D>> groupingBy(
Function<? super T, ? extends K> classifier,
Collector<? super T, A, D> downstream);
/**
* classifier: 流分组器
* mapFactory: 返回值map的工厂(Map的子类)
* downstream: 每组流的聚合处理器
*/
public static <T, K, D, A, M extends Map<K, D>> Collector<T, ?, M> groupingBy(
Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,
Collector<? super T, A, D> downstream);
///////////////////用法和上面一样
/**
* classifier: 分组器 ; 分组后,使用List作为每个流的容器
*/
public static <T, K> Collector<T, ?, ConcurrentMap<K, List<T>>> groupingByConcurrent(
Function<? super T, ? extends K> classifier);
/**
* classifier: 分组器
* downstream: 流的聚合处理器
*/
public static <T, K, A, D> Collector<T, ?, ConcurrentMap<K, D>> groupingByConcurrent(
Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream)
/**
* classifier: 分组器
* mapFactory: 返回值类型map的生产工厂(ConcurrentMap的子类)
* downstream: 流的聚合处理器
*/
public static <T, K, A, D, M extends ConcurrentMap<K, D>> Collector<T, ?, M> groupingByConcurrent(
Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,
Collector<? super T, A, D> downstream);
List<Integer> integers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
HashMap<Integer, List<String>> collect = integers.stream().collect(Collectors.groupingBy(i -> i % 3,HashMap::new,Collectors.mapping(String::valueOf, Collectors.toList())));
System.out.println(collect);
-------------result----------
{0=[3, 6, 9], 1=[1, 4, 7, 10], 2=[2, 5, 8]}
流拆分
public static <T> Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(
Predicate<? super T> predicate)
/**
* predicate: 二分器
* downstream: 流的聚合处理器
*/
public static <T, D, A> Collector<T, ?, Map<Boolean, D>> partitioningBy(
Predicate<? super T> predicate, Collector<? super T, A, D> downstream)
List<Integer> integers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
Map<Boolean, List<Integer>> collect = integers.stream().collect(Collectors.partitioningBy(i -> i % 3 == 0));
System.out.println(collect);
-----------result-----------
{false=[1, 2, 4, 5, 7, 8, 10], true=[3, 6, 9]}
聚合求平均值
// 返回Double类型
public static <T> Collector<T, ?, Double> averagingDouble(ToDoubleFunction<? super T> mapper)
// 返回Long 类型
public static <T> Collector<T, ?, Double> averagingLong(ToLongFunction<? super T> mapper)
//返回Int 类型
public static <T> Collector<T, ?, Double> averagingInt(ToIntFunction<? super T> mapper)
List<Double> integers = Arrays.asList(1.1,2.2,3.3,4.4,5.1,6.1,7.1,8.1,9.1,10.1);
Double collect = integers.stream().collect(Collectors.averagingDouble(Double::doubleValue));
System.out.println(collect);
----------result-----------
5.659999999999999
聚合查找最大最小值
//最小值
public static <T> Collector<T, ?, Optional<T>> minBy(Comparator<? super T> comparator)
//最大值
public static <T> Collector<T, ?, Optional<T>> maxBy(Comparator<? super T> comparator)
List<Double> integers = Arrays.asList(1.1,2.2,3.3,4.4,5.1,6.1,7.1,8.1,9.1,10.1);
Optional<Double> max = integers.stream().collect(Collectors.maxBy(Comparator.comparing(i -> i)));
Optional<Double> min = integers.stream().collect(Collectors.minBy(Comparator.comparing(i -> i)));
System.out.println("max="+max.get()+",min="+min.get());
------------result------------
max=10.1,min=1.1
聚合计算统计结果
//可以获得元素总个数,元素累计总和,最小值,最大值,平均值
//返回Int 类型
public static <T> Collector<T, ?, IntSummaryStatistics> summarizingInt(
ToIntFunction<? super T> mapper)
//返回Double 类型
public static <T> Collector<T, ?, DoubleSummaryStatistics> summarizingDouble(
ToDoubleFunction<? super T> mapper)
//返回Long 类型
public static <T> Collector<T, ?, LongSummaryStatistics> summarizingLong(
ToLongFunction<? super T> mapper)
List<Double> integers = Arrays.asList(1.1,2.2,3.3,4.4,5.1,6.1,7.1,8.1,9.1,10.1);
DoubleSummaryStatistics collect = integers.stream().collect(Collectors.summarizingDouble(Double::doubleValue));
System.out.println(collect);
--------result----------
DoubleSummaryStatistics{count=10, sum=56.600000, min=1.100000, average=5.660000, max=10.100000}
并行流
List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5, 6, 7);
integers.parallelStream().forEach(e->{
System.out.println(e);
System.out.println(Thread.currentThread().getName());
});
--------reult---------
5
main
4
main
1
main
3
ForkJoinPool.commonPool-worker-3
6
7
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-4
2
ForkJoinPool.commonPool-worker-1
可以看出,并行流使用的是fork/join
,他使用的commonPool,所有共用一个。所以,我们一般自定义线程池来运行。
IntStream range = IntStream.range(1, 10);
ForkJoinPool forkJoinPol = new ForkJoinPool(5);
forkJoinPol.submit(()->{
range.parallel().forEach(f->{
System.out.println(f);
System.out.println(Thread.currentThread().getName());
});
}).get();
------------result------------
6
ForkJoinPool-1-worker-1
3
ForkJoinPool-1-worker-2
5
ForkJoinPool-1-worker-1
8
ForkJoinPool-1-worker-3
4
ForkJoinPool-1-worker-2
7
ForkJoinPool-1-worker-4
9
ForkJoinPool-1-worker-3
2
ForkJoinPool-1-worker-1
1
ForkJoinPool-1-worker-5
parallelStream的坑
- 默认情况下,使用通用的
fork/join
池完成任务,该池是所有并行流共享的。并且该池线程数和cpu核心数一样,对cpu密集型任务是有意义的,因为机器只能执行这么多线程;对于io密集型,瓶颈是io,会等待其他系统。所以,两个流使用通用的线程池,会因为相互等待尔阻塞 - 使用foreach会乱序,应该使用foreachOrdered
- 线程不安全,写代码的是否需要注意线程安全的问题
fork/join是什么
fork/join
框架是Java7提供的一个用于并行执行任务的框架,拆用‘分而治之’的思想,就是把一个大任务拆成若干个小任务,最后在汇总结果,具体过程,我在网上找了一张图:
工作窃取算法
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下:
我们把一个任务分配给两个线程执行,线程1执行很快,线程2还没执行完,线程1就会去线程儿中窃取一个任务执行,加快速度。此时,他们访问同一队列,为了减少窃取任务线程和被窃取任务线程的竞争,通常拆用双端队列,被窃取任务线程永远从头部取任务,而窃取任务线程永远从尾部拿任务线程
ForkJoinPool
ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。ForkJoinPool提供了如下两个常用的构造器:
// 创建一个包含parallelism个并行线程的ForkJoinPool
public ForkJoinPool(int parallelism)
//以Runtime.getRuntime().availableProcessors()的返回值作为parallelism来创建ForkJoinPool
public ForkJoinPool() :
创建实例后,可以调用public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
和public void execute(ForkJoinTask<?> task)
来执行任务。ForkJoinTask
是一个抽象类,代表可并行、合并的任务,它还有两个抽象子类:
- RecursiveAction代表没有返回值的任务
public class Demo1 extends RecursiveAction {
public int start;
public int end;
public Demo1(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if((end-start) >40){
//执行拆分
// 将大任务分解成两个小任务
int middle = (start + end) / 2;
Demo1 left = new Demo1(start, middle);
Demo1 right = new Demo1(middle, end);
left.fork();
right.fork();
}else {
for(int i= start; i<end;i++) {
System.out.println(Thread.currentThread().getName()+"i的值"+i);
}
}
}
public static void main(String[] args) throws InterruptedException {
// 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 提交可分解的PrintTask任务
forkJoinPool.submit(new Demo1(0, 1000));
//阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束
forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
// 关闭线程池
forkJoinPool.shutdown();
}
}
- RecursiveTask代表有返回值的任务
public class Demo1 extends RecursiveTask<Integer> {
public int start;
public int end;
public Demo1(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
if((end-start) >40){
//执行拆分
// 将大任务分解成两个小任务
int middle = (start + end) / 2;
Demo1 left = new Demo1(start, middle);
Demo1 right = new Demo1(middle, end);
//并行执行任务
left.fork();
right.fork();
//累加两个任务
return left.join()+right.join();
}else {
for(int i= start; i<end;i++) {
sum+=i;
}
return sum;
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool();
int sum = IntStream.range(0, 100).parallel().reduce( 0,Integer::sum );
System.out.println(sum);
// 提交可分解的PrintTask任务
ForkJoinTask<Integer> submit = forkJoinPool.submit(new Demo1(0, 100));
Integer integer = submit.get();
System.out.println(integer);
//阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束
forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
// 关闭线程池
forkJoinPool.shutdown();
}
}
总结,fork/join
总体分两步,第一步fork把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小;第二步,分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据