RFM Analysis for Customer Segmentation | in Python using by Azure Databricks

Irakli DD
4 min readOct 28, 2022

The idea of RFM analysis is to segment customers based on when their last purchase was, how often they’ve purchased in the past, and how much they have spent overall.
After the RFM analysis, we can draw conclusions, for example, if the customer has spent a lot of money, he used to buy often, but recently he doesn’t buy anything, we can assume that they don’t like something and we can make an special offer for them.

Azure Databricks uses Apache Spark Structured Streaming to work with streaming data and incremental data changes.
So all functions that we can implement in PySpark, we can also use it in Databricks.

RFM stands for recency, frequency, and monetary value.
#from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import functions as f
appName = "RFM - Analysis"
master = "local"
conf = SparkConf() # create the configurationspark = SparkSession.builder \
.config(conf=conf) \
.master(master) \
.appName(appName) \
.getOrCreate()
sc=spark.sparkContext
# Load data from 0.csv to df_0 (Here we have historical data)df_0 = spark.read.option("header",True)\
.schema("DATE_ String, COSTUMER_ID bigint, EXP_AMOUNT double")\
.csv("/FileStore/tables/0.csv", inferSchema =True)
# Load new, current day's data from 13_10_2022.csv to df_1df_1 = spark.read.option("header",True)\
.schema("DATE_ String, COSTUMER_ID bigint, EXP_AMOUNT double")\
.csv("/FileStore/tables/13_10_2022.csv", inferSchema =True)
# and combine themdf = df_0.unionAll(df_1)
# Create new DataFrame
# with a new column and convert string to Date
df_new = df.withColumn("SpinDate_", f.to_date(f.col("DATE_"),"M/d/y")).select('DATE_', 'COSTUMER_ID', 'EXP_AMOUNT')
# create new DataFrame df_col with new columns:
# "all_amount", "last_week" amount and "last_day" amount
# using by f function "case switcher"
df_col = df_new.select('COSTUMER_ID', f.col('EXP_AMOUNT').alias("all_amount") ,\
f.expr('case when DATE_ > "2022-10-06" and spindate_ <= "2022-10-13" then EXP_AMOUNT else 0 end as last_week'),\
f.expr('case when DATE_ = "2022-10-13" then EXP_AMOUNT else 0 end as last_day'),\
)
# Agregate DataFrame by COSTUMER_ID and summerizedf_agg = df_col.groupBy('COSTUMER_ID').agg(
f.sum('all_amount').alias('all_amount'),
f.sum('last_week').alias('last_week'),
f.sum('last_day').alias('last_day'),
f.count('user_id_').alias('frequency')
)
# Describe with percentile and calculate scoredf_score = df_agg.select('all_amount', 'last_week', 'last_day', 'frequency').summary("25%", "50%", "75%").collect()
df_agg.summary().show()
# percentiles (how to use)# 25%
df_score[0][1] # all_amount
df_score[0][2] # last_week
df_score[0][3] # last_day
df_score[0][4] # frequency
# 50%
df_score[1][1] # all_amount
df_score[1][2] # last_week
df_score[1][3] # last_day
df_score[1][4] # frequency
# 75%
df_score[2][1] # all_amount
df_score[2][2] # last_week
df_score[2][3] # last_day
df_score[2][4] # frequency
df_agg.show(3)
# set koefficient (0-25% 1, 25-50% 2, 50-75% 3, 75-100% 4)df_agg_K = \
df_agg.select('*')\
.withColumn(
"all_amount_K", f.when( df_agg.all_amount > df_score[2][1], 4)
.when( df_agg.all_amount > df_score[1][1], 3)
.when( df_agg.all_amount > df_score[0][1], 2)
.otherwise(1)
)\
.withColumn(
"last_week_K", f.when( df_agg.last_week > df_score[2][2], 4)
.when( df_agg.last_week > df_score[1][2], 3)
.when( df_agg.last_week > df_score[0][2], 2)
.otherwise(1)
)\
.withColumn(
"last_day_K", f.when( df_agg.last_day > df_score[2][3], 4)
.when( df_agg.last_day > df_score[1][3], 3)
.when( df_agg.last_day > df_score[0][3], 2)
.otherwise(1)
)\
.withColumn(
"frequency_K", f.when( df_agg.frequency > df_score[2][4], 4)
.when( df_agg.frequency > df_score[1][4], 3)
.when( df_agg.frequency > df_score[0][4], 2)
.otherwise(1)
)
df_agg_K.show(3)
# Calculate RFMScore (conditional designation)df_agg_K = df_agg_K.withColumn('RFMScore',
f.concat(
f.col('all_amount_K'), f.col('last_week_K'), f.col('last_day_K'), f.col('frequency_K')
))
#df_summ = df_agg_K.groupBy('RFMScore')\
.agg({'all_amount':'mean', 'last_week': 'mean', 'last_day': 'mean', 'frequency': 'mean'})\
.sort(f.col('RFMScore'))
df_summ.show()

So, in this picture first costumer that with RFM score 4114 means they has:

df_agg_K.show(3)

4 - higher amount of money spended (more than 75% percentile),
1 - lower amount of money spend they last week
(less than 25% percentile),
1 -lower amount of money spend they last day,
4 -they spends very frequently
Therefore, we can assume that, this costumer is the most active and the most spender in our market, but for the last week it is no longer active.

--

--