Benchmarking Java Streams – DZone – Uplaza

In my earlier article, I took a more in-depth take a look at the Java ExecutorService interface and its implementations, with some concentrate on the Fork/Be a part of framework and ThreadPerTaskExecutor. At this time, I want to take a step ahead and examine how nicely they behave when put underneath stress. In brief, I’m going to make benchmarks, a variety of benchmarks.

All of the code from beneath, and extra, will probably be out there in a devoted GitHub repository.

Logic Beneath Benchmark

I want to begin this textual content with a stroll by the logic that would be the base for benchmarks as it’s break up into two fundamental classes:

  1. Based mostly on the basic stream
  2. Based mostly on the Fork/Be a part of strategy

Basic Stream Logic

public static Map groupByIncomingIp(Stream requests, 
LocalDateTime upperTimeBound, LocalDateTime lowerTimeBound) {
return requests
     .map(line -> line.break up(","))
     .filter(phrases -> phrases.size == 3)
      .map(phrases -> new Request(phrases[1], LocalDateTime.parse(phrases[2])))
      .filter(request -> request.timestamp().isBefore(upperTimeBound) && 
               request.timestamp().isAfter(lowerTimeBound))
      .map(i -> new Ip(i.ip()))
      .gather(groupingBy(i -> i, summingInt(i -> 1)));
}

In idea, the aim of this piece of code is to remodel an inventory of strings, then do some filtering and grouping round and return the map. Provided strings are within the following format:

1,192.168.1.1,2023-10-29T17:33:33.647641574

It represents the occasion of studying an IP deal with making an attempt to entry a selected server. The output maps an IP deal with to the variety of entry makes an attempt in a selected interval, expressed by decrease and higher time boundaries.

Fork/Be a part of Logic

@Override
public Map compute() {
    if (knowledge.measurement() >= THRESHOLD) {
        Map output = new HashMap();
        ForkJoinTask
            .invokeAll(createSubTasks())
            .forEach(activity -> activity
                     .be a part of()
                     .forEach((okay, v) -> updateOutput(okay, v, output))
                    );
        return output;
    }
    return course of();
}

personal void updateOutput(Ip okay, Integer v, Map output) {
    Integer currentValue = output.get(okay);
    if (currentValue == null) {
        output.put(okay, v);
    } else {
        output.substitute(okay, currentValue + v);
    }
}

personal Checklist createSubTasks() {
    int measurement = knowledge.measurement();
    int center = measurement / 2;
    return Checklist.of(
        new ForkJoinDefinition(new ArrayList(knowledge.subList(0, center)), now),
        new ForkJoinDefinition(new ArrayList(knowledge.subList(center, measurement)), now)
    );
}

personal Map course of() {
    return groupByIncomingIp(knowledge.stream(), upperTimeBound, lowerTimeBound);
}

The one impactful distinction right here is that I break up the dataset into smaller batches till a sure threshold is reached. By default, the brink is about to twenty. After this operation, I begin to carry out the computations. Computations are the identical as within the basic stream strategy logic described above – I’m utilizing the groupByIncomingIp technique.

JMH Setup

All of the benchmarks are written utilizing Java Microbenchmark Harness (or JMH for brief).

I’ve used JMH in model 1.37 to run benchmarks. Benchmarks share the identical setup: 5 warm-up iterations and twenty measurement iterations.

There are two totally different modes right here: common time and throughput. Within the case of common time, the JMH measures the common execution time of code underneath benchmark, and output time is expressed in milliseconds.

For throughput, JMH measures the variety of operations – full execution of code – in a selected unit of time, milliseconds on this case. The result’s expressed in ops per millisecond.

In additional JMH syntax:

@Warmup(iterations = 5, time = 10, timeUnit = SECONDS)
@Measurement(iterations = 20, time = 10, timeUnit = SECONDS)
@BenchmarkMode({Mode.AverageTime, Mode.Throughput})
@OutputTimeUnit(MILLISECONDS)
@Fork(1)
@Threads(1)

Moreover, every benchmark has its distinctive State with a Benchmark scope containing all the info and variables wanted by a selected benchmark.

Benchmark State

Basic Stream

The bottom benchmark state for Basic Stream will be considered beneath.

@State(Scope.Benchmark)
public class BenchmarkState {

    @Param({"0"})
    public int measurement;
    public Checklist enter;
    public ClassicDefinition definitions;
    public ForkJoinPool forkJoinPool_4;
    public ForkJoinPool forkJoinPool_8;
    public ForkJoinPool forkJoinPool_16;
    public ForkJoinPool forkJoinPool_32;
    personal ultimate LocalDateTime now = LocalDateTime.now();

    @Setup(Stage.Trial)
    public void trialUp() {
        enter = new TestDataGen(now).generate(measurement);
        definitions = new ClassicDefinition(now);
        System.out.println(enter.measurement());
    }

    @Setup(Stage.Iteration)
    public void up() {
        forkJoinPool_4 = new ForkJoinPool(4);
        forkJoinPool_8 = new ForkJoinPool(8);
        forkJoinPool_16 = new ForkJoinPool(16);
        forkJoinPool_32 = new ForkJoinPool(32);
    }

    @TearDown(Stage.Iteration)
    public void down() {
        forkJoinPool_4.shutdown();
        forkJoinPool_8.shutdown();
        forkJoinPool_16.shutdown();
        forkJoinPool_32.shutdown();
    }
}

First, I arrange all of the variables wanted to carry out benchmarks. Aside from the measurement parameter, which is especially particular on this half, thread swimming pools will probably be used solely within the benchmark.

The measurement parameter, however, is sort of an fascinating mechanism of JMH. It permits the parametrization of a sure variable used through the benchmark. You will notice how I took benefit of this later once we transfer to operating benchmarks.

As for now, I’m utilizing this parameter to generate the enter dataset that may stay unchanged all through the entire benchmark – to attain higher repeatability of outcomes.

The second half is an up technique that works equally to @BeforeEach from the JUnit library.

It is going to be triggered earlier than every of the 20 iterations of my benchmark and reset all of the variables used within the benchmark. Due to such a setting, I begin with a transparent state for each iteration.

The final half is the down technique that works equally to @AfterEach from the JUnit library.

It is going to be triggered after every of the 20 iterations of my benchmark and shut down all of the thread swimming pools used within the iteration – largely to deal with potential reminiscence leaks.

Fork/Be a part of

The state for the Fork/Be a part of model seems to be as beneath.

@State(Scope.Benchmark)
public class ForkJoinState {

    @Param({"0"})
    public int measurement;
    public Checklist enter;
    public ForkJoinPool forkJoinPool_4;
    public ForkJoinPool forkJoinPool_8;
    public ForkJoinPool forkJoinPool_16;
    public ForkJoinPool forkJoinPool_32;
    public ultimate LocalDateTime now = LocalDateTime.now();

    @Setup(Stage.Trial)
    public void trialUp() {
        enter = new TestDataGen(now).generate(measurement);
        System.out.println(enter.measurement());
    }

    @Setup(Stage.Iteration)
    public void up() {
        forkJoinPool_4 = new ForkJoinPool(4);
        forkJoinPool_8 = new ForkJoinPool(8);
        forkJoinPool_16 = new ForkJoinPool(16);
        forkJoinPool_32 = new ForkJoinPool(32);
    }

    @TearDown(Stage.Iteration)
    public void down() {
        forkJoinPool_4.shutdown();
        forkJoinPool_8.shutdown();
        forkJoinPool_16.shutdown();
        forkJoinPool_32.shutdown();
    }
}

There is no such thing as a huge distinction between the setup for traditional stream and Fork/Be a part of. The one distinction comes from putting the definitions inside benchmarks themselves, not in state as within the case of the Basic strategy.

Such change comes from how RecursiveTask works – activity executions are memoized and saved – thus, it may well influence general benchmark outcomes.

Benchmark Enter

The essential enter for benchmarks is an inventory of strings within the following format:

1,192.168.1.1,2023-10-29T17:33:33.647641574

Or in a extra generalized description:

{ordering-number},{ip-like-string},{timestamp}

There are 5 totally different enter sizes:

  1. 100
  2. 1000
  3. 10000
  4. 100000
  5. 1000000

There may be some deeper that means behind the sizes, as I consider that such a measurement vary can illustrate how nicely the answer will scale and doubtlessly present some efficiency bottleneck.

Moreover, the general setup of the benchmark may be very versatile, so including a brand new measurement shouldn’t be tough if somebody is excited about doing so.

Benchmark Setup

Basic Stream

There may be solely a single class associated to the basic stream benchmark. Completely different sizes are dealt with on a State degree.

public class ClassicStreamBenchmark extends BaseBenchmarkConfig {
   @Benchmark
   public void bench_sequential(SingleStreamState state, Blackhole bh) {
       Map map = state.definitions.sequentialStream(state.enter);
       bh.devour(map);
   }

   @Benchmark
   public void bench_defaultParallelStream(SingleStreamState state, Blackhole bh) {
       Map map = state.definitions.defaultParallelStream(state.enter);
       bh.devour(map);
   }

   @Benchmark
   public void bench_parallelStreamWithCustomForkJoinPool_4(SingleStreamState state, Blackhole bh) {
       Map map = state.definitions.parallelStreamWithCustomForkJoinPool(state.forkJoinPool_4, state.enter);
       bh.devour(map);
   }

   @Benchmark
   public void bench_parallelStreamWithCustomForkJoinPool_8(SingleStreamState state, Blackhole bh) {
       Map map = state.definitions.parallelStreamWithCustomForkJoinPool(state.forkJoinPool_8, state.enter);
       bh.devour(map);
   }

   @Benchmark
   public void bench_parallelStreamWithCustomForkJoinPool_16(SingleStreamState state, Blackhole bh) {
       Map map = state.definitions.parallelStreamWithCustomForkJoinPool(state.forkJoinPool_16, state.enter);
       bh.devour(map);
   }

   @Benchmark
   public void bench_parallelStreamWithCustomForkJoinPool_32(SingleStreamState state, Blackhole bh) {
       Map map = state.definitions.parallelStreamWithCustomForkJoinPool(state.forkJoinPool_32, state.enter);
       bh.devour(map);
   }
}

There are six totally different benchmark setups of the identical logic:

  1. bench_sequential: Easy benchmark with only a singular sequential stream
  2. bench_defaultParallelStream: Benchmark with default Java parallel stream through .parallelStream() technique of Stream class in observe a commonPool from ForkJoinPool and parallelism of 19 (not less than on my machine)
  3. bench_parallelStreamWithCustomForkJoinPool_4: Customized ForkJoinPool with parallelism degree equal to 4
  4. bench_parallelStreamWithCustomForkJoinPool_8: Customized ForkJoinPool with parallelism degree equal to eight
  5. bench_parallelStreamWithCustomForkJoinPool_16: Customized ForkJoinPool with parallelism degree equal to 16
  6. bench_parallelStreamWithCustomForkJoinPool_32 : Customized ForkJoinPool with parallelism degree equal to 32

For traditional stream logic, I’ve 6 totally different setups and 5 totally different enter sizes leading to a complete of 30 totally different distinctive combos of benchmarks.

Fork/Be a part of

public class ForkJoinBenchmark extends BaseBenchmarkConfig {
   @Benchmark
   public void bench(ForkJoinState state, Blackhole bh) {
       Map map = new ForkJoinDefinition(state.enter, state.now).compute();
       bh.devour(map);
   }

   @Benchmark
   public void bench_customForkJoinPool_4(ForkJoinState state, Blackhole bh) {
       ForkJoinDefinition forkJoinDefinition = new ForkJoinDefinition(state.enter, state.now);
       Map map = state.forkJoinPool_4.invoke(forkJoinDefinition);
       bh.devour(map);
   }

   @Benchmark
   public void bench_customForkJoinPool_8(ForkJoinState state, Blackhole bh) {
       ForkJoinDefinition forkJoinDefinition = new ForkJoinDefinition(state.enter, state.now);
       Map map = state.forkJoinPool_8.invoke(forkJoinDefinition);
       bh.devour(map);
   }

   @Benchmark
   public void bench_customForkJoinPool_16(ForkJoinState state, Blackhole bh) {
       ForkJoinDefinition forkJoinDefinition = new ForkJoinDefinition(state.enter, state.now);
       Map map = state.forkJoinPool_16.invoke(forkJoinDefinition);
       bh.devour(map);
   }

   @Benchmark
   public void bench_customForkJoinPool_32(ForkJoinState state, Blackhole bh) {
       ForkJoinDefinition forkJoinDefinition = new ForkJoinDefinition(state.enter, state.now);
       Map map = state.forkJoinPool_32.invoke(forkJoinDefinition);
       bh.devour(map);
   }
}

There are six totally different benchmark setups of the identical logic:

  1. bench -> easy benchmark with only a singular sequential stream
  2. bench_customForkJoinPool_4: Customized ForkJoinPool with parallelism degree equal to 4
  3. bench_customForkJoinPool_8: Customized ForkJoinPool with parallelism degree equal to eight
  4. bench_customForkJoinPool_16: Customized ForkJoinPool with parallelism degree equal to 16
  5. bench_customForkJoinPool_32: Customized ForkJoinPool with parallelism degree equal to 32

For traditional stream logic, I’ve 5 totally different setups and 5 totally different enter sizes leading to a complete of 25 totally different distinctive combos of benchmarks.

What’s extra, in each circumstances I’m additionally utilizing the Blackhole idea from JMH to “cheat” the compiler optimization of useless code. There’s extra about Blackholes and their use case right here.

Benchmark Setting

Machine 1

The exams we carried out on my Dell XPS with the next parameters:

  • OS: Ubuntu 20.04.6 LTS
  • CPU: i9-12900HK × 20
  • Reminiscence: 64 GB

JVM

  • OpenJDK model “21” 2023-09-19
  • OpenJDK Runtime Setting (construct 21+35-2513)
  • OpenJDK 64-Bit Server VM (construct 21+35-2513, blended mode, sharing)

Machine 2

The exams we carried out on my Lenovo Y700 with the next parameters:

  • OS: Ubuntu 20.04.6 LTS
  • CPU: i7-6700HQ × 8
  • Reminiscence: 32 GB

JVM

  • OpenJDK model “21” 2023-09-19
  • OpenJDK Runtime Setting (construct 21+35-2513)
  • OpenJDK 64-Bit Server VM (construct 21+35-2513, blended mode, sharing)

For each machines, all facet/insignificant functions had been closed. I attempted to make the runtime system as pure as potential in order to not generate any undesirable efficiency overhead. Nevertheless, on a pure Ubuntu server or when run inside a container, the general efficiency could differ.

Benchmark Report

The outcomes of operating benchmarks are saved in .csv recordsdata and the GitHub repository within the reviews listing. Moreover, to ease the obtain of reviews, there’s a separate .zip file named reviews.zip that incorporates all of the .csv recordsdata with knowledge.

Stories directories are structured on per measurement foundation with three particular reviews for all enter sizes:

  • report_classic: All enter sizes for traditional stream
  • report_forkjoin: All enter sizes for fork/be a part of stream
  • report_whole: All enter sizes for each basic and fork/be a part of stream

Every report listing from the above 3 separate recordsdata:

  • averagetime.csv: Outcomes for common time mode benchmarks
  • throughput.csv: Outcomes for throughput mode benchmarks
  • whole.csv: Mix outcomes for each modes

For the actual reviews, I’ve two codecs: averagetime.csv and throughput.csv share one format, and whole.csv has a separate one. Let’s name them modes and whole codecs.

The modes report incorporates eight columns:

  1. Label: Title of the benchmark
  2. Enter Dimension: Benchmark enter measurement
  3. Threads: Variety of threads utilized in benchmark from set 1,4,7,8,16,19,32
  4. Mode: Benchmark mode, both common time or throughput
  5. Cnt: The variety of benchmark iterations ought to at all times be equal to twenty
  6. Rating: Precise outcomes of benchmark
  7. Rating Imply Error: Benchmark measurement error
  8. Models: Models of benchmark both ms/op (for common time) or ops/ms (for throughput)

The entire report incorporates 10 columns:

  1. Label: Title of the benchmark
  2. Enter Dimension: Benchmark enter measurement
  3. Threads: Variety of threads utilized in benchmark from set 1,4,7,8,16,19,32
  4. Cnt: The variety of benchmark iterations ought to at all times be equal to twenty
  5. AvgTimeScore: Precise outcomes of benchmark for common time mode
  6. AvgTimeMeanError: Benchmark measurement error for common time mode
  7. AvgUnits: Models of benchmark for common time mode in ms/op
  8. ThroughputScore: Precise outcomes of benchmark
  9. ThroughputMeanError: Benchmark measurement error for throughput mode
  10. ThroughputUnits: Models of benchmark for throughput mode in ops/ms

Outcomes Evaluation

Assumptions

Baseline

I’ll current basic outcomes and insights based mostly on the dimensions of 10000 – so I will probably be utilizing the .csv recordsdata from the report_10000 listing.

There are two most important causes behind selecting this explicit knowledge measurement.

  1. The execution time is excessive sufficient to point out any distinction based mostly on totally different setups.
  2. Knowledge sizes 100 and 1000 are, in my view, too small to note some efficiency bottlenecks

Thus, I feel that an in-depth evaluation of this explicit knowledge measurement can be essentially the most impactful.

After all, different sizes may even get a outcomes overview but it surely is not going to be as thorough as this one except I encounter some anomalies – compared to the conduct for measurement 10000.

A Phrase On Fork/Be a part of Native Strategy

With the present code underneath benchmark, there will probably be efficiency overhead related to Fork/Be a part of benchmarks.

Because the fork-join benchmark logic closely depends on splitting the enter dataset there should be a second when the entire outcomes are mixed right into a single cohesive output. That is the fragment that isn’t included in regular benchmarks, so accurately understanding its influence on general efficiency is essential.

Please bear in mind about this.

Evaluation

Machine 1 (20 cores)

As you’ll be able to see above the very best general consequence for enter quantity 10 hundreds on machine 1 belongs to variations with defaultParallelStream.

For ClassicStream-based benchmarks, bench_defaultParallelStream returns by far the very best consequence. Even once we consider a potential error in measurements, it nonetheless comes on prime.

Setup for ForkJoinPool with parallelism 32 and 16 and return worse outcomes. On one hand, it’s shocking – for parallelism 32, I’d count on a greater rating than for the default pool (parallelism 19). Nevertheless, parallelism 16 has worse outcomes than each parallelism 19 and 32.

With 20 CPU threads on Machine 1, parallelism 32 will not be sufficient to image efficiency degradation brought on by an overabundance of threads. Nevertheless, you’d be capable to discover such conduct for Machine 2. I’d assume that to point out such conduct on Machine 1, the parallelism ought to be set to 64 or extra.

What’s curious right here is that the connection with bench_defaultParallelStream approaching prime appears to not maintain for greater enter sizes of 100k and a million. One of the best efficiency belongs to bench_parallelStreamWithCustomForkJoinPool_16 which can point out that in the long run, fairly smaller parallelism could also be a good suggestion.

The Fork/Be a part of-based implementation is noticeably slower than the default parallel stream implementation, with round 10 % worse efficiency. This sample additionally happens for different sizes. It confirms my assumption from above that becoming a member of totally different smaller elements of a break up knowledge set has a noticeable influence.

After all, the worst rating belongs to the single-threaded strategy and is round 5 occasions slower than the very best consequence. Such a scenario is predicted, as a single-threaded benchmark is a sort of baseline for me. I need to examine how far we will transfer its execution time and 5 occasions higher common execution time within the best-case state of affairs looks like a superb rating.

As for the worth of the rating imply error it is extremely very small. Within the worst case (the very best error), it’s inside 1,5% of its respectable rating (consequence for ClassicStreamBenchmark.bench_parallelStreamWithCustomForkJoinPool_4).

In different circumstances, it varies from 0,1 % to 0,7 % of the general rating.

There appears to be no distinction in consequence positions for sizes above 10 thousand.

Machine 2 (8 cores)

As within the case of Machine 1, the primary rating additionally belongs to bench_defaultParallelStream. Once more, even once we think about a potential measurement error, it nonetheless comes out on prime: nothing particularly fascinating.
What’s fascinating, nonetheless, is that the sample of the primary 3 positions for Machine 2 adjustments quite a bit based mostly on greater enter sizes. For enter 100 to 10000, we’ve considerably related conduct, with bench_defaultParallelStream occupying 1 place and bench_parallelStreamWithCustomForkJoinPool_8 following shortly after.

However, for inputs 100000 and 1000000, the primary place belongs to bench_parallelStreamWithCustomForkJoinPool_8 adopted by bench_parallelStreamWithCustomForkJoinPool_32. Whereas bench_defaultParallelStream is moved to 4th and third positions.

One other curious factor about Machine 2 could also be that for smaller enter sizes, parallelism 32 is sort of distant from the highest. Such efficiency degradation could also be brought on by the overabundance of threads in comparison with the 8 CPU threads whole out there on the machine.

Nonetheless, on inputs 100000 and 1000000, ForkJoinPool with parallelism 32 is within the second place, which can point out that for longer time spans, such overabundance of threads will not be an issue.

Another elements which can be similar to the conduct of Machine 1 are skipped right here and are talked about beneath.

Widespread Factors

There are just a few observations legitimate for each machines:

  1. My ForkJoinNative (“naive”)-based benchmarks yield outcomes which can be noticeably worse, round 10% on each machines, than these delivered by default variations of a parallel stream and even ones with customForkJoinPool. After all, one of many causes is that they aren’t optimized in any means. There are most likely some low-hanging efficiency fruits right here. Thus, I strongly advocate getting aware of the Fork/Be a part of framework, earlier than shifting its implementations to manufacturing.
  2. The time distinction between positions one to 3 may be very, very small – lower than a millisecond. Thus, it might be arduous to attain any kind of repeatability for these benchmarks. With such a small distinction it’s simple for the outcomes distribution to vary between benchmark runs.
  3. The imply error of the scores can also be very, very small, as much as 2% of the general rating in worse circumstances – largely lower than 1%. Such low error could point out two issues. The primary, benchmarks are dependable as a result of outcomes are centered round some level. If there have been some anomalies alongside the way in which the error can be greater. Second, JMH is nice at making measurements.
  4. There is no such thing as a breaking distinction in outcomes between throughput and common time modes. If one of many benchmarks carried out nicely in common time mode, it will additionally carry out nicely in throughput mode.

Above you’ll be able to see all of the variations and similarities I discovered contained in the report recordsdata. For those who discover the rest that appears to be fascinating don’t hesitate to say it within the remark part beneath.

Abstract

Earlier than we lastly break up methods for at the moment, I want to point out another essential factor:

JAVA IS NOT SLOW

Processing the record with a million parts, all potential JMH overhead, and a single thread takes 560 milliseconds (Machine 1) and 1142 milliseconds (Machine 2). There aren’t any particular optimizations or magic included, simply pure default JVM.

The entire greatest time for processing a million parts for Machine 1 was 88 milliseconds for ClassicStreamBenchmark.bench_parallelStreamWithCustomForkJoinPool_16. Within the case of Machine 2, it was 321 milliseconds for ClassicStreamBenchmark.bench_parallelStreamWithCustomForkJoinPool_8.

Though each outcomes is probably not pretty much as good as C/C++-based options, the relative simplicity and descriptiveness of the strategy make it very fascinating, in my view.

General, it’s fairly a pleasant addition to Java’s one billion rows problem.

I’d identical to to say that every one the reviews and benchmark code are within the GitHub repository (linked within the introduction of this text). You possibly can simply confirm my outcomes and evaluate them to the benchmark conduct in your machine.

Moreover, to ease up the obtain of reposts there’s a separate .zip file named reviews.zip that incorporates all of the .csv recordsdata with knowledge.

In addition to, bear in mind Java will not be sluggish.

Thanks in your time.

Evaluate by: Krzysztof Ciesielski, Łukasz Rola

Share This Article
Leave a comment

Leave a Reply

Your email address will not be published. Required fields are marked *

Exit mobile version