电商推荐系统七:基于物品的协同过滤相似推荐
7.2 基于物品的协同过滤相似推荐
基于物品的协同过滤(Item-CF),只需收集用户的常规行为数据(比如点击、收藏、购买)就可以得到商品间的相似度
,在实际项目中应用很广。
我们的整体思想是,如果两个商品有同样的受众(感兴趣的人群),那么它们就是有内在相关性的。所以可以利用已有的行为数据,分析商品受众的相似程度,进而得出商品间的相似度。我们把这种方法定义为物品的“同现相似度”,可以概括为喜欢同一商品的用户能喜欢同一组用户喜欢的别的商品,即人以群分
的理念。
公式如下:
其中,Ni 是购买商品 i (或对商品 i 评分)的用户列表,Nj 是购买商品 j 的用户列表。
核心代码实现如下:
package com.recom.itemcf
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
case class ProductRating(userId: Int,productId:Int,score:Double,timestamp:Int)
case class MongoConfig(uri:String,db:String)
//定义标准推荐对象
case class Recommendation(productId: Int,score:Double)
//定义用户推荐列表
case class UserRecs(userId:Int,recs:Seq[Recommendation])
//定义商品相似度列表
case class ProductRecs(productId:Int,recs:Seq[Recommendation])
object ItemCFRecommender {
//定义表名和常量
val MONGODB_RATING_COLLECTION = "Rating"
val USER_MAX_RECOMMENDATION=10
val ITEM_CF_PRODUCT_RECS = "ItemCFProductRecs"
def main(args: Array[String]): Unit = {
//定义基础配置的集合(可以放入配置文件,通过方法获取属性的值)
val config = Map(
"spark.cores"->"local[*]",
"mongo.uri"->"mongodb://hadoop102:27017/recommender",
"mongo.db"->"recommender"
)
//创建一个spark config
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
//创建一个spark session
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
//导入隐式转换类,在DF和DS转换的过程中会使用到
import spark.implicits._
//通过隐式类的方法创建mongodb连接对象
implicit val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))
//加载数据
val ratingDF = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.map(rating=>(rating.userId,rating.productId,rating.score))
.toDF("userId","productId","score")
.cache()
//TODO:核心算法,计算同现相似度,得到商品的相似列表
//统计每个商品的评分个数,按照productId来做group by
val productRatingCountDF = ratingDF.groupBy("productId").count()
//在原有评分表上添加count
val ratingWithCountDF = ratingDF.join(productRatingCountDF,"productId")
//将评分表按照用户id两两配对,统计两个商品被同一个用户评分过的次数
val joinDF = ratingWithCountDF.join(ratingWithCountDF,"userId")
.toDF("userId","product1","score1","count1","product2","score2","count2")
.select("userId","product1","count1","product2","count2")
//joinDF.show()
//创建一张临时表,用于写sql查询
joinDF.createOrReplaceTempView("joined")
//按照product1,product2做group by,统计userId的数量,即同时对两个商品评分的人数
val cooccurrenceDF = spark.sql(
""" |select product1,product2, count(userId) as cocount, |first(count1) as count1, first(count2) as count2 |from joined |group by product1,product2 |""".stripMargin
).cache()
//提取需要的数据,包装成(product1,(product2,score))
val simDF = cooccurrenceDF.map{
raw =>
val coocSim = cooccurrenceSim(raw.getAs[Long]("cocount"),raw.getAs[Long]("count1")
,raw.getAs[Long]("count2"))
(raw.getInt(0),(raw.getInt(1),coocSim))
}
.rdd
.groupByKey()
.map{
case (productId,recs)=>
ProductRecs(productId,recs.toList.filter(x=>x._1!=productId)
.sortWith(_._2>_._2)
.take(USER_MAX_RECOMMENDATION)
.map(x=>Recommendation(x._1,x._2))
)
}
.toDF()
//保存到mongodb
simDF.write
.option("uri",mongoConfig.uri)
.option("collection",ITEM_CF_PRODUCT_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save( )
spark.stop()
}
//按照公式计算现同相似度
def cooccurrenceSim(coCount: Long, count1: Long, count2: Long)={
coCount/math.sqrt(count1*count2)
}
}
还没有评论,来说两句吧...