Processing large files in Scala

Published on 2009-10-01
Edited on 2012-03-24
Tagged: scala

Part of my research lately has been analyzing the heap behavior of several network applications. This involves logging every malloc, free, load, and store that occurs while a program is executing, then running a number of analysis tools on the log later. I decided to write my log processing tools in Scala since the combination of pattern matching and good data structures allows me to write new analysis tools very quickly.

While writing these tools, I experienced a lot of performance problems. Some of the logs can get quite large (on the order of gigabytes). Here are some of the techniques I used to make things run at an acceptable speed.

1. Increase the maximum heap size.

By default, Scala sets the maximum heap size to 256 MB. This is not nearly enough if you have any reasonably large data set that you want to hold in memory. If you exceed this limit, you will get an OutOfMemoryError, and your program will crash. There will also be substantial garbage collection overhead as you approach the limit. With Java, you can increase the maximum heap size by passing the command line option -Xmx followed by the maximum size of the heap. Although the Scala code runner doesn't accept Java arguments, it does let you pass them directly to Java through the JAVA_OPTS environment variable. So to start a Scala program with a 4GB heap, you would run it like this:

JAVA_OPTS=-Xmx4g scala foo.bar.baz arg1 arg2 ...

2. Use memory mapped I/O

If you are reading individual bytes from a binary file with a FileInputStream, you are going to have terrible performance. Every time you read from the stream, Java will make a system call, which is expensive. This can be mitigated by wrapping the file with a BufferedInputStream, but you still need to make a number of system calls proportional to the size of the file.

Java lets you map the contents of a file directly into memory using FileChannel and MappedByteBuffer from the java.nio package. This works like the mmap system call in C. The file's contents will appear in your address space, but since it won't be on the heap, it will not cause any additional garbage collection overhead. Once a file is mapped, you can read data from it using the methods in ByteBuffer without making any additional system calls. When you read from a part of the file that hasn't been read before, the kernel will load that section of the file automatically.

You can map a file like this:

import java.io.File
import java.io.FileInputStream
import java.nio.channels.FileChannel.MapMode._

val file = new File("somefile.dat")
val fileSize = file.length
val stream = new FileInputStream(file)
val buffer = stream.getChannel.map(READ_ONLY, 0, fileSize)

There are some important caveats for memory mapped I/O. First, each mapping can only cover 2GB of a file. This is apparently because ByteBuffer uses signed 32-bit integers for offsets and positions. Use multiple mappings if you need to map a larger file, one for each 2GB chunk. Second, there is no way to manually unmap a file. Unmapping occurs automatically when the ByteBuffer object gets garbage collected, but there is no way to control when that occurs. Because of both of these caveats, I would highly recommend running a 64-bit JVM to avoid exhausting your virtual address space.

3. Make Java do endian conversion for you

Most computers are based on the x86 architecture, which means that binary values in your data are probably in little-endian format (least significant byte first). By default, Java expects file data to be in big-endian format, so you would normally have to run a bit of code to swap bytes every time you read an integer. If you're using memory mapped I/O (or if you're using a ByteBuffer at all), you can set the byte order to little-endian to eliminate this overhead:

import java.nio.ByteOrder._

buffer.order(LITTLE_ENDIAN)

val i = buffer.getInt  // reads a 32-bit little-endian integer

Chances are, the JVM will still do some byte order conversions internally, but these should be on highly optimized code paths, i.e., not random JITed byte code).

4. Use streams instead of lists or arrays

When I first wrote my data processing tools, I followed a very simple pattern:

  1. read events from file into ArrayBuffer
  2. make one or more passes over the ArrayBuffer
  3. write out results

Following this pattern, my programs would read the log quickly at first but would get slower and slower as more data was put on the heap, eventually grinding to a halt. The data processing passes wouldn't even get to run because my programs would run out of memory. Since many of my tools only need to make one pass over the log, it made more sense to present the sequence of events as a stream.

Streams in Scala are like lists, but they don't evaluate their contents until requested. This means a program can read and process data at the same time. Memory usage is kept at a fixed level, since new events aren't read until they are needed, and old events can be garbage collected. If you need to make multiple passes over a large data set, streams are still useful. Even though old events may have been garbage collected, reading them a second time will be fast since the file will probably still be in the kernel's buffer cache.

Here's how I implemented my event stream:

final class EventStream(buffer: ByteBuffer, position: Int) 
  extends Stream[Event]
{
  private def readEvent: Event = {
    // read an event from the buffer starting at its current position
  }

  private lazy val (event, nextPosition) = {
    assert(!isEmpty)
    buffer.position(position)  // reset the buffer's position
    val e = readEvent
    (e, buffer.position)
  }

  override def head = event

  override def tail = {
    if (!tailDefined)
      throw new UnsupportedOperationException
    else
      new EventStream(buffer, nextPosition)
  }

  override def isEmpty = position == buffer.capacity

  protected def tailDefined = nextPosition <= buffer.capacity
}

The event and the position of the next event in the buffer are computed lazily. The event is returned by head. The rest of the stream is generated in tail by creating a new EventStream at the position of the next event.

The stream can be initialized and used like this:

val events = new EventStream(buffer, 0)
for (e <- events) {
    // process event
}

5. Double-check your code for slow areas

When I write code, I usually strive for simplicity and readability over performance. This is not always a best practice, especially when you are running a very simple, readable O(n2) algorithm on a 30 million element data set. If you can switch from a O(n) container to one that is O(lg n) or O(1), it will probably boost your performance significantly. The extra complication may be worth it.