The 1 Billion Row Challenge
7 minuter i lästid Helg Java 1BRC

The 1 Billion Row Challenge

Så har det blivit ett nytt år och vi skriver 2024. Hoppas du hunnit vila ut och är redo för, som man klämkäckt säger, nya utmaningar. Hos oss på Ribomation, jobbar vi på fullt med att utveckla nya kurser och hålla vår kurskalender uppdaterad.

Några som heller inte legat på latsidan sedan årsskiftet är de många programmerare som tagit del av nyårets stora snackis: Gunnar Morling's utmaning 1BRC - The 1 Billion Row Challenge.

Den först januari publicerade han en bloggpost, samt länkade denna från X/Twitter, om en utmaning till alla Java programmerare med följande (förkortade) beskrivning.

Givet en textfil med väderstation och tillhörande temperaturvärde per rad; beräkna medelvärdet per väderstation, samt minsta och största värdet för denna. Skriv ut resultatet, sorterat på väderstationernas namn. Antalet rader i textfilen är en miljard.

1BRC - The 1 Billion Row Challenge

Textfilens innehåll utgörs av stationsnamn och temperatur, separerat av semikolon. Visar ett exempelutsnitt nedan. Textfilens innehåll genereras av ett Java program.

Hamburg;12.0
Bulawayo;8.9
Palembang;38.8
St. John's;15.2
Cracow;12.6
...

Gunnar publicerade själv en icke-optimerad lösning baserad på Java streams, vilken fungerar som utmaningens baseline. Jag tog mig friheten att skriva om hans lösning, så att det blir lite enklare att se hur den fungerar.

Först, behöver man generera indatafilen, som har det fixerade namnet measurements.txt. Argumentet är antalet rader som ska genereras (här är det bara 100 000). Om du vill testa själv, behöver du först klona GitHub repo - länk finns sist i denna artikel.

java src/main/java/dev/morling/onebrc/CreateMeasurements.java 100000

Huvudfunktionen (main) ser ut som nedan. I try-satsen öppnas indatafilen som en Java stream, vilken läser radvis. Varje rad skapar ett Measurement objekt. Stream-magiken utförs med nästa steg i form av en egen-konfigurerad collector till aggregeringsfunktionen Collectors.groupingBy.

public static void main(String[] args) throws IOException {
    var collector = Collector.of(
            Aggregation::new,
            Aggregation::update,
            Aggregation::combine,
            Result::create,
            Collector.Characteristics.UNORDERED
    );

    var startTime = System.nanoTime();
    try (var lines = Files.lines(Path.of(INPUT_FILE))) {
        lines
                .map(Measurement::create)
                .collect(Collectors.groupingBy(m -> m.station, collector))
                .forEach((station, result) -> System.out.printf("%s: %s%n", station, result));
    }
    var endTime = System.nanoTime();
    System.out.printf(Locale.ENGLISH, "----%nElapsed time: %.3f secs (%,d records)%n",
            (endTime - startTime) * 1E-9, Measurement.count);
}

Measurement utgörs av en Java record och ser ut som nedan. Funktionen create tar en CSV rad (Copenhagen;19.3) och skapar ett objekt, som skickas vidare i strömmen till aggregations-delen. Här används funktionen groupingBy, som grupperar data till en hash-map, vilken slutligen skrivs ut med forEach.

record Measurement(String station, double temperature) {
    static int count = 0;
    Measurement {++count;}
    Measurement(String[] parts) {
        this(parts[0], Double.parseDouble(parts[1]));
    }
    static Measurement create(String csv) {
        return new Measurement(csv.split(";"));
    }
}

Funktionen groupingBy använder sig av en så kallad collector, för att aggregera alla datavärden som tillhöra samma nyckelvärde, vilket i vårt fall är väderstationsnamnet. Första argumentet är en key-mapper, vilken här returnerar stationsnamnet.

Collectors.groupingBy((Measurement m) -> m.station, collector)

Andra argumentet är en collector, som aggregerar ihop alla värden för motsvarande stationsnamn. Här använder vi en fabriksfunktion, som tar ett fyra lambda uttryck, plus ett konfigurationsvärde.

var collector = Collector.of(
        Aggregation::new,
        Aggregation::update,
        Aggregation::combine,
        Result::create,
        Collector.Characteristics.UNORDERED
);

Medan aggregeringen pågår, hanteras detta med hjälp av ett objekt av typen Aggregation, som visas nedan. De fyra lambda-uttrycken representeras som method reference.

class Aggregation {
    int count = 0;
    double sum = 0;
    double min = Double.MAX_VALUE;
    double max = Double.MIN_VALUE;
    Aggregation() {}
    void update(Measurement m) {
        ++count;
        sum += m.temperature;
        min = Math.min(min, m.temperature);
        max = Math.max(max, m.temperature);
    }
    static Aggregation combine(Aggregation a, Aggregation b) {
        var c   = new Aggregation();
        c.count = a.count + b.count;
        c.sum   = a.sum + b.sum;
        c.min   = Math.min(a.min, b.min);
        c.max   = Math.max(a.max, b.max);
        return c;
    }
}

Det första argumentet skapar ett nytt objekt.

Aggregation::new    -->  () -> new Aggregation()

Det andra argumentet uppdaterar ett existerande Aggregation objekt.

 Aggregation::update    --> (aggr, m) -> aggr.update(m) 

Det tredje argumentet kombinerar två objekt och returnerar ett nytt.

Aggregation::combine    --> (aggr1, aggr2) ->  Aggregation.combine(aggr1, aggr2)

Det färdiga resultatet representeras här av en Java record, som räknar ut medelvärdet, samt kan returnera en formaterad textrepresentation.

record Result(int count, double mean, double min, double max) {
    static Result create(Aggregation a) {
        return new Result(a.count, a.sum / a.count, a.min, a.max);
    }
    @Override
    public String toString() {
        return String.format(Locale.ENGLISH, "%.1fC, %.1f/%.1f (%d)", mean, min, max, count);
    }
}

Ett sådant resultatobjekt skapas med det fjärde argumentet.

 Result::create    --> (aggr) -> Result.create(aggr)

Så här kan man köra min lösning och sista delen av utskriften visas.

java src/main/java/ribomation/CalculateAverage.java
...
Ljubljana: 10.4C, -12.6/35.1 (264)
Tijuana: 16.8C, -5.4/41.2 (236)
Dakar: 24.3C, -6.5/53.3 (252)
Valencia: 18.5C, -7.6/43.5 (262)
----
Elapsed time: 0.289 secs (100,000 records)

En länk till min kompletta källkod (GitHub gist) finner du sist i denna artikel. Jag vill (ännu en gång) poängtera att min lösning är en pedagogisk omskrivning av Gunnars baseline lösning.

Vad är då så fascinerande med denna utmaning (1BRC)? Jo, att ta del av alla optimerade lösningar. Inom loppet av en dryg vecka skickade folk in lösningar på alltmer esoteriska prestandalösningar.

Gunnars ursprungliga lösning klockade in på 4 minuter och 13 sekunder, på den testserver han använder. I skrivande stund så är det signaturen royvanrijn som leder med en lösning som klockar in på blott 6 sekunder!!! 😮

Det finns några återkommande tricks, som ser ut att vara utslagsgivande.

  1. Graal JVM
  2. Minnesmappad indatafil
  3. Flertrådad skanning av indatat
  4. Egenutvecklad hash-map
  5. Villkorslös (branch-less) optimerad tolkning (parsing) av temperaturvärdena

Graal (1) är en relativt ny JVM från Oracle baserad på insikter/erfarenheter från den klassiska JVM:en och JRockit. Helt klart är den snabbare. Eftersom det är så många rader i indatafilen, så är programmet primärt I/O-bound, vilket man läser genom att smacka in hela filen i det virtuella adressrummet (2). På så sätt utnyttjar man operativets läsning av minnessidor från disk (VM pages).

I och med att hela filinnehållet finns i adressrummet, som en enda stor array, kan man dela upp arbetet på flera trådar, en för varje tillgänglig CPU (processing unit) vilken arbetar med ett eget segment av indatat (3).

Så här långt (1-3) är det enkelt att hänga med, det är sen det börjar bli esoteriskt. Okej, man har väl kanske implementerat en hash-tabell (4) någon gång, men det inget man gör till vardags.

Men sen blir det ju rena Harry Potter med SWAR-parsing (5). SWAR står för SIMD Within A Register. SIMD står för Single Instruction Multiple Data.

I korthet, innebär det att en textsträng innehållande ett tal på 8 siffror, där varje teckensiffra är ett ASCII byte-värde, läggs i ett 64-bits register, vilket ju är 8 byte. Nu kan varje siffra tolkas parallellt genom att subtrahera ASCII-koden för siffran '0'. Siffervärdena kan sen adderas ihop, där varje siffra multipliceras med motsvarande tiopotens. Det finns en länk till en förklarade artikel sist i denna bloggpost. Utan vidare förklaring lägger jag koden här nedan för vidare förundran.

uint32_t  parse_eight_digits_unrolled(uint64_t val) {
  const uint64_t mask = 0x000000FF000000FF;
  const uint64_t mul1 = 0x000F424000000064; // 100 + (1000000ULL << 32)
  const uint64_t mul2 = 0x0000271000000001; // 1 + (10000ULL << 32)
  val -= 0x3030303030303030;
  val = (val * 10) + (val >> 8); // val = (val * 2561) >> 8;
  val = (((val & mask) * mul1) + (((val >> 16) & mask) * mul2)) >> 32;
  return val;
}

Om du inte gillar Java, så finns det en speciell sektion med lösningar i en stor mängd andra programspråk. Här finner du program i C, C++, C#, Elixir, Python, Go, Ruby, Rust, Kotlin, med flera. En länk till detta finns sist i denna artikel.


Lista med länkar omnämnda i texten