{ "cells": [ { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "7cc91738-9e29-4954-ae62-c5cbafe7fe50", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "# Flow Chart For Equity Transparency Calculations " ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "7c713ba2-5a21-4ba3-a87d-8a00c11a0fce", "showTitle": false, "tableResultSettingsMap": {}, "title": "" }, "vscode": { "languageId": "powershell" } }, "source": [ "#### Pip install the package using the link copied from GitHub " ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "add94f48-a78b-4986-9f0a-02ac346a57b0", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [], "source": [ "!pip install git+https://github.com/European-Securities-Markets-Authority/esma_data_py.git\n", "!pip install pandas\n", "!pip install plotly\n", "!pip install matplotlib" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "8409b025-658d-466e-a7b0-eb6c642a6996", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "#### Import libraries" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "5cc95ec1-0410-42b1-a057-7651cdf7272a", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [], "source": [ "from esma_data_py import EsmaDataLoader\n", "import os\n", "from pathlib import Path\n", "import plotly.express as px\n", "import plotly.graph_objects as go\n", "from matplotlib.colors import to_rgb\n", "import pandas as pd\n", "from tqdm import tqdm\n", "from PIL import Image" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "f0839163-0a77-432e-8ec9-6569174d088b", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "#### Define color functions " ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "c43ad2e4-dd51-4b18-975a-159459f24757", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [], "source": [ "def hex_to_rgba(hex_color, alpha=1.0):\n", " \"\"\"\n", " Convert a hex color code to an RGBA string.\n", "\n", " Args:\n", " hex_color (str): A string representing the hex color (e.g., '#RRGGBB').\n", " alpha (float, optional): The alpha (opacity) value for the color (default is 1.0).\n", "\n", " Returns:\n", " str: The color in 'rgba(r, g, b, alpha)' format.\n", " \"\"\"\n", " hex_color = hex_color.lstrip('#') # Remove the '#' if present\n", " if len(hex_color) != 6:\n", " raise ValueError(\"Hex color must be in the format #RRGGBB.\")\n", " r, g, b = tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4))\n", " return f'rgba({r}, {g}, {b}, {alpha})'\n", "\n", "\n", "def rgb_to_rgba(rgb_color, alpha=1.0):\n", " \"\"\"\n", " Convert an RGB color string to an RGBA string.\n", "\n", " Args:\n", " rgb_color (str): A string representing the RGB color (e.g., 'rgb(r, g, b)').\n", " alpha (float, optional): The alpha (opacity) value for the color (default is 1.0).\n", "\n", " Returns:\n", " str: The color in 'rgba(r, g, b, alpha)' format.\n", " \"\"\"\n", " # Strip off 'rgb(' and ')'\n", " rgb_color = rgb_color[rgb_color.find('(')+1:rgb_color.find(')')]\n", " r, g, b = map(int, rgb_color.split(','))\n", " return f'rgba({r}, {g}, {b}, {alpha})'\n", "\n", "\n", "def named_to_rgba(named_color, alpha=1.0):\n", " \"\"\"\n", " Convert a named color to an RGBA string.\n", "\n", " Args:\n", " named_color (str): The name of the color (e.g., 'red', 'blue').\n", " alpha (float, optional): The alpha (opacity) value for the color (default is 1.0).\n", "\n", " Returns:\n", " str: The color in 'rgba(r, g, b, alpha)' format.\n", " \"\"\"\n", " try:\n", " r, g, b = to_rgb(named_color)\n", " except ValueError:\n", " raise ValueError(f\"Invalid named color: {named_color}\")\n", " return f'rgba({int(r*255)}, {int(g*255)}, {int(b*255)}, {alpha})'\n", "\n", "\n", "def convert_to_rgba(color, alpha=1.0):\n", " \"\"\"\n", " Convert a color in different formats (hex, rgb, named) to an RGBA string.\n", "\n", " Args:\n", " color (str): A color in either hex ('#RRGGBB'), rgb ('rgb(r, g, b)'), or a named color (e.g., 'red').\n", " alpha (float, optional): The alpha (opacity) value for the color (default is 1.0).\n", "\n", " Returns:\n", " str: The color in 'rgba(r, g, b, alpha)' format.\n", " \"\"\"\n", " if color.startswith('#'):\n", " return hex_to_rgba(color, alpha)\n", " elif color.startswith('rgb'):\n", " return rgb_to_rgba(color, alpha)\n", " else:\n", " return named_to_rgba(color, alpha)\n", "\n", "\n", "# Concatenate different qualitative color sets from Plotly\n", "colors = px.colors.qualitative.Set1 + \\\n", " px.colors.qualitative.Set2 + \\\n", " px.colors.qualitative.Set3 + \\\n", " px.colors.qualitative.Pastel1 + \\\n", " px.colors.qualitative.Pastel2\n", "\n", "# Define the opacity\n", "opacity = 0.3\n", "\n", "# Convert each color to RGBA format with the specified opacity\n", "rgba_colors = [convert_to_rgba(color, alpha=opacity) for color in colors] " ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "160b6265-6423-4a51-8b78-e1bbaf13d277", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "#### Define plot and ds functions" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "9e39956c-f7ac-428c-8754-032a31289ddc", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [], "source": [ "def make_sankey_plot(df, id1, id2, value, title='', title_left=\"Year-1\", title_right=\"Year\", opacity=0.3):\n", " \"\"\"\n", " Creates a Sankey plot to visualize the flow of values between two categorical variables in the dataframe.\n", " \n", " Parameters:\n", " - df (pd.DataFrame): Input dataframe containing the data.\n", " - id1 (str): Column name representing the first category (e.g., previous year).\n", " - id2 (str): Column name representing the second category (e.g., current year).\n", " - value (str): Column name representing the values to be visualized in the Sankey plot.\n", " - title (str, optional): Title for the plot. Defaults to an empty string.\n", " - title_left (str, optional): Title for the left side of the plot (default is \"Year-1\").\n", " - title_right (str, optional): Title for the right side of the plot (default is \"Year\").\n", " - opacity (float, optional): Opacity for the Sankey plot colors (default is 0.3).\n", " \n", " Returns:\n", " - fig (plotly.graph_objects.Figure): The Sankey plot figure.\n", " - data (pd.DataFrame): Dataframe containing the prepared data for the Sankey plot.\n", " \"\"\"\n", " \n", " # Format the id columns to include 'Year_' as prefix\n", " id1, id2 = f\"Year_{id1}\", f\"Year_{id2}\"\n", "\n", " # Prepare the dataframe by selecting necessary columns and converting to string type\n", " df = df[[id1, id2, value]].astype(str)\n", "\n", " # Create a sorted list of unique categories from both id1 and id2 columns\n", " list_thrs_id1 = list(df[id1].value_counts().index)\n", " list_thrs_id2 = list(df[id2].value_counts().index)\n", " list_thrs = sorted(set(list_thrs_id1 + list_thrs_id2))\n", "\n", " # Create label dataframes for id1 and id2 categories\n", " df_lis_label = (pd.DataFrame({id1: list_thrs})\n", " .sort_values([id1], ascending=True)\n", " .reset_index(drop=True)\n", " .assign(id_ = lambda x: x.apply(lambda y: y.name, axis=1))\n", " .rename(columns = {'id_': id1 + '_id'}))\n", "\n", " df_lis_fitrs_label = (pd.DataFrame({id2: list_thrs})\n", " .sort_values([id2], ascending=True)\n", " .reset_index(drop=True)\n", " .assign(id_ = lambda x: x.apply(lambda y: y.name + len(df_lis_label.index), axis=1))\n", " .rename(columns = {'id_': id2 + '_id'}))\n", " \n", " # Define a list of colors for the nodes\n", " colors = px.colors.qualitative.Set1 + \\\n", " px.colors.qualitative.Set2 + \\\n", " px.colors.qualitative.Set3 + \\\n", " px.colors.qualitative.Pastel1 + \\\n", " px.colors.qualitative.Pastel2\n", " \n", " # Ensure we have enough colors to cover all unique categories\n", " while len(colors) < len(list_thrs):\n", " colors += colors\n", "\n", " # Convert colors to RGBA format with specified opacity\n", " rgba_colors = [convert_to_rgba(color, alpha=opacity) for color in colors]\n", " \n", " # Prepare color mapping for each category\n", " df_colors = pd.DataFrame({\"list_id\": list_thrs,\n", " \"color\": colors[:len(list_thrs)],\n", " \"color_light\": rgba_colors[:len(list_thrs)]})\n", "\n", " # Merge the label dataframes with the color mapping for a final dataframe\n", " df_colors2 = (pd.concat([df_lis_fitrs_label.rename(columns = {id2: 'list_id', id2 + '_id': 'id_'}),\n", " df_lis_label.rename(columns = {id1: 'list_id', id1 + '_id': 'id_'})]) \n", " .merge(df_colors, on=\"list_id\", how='left')\n", " .sort_values(['id_'], ascending=True))\n", " \n", " # Aggregate data and prepare for Sankey plot\n", " data = (df[[id1, id2, value]].groupby([id1, id2], as_index=False)\n", " .count()\n", " .merge(df_lis_fitrs_label, on=id2, how='left')\n", " .merge(df_lis_label, on=id1, how='left')\n", " .merge(df_colors.rename(columns = {'list_id': id1}), on=id1, how='left'))\n", " \n", " # Calculate percentages of items in the same and different buckets (id1 == id2 vs id1 != id2)\n", " n_isin_same_bucket = sum(data.query(f\"{id1} == {id2}\")[value]) \n", " n_isin_different_bucket = sum(data[value]) - sum(data.query(f\"{id1} == {id2}\")[value]) \n", " pct_isin_same_bucket = n_isin_same_bucket / sum(data[value]) * 100\n", " pct_isin_different_bucket = n_isin_different_bucket / sum(data[value]) * 100\n", "\n", " # Prepare caption to show in the plot\n", " add_caption = f\"
Instruments whose indicator matches : {pct_isin_same_bucket:.1f}% ({n_isin_same_bucket:,.0f})\"\n", " add_caption += f\"
Instruments whose indicator changes : {pct_isin_different_bucket:.1f}% ({n_isin_different_bucket:,.0f})\"\n", " add_caption += f\"
Total number of instruments: {sum(data[value]):,.0f}\"\n", " data['matching_ratio'] = pct_isin_same_bucket\n", " \n", " # Add the caption to the title\n", " title = \"\" + title + \"\"\n", " title += add_caption\n", " \n", " # Prepare data for Sankey plot\n", " source = list(data[id2 + '_id'])\n", " target = list(data[id1 + '_id'])\n", " values = list(data[value])\n", " color = list(data['color_light'])\n", "\n", " # Create the Sankey plot using Plotly\n", " fig = go.Figure(data=[go.Sankey(node = dict(pad = 15,\n", " thickness = 20,\n", " line = dict(color = \"black\", width = 0.5),\n", " label = list(df_colors2['list_id']),\n", " color = list(df_colors2['color'])),\n", " link = dict(source = source, \n", " target = target, \n", " value = values, \n", " label = values,\n", " color = color ))])\n", "\n", " # Update layout of the plot (fonts, size, etc.)\n", " fig = fig.update_layout(font_size=10, \n", " width=700, \n", " height=700, \n", " plot_bgcolor='white',\n", " xaxis={'showgrid': False,\n", " 'zeroline': False,\n", " 'visible': False},\n", " yaxis={'showgrid': False,\n", " 'zeroline': False,\n", " 'visible': False,},\n", " title={'text' : title,\n", " 'y': 0.98})\n", " \n", " # Add annotations for the titles of the left and right sides of the plot\n", " for x_coordinate, column_name in enumerate([title_left, title_right]):\n", " fig = fig.add_annotation(x=x_coordinate,\n", " y=1.05,\n", " xref=\"x\",\n", " yref=\"paper\",\n", " text=column_name,\n", " showarrow=False,\n", " font=dict(size=16),\n", " align=\"center\")\n", "\n", " return fig, data\n", "\n", "\n", "def create_sankey_datasets(df, year_1, year_2, columns_to_process):\n", " \"\"\"\n", " This function processes a given dataframe to create datasets for Sankey plots and generates the plots themselves \n", " for specific columns representing different attributes across two given years.\n", "\n", " Parameters:\n", " df (pd.DataFrame): The input dataframe containing the data to process.\n", " year_1 (str): The first year to filter the data (e.g., '2022').\n", " year_2 (str): The second year to filter the data (e.g., '2023').\n", " columns_to_process (list): A list of column names that should be processed for creating the Sankey plots.\n", "\n", " Returns:\n", " sankey_datasets (dict): A dictionary where keys are column names and values are the corresponding processed DataFrames.\n", " sankey_plots (dict): A dictionary where keys are column names and values are the corresponding Sankey plot objects.\n", " \"\"\"\n", "\n", " # Split the data into two datasets based on the years provided\n", " df_year1 = df.loc[lambda x: x.FrDt.str.startswith(year_1)]\n", " df_year2 = df.loc[lambda x: x.ToDt.str.startswith(year_2)]\n", " \n", " # Initialize dictionaries to store the resulting datasets and plots\n", " sankey_datasets = {}\n", " sankey_plots = {}\n", "\n", " # Iterate over the columns to be processed for the Sankey plots\n", " for column in tqdm(columns_to_process):\n", "\n", " # Create subsets of the data for the current column for each year\n", " df_year1_subset = df_year1[['Id', column]].rename(columns={column: f'Year_{year_1}'})\n", " df_year2_subset = df_year2[['Id', column]].rename(columns={column: f'Year_{year_2}'})\n", " \n", " # Merge the two subsets on the 'Id' to align the data from both years\n", " merged_df = pd.merge(df_year1_subset, df_year2_subset, on='Id', how='outer')\n", " \n", " # Drop rows where there is missing data for either year\n", " merged_df = merged_df.dropna(subset=[f'Year_{year_1}', f'Year_{year_2}'])\n", "\n", " # Remove duplicate entries to ensure clean data\n", " merged_df = merged_df.drop_duplicates()\n", " \n", " # Store the cleaned and merged DataFrame for the current column\n", " sankey_datasets[column] = merged_df\n", "\n", " # Generate the Sankey plot for the merged dataset\n", " sankey_plot, sk = make_sankey_plot(df=merged_df, \n", " id1=year_1, \n", " id2=year_2, \n", " value=\"Id\", \n", " title=f\"Sankey Plot for {column}\", \n", " title_left=f\"Year {year_1}\", \n", " title_right=f\"Year {year_2}\")\n", "\n", " # Store the generated Sankey plot in the dictionary\n", " sankey_plots[column] = sankey_plot\n", " \n", " # Return the dictionaries containing the datasets and plots\n", " return sankey_datasets, sankey_plots\n", "\n", "def display_saved_img(img_path):\n", " \"\"\"\n", " Changes working directory to '_static' and displays the image at `img_path`.\n", "\n", " Parameters:\n", " img_path (str): Path to the image file.\n", " \"\"\"\n", "\n", " picture_link = Path.cwd().parent / '_static'\n", " \n", " if os.path.exists(picture_link):\n", " os.chdir(picture_link)\n", " \n", " img = Image.open(f'{img_path}').convert(\"RGB\")\n", " display(img)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "241aa31e-883e-4505-8967-83972988c9fc", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "#### Initialize EsmaDataLoader" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "67c3647d-c017-49e8-955b-b2c4fee9be8e", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [], "source": [ "# Initialize edl with creation dates to, if not set:\n", "# creation_date_to = Today\n", "# creation_date_from = 2017-01-01\n", "\n", "edl = EsmaDataLoader(creation_date_from='2024-03-01', creation_date_to='2024-03-03')" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "b16ed10e-b42c-4b94-a1ec-5f93693672c6", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [], "source": [ "# Load latest files, if no arguments are specified: \n", "\n", "# file_type = 'Full'\n", "# vcap = False (True if dvcap files to retrieve)\n", "# isin = [] \n", "# cfi = 'E' \n", "# eqt=True # Determines (If filter for only equity instruments)\n", "# update = False (Parameter used for caching, flag to True to re download the df as the latest version)\n", "\n", "df = edl.load_latest_files()" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "9915e3e7-e9fd-4816-b712-ea76d88b4ace", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "#### Custom preprocessing " ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "23245e40-a7be-4437-9af4-a707e701bd95", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [], "source": [ "# Custom filtering for this specific process\n", "\n", "df = df.query(\"Mthdlgy == 'YEAR'\").query(\"FrDt >= '2022-01-01'\")\n", "df = df.rename(columns={'Id_2': 'RlvntMkt'})" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "f619aa8c-3dac-432e-a3d2-3a67796535eb", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "#### Plots creation" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "63a0b882-3608-4377-96f5-f8da7c57e6db", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [], "source": [ "# Create plots \n", "\n", "sankey_datasets, sankey_plots = create_sankey_datasets(df=df, \n", " year_1=\"2022\", \n", " year_2=\"2023\", \n", " columns_to_process= [\"RlvntMkt\", \"LrgInScale\", \"Lqdty\"])" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "36101ac1-b83f-453d-8264-bbeea303d47a", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [], "source": [ "for img in os.listdir(Path.cwd().parent / '_static'):\n", " display_saved_img(img)" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "5f525c79-5823-4e14-80a7-82ca29f75dd4", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [], "source": [] } ], "metadata": { "application/vnd.databricks.v1+notebook": { "computePreferences": null, "dashboards": [], "environmentMetadata": { "base_environment": "", "environment_version": "2" }, "language": "python", "notebookMetadata": { "pythonIndentUnit": 4 }, "notebookName": "Flow_Chart_For_Equity_Transparency_Calculations", "widgets": {} }, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }