import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("market_basket_assoc").getOrCreate()
spark
SparkSession - in-memory
####
from datetime import datetime
from pytz import timezone
####
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler
####
def print_now():
#from datetime import datetime
#from pytz import timezone
format = "%Y-%m-%d %H:%M:%S %Z%z"
now_utc = datetime.now(timezone('UTC'))
SGT = timezone('Asia/Singapore')
now_local = now_utc.astimezone(SGT)
print(now_local.strftime(format))
def load_csv(name):
path = "./"
# Some csv data
pnl = spark.read.csv(path+name,inferSchema=True,header=True)
return pnl
def show_stats(df,input_columns,dependent_var):
print('just some stats on distributions')
df.groupBy(dependent_var).count().show()
df.groupBy(input_columns).count().show()
import pandas as pd
pd.options.display.max_rows = None
pd.options.display.max_columns = None
pd.options.display.max_colwidth = None
pd.options.display.float_format = lambda x: '{:,.2f}'.format(x)
print_now()
2023-05-10 22:15:57 +08+0800
#rerunnable
print_now()
EOD= "2023-05-06"
fname_v1 = "fake-pnl-report.2023-05-06.csv"
fname_v2 = "fake-pnl-report.2023-05-06.fabricated.csv"
pnl_v1 = load_csv(fname_v1)
pnl_v2 = load_csv(fname_v2)
2023-05-10 22:15:57 +08+0800
pnl_v1.limit(3).toPandas() #pnl_v1.show(3)
| id | trade_date | exp_date | trader | trade_ccy | product | disc_curve | fx_cut | market | PnL_ccy | delim | uPnL | rPnL | PnL | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 0 | 2023-05-04 | 2023-06-09 | Shannon Byrd | USDALL | fx swap | USDALL-GE | TKY1500 | GE | USD | : | -7,408,923.84 | -1,466,436.87 | -8,875,360.71 |
| 1 | 1 | 2023-05-14 | 2023-06-25 | Ashley Wilson | USDBWP | fx spot | USDBWP-GM | TKY1500 | GM | USD | : | -5,226,084.90 | -8,085,982.90 | -13,312,067.80 |
| 2 | 2 | 2023-04-29 | 2023-06-02 | Jasmine Baker | USDFJD | fx spot | USDFJD-AG | NYC1000 | AG | USD | : | 6,394,191.51 | 819,916.38 | 7,214,107.89 |
pnl_v2.limit(3).toPandas() #pnl_v2.show(3)
| id | trade_date | exp_date | trader | trade_ccy | product | disc_curve | fx_cut | market | PnL_ccy | delim | uPnL | rPnL | PnL | trade_date_repdate_diff | exp_date_repdate_diff | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 0 | 2023-05-04 | 2023-06-09 | Shannon Byrd | USDALL | fx swap | USDALL-GE | TKY1500 | GE | USD | : | -7,408,923.84 | -1,466,436.87 | -8,875,360.71 | 2 | -34 |
| 1 | 1 | 2023-05-14 | 2023-06-25 | Ashley Wilson | USDBWP | fx spot | USDBWP-GM | TKY1500 | GM | USD | : | -5,226,084.90 | -8,085,982.90 | -13,312,067.80 | -8 | -50 |
| 2 | 2 | 2023-04-29 | 2023-06-02 | Jasmine Baker | USDFJD | fx spot | USDFJD-AG | NYC1000 | AG | USD | : | 6,394,191.51 | 819,916.38 | 7,214,107.89 | 7 | -27 |
# rerunnable.
# renaming the RIGHTMOST fields of the two dataframes so as they are ready for join.
dx = pnl_v1.columns.index('delim')+1
join_cols = pnl_v1.columns[:dx]
assert join_cols[-1] == 'delim'
# renaming some cols:
pnl_v1r = pnl_v1
pnl_v2r = pnl_v2
for c in pnl_v1.columns[dx:]:
pnl_v1r = pnl_v1r.withColumnRenamed(c,c+"_v1")
pnl_v2r = pnl_v2r.withColumnRenamed(c,c+"_v2")
# pnl_v1r.show(3): the rightmost columns will have a name _v1
# pnl_v2r.show(3): the rightmost columns will have a name _v2
use_hash = False
# when True I will have all the left columns duplicated in the join df
# rerunnable.
# I compute the md5 of the concatenated string with the values of all the joinable colums
if use_hash:
pnl_v1rh = pnl_v1r.withColumn('hashcolname',md5(concat_ws('#',array(join_cols))))
pnl_v2rh = pnl_v2r.withColumn('hashcolname',md5(concat_ws('#',array(join_cols))))
else:
pnl_v1rh = pnl_v1r
pnl_v2rh = pnl_v2r
# rerunnable.
if use_hash:
pnl_v12 = pnl_v1rh.join(other=pnl_v2rh,on='hashcolname',how='fullouter') # we assumed inner and fullouter will return the same result
else:
pnl_v12 = pnl_v1rh.join(other=pnl_v2rh,on=join_cols,how='fullouter') # we assumed inner and fullouter will return the same result
pop_rc = pnl_v12.count()
print('pop_rc:',pop_rc)
pnl_v12.limit(3).toPandas()
pop_rc: 500000
| id | trade_date | exp_date | trader | trade_ccy | product | disc_curve | fx_cut | market | PnL_ccy | delim | uPnL_v1 | rPnL_v1 | PnL_v1 | uPnL_v2 | rPnL_v2 | PnL_v2 | trade_date_repdate_diff | exp_date_repdate_diff | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 5 | 2023-05-13 | 2023-05-14 | Jennifer Jones | USDZMW | fx fwd | USDZMW-GN | NYC1000 | GN | USD | : | -9,869,409.75 | -3,996,865.59 | -13,866,275.34 | -9,869,409.75 | -3,996,865.59 | -13,866,275.34 | -7 | -8 |
| 1 | 6 | 2023-05-11 | 2023-06-29 | Rebecca Flores | USDMNT | fx spot | USDMNT-GD | NYC1000 | GD | USD | : | -996,809.37 | -8,099,321.43 | -9,096,130.80 | -996,809.37 | -8,099,321.43 | -9,096,130.80 | -5 | -54 |
| 2 | 8 | 2023-05-02 | 2023-05-08 | John Franklin | USDLSL | fx futures | USDLSL-TH | TKY1500 | TH | USD | : | 6,089,738.13 | 2,395,105.79 | 8,484,843.92 | 6,089,738.13 | 2,395,105.79 | 8,484,843.92 | 4 | -2 |
# rerunnable.
pnl_v12b = pnl_v12
for cc in ["uPnL","rPnL","PnL"]:
print('adding a diff boolean flag for column',cc,'as',cc+"_isdiff")
pnl_v12b = pnl_v12b.withColumn(cc+"_isdiff", when(pnl_v12b[cc+"_v1"] == pnl_v12b[cc+"_v2"],0).otherwise(1))
rc = pnl_v12b.filter(pnl_v12b[cc+"_isdiff"]==lit(1)).count()
print('diff rowcount:',rc,'- min support should be:',rc/pop_rc)
adding a diff boolean flag for column uPnL as uPnL_isdiff diff rowcount: 2090 - min support should be: 0.00418 adding a diff boolean flag for column rPnL as rPnL_isdiff diff rowcount: 756 - min support should be: 0.001512 adding a diff boolean flag for column PnL as PnL_isdiff diff rowcount: 199265 - min support should be: 0.39853
# rerunnable.
print("adding a signal: exp_date == report_date - 1")
T_plus_n_watch = -1
pnl_v12bS = pnl_v12b.withColumn("is_exp_date_m1", when(pnl_v12b.exp_date_repdate_diff == lit(T_plus_n_watch),1).otherwise(0))
print('count of diff:',pnl_v12bS.filter(pnl_v12bS["is_exp_date_m1"]==lit(1)).count())
adding a signal: exp_date == report_date - 1 count of diff: 6073
# rerunnable.
# disposable columns...
pnl_v12bS = pnl_v12bS.drop("trade_date_repdate_diff")
pnl_v12bS = pnl_v12bS.drop("exp_date_repdate_diff")
# pnl_v12bS.printSchema()
# pnl_v12bS.columns
pnl_v12bS.limit(3).toPandas()
| id | trade_date | exp_date | trader | trade_ccy | product | disc_curve | fx_cut | market | PnL_ccy | delim | uPnL_v1 | rPnL_v1 | PnL_v1 | uPnL_v2 | rPnL_v2 | PnL_v2 | uPnL_isdiff | rPnL_isdiff | PnL_isdiff | is_exp_date_m1 | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 5 | 2023-05-13 | 2023-05-14 | Jennifer Jones | USDZMW | fx fwd | USDZMW-GN | NYC1000 | GN | USD | : | -9,869,409.75 | -3,996,865.59 | -13,866,275.34 | -9,869,409.75 | -3,996,865.59 | -13,866,275.34 | 0 | 0 | 0 | 0 |
| 1 | 6 | 2023-05-11 | 2023-06-29 | Rebecca Flores | USDMNT | fx spot | USDMNT-GD | NYC1000 | GD | USD | : | -996,809.37 | -8,099,321.43 | -9,096,130.80 | -996,809.37 | -8,099,321.43 | -9,096,130.80 | 0 | 0 | 1 | 0 |
| 2 | 8 | 2023-05-02 | 2023-05-08 | John Franklin | USDLSL | fx futures | USDLSL-TH | TKY1500 | TH | USD | : | 6,089,738.13 | 2,395,105.79 | 8,484,843.92 | 6,089,738.13 | 2,395,105.79 | 8,484,843.92 | 0 | 0 | 0 | 0 |
from pyspark.sql.functions import concat
def get_basket(pnl,rule_antecedent_cols,rule_consequent_col):
# https://stackoverflow.com/questions/51325092/pyspark-fp-growth-algorithm-raise-valueerrorparams-must-be-either-a-param
# you cannot have an array in a cell containing 0 multiple times. array items must be unique. so:
rule_ac = rule_antecedent_cols + rule_consequent_col
for rulecol in rule_ac:
# every value in every column in rule_ac is prefixed with its own column name and "=".
pnl = pnl.withColumn(rulecol, concat(lit(rulecol+"="),col(rulecol)))
return pnl.select('id',array(rule_ac).alias("items"))
def get_itempopularity(model,rule_consequent_cols):
assert len(rule_consequent_cols)==1
watch_conseq = rule_consequent_cols[0] + "=1" # 'uPnL_diff=1'
print('watch_conseq:',watch_conseq)
watch_alias = 'is_conseq_in'
#colz_alias = 'diff_included_in_C'
itempopularity = model.freqItemsets
# ... FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead
# ... not under my control
itempopularity.createOrReplaceTempView("itempopularity")
# Then Query the temp view
#print("Top 20")
dfo = spark.sql("SELECT * FROM itempopularity ORDER BY freq desc")
#dfo.printSchema()
dofd=dfo.select('items','freq',size(dfo.items).alias('size'),array_contains(dfo.items, lit(watch_conseq)).alias(watch_alias))
# ... .where( # .collect()
return dofd
def get_relevantbasketpopularity(ipdf):
watch_alias = 'is_conseq_in'
return ipdf.where('size >= 2 and '+ watch_alias) # do not include the baskets with a single item
def show_assoc(model,display,rule_consequent_cols):
watch_conseq = rule_consequent_cols[0] + "=1" # 'uPnL_diff=1'
print('watching conseq:',watch_conseq)
#watch_alias = 'is_conseq_in'
colz_alias = 'diff_included_in_C'
assoc = model.associationRules
assoc.createOrReplaceTempView("assoc")
# Then Query the temp view
df2a = spark.sql("SELECT * FROM assoc ORDER BY confidence desc")
df2b = df2a.select('antecedent','consequent','confidence','lift','support', \
size(df2a.antecedent).alias('lenA'),size(df2a.consequent).alias('lenC'), \
array_contains(df2a.consequent,watch_conseq).alias(colz_alias))
dofdx2 = df2b.where(colz_alias)
if display == "all,confid desc":
return df2a
if display == "all+criteria":
return df2b
if display == "top assoc":
# df2b = show_assoc(model,"all+criteria
# dofdx2 = df2b.where('lenC==1 and Cisdpnlnz')
return dofdx2.orderBy(col("lift").desc(),col("lenA").asc())
if display == "top assoc for csv output":
dofdx2noa = dofdx2.withColumn("antecedent", concat_ws(",",col("antecedent")))
dofdx2noa = dofdx2noa.withColumn("consequent", concat_ws(",",col("consequent")))
return dofdx2noa
## dofdx2noa.toPandas().to_csv(fname_v1+".output."+rule_consequent_cols[0]+".assoc.csv")
# dofdx2noa.coalesce(1).write.option("header",True).mode('overwrite').csv(fname_v1+".assoc.csv")
# thanks https://stackoverflow.com/questions/43661660/spark-how-to-write-a-single-csv-file-without-folder
minSupport_ = 0.0012
minConfidence_ = 0.9
# this pandas is now ready for rule-mining.
rule_antecedent_cols = [ "trade_ccy","product","disc_curve","fx_cut","market","PnL_ccy","is_exp_date_m1" ]
rule_consequent_cols = ["rPnL_isdiff"]
#rule_consequent_cols = [ "uPnL_diff"] # , "rPnL_diff"] # , "PnL_diff" ]
# ac = antec and conseq
# goal: to build datafram pnl3 with items column with the fields above
pnlb = get_basket(pnl_v12bS,rule_antecedent_cols,rule_consequent_cols)
pnlb.limit(3).toPandas()
| id | items | |
|---|---|---|
| 0 | 5 | [trade_ccy=USDZMW, product=fx fwd, disc_curve=USDZMW-GN, fx_cut=NYC1000, market=GN, PnL_ccy=USD, is_exp_date_m1=0, rPnL_isdiff=0] |
| 1 | 6 | [trade_ccy=USDMNT, product=fx spot, disc_curve=USDMNT-GD, fx_cut=NYC1000, market=GD, PnL_ccy=USD, is_exp_date_m1=0, rPnL_isdiff=0] |
| 2 | 8 | [trade_ccy=USDLSL, product=fx futures, disc_curve=USDLSL-TH, fx_cut=TKY1500, market=TH, PnL_ccy=USD, is_exp_date_m1=0, rPnL_isdiff=0] |
# above are the "market baskets", each with a number of items. Let's mine antecedent/consequent rules.
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.fpm.FPGrowth.html = 3.4.0
# https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.fpm.FPGrowth.html
import time
from pyspark.ml.fpm import FPGrowth
fpGrowth = FPGrowth(itemsCol="items", minSupport=minSupport_, minConfidence=minConfidence_) # minSupport=0.2, minConfidence=0.1
model = fpGrowth.fit(pnlb)
# now you have the model.
epoch_a = time.time()
ipdf = get_itempopularity(model,rule_consequent_cols)
print('df rowcount:',ipdf.count())
epoch_z = time.time()
#print('elapsed:',round(epoch_z - epoch_a,1),'s')
print("elapsed: %.1f s" % (epoch_z - epoch_a))
ipdf.limit(5).toPandas()
watch_conseq: rPnL_isdiff=1 df rowcount: 22305 elapsed: 22.0 s
| items | freq | size | is_conseq_in | |
|---|---|---|---|---|
| 0 | [rPnL_isdiff=0] | 499244 | 1 | False |
| 1 | [is_exp_date_m1=0] | 493927 | 1 | False |
| 2 | [is_exp_date_m1=0, rPnL_isdiff=0] | 493177 | 2 | False |
| 3 | [PnL_ccy=USD] | 484878 | 1 | False |
| 4 | [PnL_ccy=USD, rPnL_isdiff=0] | 484878 | 2 | False |
dofdx = get_relevantbasketpopularity(ipdf)
dofdx.limit(10).toPandas()
| items | freq | size | is_conseq_in | |
|---|---|---|---|---|
| 0 | [rPnL_isdiff=1, PnL_ccy=AUD] | 756 | 2 | True |
| 1 | [rPnL_isdiff=1, PnL_ccy=AUD, product=fx fwd] | 756 | 3 | True |
| 2 | [rPnL_isdiff=1, PnL_ccy=AUD, trade_ccy=AUDUSD] | 756 | 3 | True |
| 3 | [rPnL_isdiff=1, PnL_ccy=AUD, trade_ccy=AUDUSD, product=fx fwd] | 756 | 4 | True |
| 4 | [rPnL_isdiff=1, product=fx fwd] | 756 | 2 | True |
| 5 | [rPnL_isdiff=1, trade_ccy=AUDUSD] | 756 | 2 | True |
| 6 | [rPnL_isdiff=1, trade_ccy=AUDUSD, product=fx fwd] | 756 | 3 | True |
| 7 | [rPnL_isdiff=1, is_exp_date_m1=0] | 750 | 2 | True |
| 8 | [rPnL_isdiff=1, PnL_ccy=AUD, is_exp_date_m1=0] | 750 | 3 | True |
| 9 | [rPnL_isdiff=1, PnL_ccy=AUD, product=fx fwd, is_exp_date_m1=0] | 750 | 4 | True |
if False:
df2a = show_assoc(model,"all,confid desc",rule_consequent_cols)
print("Top 20:")
df2a.limit(20).toPandas()
if False:
df2b = show_assoc(model,"all+criteria",rule_consequent_cols)
df2b.limit(20).toPandas()
dofdx2 = show_assoc(model,"top assoc",rule_consequent_cols)
dofdx2.limit(20).toPandas()
watching conseq: rPnL_isdiff=1
| antecedent | consequent | confidence | lift | support | lenA | lenC | diff_included_in_C | |
|---|---|---|---|---|---|---|---|---|
| 0 | [PnL_ccy=AUD, product=fx fwd] | [rPnL_isdiff=1] | 1.00 | 661.38 | 0.00 | 2 | 1 | True |
| 1 | [trade_ccy=AUDUSD, product=fx fwd] | [rPnL_isdiff=1] | 1.00 | 661.38 | 0.00 | 2 | 1 | True |
| 2 | [trade_ccy=AUDUSD, product=fx fwd, is_exp_date_m1=0] | [rPnL_isdiff=1] | 1.00 | 661.38 | 0.00 | 3 | 1 | True |
| 3 | [PnL_ccy=AUD, product=fx fwd, is_exp_date_m1=0] | [rPnL_isdiff=1] | 1.00 | 661.38 | 0.00 | 3 | 1 | True |
| 4 | [PnL_ccy=AUD, trade_ccy=AUDUSD, product=fx fwd] | [rPnL_isdiff=1] | 1.00 | 661.38 | 0.00 | 3 | 1 | True |
| 5 | [PnL_ccy=AUD, trade_ccy=AUDUSD, product=fx fwd, is_exp_date_m1=0] | [rPnL_isdiff=1] | 1.00 | 661.38 | 0.00 | 4 | 1 | True |
dofdx2o = show_assoc(model,"top assoc for csv output",rule_consequent_cols)
dofdx2o.toPandas().to_csv(fname_v1+".output.conseq-"+rule_consequent_cols[0]+".assoc.csv")
watching conseq: rPnL_isdiff=1
## .. the most recurring combination of antecedents'values for teh consequent
print_now()
2023-05-10 22:20:04 +08+0800
# this pandas is now ready for rule-mining.
rule_antecedent_cols = [ "trade_ccy","product","disc_curve","fx_cut","market","PnL_ccy","is_exp_date_m1" ]
rule_consequent_cols = ["uPnL_isdiff"]
#rule_consequent_cols = [ "uPnL_diff"] # , "rPnL_diff"] # , "PnL_diff" ]
# ac = antec and conseq
# goal: to build datafram pnl3 with items column with the fields above
pnlb = get_basket(pnl_v12bS,rule_antecedent_cols,rule_consequent_cols)
pnlb.limit(3).toPandas()
| id | items | |
|---|---|---|
| 0 | 5 | [trade_ccy=USDZMW, product=fx fwd, disc_curve=USDZMW-GN, fx_cut=NYC1000, market=GN, PnL_ccy=USD, is_exp_date_m1=0, uPnL_isdiff=0] |
| 1 | 6 | [trade_ccy=USDMNT, product=fx spot, disc_curve=USDMNT-GD, fx_cut=NYC1000, market=GD, PnL_ccy=USD, is_exp_date_m1=0, uPnL_isdiff=0] |
| 2 | 8 | [trade_ccy=USDLSL, product=fx futures, disc_curve=USDLSL-TH, fx_cut=TKY1500, market=TH, PnL_ccy=USD, is_exp_date_m1=0, uPnL_isdiff=0] |
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.fpm.FPGrowth.html = 3.4.0
# https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.fpm.FPGrowth.html
import time
from pyspark.ml.fpm import FPGrowth
fpGrowth = FPGrowth(itemsCol="items", minSupport=minSupport_, minConfidence=minConfidence_) # minSupport=0.2, minConfidence=0.1
model = fpGrowth.fit(pnlb)
# now you have the model.
dofdx2 = show_assoc(model,"top assoc",rule_consequent_cols)
dofdx2.limit(20).toPandas()
watching conseq: uPnL_isdiff=1
| antecedent | consequent | confidence | lift | support | lenA | lenC | diff_included_in_C | |
|---|---|---|---|---|---|---|---|---|
| 0 | [is_exp_date_m1=1, fx_cut=TKY1500] | [uPnL_isdiff=1] | 1.00 | 239.23 | 0.00 | 2 | 1 | True |
| 1 | [is_exp_date_m1=1, fx_cut=TKY1500, PnL_ccy=USD] | [uPnL_isdiff=1] | 1.00 | 239.23 | 0.00 | 3 | 1 | True |
dofdx2o = show_assoc(model,"top assoc for csv output",rule_consequent_cols)
dofdx2o.toPandas().to_csv(fname_v1+".output.conseq-"+rule_consequent_cols[0]+".assoc.csv")
watching conseq: uPnL_isdiff=1
print_now()
2023-05-10 22:21:31 +08+0800