{ "cells": [ { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "7e585fa2-8b77-4fa5-b35b-4c57d248be86", "showTitle": false, "title": "" } }, "source": [ "## Prerequirement\n", "Dataset is derived from Fannie Mae’s [Single-Family Loan Performance Data](http://www.fanniemae.com/portal/funding-the-market/data/loan-performance-data.html) with all rights reserved by Fannie Mae. Refer to these [instructions](https://github.com/NVIDIA/spark-rapids-examples/blob/main/docs/get-started/xgboost-examples/dataset/mortgage.md) to download the dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "f8237deb-486e-401c-ba21-37b32744dd8d", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "html" } }, "output_type": "display_data" } ], "source": [ "import time\n", "import os\n", "from pyspark import broadcast\n", "from pyspark.sql import SparkSession\n", "from pyspark.sql.functions import *\n", "from pyspark.sql.types import *\n", "from pyspark.sql.window import Window\n" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "6be6a5eb-5236-4b12-b6c5-ecab547a229d", "showTitle": false, "title": "" } }, "source": [ "## Function Define\n", "### 1. Define the constants\n", "\n", "* Define input file schema (Performance and Acquisition)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "b5bd7499-ee84-444c-a2ab-eb2329a0c953", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "html" } }, "output_type": "display_data" } ], "source": [ "# File schema\n", "_csv_raw_schema = StructType([\n", " StructField(\"reference_pool_id\", StringType()),\n", " StructField(\"loan_id\", LongType()),\n", " StructField(\"monthly_reporting_period\", StringType()),\n", " StructField(\"orig_channel\", StringType()),\n", " StructField(\"seller_name\", StringType()),\n", " StructField(\"servicer\", StringType()),\n", " StructField(\"master_servicer\", StringType()),\n", " StructField(\"orig_interest_rate\", DoubleType()),\n", " StructField(\"interest_rate\", DoubleType()),\n", " StructField(\"orig_upb\", DoubleType()),\n", " StructField(\"upb_at_issuance\", StringType()),\n", " StructField(\"current_actual_upb\", DoubleType()),\n", " StructField(\"orig_loan_term\", IntegerType()),\n", " StructField(\"orig_date\", StringType()),\n", " StructField(\"first_pay_date\", StringType()), \n", " StructField(\"loan_age\", DoubleType()),\n", " StructField(\"remaining_months_to_legal_maturity\", DoubleType()),\n", " StructField(\"adj_remaining_months_to_maturity\", DoubleType()),\n", " StructField(\"maturity_date\", StringType()),\n", " StructField(\"orig_ltv\", DoubleType()),\n", " StructField(\"orig_cltv\", DoubleType()),\n", " StructField(\"num_borrowers\", DoubleType()),\n", " StructField(\"dti\", DoubleType()),\n", " StructField(\"borrower_credit_score\", DoubleType()),\n", " StructField(\"coborrow_credit_score\", DoubleType()),\n", " StructField(\"first_home_buyer\", StringType()),\n", " StructField(\"loan_purpose\", StringType()),\n", " StructField(\"property_type\", StringType()),\n", " StructField(\"num_units\", IntegerType()),\n", " StructField(\"occupancy_status\", StringType()),\n", " StructField(\"property_state\", StringType()),\n", " StructField(\"msa\", DoubleType()),\n", " StructField(\"zip\", IntegerType()),\n", " StructField(\"mortgage_insurance_percent\", DoubleType()),\n", " StructField(\"product_type\", StringType()),\n", " StructField(\"prepayment_penalty_indicator\", StringType()),\n", " StructField(\"interest_only_loan_indicator\", StringType()),\n", " StructField(\"interest_only_first_principal_and_interest_payment_date\", StringType()),\n", " StructField(\"months_to_amortization\", StringType()),\n", " StructField(\"current_loan_delinquency_status\", IntegerType()),\n", " StructField(\"loan_payment_history\", StringType()),\n", " StructField(\"mod_flag\", StringType()),\n", " StructField(\"mortgage_insurance_cancellation_indicator\", StringType()),\n", " StructField(\"zero_balance_code\", StringType()),\n", " StructField(\"zero_balance_effective_date\", StringType()),\n", " StructField(\"upb_at_the_time_of_removal\", StringType()),\n", " StructField(\"repurchase_date\", StringType()),\n", " StructField(\"scheduled_principal_current\", StringType()),\n", " StructField(\"total_principal_current\", StringType()),\n", " StructField(\"unscheduled_principal_current\", StringType()),\n", " StructField(\"last_paid_installment_date\", StringType()),\n", " StructField(\"foreclosed_after\", StringType()),\n", " StructField(\"disposition_date\", StringType()),\n", " StructField(\"foreclosure_costs\", DoubleType()),\n", " StructField(\"prop_preservation_and_repair_costs\", DoubleType()),\n", " StructField(\"asset_recovery_costs\", DoubleType()),\n", " StructField(\"misc_holding_expenses\", DoubleType()),\n", " StructField(\"holding_taxes\", DoubleType()),\n", " StructField(\"net_sale_proceeds\", DoubleType()),\n", " StructField(\"credit_enhancement_proceeds\", DoubleType()),\n", " StructField(\"repurchase_make_whole_proceeds\", StringType()),\n", " StructField(\"other_foreclosure_proceeds\", DoubleType()),\n", " StructField(\"non_interest_bearing_upb\", DoubleType()),\n", " StructField(\"principal_forgiveness_upb\", StringType()),\n", " StructField(\"original_list_start_date\", StringType()),\n", " StructField(\"original_list_price\", StringType()),\n", " StructField(\"current_list_start_date\", StringType()),\n", " StructField(\"current_list_price\", StringType()),\n", " StructField(\"borrower_credit_score_at_issuance\", StringType()),\n", " StructField(\"co-borrower_credit_score_at_issuance\", StringType()),\n", " StructField(\"borrower_credit_score_current\", StringType()),\n", " StructField(\"co-Borrower_credit_score_current\", StringType()),\n", " StructField(\"mortgage_insurance_type\", DoubleType()),\n", " StructField(\"servicing_activity_indicator\", StringType()),\n", " StructField(\"current_period_modification_loss_amount\", StringType()),\n", " StructField(\"cumulative_modification_loss_amount\", StringType()),\n", " StructField(\"current_period_credit_event_net_gain_or_loss\", StringType()),\n", " StructField(\"cumulative_credit_event_net_gain_or_loss\", StringType()),\n", " StructField(\"homeready_program_indicator\", StringType()),\n", " StructField(\"foreclosure_principal_write_off_amount\", StringType()),\n", " StructField(\"relocation_mortgage_indicator\", StringType()),\n", " StructField(\"zero_balance_code_change_date\", StringType()),\n", " StructField(\"loan_holdback_indicator\", StringType()),\n", " StructField(\"loan_holdback_effective_date\", StringType()),\n", " StructField(\"delinquent_accrued_interest\", StringType()),\n", " StructField(\"property_valuation_method\", StringType()),\n", " StructField(\"high_balance_loan_indicator\", StringType()),\n", " StructField(\"arm_initial_fixed-rate_period_lt_5_yr_indicator\", StringType()),\n", " StructField(\"arm_product_type\", StringType()),\n", " StructField(\"initial_fixed-rate_period\", StringType()),\n", " StructField(\"interest_rate_adjustment_frequency\", StringType()),\n", " StructField(\"next_interest_rate_adjustment_date\", StringType()),\n", " StructField(\"next_payment_change_date\", StringType()),\n", " StructField(\"index\", StringType()),\n", " StructField(\"arm_cap_structure\", StringType()),\n", " StructField(\"initial_interest_rate_cap_up_percent\", StringType()),\n", " StructField(\"periodic_interest_rate_cap_up_percent\", StringType()),\n", " StructField(\"lifetime_interest_rate_cap_up_percent\", StringType()),\n", " StructField(\"mortgage_margin\", StringType()),\n", " StructField(\"arm_balloon_indicator\", StringType()),\n", " StructField(\"arm_plan_number\", StringType()),\n", " StructField(\"borrower_assistance_plan\", StringType()),\n", " StructField(\"hltv_refinance_option_indicator\", StringType()),\n", " StructField(\"deal_name\", StringType()),\n", " StructField(\"repurchase_make_whole_proceeds_flag\", StringType()),\n", " StructField(\"alternative_delinquency_resolution\", StringType()),\n", " StructField(\"alternative_delinquency_resolution_count\", StringType()),\n", " StructField(\"total_deferral_amount\", StringType())\n", " ])" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "3f7bd630-aad1-48ac-94f4-698480de66e0", "showTitle": false, "title": "" } }, "source": [ "* Define seller name mapping" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "19fbebaf-2338-47c3-a351-d477b620ef90", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "html" } }, "output_type": "display_data" } ], "source": [ "# name mappings\n", "_name_mapping = [\n", " (\"WITMER FUNDING, LLC\", \"Witmer\"),\n", " (\"WELLS FARGO CREDIT RISK TRANSFER SECURITIES TRUST 2015\", \"Wells Fargo\"),\n", " (\"WELLS FARGO BANK, NA\" , \"Wells Fargo\"),\n", " (\"WELLS FARGO BANK, N.A.\" , \"Wells Fargo\"),\n", " (\"WELLS FARGO BANK, NA\" , \"Wells Fargo\"),\n", " (\"USAA FEDERAL SAVINGS BANK\" , \"USAA\"),\n", " (\"UNITED SHORE FINANCIAL SERVICES, LLC D\\\\/B\\\\/A UNITED WHOLESALE MORTGAGE\" , \"United Seq(e\"),\n", " (\"U.S. BANK N.A.\" , \"US Bank\"),\n", " (\"SUNTRUST MORTGAGE INC.\" , \"Suntrust\"),\n", " (\"STONEGATE MORTGAGE CORPORATION\" , \"Stonegate Mortgage\"),\n", " (\"STEARNS LENDING, LLC\" , \"Stearns Lending\"),\n", " (\"STEARNS LENDING, INC.\" , \"Stearns Lending\"),\n", " (\"SIERRA PACIFIC MORTGAGE COMPANY, INC.\" , \"Sierra Pacific Mortgage\"),\n", " (\"REGIONS BANK\" , \"Regions\"),\n", " (\"RBC MORTGAGE COMPANY\" , \"RBC\"),\n", " (\"QUICKEN LOANS INC.\" , \"Quicken Loans\"),\n", " (\"PULTE MORTGAGE, L.L.C.\" , \"Pulte Mortgage\"),\n", " (\"PROVIDENT FUNDING ASSOCIATES, L.P.\" , \"Provident Funding\"),\n", " (\"PROSPECT MORTGAGE, LLC\" , \"Prospect Mortgage\"),\n", " (\"PRINCIPAL RESIDENTIAL MORTGAGE CAPITAL RESOURCES, LLC\" , \"Principal Residential\"),\n", " (\"PNC BANK, N.A.\" , \"PNC\"),\n", " (\"PMT CREDIT RISK TRANSFER TRUST 2015-2\" , \"PennyMac\"),\n", " (\"PHH MORTGAGE CORPORATION\" , \"PHH Mortgage\"),\n", " (\"PENNYMAC CORP.\" , \"PennyMac\"),\n", " (\"PACIFIC UNION FINANCIAL, LLC\" , \"Other\"),\n", " (\"OTHER\" , \"Other\"),\n", " (\"NYCB MORTGAGE COMPANY, LLC\" , \"NYCB\"),\n", " (\"NEW YORK COMMUNITY BANK\" , \"NYCB\"),\n", " (\"NETBANK FUNDING SERVICES\" , \"Netbank\"),\n", " (\"NATIONSTAR MORTGAGE, LLC\" , \"Nationstar Mortgage\"),\n", " (\"METLIFE BANK, NA\" , \"Metlife\"),\n", " (\"LOANDEPOT.COM, LLC\" , \"LoanDepot.com\"),\n", " (\"J.P. MORGAN MADISON AVENUE SECURITIES TRUST, SERIES 2015-1\" , \"JP Morgan Chase\"),\n", " (\"J.P. MORGAN MADISON AVENUE SECURITIES TRUST, SERIES 2014-1\" , \"JP Morgan Chase\"),\n", " (\"JPMORGAN CHASE BANK, NATIONAL ASSOCIATION\" , \"JP Morgan Chase\"),\n", " (\"JPMORGAN CHASE BANK, NA\" , \"JP Morgan Chase\"),\n", " (\"JP MORGAN CHASE BANK, NA\" , \"JP Morgan Chase\"),\n", " (\"IRWIN MORTGAGE, CORPORATION\" , \"Irwin Mortgage\"),\n", " (\"IMPAC MORTGAGE CORP.\" , \"Impac Mortgage\"),\n", " (\"HSBC BANK USA, NATIONAL ASSOCIATION\" , \"HSBC\"),\n", " (\"HOMEWARD RESIDENTIAL, INC.\" , \"Homeward Mortgage\"),\n", " (\"HOMESTREET BANK\" , \"Other\"),\n", " (\"HOMEBRIDGE FINANCIAL SERVICES, INC.\" , \"HomeBridge\"),\n", " (\"HARWOOD STREET FUNDING I, LLC\" , \"Harwood Mortgage\"),\n", " (\"GUILD MORTGAGE COMPANY\" , \"Guild Mortgage\"),\n", " (\"GMAC MORTGAGE, LLC (USAA FEDERAL SAVINGS BANK)\" , \"GMAC\"),\n", " (\"GMAC MORTGAGE, LLC\" , \"GMAC\"),\n", " (\"GMAC (USAA)\" , \"GMAC\"),\n", " (\"FREMONT BANK\" , \"Fremont Bank\"),\n", " (\"FREEDOM MORTGAGE CORP.\" , \"Freedom Mortgage\"),\n", " (\"FRANKLIN AMERICAN MORTGAGE COMPANY\" , \"Franklin America\"),\n", " (\"FLEET NATIONAL BANK\" , \"Fleet National\"),\n", " (\"FLAGSTAR CAPITAL MARKETS CORPORATION\" , \"Flagstar Bank\"),\n", " (\"FLAGSTAR BANK, FSB\" , \"Flagstar Bank\"),\n", " (\"FIRST TENNESSEE BANK NATIONAL ASSOCIATION\" , \"Other\"),\n", " (\"FIFTH THIRD BANK\" , \"Fifth Third Bank\"),\n", " (\"FEDERAL HOME LOAN BANK OF CHICAGO\" , \"Fedral Home of Chicago\"),\n", " (\"FDIC, RECEIVER, INDYMAC FEDERAL BANK FSB\" , \"FDIC\"),\n", " (\"DOWNEY SAVINGS AND LOAN ASSOCIATION, F.A.\" , \"Downey Mortgage\"),\n", " (\"DITECH FINANCIAL LLC\" , \"Ditech\"),\n", " (\"CITIMORTGAGE, INC.\" , \"Citi\"),\n", " (\"CHICAGO MORTGAGE SOLUTIONS DBA INTERFIRST MORTGAGE COMPANY\" , \"Chicago Mortgage\"),\n", " (\"CHICAGO MORTGAGE SOLUTIONS DBA INTERBANK MORTGAGE COMPANY\" , \"Chicago Mortgage\"),\n", " (\"CHASE HOME FINANCE, LLC\" , \"JP Morgan Chase\"),\n", " (\"CHASE HOME FINANCE FRANKLIN AMERICAN MORTGAGE COMPANY\" , \"JP Morgan Chase\"),\n", " (\"CHASE HOME FINANCE (CIE 1)\" , \"JP Morgan Chase\"),\n", " (\"CHASE HOME FINANCE\" , \"JP Morgan Chase\"),\n", " (\"CASHCALL, INC.\" , \"CashCall\"),\n", " (\"CAPITAL ONE, NATIONAL ASSOCIATION\" , \"Capital One\"),\n", " (\"CALIBER HOME LOANS, INC.\" , \"Caliber Funding\"),\n", " (\"BISHOPS GATE RESIDENTIAL MORTGAGE TRUST\" , \"Bishops Gate Mortgage\"),\n", " (\"BANK OF AMERICA, N.A.\" , \"Bank of America\"),\n", " (\"AMTRUST BANK\" , \"AmTrust\"),\n", " (\"AMERISAVE MORTGAGE CORPORATION\" , \"Amerisave\"),\n", " (\"AMERIHOME MORTGAGE COMPANY, LLC\" , \"AmeriHome Mortgage\"),\n", " (\"ALLY BANK\" , \"Ally Bank\"),\n", " (\"ACADEMY MORTGAGE CORPORATION\" , \"Academy Mortgage\"),\n", " (\"NO CASH-OUT REFINANCE\" , \"OTHER REFINANCE\"),\n", " (\"REFINANCE - NOT SPECIFIED\" , \"OTHER REFINANCE\"),\n", " (\"Other REFINANCE\" , \"OTHER REFINANCE\")]" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "64e459a1-3368-45b0-8e15-5bf61c40d610", "showTitle": false, "title": "" } }, "source": [ "* Define category (string) column and numeric column" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "67d567bf-acdd-4562-82ae-70ac9a760bfc", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "html" } }, "output_type": "display_data" } ], "source": [ "# String columns\n", "cate_col_names = [\n", " \"orig_channel\",\n", " \"first_home_buyer\",\n", " \"loan_purpose\",\n", " \"property_type\",\n", " \"occupancy_status\",\n", " \"property_state\",\n", " \"product_type\",\n", " \"relocation_mortgage_indicator\",\n", " \"seller_name\",\n", " \"mod_flag\"\n", "]\n", "# Numberic columns\n", "label_col_name = \"delinquency_12\"\n", "numeric_col_names = [\n", " \"orig_interest_rate\",\n", " \"orig_upb\",\n", " \"orig_loan_term\",\n", " \"orig_ltv\",\n", " \"orig_cltv\",\n", " \"num_borrowers\",\n", " \"dti\",\n", " \"borrower_credit_score\",\n", " \"num_units\",\n", " \"zip\",\n", " \"mortgage_insurance_percent\",\n", " \"current_loan_delinquency_status\",\n", " \"current_actual_upb\",\n", " \"interest_rate\",\n", " \"loan_age\",\n", " \"msa\",\n", " \"non_interest_bearing_upb\",\n", " label_col_name\n", "]\n", "all_col_names = cate_col_names + numeric_col_names" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "f7c7fdab-81f9-4d83-bff1-cde870b7c8c6", "showTitle": false, "title": "" } }, "source": [ "* Functions to extract perf and acq columns from raw schema" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "c9b064d7-12d3-478f-ba86-9e9b50b03826", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "html" } }, "output_type": "display_data" } ], "source": [ "def extract_perf_columns(rawDf):\n", " perfDf = rawDf.select(\n", " col(\"loan_id\"),\n", " date_format(to_date(col(\"monthly_reporting_period\"),\"MMyyyy\"), \"MM/dd/yyyy\").alias(\"monthly_reporting_period\"),\n", " upper(col(\"servicer\")).alias(\"servicer\"),\n", " col(\"interest_rate\"),\n", " col(\"current_actual_upb\"),\n", " col(\"loan_age\"),\n", " col(\"remaining_months_to_legal_maturity\"),\n", " col(\"adj_remaining_months_to_maturity\"),\n", " date_format(to_date(col(\"maturity_date\"),\"MMyyyy\"), \"MM/yyyy\").alias(\"maturity_date\"),\n", " col(\"msa\"),\n", " col(\"current_loan_delinquency_status\"),\n", " col(\"mod_flag\"),\n", " col(\"zero_balance_code\"),\n", " date_format(to_date(col(\"zero_balance_effective_date\"),\"MMyyyy\"), \"MM/yyyy\").alias(\"zero_balance_effective_date\"),\n", " date_format(to_date(col(\"last_paid_installment_date\"),\"MMyyyy\"), \"MM/dd/yyyy\").alias(\"last_paid_installment_date\"),\n", " date_format(to_date(col(\"foreclosed_after\"),\"MMyyyy\"), \"MM/dd/yyyy\").alias(\"foreclosed_after\"),\n", " date_format(to_date(col(\"disposition_date\"),\"MMyyyy\"), \"MM/dd/yyyy\").alias(\"disposition_date\"),\n", " col(\"foreclosure_costs\"),\n", " col(\"prop_preservation_and_repair_costs\"),\n", " col(\"asset_recovery_costs\"),\n", " col(\"misc_holding_expenses\"),\n", " col(\"holding_taxes\"),\n", " col(\"net_sale_proceeds\"),\n", " col(\"credit_enhancement_proceeds\"),\n", " col(\"repurchase_make_whole_proceeds\"),\n", " col(\"other_foreclosure_proceeds\"),\n", " col(\"non_interest_bearing_upb\"),\n", " col(\"principal_forgiveness_upb\"),\n", " col(\"repurchase_make_whole_proceeds_flag\"),\n", " col(\"foreclosure_principal_write_off_amount\"),\n", " col(\"servicing_activity_indicator\"),\n", " col('quarter')\n", " )\n", "\n", " return perfDf.select(\"*\").filter(\"current_actual_upb != 0.0\")\n", "\n", "def extract_acq_columns(rawDf):\n", " acqDf = rawDf.select(\n", " col(\"loan_id\"),\n", " col(\"orig_channel\"),\n", " upper(col(\"seller_name\")).alias(\"seller_name\"),\n", " col(\"orig_interest_rate\"),\n", " col(\"orig_upb\"),\n", " col(\"orig_loan_term\"),\n", " date_format(to_date(col(\"orig_date\"),\"MMyyyy\"), \"MM/yyyy\").alias(\"orig_date\"),\n", " date_format(to_date(col(\"first_pay_date\"),\"MMyyyy\"), \"MM/yyyy\").alias(\"first_pay_date\"),\n", " col(\"orig_ltv\"),\n", " col(\"orig_cltv\"),\n", " col(\"num_borrowers\"),\n", " col(\"dti\"),\n", " col(\"borrower_credit_score\"),\n", " col(\"first_home_buyer\"),\n", " col(\"loan_purpose\"),\n", " col(\"property_type\"),\n", " col(\"num_units\"),\n", " col(\"occupancy_status\"),\n", " col(\"property_state\"),\n", " col(\"zip\"),\n", " col(\"mortgage_insurance_percent\"),\n", " col(\"product_type\"),\n", " col(\"coborrow_credit_score\"),\n", " col(\"mortgage_insurance_type\"),\n", " col(\"relocation_mortgage_indicator\"),\n", " dense_rank().over(Window.partitionBy(\"loan_id\").orderBy(to_date(col(\"monthly_reporting_period\"),\"MMyyyy\"))).alias(\"rank\"),\n", " col('quarter')\n", " )\n", "\n", " return acqDf.select(\"*\").filter(col(\"rank\")==1)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "4ff5870a-59ed-40df-9179-ec9418747463", "showTitle": false, "title": "" } }, "source": [ "### 2. Define ETL Process\n", "\n", "Define the function to do the ETL process\n", "\n", "#### 2.1 Define Functions to Read Raw CSV File\n", "\n", "* Define function to get quarter from input CSV file name" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "c3b71536-58a1-4d60-86a8-54d48780d7bd", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "html" } }, "output_type": "display_data" } ], "source": [ "def _get_quarter_from_csv_file_name():\n", " return substring_index(substring_index(input_file_name(), '.', 1), '/', -1)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "d8166d00-4ea3-460b-b3e1-3c9c65524bac", "showTitle": false, "title": "" } }, "source": [ "* Define function to read raw CSV data file" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "374c8954-8101-4650-b074-1fde4c35f0c1", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "html" } }, "output_type": "display_data" } ], "source": [ "def read_raw_csv(spark, path):\n", " return spark.read.format('csv') \\\n", " .option('nullValue', '') \\\n", " .option('header', False) \\\n", " .option('delimiter', '|') \\\n", " .schema(_csv_raw_schema) \\\n", " .load(path) \\\n", " .withColumn('quarter', _get_quarter_from_csv_file_name())" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "7f1b22a9-34d8-4483-8678-72710b77cc60", "showTitle": false, "title": "" } }, "source": [ "#### 2.2 Define ETL Process\n", "\n", "* Define function to parse dates in Performance data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "2bc46125-1872-4f1a-ad22-53f52a5b0c25", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "html" } }, "output_type": "display_data" } ], "source": [ "def _parse_dates(perf):\n", " return perf \\\n", " .withColumn('monthly_reporting_period', to_date(col('monthly_reporting_period'), 'MM/dd/yyyy')) \\\n", " .withColumn('monthly_reporting_period_month', month(col('monthly_reporting_period'))) \\\n", " .withColumn('monthly_reporting_period_year', year(col('monthly_reporting_period'))) \\\n", " .withColumn('monthly_reporting_period_day', dayofmonth(col('monthly_reporting_period'))) \\\n", " .withColumn('last_paid_installment_date', to_date(col('last_paid_installment_date'), 'MM/dd/yyyy')) \\\n", " .withColumn('foreclosed_after', to_date(col('foreclosed_after'), 'MM/dd/yyyy')) \\\n", " .withColumn('disposition_date', to_date(col('disposition_date'), 'MM/dd/yyyy')) \\\n", " .withColumn('maturity_date', to_date(col('maturity_date'), 'MM/yyyy')) \\\n", " .withColumn('zero_balance_effective_date', to_date(col('zero_balance_effective_date'), 'MM/yyyy'))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "bf34b44e-053e-4341-8047-a58116ef28ef", "showTitle": false, "title": "" } }, "source": [ "* Define function to create deliquency data frame from Performance data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "812fded9-4856-4f88-a117-a7ef5648e83a", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "html" } }, "output_type": "display_data" } ], "source": [ "def _create_perf_deliquency(spark, perf):\n", " aggDF = perf.select(\n", " col(\"quarter\"),\n", " col(\"loan_id\"),\n", " col(\"current_loan_delinquency_status\"),\n", " when(col(\"current_loan_delinquency_status\") >= 1, col(\"monthly_reporting_period\")).alias(\"delinquency_30\"),\n", " when(col(\"current_loan_delinquency_status\") >= 3, col(\"monthly_reporting_period\")).alias(\"delinquency_90\"),\n", " when(col(\"current_loan_delinquency_status\") >= 6, col(\"monthly_reporting_period\")).alias(\"delinquency_180\")) \\\n", " .groupBy(\"quarter\", \"loan_id\") \\\n", " .agg(\n", " max(\"current_loan_delinquency_status\").alias(\"delinquency_12\"),\n", " min(\"delinquency_30\").alias(\"delinquency_30\"),\n", " min(\"delinquency_90\").alias(\"delinquency_90\"),\n", " min(\"delinquency_180\").alias(\"delinquency_180\")) \\\n", " .select(\n", " col(\"quarter\"),\n", " col(\"loan_id\"),\n", " (col(\"delinquency_12\") >= 1).alias(\"ever_30\"),\n", " (col(\"delinquency_12\") >= 3).alias(\"ever_90\"),\n", " (col(\"delinquency_12\") >= 6).alias(\"ever_180\"),\n", " col(\"delinquency_30\"),\n", " col(\"delinquency_90\"),\n", " col(\"delinquency_180\"))\n", " joinedDf = perf \\\n", " .withColumnRenamed(\"monthly_reporting_period\", \"timestamp\") \\\n", " .withColumnRenamed(\"monthly_reporting_period_month\", \"timestamp_month\") \\\n", " .withColumnRenamed(\"monthly_reporting_period_year\", \"timestamp_year\") \\\n", " .withColumnRenamed(\"current_loan_delinquency_status\", \"delinquency_12\") \\\n", " .withColumnRenamed(\"current_actual_upb\", \"upb_12\") \\\n", " .select(\"quarter\", \"loan_id\", \"timestamp\", \"delinquency_12\", \"upb_12\", \"timestamp_month\", \"timestamp_year\") \\\n", " .join(aggDF, [\"loan_id\", \"quarter\"], \"left_outer\")\n", "\n", " # calculate the 12 month delinquency and upb values\n", " months = 12\n", " monthArray = [lit(x) for x in range(0, 12)]\n", " # explode on a small amount of data is actually slightly more efficient than a cross join\n", " testDf = joinedDf \\\n", " .withColumn(\"month_y\", explode(array(monthArray))) \\\n", " .select(\n", " col(\"quarter\"),\n", " floor(((col(\"timestamp_year\") * 12 + col(\"timestamp_month\")) - 24000) / months).alias(\"josh_mody\"),\n", " floor(((col(\"timestamp_year\") * 12 + col(\"timestamp_month\")) - 24000 - col(\"month_y\")) / months).alias(\"josh_mody_n\"),\n", " col(\"ever_30\"),\n", " col(\"ever_90\"),\n", " col(\"ever_180\"),\n", " col(\"delinquency_30\"),\n", " col(\"delinquency_90\"),\n", " col(\"delinquency_180\"),\n", " col(\"loan_id\"),\n", " col(\"month_y\"),\n", " col(\"delinquency_12\"),\n", " col(\"upb_12\")) \\\n", " .groupBy(\"quarter\", \"loan_id\", \"josh_mody_n\", \"ever_30\", \"ever_90\", \"ever_180\", \"delinquency_30\", \"delinquency_90\", \"delinquency_180\", \"month_y\") \\\n", " .agg(max(\"delinquency_12\").alias(\"delinquency_12\"), min(\"upb_12\").alias(\"upb_12\")) \\\n", " .withColumn(\"timestamp_year\", floor((lit(24000) + (col(\"josh_mody_n\") * lit(months)) + (col(\"month_y\") - 1)) / lit(12))) \\\n", " .selectExpr('*', 'pmod(24000 + (josh_mody_n * {}) + month_y, 12) as timestamp_month_tmp'.format(months)) \\\n", " .withColumn(\"timestamp_month\", when(col(\"timestamp_month_tmp\") == lit(0), lit(12)).otherwise(col(\"timestamp_month_tmp\"))) \\\n", " .withColumn(\"delinquency_12\", ((col(\"delinquency_12\") > 3).cast(\"int\") + (col(\"upb_12\") == 0).cast(\"int\")).alias(\"delinquency_12\")) \\\n", " .drop(\"timestamp_month_tmp\", \"josh_mody_n\", \"month_y\")\n", "\n", " return perf.withColumnRenamed(\"monthly_reporting_period_month\", \"timestamp_month\") \\\n", " .withColumnRenamed(\"monthly_reporting_period_year\", \"timestamp_year\") \\\n", " .join(testDf, [\"quarter\", \"loan_id\", \"timestamp_year\", \"timestamp_month\"], \"left\") \\\n", " .drop(\"timestamp_year\", \"timestamp_month\")" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "e2b3f613-4b89-4cca-ac2f-7ca44a3f07d6", "showTitle": false, "title": "" } }, "source": [ "* Define function to create acquisition data frame from Acquisition data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "1f15c8b1-321a-49e4-bdbe-98ad6fe9428c", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "html" } }, "output_type": "display_data" } ], "source": [ "def _create_acquisition(spark, acq):\n", " nameMapping = spark.createDataFrame(_name_mapping, [\"from_seller_name\", \"to_seller_name\"])\n", " return acq.join(nameMapping, col(\"seller_name\") == col(\"from_seller_name\"), \"left\") \\\n", " .drop(\"from_seller_name\") \\\n", " .withColumn(\"old_name\", col(\"seller_name\")) \\\n", " .withColumn(\"seller_name\", coalesce(col(\"to_seller_name\"), col(\"seller_name\"))) \\\n", " .drop(\"to_seller_name\") \\\n", " .withColumn(\"orig_date\", to_date(col(\"orig_date\"), \"MM/yyyy\")) \\\n", " .withColumn(\"first_pay_date\", to_date(col(\"first_pay_date\"), \"MM/yyyy\")) " ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "936c1b79-dd1e-4f0b-9547-ef882d1085ef", "showTitle": false, "title": "" } }, "source": [ "#### 2.3 Define Casting Process\n", "This part is casting String column to Numbric. \n", "Example:\n", "```\n", "col_1\n", " \"a\"\n", " \"b\"\n", " \"c\"\n", " \"a\"\n", "# After String ====> Numberic\n", "col_1\n", " 0\n", " 1\n", " 2\n", " 0\n", "``` \n", "
\n", "\n", "* Define function to get column dictionary\n", "\n", " Example\n", " ```\n", " col1 = [row(data=\"a\",id=0), row(data=\"b\",id=1)]\n", " ```" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "9c4a16a6-7e4c-46e5-aae3-e0183aefa675", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "html" } }, "output_type": "display_data" } ], "source": [ "def _gen_dictionary(etl_df, col_names):\n", " cnt_table = etl_df.select(posexplode(array([col(i) for i in col_names])))\\\n", " .withColumnRenamed(\"pos\", \"column_id\")\\\n", " .withColumnRenamed(\"col\", \"data\")\\\n", " .filter(\"data is not null\")\\\n", " .groupBy(\"column_id\", \"data\")\\\n", " .count()\n", " windowed = Window.partitionBy(\"column_id\").orderBy(desc(\"count\"))\n", " return cnt_table.withColumn(\"id\", row_number().over(windowed)).drop(\"count\")" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "d0cd03b6-40f0-4c97-ba92-61b04e38434c", "showTitle": false, "title": "" } }, "source": [ "* Define function to convert string columns to numeric" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "d235ccd5-439b-4e3c-b84e-dfa5f04156ec", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "html" } }, "output_type": "display_data" } ], "source": [ "def _cast_string_columns_to_numeric(spark, input_df):\n", " cached_dict_df = _gen_dictionary(input_df, cate_col_names).cache()\n", " output_df = input_df\n", " # Generate the final table with all columns being numeric.\n", " for col_pos, col_name in enumerate(cate_col_names):\n", " col_dict_df = cached_dict_df.filter(col(\"column_id\") == col_pos)\\\n", " .drop(\"column_id\")\\\n", " .withColumnRenamed(\"data\", col_name)\n", " \n", " output_df = output_df.join(broadcast(col_dict_df), col_name, \"left\")\\\n", " .drop(col_name)\\\n", " .withColumnRenamed(\"id\", col_name)\n", " return output_df " ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "19557401-4aa5-4c15-913b-e1684853fb97", "showTitle": false, "title": "" } }, "source": [ "#### 2.4 Define Main Function\n", "In this function:\n", "1. Parse date in Performance data by calling _parse_dates (parsed_perf)\n", "2. Create deliqency dataframe(perf_deliqency) form Performance data by calling _create_perf_deliquency\n", "3. Create cleaned acquisition dataframe(cleaned_acq) from Acquisition data by calling _create_acquisition\n", "4. Join deliqency dataframe(perf_deliqency) and cleaned acquisition dataframe(cleaned_acq), get clean_df\n", "5. Cast String column to Numbric in clean_df by calling _cast_string_columns_to_numeric, get casted_clean_df\n", "6. Return casted_clean_df as final result" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "b9c228c8-6fc5-408a-a39b-26375587d7e2", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "html" } }, "output_type": "display_data" } ], "source": [ "def run_mortgage(spark, perf, acq):\n", " parsed_perf = _parse_dates(perf)\n", " perf_deliqency = _create_perf_deliquency(spark, parsed_perf)\n", " cleaned_acq = _create_acquisition(spark, acq)\n", " clean_df = perf_deliqency.join(cleaned_acq, [\"loan_id\", \"quarter\"], \"inner\").drop(\"quarter\")\n", " casted_clean_df = _cast_string_columns_to_numeric(spark, clean_df)\\\n", " .select(all_col_names)\\\n", " .withColumn(label_col_name, when(col(label_col_name) > 0, 1).otherwise(0))\\\n", " .fillna(float(0))\n", " return casted_clean_df" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "34da72d1-8668-4e7f-a633-12992a7bd4b8", "showTitle": false, "title": "" } }, "source": [ "## Script Settings\n", "\n", "### 1. File Path Settings\n", "* Define input file path" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "a8cc0927-1102-4fcc-b3b8-1fb574bb0011", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "html" } }, "output_type": "display_data" } ], "source": [ "# You need to update them to your real paths!\n", "dataRoot = os.getenv(\"DATA_ROOT\", 'dbfs:///FileStore/tables/mortgage-fannieMae')\n", "orig_raw_path = dataRoot + '/'" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "b6e2f44b-06ee-4c12-9f38-8dd6a69d74a2", "showTitle": false, "title": "" } }, "source": [ "* Define output folder path" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "b47cb1c6-5340-43ca-a910-6826e7a5d957", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "html" } }, "output_type": "display_data" } ], "source": [ "output_path = dataRoot + '/output/data/'\n", "output_path_train = dataRoot + '/output/train/'\n", "output_path_eval = dataRoot + '/output/eval/'\n", "output_csv2parquet = dataRoot + '/output/csv2parquet/'\n", "\n", "save_train_eval_dataset = True" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "888e4e94-185b-4a9a-ae0f-fd8da307cc69", "showTitle": false, "title": "" } }, "source": [ "### 2. Common Spark Settings" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "fed9b4ba-4a33-4a7b-a460-f01c4234a795", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "html" } }, "output_type": "display_data" } ], "source": [ "spark.conf.set('spark.rapids.sql.explain', 'ALL')\n", "spark.conf.set('spark.rapids.sql.batchSizeBytes', '512M')\n", "spark.conf.set('spark.rapids.sql.reader.batchSizeBytes', '768M')" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "fbf5a463-c679-4be4-954a-059f72e29d6b", "showTitle": false, "title": "" } }, "source": [ "## Run Part" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "e33d79a2-c317-4145-94cc-e42348a310e3", "showTitle": false, "title": "" } }, "source": [ "### Read Raw File" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "67f9bf29-cc04-4fd2-8812-6118f1f81c7f", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "html" } }, "output_type": "display_data" } ], "source": [ "rawDf = read_raw_csv(spark, orig_raw_path)\n", "rawDf.write.parquet(output_csv2parquet, mode='overwrite')\n", "rawDf = spark.read.parquet(output_csv2parquet)\n", "\n", "acq = extract_acq_columns(rawDf)\n", "perf = extract_perf_columns(rawDf)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "d859c794-9a4c-40e6-9b2a-797b662131db", "showTitle": false, "title": "" } }, "source": [ "### Run ETL\n", "#### 1. Add additional Spark settings" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "a47438ad-e655-4bd2-8b8e-439d3abb1f0f", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "html" } }, "output_type": "display_data" } ], "source": [ "# GPU run, set to true\n", "spark.conf.set('spark.rapids.sql.enabled', 'true')\n", "# CPU run, set to false\n", "# spark.conf.set('spark.rapids.sql.enabled', 'false')\n", "spark.conf.set('spark.sql.files.maxPartitionBytes', '1G')" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "af7e9a24-6a41-42b2-a385-67384bde6f88", "showTitle": false, "title": "" } }, "source": [ "#### 2.Read Parquet File and Run ETL Process, Save the Result" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "inputWidgets": {}, "nuid": "8df58be9-2a91-4791-bfbc-cb69efc20a5b", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "" ] }, "metadata": { "application/vnd.databricks.v1+output": { "arguments": {}, "data": "", "errorSummary": "The spark context has stopped and the driver is restarting. Your notebook will be automatically reattached.", "errorTraceType": "html", "metadata": {}, "type": "ipynbError" } }, "output_type": "display_data" } ], "source": [ "start = time.time()\n", "\n", "# run main function to process data\n", "out = run_mortgage(spark, perf, acq)\n", "\n", "# save processed data\n", "out.write.parquet(output_path, mode='overwrite')\n", "\n", "# save processed data\n", "if save_train_eval_dataset:\n", " etlDf = spark.read.parquet(output_path)\n", "\n", " # split 80% for training, 20% for test\n", " splits = etlDf.randomSplit([0.8, 0.2])\n", "\n", " splits[0].write.format('parquet').save(output_path_train, mode=\"overwrite\")\n", " splits[1].write.format('parquet').save(output_path_eval, mode=\"overwrite\")\n", "\n", "# print explain and time\n", "print(out.explain())\n", "end = time.time()\n", "print(end - start)\n", "spark.stop()" ] } ], "metadata": { "application/vnd.databricks.v1+notebook": { "dashboards": [], "language": "python", "notebookMetadata": { "pythonIndentUnit": 2 }, "notebookName": "Mortgage-ETL", "notebookOrigID": 3566800477263210, "widgets": {} }, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.2" }, "name": "gpu-mortgage", "notebookId": 4440374682851873 }, "nbformat": 4, "nbformat_minor": 1 }