Spark 1.4.1的parquet metadata缓存机制设计的有问题,在数据没有变化的情况下读取数据表没有问题,当表的数据更新后,再次读取会报类似这样:java.io.FileNotFoundException: File does not exist: /user/hive/warehouse/test.db/b/part-r-00004-3abcbb07-e20a-4b5e-a6e5-59356c3d3149.gz.parquet的错误。重启Spark Application后在查询就正常了,但,总不能每次数据表更新后都要重启Spark Application吧。

复现步骤:

// hc is an instance of HiveContext 
hc.sql("select * from b").show()         // this is ok and b is a parquet table 
hc.sql("alter table b rename to b_bak")
hc.sql("create table b stored as parquet as select * from b_bak")
hc.sql("select * from b").show() // 执行报错

即便是设置spark.sql.parquet.cacheMetadata=false也是会出错!应该是Spark的一个Bug,我已经给社区提了此问题,但目前依旧没有修复,只好自己解决:https://issues.apache.org/jira/browse/SPARK-9465

修改源代码:spark/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala, 323处,将如下代码:

val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) 

val cached = getCached(tableIdentifier, paths, metastoreSchema, None) 
val parquetRelation = cached.getOrElse { 
  val created = LogicalRelation( 
    new ParquetRelation2(paths.toArray, None, None, parquetOptions)(hive)) 
  cachedDataSourceTables.put(tableIdentifier, created) 
  created 
} 

parquetRelation 

修改为:

val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) 

val parquetRelation = LogicalRelation( 
    new ParquetRelation2(paths.toArray, None, None, parquetOptions)(hive)) 

parquetRelation 

重新编译Spark即可。

不过,这也只能解决通过hive重新创建的parquet数据表不在报错的问题,如果是通过spark创建的parquet(df.write.mode(SaveMode.Overwrite).saveAsTable("b")这样生成的表还是有问题 ),依旧还是有问题(现在还没有定位到相关代码)。