Spark 大数据入门清单:RFM方法分析用户评级

什么是 RFM 分析方法

理论

RFM是3个指标的缩写,最近一次消费时间间隔(Recency),消费频率(Frequency),消费金额(Monetary)。通过这3个指标对用户分类。

用RFM分析方法把用户分为8类,这样就可以对不同价值用户使用不同的营销决策,把公司有限的资源发挥最大的效果,这就是我们常常听到的精细化运营。比如第1类是重要价值用户,这类用户最近一次消费较近,消费频率也高,消费金额也高,要提供 vip 服务。

  • 最近1次消费时间间隔(R)是指用户最近一次消费距离现在多长时间了
  • 消费频率(F)是指用户一段时间内消费了多少次
  • 消费金额(M)是指用户一段时间内的消费金额

更加广义上的RFM也可以是:

  • 最近1次使用产品时间间隔(R)是指用户最近一次使用距离现在多长时间了
  • 使用频率(F)是指用户一段时间内使用了多少次
  • 使用成本(M)是指用户一段时间内的使用总时间

一段时间的定义可以根据具体的业务情况而定;使用可以指打开app等。

比如您也可以把人脉的联系使用RFM方法进行分析,他大概是这个样子:

但是总体来说:

  • 最近一次消费时间间隔(R),上一次消费离得越近,也就是R的值越小,用户价值越高。

  • 消费频率(F),购买频率越高,也就是F的值越大,用户价值越高。

  • 消费金额(M),消费金额越高,也就是M的值越大,用户价值越高。

我们把这3个指标按价值从低到高排序,并把这3个指标作为XYZ坐标轴,就可以把空间分为8部分,这样就可以把用户分为下图的8类。

实现

现有如下数据:

  • uid:用户唯一标识符
  • use_duration:用户某次使用产品时长
  • record_time:该条信息被记录的时间

我们通过以下方式来处理:

  1. 读取和过滤数据

    spark.read.parquet("path/to/data")
    .select("uid", "use_duration", "record_time")
    .filter($"uid".isNotNull and $"uid" =!= "" // 登录的用户才算有效
    and $"use_duration" > 20 // 达到一定的使用时长(second)
    and datediff(current_date(), $"record_time") < 90 // 过去一段时间的数据(day)
    and datediff(current_date(), $"record_time") >= 0) // 过滤掉未来的数据(day)
  2. 计算RFM

    df.groupBy("uid").agg(
    sum("use_duration") as "M",
    count("uid") as "F",
    datediff(current_date(), max("record_time")) as "R"
    )

如何选取RFM阈值

聚类

由于RFM数据的量纲各不相同,数据的取值也存在很大的差异。为了消除分布差异较大和量纲不同的影响,在对各个指标进行加权之前,需要考虑对数据进行标准化处理。

一般情况中,F、M指标对顾客价值存在正相关的影响,因此其标准化调整通过 x=(xxs)xlxsx'=\frac{(x-x^s)}{x^l-x^s},其中xx'为标准化后的值,xx为原值,xsx^s为该指标最小值,xlx^l为该指标最大值。R对顾客价值存在负相关关系,因此其标准化调整公式为:x=(xlx)xlxsx'=\frac{(x^l-x)}{x^l-x^s}

在聚类的过程中,需要预先判断其聚类的类别数。在模型中客户分类通过每个顾客类别RFM平均值与总RFM平均值相比较来决定的,而单个指标的比较只能有两种情况:大于(等于)或小于平均值,因此可能有8种类别。

实现

  1. 将 RFM 三值标准化(Normalize)

    val minmax = rfmDF.agg(min("R"), min("F"), min("M"),
    max("R"), max("F"), max("M"))
    // 如果下面的代码报错不能将Long转成Int,那就需要手动改一下。
    val minR = minmax.select("min(R)").head().getInt(0)
    val minF = minmax.select("min(F)").head().getInt(0)
    val minM = minmax.select("min(M)").head().getInt(0)
    val maxR = minmax.select("max(R)").head().getInt(0)
    val maxF = minmax.select("max(F)").head().getInt(0)
    val maxM = minmax.select("max(M)").head().getInt(0)
    val udfNormalizeR = udf((r: Int) => {
    (maxR.toDouble - r.toDouble) / (maxR.toDouble - minR.toDouble)
    })
    val udfNormalizeF = udf((f: Int) => {
    (f.toDouble - minF.toDouble) / (maxF.toDouble - minF.toDouble)
    })
    val udfNormalizeM = udf((m: Int) => {
    (m.toDouble - minM.toDouble) / (maxM.toDouble - minM.toDouble)
    })
    val rfmDF = rfmWithLabelDF.withColumn("normalizedR", udfNormalizeR($"R"))
    .withColumn("normalizedF", udfNormalizeF($"F"))
    .withColumn("normalizedM", udfNormalizeM($"M"))
  2. 将features聚合成vector以参与聚类训练

    VectorAssembler assembler = new VectorAssembler()
    .setInputCols(new String[]{"normalizedR", "normalizedF", "normalizedM"})
    .setOutputCol("featuresAssembled");
    rfmDF = assembler.transform(rfmDF);
  3. 进行数据标准化(Standardize),以提升算法准确度(可选)

    StandardScaler scaler = new StandardScaler()
    // 一下两者选其一,一般使用默认值(Std(Scale to unit standard deviation))就好
    //.setWithMean(true)
    //.setWithStd(true)
    .setInputCol("featuresAssembled")
    .setOutputCol("features");
    StandardScalerModel scalerModel = scaler.fit(rfmDF);
    rfmDF = scalerModel.transform(rfmDF);
  4. 训练

    KMeans kMeans = new KMeans()
    .setK(8)
    .setSeed(666)
    // 还有很多设置可以在这里调整,一般使用默认值就好,比如
    //.setMaxIter()
    //.setTol()
    .setDistanceMeasure(DistanceMeasure.EUCLIDEAN());

    // 训练
    KMeansModel model = kMeans.fit(rfmDF);

    // 保存训练之后的模型
    model.write().overwrite().save("kmeans.model");
  5. 查看一些训练之后的信息,比如cost

    KMeansSummary summary = model.summary();
    logger.info("trainingCost: " + summary.trainingCost());
  6. 使用训练好的模型来对原始数据做一次预测并保存

    // summary.predictions(); 也有同样的作用
    Dataset<Row> predictions = model.transform(rfmDF);
    // 把预测的数据结果保存
    predictions.select("userid", "normalizedM", "normalizedF", "normalizedR", "prediction")
    .write().mode(SaveMode.Overwrite).option("header", true)
    .csv("path/to/save");

统计

通过分位数,观察用户成分的分布情况,提出RFM三个指标分别的阈值。

实现

  1. 计算相关性系数

    private def calcCorr(rfm: Dataset[Row]): Unit = {
    val rfCorr = rfm.stat.corr("R", "F")
    logger.info(s"R F Corr is $rfCorr")
    val rmCorr = rfm.stat.corr("R", "M")
    logger.info(s"R M Corr is $rmCorr")
    val mfCorr = rfm.stat.corr("M", "F")
    logger.info(s"M F Corr is $mfCorr")
    }
    20/03/07 01:38:00 INFO Application$: R F Corr is -0.36390210862365274
    20/03/07 01:38:07 INFO Application$: R M Corr is 0.015190953608016712
    20/03/07 01:38:09 INFO Application$: M F Corr is 0.01806946112236617
  2. 计算分位数

    private def calcQuantiles(rfm: Dataset[Row]): Unit = {
    val cols = Array("R", "F", "M")
    val quantiles = Array(0.25, 0.5, 0.6, 0.75, 0.9, 0.95, 0.99)
    val colMedians = rfm.stat.approxQuantile(cols, quantiles, 0.0d)
    for ((medians, index) <- colMedians.zipWithIndex) {
    logger.info(s"列: ${cols(index)}")
    for ((median, index) <- medians.zipWithIndex) {
    logger.info(s"median of ${quantiles(index)}: $median")
    }
    }
    }

通过以上两步骤,可以在日志里看出数据的分布趋势,好确定RFM各自的阈值,比如设定如下阈值:

val rThreshold = 36
val fThreshold = 25
val mThreshold = 4626
  1. 计算用户评级

    rfmDF.withColumn("label",
    when($"R" < rThreshold and $"F" >= fThreshold and $"M" >= mThreshold, lit("重要保持"))
    .when($"R" >= rThreshold and $"F" >= fThreshold and $"M" >= mThreshold, lit("重要价值"))
    .when($"R" < rThreshold and $"F" < fThreshold and $"M" >= mThreshold, lit("重要挽留"))
    .when($"R" >= rThreshold and $"F" < fThreshold and $"M" >= mThreshold, lit("重要发展"))
    .when($"R" < rThreshold and $"F" >= fThreshold and $"M" < mThreshold, lit("一般保持"))
    .when($"R" >= rThreshold and $"F" >= fThreshold and $"M" < mThreshold, lit("一般价值"))
    .when($"R" < rThreshold and $"F" < fThreshold and $"M" < mThreshold, lit("一般挽留"))
    .when($"R" >= rThreshold and $"F" < fThreshold and $"M" < mThreshold, lit("一般发展"))
    .otherwise(lit("其他"))
  2. 查看每个级别的用户数量

    rfmWithLabelDF.groupBy("label").count().show()
  3. 保存计算出来的结果

    rfmWithLabelDF.write
    .mode(SaveMode.Overwrite)
    .option("header", value = true)
    .csv("path/to/save")

差异

通过聚类(未标准化(Normalize))

通过统计手段

具体要选取哪种要根据业务本身的需求和数据的质量来决定。

更多

在使用聚类或者统计手段给用户分了类别之后,我们面临的新问题就是同一类别的数据有没有优先级?

答案肯定是有的,我们肯定是要对客户簇进行细分。

利用 AHP 法分析(The analytic hierarchy process)得到的 RFM 各指标权重,结合各类顾客的 RFM 指标,根据每一类客户的顾客终身价值得分来进行排序。

在进行客户分类后再对客户的类别进行顾客终身价值排序,使得企业能够量化各类客户的价值的差别,弥补了的客户分类方法的不足。这有助于企业制定更为可行的客户政策。由于受到成本的制约,电信企业不可能采取无差别的个性化服务,企业只能将资源集中在少数几类对企业重要的客户上。按照总得分的排列情况,企业应该优先将资源投放到总得分较高的客户身上。

那如何进行层次分析呢?请参考Spark 大数据入门清单:AHP法分析顾客终身价值得分

总结

参考文献


   转载规则


《Spark 大数据入门清单:RFM方法分析用户评级》 Harbor Zeng 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
Spark 大数据入门清单:AHP法分析顾客终身价值得分 Spark 大数据入门清单:AHP法分析顾客终身价值得分
什么是 AHP 法 层次分析法(The analytic hierarchy process,简称AHP),也称层级分析法。 层次分析法的基本思路与人对一个复杂的决策问题的思维、判断过程大体上是一样的。比如: 买钢笔,一般要依据质量、颜色、实用性、价格、外形等方面的因素选择某一支钢笔。 假期旅游,是去风光秀丽的苏州,还是去迷人的北戴河,或者是去山水甲天下的桂林,那一般会依据景色、费用、食宿条件
下一篇 
Spark 大数据入门清单:Local 模式启动 Spark 应用 Spark 大数据入门清单:Local 模式启动 Spark 应用
local 模式启动 spark 应用的过程,其实非常简单,只需简单的几步就能完成。 前言 在上一篇文章中,我们说到: Local 模式即单机模式,也就是完全体会不到分布式的好处的一种模式。 如果在命令语句中不加任何配置,则默认是 Local 模式,在本地运行。 这也是最简单的一种模式,所有的 Spark 进程都运行在一台机器或一个虚拟机上面。 那么本章,我们一起来看看,如何搭建 Local
  目录