Spark实战:用Join操作和二次排序搞定电影评分分析(附完整Scala代码)
Spark实战用Join操作和二次排序搞定电影评分分析附完整Scala代码电影评分分析是推荐系统的基础环节也是大数据工程师面试的经典考题。今天我们就用Spark的Join和二次排序这两个核心算子手把手带你完成一个真实场景下的评分分析任务。不同于网上那些只贴代码的教程我会重点讲解生产环境中容易踩的坑比如KeyBy的正确用法、自定义排序类的序列化陷阱以及如何优化Shuffle性能。1. 环境准备与数据理解首先确保你的开发环境满足以下条件JDK 1.8Scala 2.12.xSpark 3.2.0我们使用的数据集包含两个关键文件ratings.dat- 用户评分记录1::1193::5::978300760 1::661::3::978302109字段含义用户ID::电影ID::评分(1-5)::时间戳movies.dat- 电影元数据1::Toy Story (1995)::Animation|Childrens|Comedy 2::Jumanji (1995)::Adventure|Childrens|Fantasy字段含义电影ID::标题::类型常见踩坑点原始数据使用::分隔符这种非标准格式需要特别注意处理。我曾见过有团队直接用CSV解析器读取导致字段错乱的案例。2. 核心处理流程设计整个分析流程分为四个阶段graph TD A[原始数据加载] -- B[评分聚合计算] B -- C[电影信息关联] C -- D[高分电影筛选] D -- E[自定义排序输出]不过在生产环境中我们更推荐用Spark SQL实现代码更简洁且易于优化。下面分别用RDD和DataFrame两种方式实现。3. RDD实现方案3.1 评分数据ETL处理首先处理评分数据计算每部电影的平均分val ratingRDD sc.textFile(hdfs://path/to/ratings.dat) .map { line val fields line.split(::) // 防止空行导致数组越界 require(fields.length 4, sInvalid rating record: $line) (fields(1).toInt, fields(2).toDouble) // (movieId, rating) } .persist(StorageLevel.MEMORY_AND_DISK) // 多次使用建议缓存 // 计算平均分使用reduceByKey更高效 val avgRatings ratingRDD .mapValues(r (r, 1)) // (rating, 1) .reduceByKey { case ((sum1, cnt1), (sum2, cnt2)) (sum1 sum2, cnt1 cnt2) } .mapValues { case (sum, cnt) BigDecimal(sum / cnt).setScale(2, RoundingMode.HALF_UP).toDouble }关键优化点使用reduceByKey替代groupByKey减少Shuffle数据量采用mapValues保持分区器避免不必要的Shuffle使用BigDecimal保证小数精度3.2 电影信息关联接下来关联电影信息case class MovieInfo(movieId: Int, title: String, genres: String) val moviesRDD sc.textFile(hdfs://path/to/movies.dat) .flatMap { line try { val fields line.split(::) Some((fields(0).toInt, MovieInfo(fields(0).toInt, fields(1), fields(2)))) } catch { case _: Exception None // 脏数据处理 } } // Join操作 - 生产环境建议设置并行度 val joined avgRatings.join(moviesRDD) .repartition(200) // 根据集群规模调整 .persist()Join性能调优技巧小表广播如果电影数据量小可用broadcast优化预分区确保两个RDD使用相同的分区器监控指标关注Spark UI中的Shuffle Read/Write指标3.3 高分电影筛选与排序筛选评分4.0的电影并进行二次排序// 自定义排序类必须序列化 class MovieScore(val movieId: Int, val avgRating: Double, val title: String) extends Ordered[MovieScore] with Serializable { override def compare(that: MovieScore): Int { val ratingCompare that.avgRating.compareTo(this.avgRating) if (ratingCompare ! 0) ratingCompare else this.title.compareTo(that.title) } } val topMovies joined.filter { case (_, (rating, _)) rating 4.0 } .map { case (id, (rating, info)) new MovieScore(id, rating, info.title) } .sortBy(identity) // 使用自定义排序序列化陷阱自定义排序类必须实现Serializable否则会报NotSerializableException。这是新手最容易犯的错误之一。4. DataFrame优化方案对于现代Spark应用更推荐使用DataFrame APIval ratingsDF spark.read .option(delimiter, ::) .csv(hdfs://path/to/ratings.dat) .selectExpr( _c1 as movieId, _c2 as rating ) .groupBy(movieId) .agg(avg(rating).alias(avgRating)) val moviesDF spark.read .option(delimiter, ::) .csv(hdfs://path/to/movies.dat) .selectExpr( _c0 as movieId, _c1 as title, _c2 as genres ) // 使用Spark SQL优化Join val resultDF ratingsDF.join(moviesDF, movieId) .filter(col(avgRating) 4.0) .orderBy(desc(avgRating), asc(title)) // 查看执行计划 resultDF.explain(true)优势对比特性RDD方案DataFrame方案代码简洁度较低高性能优化空间手动调优Catalyst自动优化类型安全弱强自定义排序灵活性高中等5. 生产环境注意事项数据倾斜处理// 采样检测倾斜 val skewKeys ratingRDD .map(_._1) .countByValue() .filter(_._2 100000) // 阈值根据数据量调整 // 倾斜Key单独处理 if (skewKeys.nonEmpty) { val skewed ratingRDD.filter(x skewKeys.contains(x._1)) val normal ratingRDD.filter(x !skewKeys.contains(x._1)) // 分别处理后再合并 }资源参数配置spark-submit \ --executor-memory 8G \ --executor-cores 4 \ --conf spark.sql.shuffle.partitions400 \ --conf spark.default.parallelism400监控指标关注Stage的GC时间Task数据倾斜度Shuffle溢出到磁盘的情况6. 扩展应用场景这个技术方案可以复用到多种业务场景电商领域商品评分分析订单关联用户信息社交网络用户关系图谱分析内容热度排序物联网设备状态关联地理位置传感器数据时间序列分析// 通用二次排序模板 class GenericSecondarySort[K : Ordered[K]] extends Serializable { def sort(rdd: RDD[(K, V)]): RDD[(K, V)] { rdd.sortByKey() } }在实际项目中我发现合理使用persist()能显著提升性能但要注意存储级别的选择。MEMORY_ONLY适合小数据集MEMORY_AND_DISK_SER更适合大数据量场景。曾经有个项目因为没设置序列化存储导致OOM失败这个教训值得大家警惕。