Stream函数操作

Java Stream基础操作

概述

jdk8之前,操作集合中的元素,一般都是通过for或者iterator去遍历,代码特别冗长。jdk8提供了Stream,让我对更加方便的操作集合

Stream构造方式

stream本身的

public static<T> Builder<T> builder();
public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b);
public static<T> Stream<T> empty();
public static<T> Stream<T> generate(Supplier<T> s);
public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f);
public static<T> Stream<T> of(T t);
public static<T> Stream<T> of(T... values);

Collection接口提供

default Stream<E> stream();

默认方法,也就是任意Collection的之类,都能通过调用stream方法获取Stream对象,常见的比如List.stream()

Stream中间操作

filter过滤操作

Stream<T> filter(Predicate<? super T> predicate);
  • predicate函数时接口,可以使用lambda替代,也可以使用传统实现的方式来入参。如果有复杂的逻辑可以用传统是实现接口方式,并用predicate提供的接口来组合
List<String> demo = Arrays.asList("q", "w", "e");
Predicate<String> f1 = e -> e.equals("q");
Predicate<String> f2 = e -> e.equals("w");
demo.stream().filter(f1.or(f2)).forEach(System.out::println);
//或者
demo.stream().filter(e->e.equals("q")||e.equals("w")).forEach(System.out::println);
-------result--------
a
b
a
b

map映射转化

<R> Stream<R> map(Function<? super T, ? extends R> mapper)
IntStream mapToInt(ToIntFunction<? super T> mapper);
LongStream mapToLong(ToLongFunction<? super T> mapper);
DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);
class User {

    private Long id;

    private String name;

    public User(Long id, String name) {
        this.id = id;
        this.name = name;
    }

    public Long getId() {
        return id;
    }

    public User setId(Long id) {
        this.id = id;
        return this;
    }

    public String getName() {
        return name;
    }

    public User setName(String name) {
        this.name = name;
        return this;
    }
}

static void mapTest(){
    List<User> users = Arrays.asList(new User(1L, "11"), new User(2L, "22"));
    users.stream().map(e->e.getId()).forEach(System.out::println);
}
---result---
1
2

peek数据处理

Stream<T> peek(Consumer<? super T> action);
  • 和map的区别是,map有返回值,这个没有
public static void main(String[] args) {
    List<User> demo = Arrays.asList(new User(1L,"11"), new User(2L,"22"), new User(3L,"33"));
// id平方,User 转为 Integer(id)
demo.stream().peek(user -> user.setId(user.id * user.id)).map(User::getId).forEach(System.out::println);
-------result--------
1
4
9

flatMap

这个没有好的翻译,暂时不翻译。为了更好的了解这个操作的意义,我们假设张三有经理和财务两个角色,两个角色对应不同权限,那么张三的权限=经理权限+财务权限,相关类如下:

class Role{
    private List<Private> private;//权限集合
}

class Private{
    private String name;//权限名称
}

//经理角色
Private p1 = new Private().setName("发布任务");
Private p2 = new Private().setName("制定计划");
List<Private> provates = Arrays.asList(p1,p2);
Role manager = new Role().setPrivates(provates);

//财务角色
Private p3 = new Private().setName("做预算");
Private p4 = new Private().setName("发工资");
List<Private> provates2 = Arrays.asList(p3,p4);
Role caiwu = new Role().setPrivates(provates2);
//张三角色集合
List<Role> roles = Arrays.asList(manager,caiwu);
//计算张三所有权限
roles.stream().flatMap(e->e.getPrivates().stream()).forEach(e-> System.out.println(e.getName()));
---result----
发布任务
制定计划
做预算
发工资

由上面的demo,可以看出flatMap的作用是,将多个provates列表,压缩成一个

distinct 去重

Stream<T> distinct();
List<Integer> demo = Arrays.asList(1, 1, 2);
demo.stream().distinct().forEach(System.out::println);
-------result--------
1
2

如果是对象去重,需要重写对象的equals和hashcode方法

sorted 排序

Stream<T> sorted();
Stream<T> sorted(Comparator<? super T> comparator);
List<Integer> demo = Arrays.asList(5, 1, 2);
//默认升序
demo.stream().sorted().forEach(System.out::println);
//降序
Comparator<Integer> comparator = Comparator.<Integer, Integer>comparing(item -> item).reversed();
demo.stream().sorted(comparator).forEach(System.out::println);
-------默认升序 result--------
1
2
5
-------降序 result--------
5
2
1

limit和skip

//截取前maxSize个元素
Stream<T> limit(long maxSize);
//跳过前n个流
Stream<T> skip(long n);
List<Integer> demo = Arrays.asList(1, 2, 3, 4, 5, 6);
//跳过前两个,然后限制截取两个
demo.stream().skip(2).limit(2).forEach(System.out::println);
-------result--------
3
4

Stream终止操作

遍历消费

//遍历消费
void forEach(Consumer<? super T> action);
//顺序遍历消费,和forEach的区别是forEachOrdered在多线程parallelStream执行,保证顺序
void forEachOrdered(Consumer<? super T> action);
List<String> strings = Arrays.asList("AA", "BB", "CC", "DD");
strings.parallelStream().forEach(System.out::print);
System.out.println();
strings.parallelStream().forEachOrdered(System.out::print);
----------result----------
CCDDBBAA
AABBCCDD

转换成数组

//流转成Object数组
Object[] toArray();
//流转成A[]数组,指定类型A
<A> A[] toArray(IntFunction<A[]> generator)
List<String> strings = Arrays.asList("AA", "BB", "CC", "DD");
Object[] objects = strings.stream().toArray();
String[] strings1 = strings.stream().toArray(String[]::new);

最大最小

//获取最小值
Optional<T> min(Comparator<? super T> comparator)
//获取最大值
Optional<T> max(Comparator<? super T> comparator)
List<Integer> integers = Arrays.asList(1, 2, 3, 45, 6);
Optional<Integer> max = integers.stream().max(Comparator.comparing(i -> i));
Optional<Integer> min = integers.stream().min(Comparator.comparing(i -> i));
System.out.println("max="+max.get()+",min="+min.get());
--------result-------
max=45,min=1

合并

//两两合并
Optional<T> reduce(BinaryOperator<T> accumulator)
//两两合并,带初始值的
T reduce(T identity, BinaryOperator<T> accumulator)
//如果是stream的话与第二个完全一样,parallelStream会在合并一次
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner)
List<Integer> integers = Arrays.asList(1, 2, 3, 45, 6);
Optional<Integer> reduce = integers.stream().reduce((v1, v2) -> v1 + v2);
Integer reduce1 = integers.stream().reduce(1, (v1, v2) -> v1 + v2);
String reduce2 = integers.stream().reduce("初始值", (v1, v2) -> v1 + "第一次" + v2, (v3, v4) -> v3 + "第二次" + v4);
String reduce3 = integers.parallelStream().reduce("a", (v1, v2) -> v1 + "-" + v2, (v3, v4) -> v3 + "第二次" + v4);
System.out.println(reduce.get());
System.out.println(reduce1);
System.out.println(reduce2);
--------result--------
57
58
初始值第一次1第一次2第一次3第一次45第一次6
a-1第二次a-2第二次a-3第二次a-45第二次a-6

计数

//统计元素个数
long count()

聚合处理

/**
 * supplier:返回结果类型的生产者
 * accumulator:元素消费者(处理并加入R)
 * combiner: 返回结果 R 怎么组合(多线程执行时,会产生多个返回值R,需要合并)
 */
<R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner);
/**
 * collector一般是由 supplier、accumulator、combiner、finisher、characteristics组合成的聚合类
 * Collectors 可提供一些内置的聚合类或者方法
 */
<R, A> R collect(Collector<? super T, A, R> collector);

具体可以看下面

Collector(聚合类)的操作

接口Collector和实现类CollectorImpl

//返回值类型的生产者
Supplier<A> supplier();
//流元素消费者
BiConsumer<A, T> accumulator();
//返回值合并器(多个线程操作时,会产生多个返回值,需要合并)
BinaryOperator<A> combiner();
//返回值转化器(最后一步处理,实际返回结果,一般原样返回)
Function<A, R> finisher();
//流的特性
Set<Characteristics> characteristics();

public static<T, A, R> Collector<T, A, R> of(Supplier<A> supplier,
    BiConsumer<A, T> accumulator, BinaryOperator<A> combiner,
    Function<A, R> finisher, Characteristics... characteristics)

转换成List, Set

//流转化成List
public static <T> Collector<T, ?, List<T>> toList()
//流转化成Set
public static <T> Collector<T, ?, Set<T>> toSet()
List<Integer> demo = Arrays.asList(1, 2, 3);
List<Integer> col = demo.stream().collect(Collectors.toList());
Set<Integer> set = demo.stream().collect(Collectors.toSet());

转换成Map

//流转化成Map
public static <T, K, U> Collector<T, ?, Map<K,U>> toMap(
    Function<? super T, ? extends K> keyMapper,
    Function<? super T, ? extends U> valueMapper)
/**
 * mergeFunction:相同的key,值怎么合并
 */
public static <T, K, U> Collector<T, ?, Map<K,U>> toMap(
    Function<? super T, ? extends K> keyMapper,
    Function<? super T, ? extends U> valueMapper,
    BinaryOperator<U> mergeFunction)
/**
 * mergeFunction:相同的key,值怎么合并
 * mapSupplier:返回值Map的生产者
 */
public static <T, K, U, M extends Map<K, U>> Collector<T, ?, M> toMap(
    Function<? super T, ? extends K> keyMapper,
    Function<? super T, ? extends U> valueMapper,
    BinaryOperator<U> mergeFunction,
    Supplier<M> mapSupplier)
List<User> users = Arrays.asList(new User(1), new User(2), new User(3), new User(1));
//Map<Integer, Integer> collect = users.stream().collect(Collectors.toMap(e -> e.getId(), f -> f.getId()));
//System.out.println(collect);
Map<Integer, Integer> collect1 = users.stream().collect(Collectors.toMap(User::getId, User::getId, (v1,v2)->v1+v2, HashMap::new ));
System.out.println(collect1);
--------result------------
{1=2, 2=2, 3=3}

字符流聚合

//多个字符串拼接成一个字符串
public static Collector<CharSequence, ?, String> joining();
//多个字符串拼接成一个字符串(指定分隔符)
public static Collector<CharSequence, ?, String> joining(CharSequence delimiter);
//多个字符串拼接,指定分隔符,前缀,后缀
public static Collector<CharSequence, ?, String> joining(CharSequence 		delimiter,CharSequence prefix,CharSequence suffix)
List<String> integers = Arrays.asList("1","2","3");
String collect = integers.stream().collect(Collectors.joining());
String collect1 = integers.stream().collect(Collectors.joining("|"));
String collect2 = integers.stream().collect(Collectors.joining("|","[","]"));
System.out.println(collect);
System.out.println(collect1);
System.out.println(collect2);
--------result----------
123
1|2|3
[1|2|3]

映射、聚合流

/**
 * mapper:映射处理器,相当于map功能
 * downstream:映射处理后需要再次聚合处理
 */
public static <T, U, A, R> Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper, Collector<? super U, A, R> downstream);
List<String> integers = Arrays.asList("1","2","3");
List collect = integers.stream().collect(Collectors.mapping(e->e+"8", Collectors.toList()));
System.out.println(collect);
--------result--------
[18, 28, 38]

聚合、转换

/**
 * downstream:聚合处理
 * finisher:结果转换处理
 */
public static<T,A,R,RR> Collector<T,A,RR> collectingAndThen(Collector<T,A,R> downstream,
        Function<R, RR> finisher); 
List<String> integers = Arrays.asList("1","2","3");
//聚合成List,最后提取数组的size作为返回值
Integer size =integers.stream().collect(Collectors.collectingAndThen(Collectors.toList(), List::size));
System.out.println(size);
---------result---------
3

分组

/**
 * classifier 指定T类型某一属性作为Key值分组
 * 分组后,使用List作为每个流的容器
 */
public static <T, K> Collector<T, ?, Map<K, List<T>>> groupingBy(
        Function<? super T, ? extends K> classifier);           
/**
 * classifier: 流分组器
 * downstream: 每组流的聚合处理器
 */
public static <T, K, A, D> Collector<T, ?, Map<K, D>> groupingBy(
        Function<? super T, ? extends K> classifier, 
        Collector<? super T, A, D> downstream);
/**
 * classifier: 流分组器
 * mapFactory: 返回值map的工厂(Map的子类)
 * downstream: 每组流的聚合处理器
 */
public static <T, K, D, A, M extends Map<K, D>> Collector<T, ?, M> groupingBy(
        Function<? super T, ? extends K> classifier,
        Supplier<M> mapFactory,
        Collector<? super T, A, D> downstream);

///////////////////用法和上面一样
/**
 * classifier: 分组器 ; 分组后,使用List作为每个流的容器
 */
public static <T, K> Collector<T, ?, ConcurrentMap<K, List<T>>> groupingByConcurrent(
        Function<? super T, ? extends K> classifier);
/**
 * classifier: 分组器
 * downstream: 流的聚合处理器
 */
public static <T, K, A, D> Collector<T, ?, ConcurrentMap<K, D>> groupingByConcurrent(
        Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream)
/**
 * classifier: 分组器
 * mapFactory: 返回值类型map的生产工厂(ConcurrentMap的子类)
 * downstream: 流的聚合处理器
 */
public static <T, K, A, D, M extends ConcurrentMap<K, D>> Collector<T, ?, M> groupingByConcurrent(
        Function<? super T, ? extends K> classifier, 
        Supplier<M> mapFactory,
        Collector<? super T, A, D> downstream);
List<Integer> integers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
HashMap<Integer, List<String>> collect = integers.stream().collect(Collectors.groupingBy(i -> i % 3,HashMap::new,Collectors.mapping(String::valueOf, Collectors.toList())));
System.out.println(collect);
-------------result----------
{0=[3, 6, 9], 1=[1, 4, 7, 10], 2=[2, 5, 8]}

流拆分

public static <T> Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(
        Predicate<? super T> predicate)
/**
 * predicate: 二分器
 * downstream: 流的聚合处理器
 */
public static <T, D, A> Collector<T, ?, Map<Boolean, D>> partitioningBy(
        Predicate<? super T> predicate, Collector<? super T, A, D> downstream)
List<Integer> integers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
        Map<Boolean, List<Integer>> collect = integers.stream().collect(Collectors.partitioningBy(i -> i % 3 == 0));
        System.out.println(collect);
-----------result-----------
{false=[1, 2, 4, 5, 7, 8, 10], true=[3, 6, 9]}

聚合求平均值

// 返回Double类型
public static <T> Collector<T, ?, Double> averagingDouble(ToDoubleFunction<? super T> mapper)
// 返回Long 类型
public static <T> Collector<T, ?, Double> averagingLong(ToLongFunction<? super T> mapper)
//返回Int 类型
public static <T> Collector<T, ?, Double> averagingInt(ToIntFunction<? super T> mapper)
List<Double> integers = Arrays.asList(1.1,2.2,3.3,4.4,5.1,6.1,7.1,8.1,9.1,10.1);
Double collect = integers.stream().collect(Collectors.averagingDouble(Double::doubleValue));
System.out.println(collect);
----------result-----------
5.659999999999999

聚合查找最大最小值

//最小值
public static <T> Collector<T, ?, Optional<T>> minBy(Comparator<? super T> comparator) 
//最大值
public static <T> Collector<T, ?, Optional<T>> maxBy(Comparator<? super T> comparator)  
List<Double> integers = Arrays.asList(1.1,2.2,3.3,4.4,5.1,6.1,7.1,8.1,9.1,10.1);
Optional<Double> max = integers.stream().collect(Collectors.maxBy(Comparator.comparing(i -> i)));
Optional<Double> min = integers.stream().collect(Collectors.minBy(Comparator.comparing(i -> i)));
System.out.println("max="+max.get()+",min="+min.get());
------------result------------
max=10.1,min=1.1

聚合计算统计结果

//可以获得元素总个数,元素累计总和,最小值,最大值,平均值
//返回Int 类型
public static <T> Collector<T, ?, IntSummaryStatistics> summarizingInt(
        ToIntFunction<? super T> mapper)
//返回Double 类型
public static <T> Collector<T, ?, DoubleSummaryStatistics> summarizingDouble(
        ToDoubleFunction<? super T> mapper)
//返回Long 类型
public static <T> Collector<T, ?, LongSummaryStatistics> summarizingLong(
        ToLongFunction<? super T> mapper)
List<Double> integers = Arrays.asList(1.1,2.2,3.3,4.4,5.1,6.1,7.1,8.1,9.1,10.1);
DoubleSummaryStatistics collect = integers.stream().collect(Collectors.summarizingDouble(Double::doubleValue));
System.out.println(collect);
--------result----------
DoubleSummaryStatistics{count=10, sum=56.600000, min=1.100000, average=5.660000, max=10.100000}

并行流

List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5, 6, 7);

integers.parallelStream().forEach(e->{
System.out.println(e);
System.out.println(Thread.currentThread().getName());
});
--------reult---------
5
main
4
main
1
main
3
ForkJoinPool.commonPool-worker-3
6
7
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-4
2
ForkJoinPool.commonPool-worker-1

可以看出,并行流使用的是fork/join,他使用的commonPool,所有共用一个。所以,我们一般自定义线程池来运行。

IntStream range = IntStream.range(1, 10);
ForkJoinPool forkJoinPol = new ForkJoinPool(5);
forkJoinPol.submit(()->{
range.parallel().forEach(f->{
System.out.println(f);
System.out.println(Thread.currentThread().getName());
});
}).get();
------------result------------
6
ForkJoinPool-1-worker-1
3
ForkJoinPool-1-worker-2
5
ForkJoinPool-1-worker-1
8
ForkJoinPool-1-worker-3
4
ForkJoinPool-1-worker-2
7
ForkJoinPool-1-worker-4
9
ForkJoinPool-1-worker-3
2
ForkJoinPool-1-worker-1
1
ForkJoinPool-1-worker-5

parallelStream的坑

  1. 默认情况下,使用通用的fork/join池完成任务,该池是所有并行流共享的。并且该池线程数和cpu核心数一样,对cpu密集型任务是有意义的,因为机器只能执行这么多线程;对于io密集型,瓶颈是io,会等待其他系统。所以,两个流使用通用的线程池,会因为相互等待尔阻塞
  2. 使用foreach会乱序,应该使用foreachOrdered
  3. 线程不安全,写代码的是否需要注意线程安全的问题

fork/join是什么

fork/join框架是Java7提供的一个用于并行执行任务的框架,拆用‘分而治之’的思想,就是把一个大任务拆成若干个小任务,最后在汇总结果,具体过程,我在网上找了一张图:

fork join

工作窃取算法

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下:

fork join

我们把一个任务分配给两个线程执行,线程1执行很快,线程2还没执行完,线程1就会去线程儿中窃取一个任务执行,加快速度。此时,他们访问同一队列,为了减少窃取任务线程和被窃取任务线程的竞争,通常拆用双端队列,被窃取任务线程永远从头部取任务,而窃取任务线程永远从尾部拿任务线程

ForkJoinPool

ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。ForkJoinPool提供了如下两个常用的构造器:

// 创建一个包含parallelism个并行线程的ForkJoinPool
public ForkJoinPool(int parallelism)

//以Runtime.getRuntime().availableProcessors()的返回值作为parallelism来创建ForkJoinPool
public ForkJoinPool() :

创建实例后,可以调用public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)public void execute(ForkJoinTask<?> task)来执行任务。ForkJoinTask是一个抽象类,代表可并行、合并的任务,它还有两个抽象子类:

  • RecursiveAction代表没有返回值的任务
public class Demo1 extends RecursiveAction {

    public int start;
    public int end;

    public Demo1(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        if((end-start) >40){
            //执行拆分
            // 将大任务分解成两个小任务
            int middle = (start + end) / 2;
            Demo1 left = new Demo1(start, middle);
            Demo1 right = new Demo1(middle, end);
            left.fork();
            right.fork();
        }else {
            for(int i= start; i<end;i++) {
                System.out.println(Thread.currentThread().getName()+"i的值"+i);
            }
        }
    }


    public static void main(String[] args) throws InterruptedException {
        // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        // 提交可分解的PrintTask任务
        forkJoinPool.submit(new Demo1(0, 1000));

        //阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束
        forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);

        // 关闭线程池
        forkJoinPool.shutdown();
    }
}
  • RecursiveTask代表有返回值的任务
public class Demo1 extends RecursiveTask<Integer> {

    public int start;
    public int end;

    public Demo1(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        if((end-start) >40){
            //执行拆分
            // 将大任务分解成两个小任务
            int middle = (start + end) / 2;
            Demo1 left = new Demo1(start, middle);
            Demo1 right = new Demo1(middle, end);
            //并行执行任务
            left.fork();
            right.fork();
            //累加两个任务
            return left.join()+right.join();
        }else {

            for(int i= start; i<end;i++) {
                sum+=i;
            }
            return sum;
        }
    }


    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        int sum = IntStream.range(0, 100).parallel().reduce( 0,Integer::sum );
        System.out.println(sum);


        // 提交可分解的PrintTask任务
        ForkJoinTask<Integer> submit = forkJoinPool.submit(new Demo1(0, 100));
        Integer integer = submit.get();
        System.out.println(integer);

        //阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束
        forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);

        // 关闭线程池
        forkJoinPool.shutdown();
    }
}

总结,fork/join总体分两步,第一步fork把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小;第二步,分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据