【推荐算法】协同过滤算法代码(pyspark | ALS)

【推荐算法】协同过滤算法介绍_MachineCYL的博客-CSDN博客

上文介绍了协同过滤算法的原理,接下来我介绍一下协同过滤算法的代码实现。

下面我就开始介绍用pyspark中的ALS(交替最小二乘矩阵分解)来实现协同过滤代码。

一、ALS的简单介绍

ALS算法是2008年以来,用的比较多的协同过滤算法。它已经集成到Spark的Mllib库中,使用起来比较方便。从协同过滤的分类来说,ALS算法属于User-Item CF,也叫做混合CF。它同时考虑了User(用户)和Item(商品)两个方面。

用户和商品的关系,可以抽象为如下的三元组:。其中,Rating是用户对商品的评分,表征用户对该商品的喜好程度。

假设我们有一批用户数据,其中包含m个User和n个Item,则我们定义Rating矩阵 Rm×n。在实际使用中,由于n和m的数量都十分巨大,因此R矩阵的规模很容易就会突破1亿项。这时候,传统的矩阵分解方法对于这么大的数据量已经是很难处理了。另一方面,一个用户也不可能给所有商品评分,因此,R矩阵注定是个稀疏矩阵。而使用ALS可以达到数据降维的目的,大大减少计算量。

二、前期准备

  • 使用pyspark前要先安装spark的环境。我的spark版本是2.4.3,pyspark版本也是2.4.3。
  • 如果需要安装spark环境,可以参考:

Spark的安装及配置 - 简书

  •  pyspark安装指令如下(加清华源,下载快多了):
pip install pyspark==2.4.3 -i https://pypi.tuna.tsinghua.edu.cn/simple
  • 数据格式(如果需要获取数据,可以见下方链接):

模型训练用到的数据格式如下:主要用到userId、movieId和rating字段。

【推荐算法】协同过滤算法代码(pyspark | ALS)_第1张图片

三、详细代码

定义CollaborativeFiltering类,主要封装模型训练、模型保存、用户推荐、商品推荐等代码。

class CollaborativeFiltering(object):
    def __init__(self, spark_session):
        self.spark_session = spark_session
        self.model = None

    def train(self, train_set, user_col, item_col, rating_col, epoch=10):
        """
        Build the recommendation model using ALS on the training data
        Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
        """
        als = ALS(regParam=0.01, maxIter=epoch, userCol=user_col, itemCol=item_col, ratingCol=rating_col,
                  coldStartStrategy='drop')
        self.model = als.fit(train_set)

    def eval(self, test_set, label_col='ratingFloat', metric='rmse'):
        """ Evaluate the model on the test data """
        predictions = self.model.transform(test_set)

        # self.model.itemFactors.show(10, truncate=False)
        # self.model.userFactors.show(10, truncate=False)
        evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=label_col, metricName=metric)
        loss = evaluator.evaluate(predictions)
        return loss

    def save(self, model_dir="./model_param"):
        self.model.write().overwrite().save(model_dir)

    def load(self, model_dir="./model_param"):
        self.model = ALSModel.load(model_dir)

    def recommend_for_all_users(self, num_items=10):
        user_recs = self.model.recommendForAllUsers(numItems=num_items)
        return user_recs

    def recommend_for_all_items(self, num_users=10):
        item_recs = self.model.recommendForAllItems(numUsers=num_users)
        return item_recs

    def recommend_for_user_subset(self, dataset, num_items=10):
        user_recs = self.model.recommendForUserSubset(dataset=dataset, numItems=num_items)
        return user_recs

    def recommend_for_item_subset(self, dataset, num_users=10):
        item_recs = self.model.recommendForItemSubset(dataset=dataset, numUsers=num_users)
        return item_recs

新建代码文件train_cf.py,加入示例代码如下:

def parse_argvs():
    parser = argparse.ArgumentParser(description='[collaborativeFiltering]')
    parser.add_argument("--data_path", type=str, default='./data/ratings.csv')
    parser.add_argument("--model_path", type=str, default='./model_param')
    parser.add_argument("--epoch", type=int, default=10)
    parser.add_argument("--train_flag", type=bool, default=False)
    args = parser.parse_args()
    print('[input params] {}'.format(args))

    return parser, args


if __name__ == '__main__':
    parser, args = parse_argvs()
    data_path = args.data_path
    model_path = args.model_path
    train_flag = args.train_flag
    epoch = args.epoch

    conf = SparkConf().setAppName('collaborativeFiltering').setMaster("local[*]")
    spark_session = SparkSession.builder.config(conf=conf).getOrCreate()

    # read data
    data_path = os.path.abspath(data_path)
    data_path = "file://" + data_path
    print("[spark] read file path: {}".format(data_path))

    ratingSamples = spark_session.read.format('csv').option('header', 'true').load(data_path) \
        .withColumn("userIdInt", F.col("userId").cast(IntegerType())) \
        .withColumn("movieIdInt", F.col("movieId").cast(IntegerType())) \
        .withColumn("ratingFloat", F.col("rating").cast(FloatType()))
    training, test = ratingSamples.randomSplit((0.8, 0.2), seed=2022)

    # collaborative filtering start
    cf = CollaborativeFiltering(spark_session=spark_session)

    if train_flag is True:
        cf.train(train_set=training,
                 user_col='userIdInt',
                 item_col='movieIdInt',
                 rating_col='ratingFloat',
                 epoch=epoch)

        cf.save(model_dir=model_path)
    else:
        cf.load(model_dir=model_path)

    loss = cf.eval(test_set=test, label_col='ratingFloat', metric='rmse')
    print("[Root-mean-square error] {}".format(loss))

    # Generate top 10 movie recommendations for each user
    user_recs = cf.recommend_for_all_users(num_items=10)
    user_recs.show(10, False)

    # Generate top 10 user recommendations for each movie
    movie_recs = cf.recommend_for_all_items(num_users=10)
    movie_recs.show(10, False)

    # Generate top 10 movie recommendations for a specified set of users
    user_data = ratingSamples.select("userIdInt").distinct().limit(10)
    user_sub_recs = cf.recommend_for_user_subset(dataset=user_data, num_items=10)
    user_sub_recs.show(10, False)

    # Generate top 10 user recommendations for a specified set of movies
    movie_data = ratingSamples.select("movieIdInt").distinct().limit(10)
    movie_sub_recs = cf.recommend_for_item_subset(dataset=movie_data, num_users=10)
    movie_sub_recs.show(10, False)

    spark_session.stop()

运行指令示例:

# 进行ALS模型训练和预测
python train_cf.py  --data_path "./data/ratings.csv" --train_flag True

# 不训练,直接进行ALS模型预测
python train_cf.py  --data_path "./data/ratings.csv"

代码执行结果如下(部分): 

【推荐算法】协同过滤算法代码(pyspark | ALS)_第2张图片

需要获取训练数据和代码可以访问我的github,如果觉得有帮助,请star收藏,谢谢~

CollaborativeFiltering

参考链接 

  • spark中ALS介绍_AuroraPetard的博客-CSDN博客_sparkals
  • als算法参数_Pyspark推荐算法实战(一)_三杉的博客-CSDN博客

你可能感兴趣的