电商推荐系统七:基于物品的协同过滤相似推荐

曾经终败给现在 2022-10-29 01:42 304阅读 0赞

7.2 基于物品的协同过滤相似推荐

基于物品的协同过滤(Item-CF),只需收集用户的常规行为数据(比如点击、收藏、购买)就可以得到商品间的相似度,在实际项目中应用很广。

我们的整体思想是,如果两个商品有同样的受众(感兴趣的人群),那么它们就是有内在相关性的。所以可以利用已有的行为数据,分析商品受众的相似程度,进而得出商品间的相似度。我们把这种方法定义为物品的“同现相似度”,可以概括为喜欢同一商品的用户能喜欢同一组用户喜欢的别的商品,即人以群分的理念。

公式如下:
在这里插入图片描述

其中,Ni 是购买商品 i (或对商品 i 评分)的用户列表,Nj 是购买商品 j 的用户列表。

核心代码实现如下:

  1. package com.recom.itemcf
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.sql.SparkSession
  4. case class ProductRating(userId: Int,productId:Int,score:Double,timestamp:Int)
  5. case class MongoConfig(uri:String,db:String)
  6. //定义标准推荐对象
  7. case class Recommendation(productId: Int,score:Double)
  8. //定义用户推荐列表
  9. case class UserRecs(userId:Int,recs:Seq[Recommendation])
  10. //定义商品相似度列表
  11. case class ProductRecs(productId:Int,recs:Seq[Recommendation])
  12. object ItemCFRecommender {
  13. //定义表名和常量
  14. val MONGODB_RATING_COLLECTION = "Rating"
  15. val USER_MAX_RECOMMENDATION=10
  16. val ITEM_CF_PRODUCT_RECS = "ItemCFProductRecs"
  17. def main(args: Array[String]): Unit = {
  18. //定义基础配置的集合(可以放入配置文件,通过方法获取属性的值)
  19. val config = Map(
  20. "spark.cores"->"local[*]",
  21. "mongo.uri"->"mongodb://hadoop102:27017/recommender",
  22. "mongo.db"->"recommender"
  23. )
  24. //创建一个spark config
  25. val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
  26. //创建一个spark session
  27. val spark = SparkSession.builder().config(sparkConf).getOrCreate()
  28. //导入隐式转换类,在DF和DS转换的过程中会使用到
  29. import spark.implicits._
  30. //通过隐式类的方法创建mongodb连接对象
  31. implicit val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))
  32. //加载数据
  33. val ratingDF = spark.read
  34. .option("uri", mongoConfig.uri)
  35. .option("collection", MONGODB_RATING_COLLECTION)
  36. .format("com.mongodb.spark.sql")
  37. .load()
  38. .as[ProductRating]
  39. .map(rating=>(rating.userId,rating.productId,rating.score))
  40. .toDF("userId","productId","score")
  41. .cache()
  42. //TODO:核心算法,计算同现相似度,得到商品的相似列表
  43. //统计每个商品的评分个数,按照productId来做group by
  44. val productRatingCountDF = ratingDF.groupBy("productId").count()
  45. //在原有评分表上添加count
  46. val ratingWithCountDF = ratingDF.join(productRatingCountDF,"productId")
  47. //将评分表按照用户id两两配对,统计两个商品被同一个用户评分过的次数
  48. val joinDF = ratingWithCountDF.join(ratingWithCountDF,"userId")
  49. .toDF("userId","product1","score1","count1","product2","score2","count2")
  50. .select("userId","product1","count1","product2","count2")
  51. //joinDF.show()
  52. //创建一张临时表,用于写sql查询
  53. joinDF.createOrReplaceTempView("joined")
  54. //按照product1,product2做group by,统计userId的数量,即同时对两个商品评分的人数
  55. val cooccurrenceDF = spark.sql(
  56. """ |select product1,product2, count(userId) as cocount, |first(count1) as count1, first(count2) as count2 |from joined |group by product1,product2 |""".stripMargin
  57. ).cache()
  58. //提取需要的数据,包装成(product1,(product2,score))
  59. val simDF = cooccurrenceDF.map{
  60. raw =>
  61. val coocSim = cooccurrenceSim(raw.getAs[Long]("cocount"),raw.getAs[Long]("count1")
  62. ,raw.getAs[Long]("count2"))
  63. (raw.getInt(0),(raw.getInt(1),coocSim))
  64. }
  65. .rdd
  66. .groupByKey()
  67. .map{
  68. case (productId,recs)=>
  69. ProductRecs(productId,recs.toList.filter(x=>x._1!=productId)
  70. .sortWith(_._2>_._2)
  71. .take(USER_MAX_RECOMMENDATION)
  72. .map(x=>Recommendation(x._1,x._2))
  73. )
  74. }
  75. .toDF()
  76. //保存到mongodb
  77. simDF.write
  78. .option("uri",mongoConfig.uri)
  79. .option("collection",ITEM_CF_PRODUCT_RECS)
  80. .mode("overwrite")
  81. .format("com.mongodb.spark.sql")
  82. .save( )
  83. spark.stop()
  84. }
  85. //按照公式计算现同相似度
  86. def cooccurrenceSim(coCount: Long, count1: Long, count2: Long)={
  87. coCount/math.sqrt(count1*count2)
  88. }
  89. }

发表评论

表情:
评论列表 (有 0 条评论,304人围观)

还没有评论,来说两句吧...

相关阅读