Spark Streaming Application Failure Recovery Failed On Windows

Posted by zixuan-zhang on August 10, 2016

1. What Happened?

I made a Application using Spark Streaming. When I tested the Fault Tolerant Feature using WriteAheadLog mechanism of Spark on Windows(yeah, windows), I found out that the application could not recover from failure. What makes it strange is that, the application works well until I killed the driver manually, and the driver restarted and failed. I have read the source code, and the reason is as follows:

2. Environment

  • Windows server 2012
  • Spark: 1.6.1
  • Hadoop: 2.6.3
  • Language: C#

3. Full Debug Log

16/08/03 12:26:14 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 4.0 (TID 85) on executor cnazdev04.fareast.corp.microsoft.com: org.apache.spark.SparkException (Could not read data from write ahead log record FileBasedWriteAheadLogSegment(hdfs://CNAZDEV02:19000/checkpoint/receivedData/0/log-1470197944406-1470198004406,189,59)) [duplicate 3]
16/08/03 12:26:14 ERROR scheduler.TaskSetManager: Task 0 in stage 4.0 failed 4 times; aborting job
16/08/03 12:26:14 INFO cluster.YarnClusterScheduler: Cancelling stage 4
16/08/03 12:26:14 INFO cluster.YarnClusterScheduler: Stage 4 was cancelled
16/08/03 12:26:14 INFO scheduler.DAGScheduler: ResultStage 4 (count at NativeMethodAccessorImpl.java:-2) failed in 5.624 s
16/08/03 12:26:14 INFO scheduler.DAGScheduler: Job 3 failed: count at NativeMethodAccessorImpl.java:-2, took 5.691028 s
java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.api.csharp.CSharpBackendHandler.handleMethodCall(CSharpBackendHandler.scala:156)
	at org.apache.spark.api.csharp.CSharpBackendHandler.handleBackendRequest(CSharpBackendHandler.scala:103)
	at org.apache.spark.api.csharp.CSharpBackendHandler.channelRead0(CSharpBackendHandler.scala:30)
	at org.apache.spark.api.csharp.CSharpBackendHandler.channelRead0(CSharpBackendHandler.scala:27)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 85, cnazdev04.fareast.corp.microsoft.com): org.apache.spark.SparkException: Could not read data from write ahead log record FileBasedWriteAheadLogSegment(hdfs://CNAZDEV02:19000/checkpoint/receivedData/0/log-1470197944406-1470198004406,189,59)
	at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:143)
	at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:167)
	at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:167)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:167)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.api.csharp.CSharpRDD.compute(CSharpRDD.scala:78)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Pathname /C:/tmp/hadoop-t-zixzha/nm-local-dir/usercache/t-zixzha/appcache/application_1470197319438_0002/container_1470197319438_0002_02_000002/tmp/02ee4ba1-15c6-4950-bcd9-9038d0d9e7ed from C:/tmp/hadoop-t-zixzha/nm-local-dir/usercache/t-zixzha/appcache/application_1470197319438_0002/container_1470197319438_0002_02_000002/tmp/02ee4ba1-15c6-4950-bcd9-9038d0d9e7ed is not a valid DFS filename.
	at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:196)
	at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:105)
	at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118)
	at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400)
	at org.apache.spark.streaming.util.FileBasedWriteAheadLog.initializeOrRecover(FileBasedWriteAheadLog.scala:227)
	at org.apache.spark.streaming.util.FileBasedWriteAheadLog.<init>(FileBasedWriteAheadLog.scala:72)
	at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:141)
	at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:141)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLog(WriteAheadLogUtils.scala:140)
	at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLogForReceiver(WriteAheadLogUtils.scala:110)
	at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:138)

4. Key Point

Caused by: java.lang.IllegalArgumentException: Pathname /C:/tmp/hadoop-t-zixzha/nm-local-dir/usercache/t-zixzha/appcache/application_1470197319438_0002/container_1470197319438_0002_02_000002/tmp/02ee4ba1-15c6-4950-bcd9-9038d0d9e7ed from C:/tmp/hadoop-t-zixzha/nm-local-dir/usercache/t-zixzha/appcache/application_1470197319438_0002/container_1470197319438_0002_02_000002/tmp/02ee4ba1-15c6-4950-bcd9-9038d0d9e7ed is not a valid DFS filename.

5. Reason & Analysis

As you can see from the debug log and key point, the error occurs at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(), the error occurs at Hadoop process. Here is the source code:

private String getPathName(Path file) {
    checkPath(file);
    String result = file.toUri().getPath();
    if (!DFSUtil.isValidName(result)) {
      throw new IllegalArgumentException("Pathname " + result + " from " +
                                         file+" is not a valid DFS filename.");
    }
    return result;
  }

The Parameter Path file, in this case is C:/tmp/hadoop. In this function, it first checkPath, nothing to say. Then another temperary viriable result is generated by file.toUri().getPath(), in this case, the result is /C:/tmp/hadoop. For now, everything is in peace. Then it will validate the result viriable by DFSUtil. Here is the source code of DFSUtil.isValidName

  /**
   * Whether the pathname is valid.  Currently prohibits relative paths,
   * names which contain a ":" or "//", or other non-canonical paths.
   */
  public static boolean isValidName(String src) {
    // Path must be absolute.
    if (!src.startsWith(Path.SEPARATOR)) {
      return false;
    }

    // Check for ".." "." ":" "/"
    String[] components = StringUtils.split(src, '/');
    for (int i = 0; i < components.length; i++) {
      String element = components[i];
      if (element.equals(".")  ||
          (element.indexOf(":") >= 0)  ||
          (element.indexOf("/") >= 0)) {
        return false;
      }
  ...

From the comments, we can tell that the src is invalid as long as the string contain :. However, /C:/tmp/hadoop is a legal path. Apparently, This code does not consider paths on Windows.

6. Why Spark Trigger This Error?

As I said on the top, the application worked well until the driver restarted and recovered from failure, the down. But, why this happened? It’s really complicated, I’ll start at the WriteAheadLogBackedBlockRDD.compute function.

    // File: WriteAheadLogBackedBlockRDD.scala
def compute(){
    ...
    if (partition.isBlockIdValid)
    {
        getBlockFromBlockManager().getOrElse { getBlockFromWriteAheadLog() } // Here is the entry.
    }
    else {
        getBlockFromWriteAheadLog()
    }
}

As you can see, Spark trys to get data block from getBlockFromBlockManager(), However, the ApplicationMaster just recovered from failue, it detected that there are failed jobs left, then it try to get the block data. But, BlockManager is empty, then it trys to get block from WriteAheadLog by call the getBlockFromWriteAheadLog function. Here is what getBlockFromWriteAheadLog function does:

def getBlockFromWriteAheadLog(): Iterator[T] = {
  var dataRead: ByteBuffer = null
  var writeAheadLog: WriteAheadLog = null
  try {
    // The WriteAheadLogUtils.createLog*** method needs a directory to create a
    // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
    // writing log data. However, the directory is not needed if data needs to be read, hence
    // a dummy path is provided to satisfy the method parameter requirements.
    // FileBasedWriteAheadLog will not create any file or directory at that path.
    // FileBasedWriteAheadLog will not create any file or directory at that path. Also,
    // this dummy directory should not already exist otherwise the WAL will try to recover
    // past events from the directory and throw errors.
    val nonExistentDirectory = new File(
      System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath
    writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
      SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
    dataRead = writeAheadLog.read(partition.walRecordHandle)

It’s really clear in the comment: To read block from WriteAheadLog, it needs a non-existent directory, then the new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()) is called. the path is what have shown above: C:\tmp\....

So, the process will be dead once it trys to read data from WriteAheadLog on Windows.

7. Breif Summary

  • Spark Streaming application will be dead once it trys to read data from WriteAhead on Windows.
  • When the driver recovers from failure, it will read data from WriteAheadLog
  • When the block not exists in BlockManager, it will read data from WriteAheadLog.

Creative Commons License
This work is licensed under a CC A-S 4.0 International License.