create SparkSession¶

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("market_basket_assoc").getOrCreate() 
spark
Out[1]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.4.0
Master
local[*]
AppName
market_basket_assoc

util functions and display settings¶

In [2]:
####

from datetime import datetime
from pytz import timezone
####
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler
####

def print_now():
    #from datetime import datetime
    #from pytz import timezone
    
    format = "%Y-%m-%d %H:%M:%S %Z%z"
    now_utc = datetime.now(timezone('UTC'))
    SGT =  timezone('Asia/Singapore')
    now_local = now_utc.astimezone(SGT)
    print(now_local.strftime(format))

    
def load_csv(name):
    path = "./"
    # Some csv data
    pnl = spark.read.csv(path+name,inferSchema=True,header=True)
    return pnl

def show_stats(df,input_columns,dependent_var):
    print('just some stats on distributions')
    df.groupBy(dependent_var).count().show()
    df.groupBy(input_columns).count().show()
In [3]:
import pandas as pd
pd.options.display.max_rows = None
pd.options.display.max_columns = None 
pd.options.display.max_colwidth = None
pd.options.display.float_format = lambda x: '{:,.2f}'.format(x)
In [4]:
print_now()
2023-05-10 22:15:57 +08+0800

loading the two csv files¶

In [5]:
#rerunnable
print_now()
EOD= "2023-05-06"
fname_v1 = "fake-pnl-report.2023-05-06.csv"
fname_v2 = "fake-pnl-report.2023-05-06.fabricated.csv"
pnl_v1 = load_csv(fname_v1)
pnl_v2 = load_csv(fname_v2)
2023-05-10 22:15:57 +08+0800
In [6]:
pnl_v1.limit(3).toPandas()  #pnl_v1.show(3)
Out[6]:
id trade_date exp_date trader trade_ccy product disc_curve fx_cut market PnL_ccy delim uPnL rPnL PnL
0 0 2023-05-04 2023-06-09 Shannon Byrd USDALL fx swap USDALL-GE TKY1500 GE USD : -7,408,923.84 -1,466,436.87 -8,875,360.71
1 1 2023-05-14 2023-06-25 Ashley Wilson USDBWP fx spot USDBWP-GM TKY1500 GM USD : -5,226,084.90 -8,085,982.90 -13,312,067.80
2 2 2023-04-29 2023-06-02 Jasmine Baker USDFJD fx spot USDFJD-AG NYC1000 AG USD : 6,394,191.51 819,916.38 7,214,107.89
In [7]:
pnl_v2.limit(3).toPandas()  #pnl_v2.show(3)
Out[7]:
id trade_date exp_date trader trade_ccy product disc_curve fx_cut market PnL_ccy delim uPnL rPnL PnL trade_date_repdate_diff exp_date_repdate_diff
0 0 2023-05-04 2023-06-09 Shannon Byrd USDALL fx swap USDALL-GE TKY1500 GE USD : -7,408,923.84 -1,466,436.87 -8,875,360.71 2 -34
1 1 2023-05-14 2023-06-25 Ashley Wilson USDBWP fx spot USDBWP-GM TKY1500 GM USD : -5,226,084.90 -8,085,982.90 -13,312,067.80 -8 -50
2 2 2023-04-29 2023-06-02 Jasmine Baker USDFJD fx spot USDFJD-AG NYC1000 AG USD : 6,394,191.51 819,916.38 7,214,107.89 7 -27
In [8]:
# rerunnable.
# renaming the RIGHTMOST fields of the two dataframes so as they are ready for join.
dx = pnl_v1.columns.index('delim')+1
join_cols = pnl_v1.columns[:dx]
assert join_cols[-1] == 'delim'
# renaming some cols:
pnl_v1r = pnl_v1
pnl_v2r = pnl_v2
for c in pnl_v1.columns[dx:]:
    pnl_v1r = pnl_v1r.withColumnRenamed(c,c+"_v1")
    pnl_v2r = pnl_v2r.withColumnRenamed(c,c+"_v2")
# pnl_v1r.show(3): the rightmost columns will have a name _v1
# pnl_v2r.show(3): the rightmost columns will have a name _v2
In [9]:
use_hash = False
# when True I will have all the left columns duplicated in the join df
In [10]:
# rerunnable.
# I compute the md5 of the concatenated string with the values of all the joinable colums
if use_hash:
    pnl_v1rh = pnl_v1r.withColumn('hashcolname',md5(concat_ws('#',array(join_cols))))
    pnl_v2rh = pnl_v2r.withColumn('hashcolname',md5(concat_ws('#',array(join_cols))))
else:
    pnl_v1rh = pnl_v1r
    pnl_v2rh = pnl_v2r

building the join table and flagging the differences in the dependent variables¶

i.e. the columns on the right¶

In [11]:
# rerunnable.
if use_hash:
    pnl_v12 =  pnl_v1rh.join(other=pnl_v2rh,on='hashcolname',how='fullouter') # we assumed inner and fullouter will return the same result
else:
    pnl_v12 = pnl_v1rh.join(other=pnl_v2rh,on=join_cols,how='fullouter') # we assumed inner and fullouter will return the same result

pop_rc = pnl_v12.count()
print('pop_rc:',pop_rc)
pnl_v12.limit(3).toPandas()
pop_rc: 500000
Out[11]:
id trade_date exp_date trader trade_ccy product disc_curve fx_cut market PnL_ccy delim uPnL_v1 rPnL_v1 PnL_v1 uPnL_v2 rPnL_v2 PnL_v2 trade_date_repdate_diff exp_date_repdate_diff
0 5 2023-05-13 2023-05-14 Jennifer Jones USDZMW fx fwd USDZMW-GN NYC1000 GN USD : -9,869,409.75 -3,996,865.59 -13,866,275.34 -9,869,409.75 -3,996,865.59 -13,866,275.34 -7 -8
1 6 2023-05-11 2023-06-29 Rebecca Flores USDMNT fx spot USDMNT-GD NYC1000 GD USD : -996,809.37 -8,099,321.43 -9,096,130.80 -996,809.37 -8,099,321.43 -9,096,130.80 -5 -54
2 8 2023-05-02 2023-05-08 John Franklin USDLSL fx futures USDLSL-TH TKY1500 TH USD : 6,089,738.13 2,395,105.79 8,484,843.92 6,089,738.13 2,395,105.79 8,484,843.92 4 -2
In [12]:
# rerunnable.
pnl_v12b = pnl_v12
for cc in ["uPnL","rPnL","PnL"]:
    print('adding a diff boolean flag for column',cc,'as',cc+"_isdiff")
    pnl_v12b = pnl_v12b.withColumn(cc+"_isdiff", when(pnl_v12b[cc+"_v1"] == pnl_v12b[cc+"_v2"],0).otherwise(1))
    rc = pnl_v12b.filter(pnl_v12b[cc+"_isdiff"]==lit(1)).count()
    print('diff rowcount:',rc,'- min support should be:',rc/pop_rc) 
adding a diff boolean flag for column uPnL as uPnL_isdiff
diff rowcount: 2090 - min support should be: 0.00418
adding a diff boolean flag for column rPnL as rPnL_isdiff
diff rowcount: 756 - min support should be: 0.001512
adding a diff boolean flag for column PnL as PnL_isdiff
diff rowcount: 199265 - min support should be: 0.39853
In [13]:
# rerunnable.
print("adding a signal: exp_date == report_date - 1")
T_plus_n_watch = -1
pnl_v12bS = pnl_v12b.withColumn("is_exp_date_m1", when(pnl_v12b.exp_date_repdate_diff == lit(T_plus_n_watch),1).otherwise(0))
print('count of diff:',pnl_v12bS.filter(pnl_v12bS["is_exp_date_m1"]==lit(1)).count())
adding a signal: exp_date == report_date - 1
count of diff: 6073
In [14]:
# rerunnable.
# disposable columns...
pnl_v12bS = pnl_v12bS.drop("trade_date_repdate_diff")
pnl_v12bS = pnl_v12bS.drop("exp_date_repdate_diff")

the table ready for the rule mining:¶

In [15]:
# pnl_v12bS.printSchema()
# pnl_v12bS.columns
In [16]:
pnl_v12bS.limit(3).toPandas()
Out[16]:
id trade_date exp_date trader trade_ccy product disc_curve fx_cut market PnL_ccy delim uPnL_v1 rPnL_v1 PnL_v1 uPnL_v2 rPnL_v2 PnL_v2 uPnL_isdiff rPnL_isdiff PnL_isdiff is_exp_date_m1
0 5 2023-05-13 2023-05-14 Jennifer Jones USDZMW fx fwd USDZMW-GN NYC1000 GN USD : -9,869,409.75 -3,996,865.59 -13,866,275.34 -9,869,409.75 -3,996,865.59 -13,866,275.34 0 0 0 0
1 6 2023-05-11 2023-06-29 Rebecca Flores USDMNT fx spot USDMNT-GD NYC1000 GD USD : -996,809.37 -8,099,321.43 -9,096,130.80 -996,809.37 -8,099,321.43 -9,096,130.80 0 0 1 0
2 8 2023-05-02 2023-05-08 John Franklin USDLSL fx futures USDLSL-TH TKY1500 TH USD : 6,089,738.13 2,395,105.79 8,484,843.92 6,089,738.13 2,395,105.79 8,484,843.92 0 0 0 0

start of rule-mining¶

In [17]:
from pyspark.sql.functions import concat
In [18]:
def get_basket(pnl,rule_antecedent_cols,rule_consequent_col):
    # https://stackoverflow.com/questions/51325092/pyspark-fp-growth-algorithm-raise-valueerrorparams-must-be-either-a-param
    # you cannot have an array in a cell containing 0 multiple times. array items must be unique. so:
    rule_ac = rule_antecedent_cols + rule_consequent_col
    for rulecol in rule_ac:
        # every value in every column in rule_ac is prefixed with its own column name and "=".
        pnl = pnl.withColumn(rulecol, concat(lit(rulecol+"="),col(rulecol)))

    return pnl.select('id',array(rule_ac).alias("items"))
In [19]:
def get_itempopularity(model,rule_consequent_cols):
    assert len(rule_consequent_cols)==1
    watch_conseq = rule_consequent_cols[0] + "=1" # 'uPnL_diff=1'
    print('watch_conseq:',watch_conseq)
    watch_alias = 'is_conseq_in'
    #colz_alias = 'diff_included_in_C'

    itempopularity = model.freqItemsets
    # ... FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead
    # ... not under my control

    itempopularity.createOrReplaceTempView("itempopularity")
    # Then Query the temp view
    #print("Top 20")
    dfo = spark.sql("SELECT * FROM itempopularity ORDER BY freq desc")
    #dfo.printSchema()
    dofd=dfo.select('items','freq',size(dfo.items).alias('size'),array_contains(dfo.items, lit(watch_conseq)).alias(watch_alias))
        # ... .where(  # .collect()
    return dofd
In [20]:
def get_relevantbasketpopularity(ipdf):
    watch_alias = 'is_conseq_in'
    return ipdf.where('size >= 2 and '+ watch_alias) # do not include the baskets with a single item
In [21]:
def show_assoc(model,display,rule_consequent_cols):
    watch_conseq = rule_consequent_cols[0] + "=1" # 'uPnL_diff=1'
    print('watching conseq:',watch_conseq)
    #watch_alias = 'is_conseq_in'
    colz_alias = 'diff_included_in_C'

    assoc = model.associationRules
    assoc.createOrReplaceTempView("assoc")
    # Then Query the temp view
    df2a = spark.sql("SELECT * FROM assoc ORDER BY confidence desc")
    df2b = df2a.select('antecedent','consequent','confidence','lift','support', \
             size(df2a.antecedent).alias('lenA'),size(df2a.consequent).alias('lenC'), \
             array_contains(df2a.consequent,watch_conseq).alias(colz_alias))
    dofdx2 = df2b.where(colz_alias)
    
    if display == "all,confid desc":
        return df2a
    if display == "all+criteria":
        return df2b
    if display == "top assoc":
        # df2b = show_assoc(model,"all+criteria
        # dofdx2 = df2b.where('lenC==1 and Cisdpnlnz')
        return dofdx2.orderBy(col("lift").desc(),col("lenA").asc())
    if display == "top assoc for csv output":
        dofdx2noa = dofdx2.withColumn("antecedent", concat_ws(",",col("antecedent")))
        dofdx2noa = dofdx2noa.withColumn("consequent", concat_ws(",",col("consequent")))
        return dofdx2noa
            ## dofdx2noa.toPandas().to_csv(fname_v1+".output."+rule_consequent_cols[0]+".assoc.csv")
            # dofdx2noa.coalesce(1).write.option("header",True).mode('overwrite').csv(fname_v1+".assoc.csv")
            # thanks https://stackoverflow.com/questions/43661660/spark-how-to-write-a-single-csv-file-without-folder

setting the consequent¶

In [22]:
minSupport_ = 0.0012
minConfidence_ = 0.9

consequent 1: rPnL¶

In [23]:
# this pandas is now ready for rule-mining.
rule_antecedent_cols = [ "trade_ccy","product","disc_curve","fx_cut","market","PnL_ccy","is_exp_date_m1" ]
rule_consequent_cols = ["rPnL_isdiff"]
#rule_consequent_cols = [ "uPnL_diff"] # , "rPnL_diff"] # , "PnL_diff" ]

# ac = antec and conseq

# goal: to build datafram pnl3 with items column with the fields above
In [24]:
pnlb = get_basket(pnl_v12bS,rule_antecedent_cols,rule_consequent_cols)
pnlb.limit(3).toPandas()
Out[24]:
id items
0 5 [trade_ccy=USDZMW, product=fx fwd, disc_curve=USDZMW-GN, fx_cut=NYC1000, market=GN, PnL_ccy=USD, is_exp_date_m1=0, rPnL_isdiff=0]
1 6 [trade_ccy=USDMNT, product=fx spot, disc_curve=USDMNT-GD, fx_cut=NYC1000, market=GD, PnL_ccy=USD, is_exp_date_m1=0, rPnL_isdiff=0]
2 8 [trade_ccy=USDLSL, product=fx futures, disc_curve=USDLSL-TH, fx_cut=TKY1500, market=TH, PnL_ccy=USD, is_exp_date_m1=0, rPnL_isdiff=0]
In [25]:
# above are the "market baskets", each with a number of items. Let's mine antecedent/consequent rules.
In [26]:
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.fpm.FPGrowth.html = 3.4.0
# https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.fpm.FPGrowth.html
import time
from pyspark.ml.fpm import FPGrowth
fpGrowth = FPGrowth(itemsCol="items", minSupport=minSupport_, minConfidence=minConfidence_) # minSupport=0.2, minConfidence=0.1
model = fpGrowth.fit(pnlb)

# now you have the model.

by the way - see items' popularity¶

In [27]:
epoch_a = time.time()
ipdf = get_itempopularity(model,rule_consequent_cols)
print('df rowcount:',ipdf.count())
epoch_z = time.time()
#print('elapsed:',round(epoch_z - epoch_a,1),'s')
print("elapsed: %.1f s" % (epoch_z - epoch_a))
ipdf.limit(5).toPandas()    
watch_conseq: rPnL_isdiff=1
df rowcount: 22305
elapsed: 22.0 s
Out[27]:
items freq size is_conseq_in
0 [rPnL_isdiff=0] 499244 1 False
1 [is_exp_date_m1=0] 493927 1 False
2 [is_exp_date_m1=0, rPnL_isdiff=0] 493177 2 False
3 [PnL_ccy=USD] 484878 1 False
4 [PnL_ccy=USD, rPnL_isdiff=0] 484878 2 False

by the way - see relevant baskets popularity¶

In [28]:
dofdx = get_relevantbasketpopularity(ipdf)
dofdx.limit(10).toPandas()
Out[28]:
items freq size is_conseq_in
0 [rPnL_isdiff=1, PnL_ccy=AUD] 756 2 True
1 [rPnL_isdiff=1, PnL_ccy=AUD, product=fx fwd] 756 3 True
2 [rPnL_isdiff=1, PnL_ccy=AUD, trade_ccy=AUDUSD] 756 3 True
3 [rPnL_isdiff=1, PnL_ccy=AUD, trade_ccy=AUDUSD, product=fx fwd] 756 4 True
4 [rPnL_isdiff=1, product=fx fwd] 756 2 True
5 [rPnL_isdiff=1, trade_ccy=AUDUSD] 756 2 True
6 [rPnL_isdiff=1, trade_ccy=AUDUSD, product=fx fwd] 756 3 True
7 [rPnL_isdiff=1, is_exp_date_m1=0] 750 2 True
8 [rPnL_isdiff=1, PnL_ccy=AUD, is_exp_date_m1=0] 750 3 True
9 [rPnL_isdiff=1, PnL_ccy=AUD, product=fx fwd, is_exp_date_m1=0] 750 4 True
In [29]:
if False:
    df2a = show_assoc(model,"all,confid desc",rule_consequent_cols)
    print("Top 20:")
    df2a.limit(20).toPandas()
In [30]:
if False:
    df2b = show_assoc(model,"all+criteria",rule_consequent_cols)
    df2b.limit(20).toPandas()

show antec -> conseq order by lift DESC, len(antecs) ASC¶

In [31]:
dofdx2 = show_assoc(model,"top assoc",rule_consequent_cols)
dofdx2.limit(20).toPandas()
watching conseq: rPnL_isdiff=1
Out[31]:
antecedent consequent confidence lift support lenA lenC diff_included_in_C
0 [PnL_ccy=AUD, product=fx fwd] [rPnL_isdiff=1] 1.00 661.38 0.00 2 1 True
1 [trade_ccy=AUDUSD, product=fx fwd] [rPnL_isdiff=1] 1.00 661.38 0.00 2 1 True
2 [trade_ccy=AUDUSD, product=fx fwd, is_exp_date_m1=0] [rPnL_isdiff=1] 1.00 661.38 0.00 3 1 True
3 [PnL_ccy=AUD, product=fx fwd, is_exp_date_m1=0] [rPnL_isdiff=1] 1.00 661.38 0.00 3 1 True
4 [PnL_ccy=AUD, trade_ccy=AUDUSD, product=fx fwd] [rPnL_isdiff=1] 1.00 661.38 0.00 3 1 True
5 [PnL_ccy=AUD, trade_ccy=AUDUSD, product=fx fwd, is_exp_date_m1=0] [rPnL_isdiff=1] 1.00 661.38 0.00 4 1 True

comments:¶

  • The 2nd implied rule (row 1) is exactly the rule that was applied to introduce fabricated differences.
  • The 1st implied rule (row 0) is a byproduct of rule 2: in our input, trade_ccy=AUDUSD ⇒ PnL_ccy=AUD

saving to csv¶

In [32]:
dofdx2o = show_assoc(model,"top assoc for csv output",rule_consequent_cols)
dofdx2o.toPandas().to_csv(fname_v1+".output.conseq-"+rule_consequent_cols[0]+".assoc.csv")
watching conseq: rPnL_isdiff=1
In [33]:
## .. the most recurring combination of antecedents'values for teh consequent
In [34]:
print_now()
2023-05-10 22:20:04 +08+0800

consequent 2: uPnL¶

In [35]:
# this pandas is now ready for rule-mining.
rule_antecedent_cols = [ "trade_ccy","product","disc_curve","fx_cut","market","PnL_ccy","is_exp_date_m1" ]
rule_consequent_cols = ["uPnL_isdiff"]
#rule_consequent_cols = [ "uPnL_diff"] # , "rPnL_diff"] # , "PnL_diff" ]

# ac = antec and conseq

# goal: to build datafram pnl3 with items column with the fields above
In [36]:
pnlb = get_basket(pnl_v12bS,rule_antecedent_cols,rule_consequent_cols)
pnlb.limit(3).toPandas()
Out[36]:
id items
0 5 [trade_ccy=USDZMW, product=fx fwd, disc_curve=USDZMW-GN, fx_cut=NYC1000, market=GN, PnL_ccy=USD, is_exp_date_m1=0, uPnL_isdiff=0]
1 6 [trade_ccy=USDMNT, product=fx spot, disc_curve=USDMNT-GD, fx_cut=NYC1000, market=GD, PnL_ccy=USD, is_exp_date_m1=0, uPnL_isdiff=0]
2 8 [trade_ccy=USDLSL, product=fx futures, disc_curve=USDLSL-TH, fx_cut=TKY1500, market=TH, PnL_ccy=USD, is_exp_date_m1=0, uPnL_isdiff=0]
In [37]:
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.fpm.FPGrowth.html = 3.4.0
# https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.fpm.FPGrowth.html
import time
from pyspark.ml.fpm import FPGrowth
fpGrowth = FPGrowth(itemsCol="items", minSupport=minSupport_, minConfidence=minConfidence_) # minSupport=0.2, minConfidence=0.1
model = fpGrowth.fit(pnlb)

# now you have the model.

show antec -> conseq order by lift DESC, len(antecs) ASC¶

In [38]:
dofdx2 = show_assoc(model,"top assoc",rule_consequent_cols)
dofdx2.limit(20).toPandas()
watching conseq: uPnL_isdiff=1
Out[38]:
antecedent consequent confidence lift support lenA lenC diff_included_in_C
0 [is_exp_date_m1=1, fx_cut=TKY1500] [uPnL_isdiff=1] 1.00 239.23 0.00 2 1 True
1 [is_exp_date_m1=1, fx_cut=TKY1500, PnL_ccy=USD] [uPnL_isdiff=1] 1.00 239.23 0.00 3 1 True

comments:¶

  • The 1st implied rule (row 0) is exactly the rule that was applied to introduce fabricated differences.
  • The 2nd implied rule (row 1) is less general than rule 1.

saving to csv¶

In [39]:
dofdx2o = show_assoc(model,"top assoc for csv output",rule_consequent_cols)
dofdx2o.toPandas().to_csv(fname_v1+".output.conseq-"+rule_consequent_cols[0]+".assoc.csv")
watching conseq: uPnL_isdiff=1
In [40]:
print_now()
2023-05-10 22:21:31 +08+0800

reminder on the rules applied to csv v1 to obtain csv v2:¶

  • fabricated diff 1 was applied to field rPNL when: trade_ccy=AUDUSD & product=fx fwd
  • fabricated diff 2 was applied to field uPNL when: fx_cut=TKY1500 & exp_dt=T-1
  • fabricated diff 3 was applied to field PNL as sum of rPNL and uPNL