In [1]:
from ftplib import FTP, all_errors

# Set URL for MusicBrainz FTP server
ftp_url = "ftp.musicbrainz.org"

# Initialize FTP connection
ftp_connection = FTP(ftp_url)
ftp_connection.login()

# Test connection
ftp_connection.retrlines('LIST')

In [2]:
def download_file(ftp_connection: FTP, src: str, dest: str):
  """ Download file `src` from the FTP server to `dest` 
  
      Args:
        src: Path on FTP server to download file.
        dest: Path to save file locally.
  """
  
  with open(dest, 'wb') as f:
    try:
        ftp_connection.retrbinary('RETR {}'.format(src), f.write)
        print("Download successful")
        print(dbutils.fs.ls(f"file:{dest}"))
    except all_errors as e:
        print("Could not download file: {}".format(str(e)))

In [3]:
import os

dump_path = '/pub/musicbrainz/listenbrainz/incremental/listenbrainz-dump-160-20200611-000002-incremental'
dump_name = 'listenbrainz-listens-dump-160-20200611-000002-spark-incremental.tar.xz'
download_file(ftp_connection, src=os.path.join(dump_path, dump_name), dest=os.path.join('/tmp', dump_name))

In [4]:
import os
import shutil
import tarfile

def extract_tar(archive: str, dest_path: str, delete_archive: bool=False, force: bool=False):
  """ Decompresses tar archive into destination path.
  
      Args:
        archive: Path of the archive to extract.
        dest_path: Location into which to extract the files.
        extraction_mode: Mode used for reading the the tar file.
        delete_archive: Delete the archive after extraction if true.
        force: Delete everything in the 'dest_path' if present.
  """
  
  # Delete everything in 'dest_path' if it exists
  if force and os.path.exists(dest_path):
    shutil.rmtree(dest_path)
  
  # Create directory
  os.makedirs(dest_path, exist_ok=True)
  
  with tarfile.open(archive) as tar:
    try:
      tar.extractall(dest_path)
    except tarfile.TarError as e:
      print("Error while extracting tarfile: {}".format(str(e)))
  
  if delete_archive:
    os.remove(archive)

In [5]:
# Extract listens into '/dbfs/data/'
listens_directory = '/dbfs/data/'
extract_tar('/tmp/listenbrainz-listens-dump-160-20200611-000002-spark-incremental.tar.xz', listens_directory, force=True)

display(dbutils.fs.ls('/data'))

path,name,size
dbfs:/data/listenbrainz-listens-dump-160-20200611-000002-spark-incremental/,listenbrainz-listens-dump-160-20200611-000002-spark-incremental/,0


In [6]:
# Move the listens to outer directory
dbutils.fs.mv('/data/listenbrainz-listens-dump-160-20200611-000002-spark-incremental/listens', '/data/listens', recurse=True)
dbutils.fs.rm('/data/listenbrainz-listens-dump-160-20200611-000002-spark-incremental/listens', recurse=True)

display(dbutils.fs.ls('/data/listens'))

path,name,size
dbfs:/data/listens/2005/,2005/,0
dbfs:/data/listens/2006/,2006/,0
dbfs:/data/listens/2007/,2007/,0
dbfs:/data/listens/2008/,2008/,0
dbfs:/data/listens/2009/,2009/,0
dbfs:/data/listens/2010/,2010/,0
dbfs:/data/listens/2011/,2011/,0
dbfs:/data/listens/2012/,2012/,0
dbfs:/data/listens/2013/,2013/,0
dbfs:/data/listens/2014/,2014/,0


In [7]:
import os

def get_listens(year: int, month: int):
  """ Loads json file for listens of given year and month and returns a dataframe
  
      Args:
        year: The year for which the listens to load
        month: The month in the year for which listens should be loaded
  """
  file_path = os.path.join('/data/listens', str(year), f"{month}.json")
  
  try:
    df = spark.read.json(file_path)
    return df
  except Exception as e:
    print(str(e))

In [8]:
listens_df = None
for month in range(1, 13):
  df = get_listens(2020, month)
  if df:
    listens_df = listens_df.union(df) if listens_df else df

display(listens_df)

artist_mbids,artist_msid,artist_name,inserted_timestamp,listened_at,recording_mbid,recording_msid,release_mbid,release_msid,release_name,tags,track_name,user_name
List(),3e5ddba9-62e1-4ef0-b03b-4c92dbb295a1,Wyatt E.,2020-06-10 15:11:12,2020-01-17T13:17:26Z,0600f038-ce34-47df-8554-0b3ff5c1102b,6b4c23f4-3393-4db9-a4be-574c5a5b001f,,c9f8c6e9-32ef-41a5-b4fb-71a1f2ac01e3,Mount Sinai/Aswan,List(),Aswan,bouwwerf
List(),00a9c3bf-8b3a-4daa-93d9-e2bf798175f2,Warhorse,2020-06-10 15:11:08,2020-01-15T21:11:07Z,3b9b8a10-489c-4a4f-a479-518f9167f87e,80278db5-7a1a-422c-bb42-7312bbb34e30,,5458b5d8-c8e9-4ec8-9695-281abf5ee73e,As Heaven Turns to Ash,List(),Every Flower Dies No Matter The Thorns (Wither),bouwwerf
List(),00a9c3bf-8b3a-4daa-93d9-e2bf798175f2,Warhorse,2020-06-10 15:11:08,2020-01-15T21:03:09Z,3b9b8a10-489c-4a4f-a479-518f9167f87e,80278db5-7a1a-422c-bb42-7312bbb34e30,,5458b5d8-c8e9-4ec8-9695-281abf5ee73e,As Heaven Turns to Ash,List(),Every Flower Dies No Matter The Thorns (Wither),bouwwerf
List(),51d86ee6-ff9c-4017-b8d6-99b60335066e,Bismuth,2020-06-10 15:11:05,2020-01-11T05:28:15Z,cc32116f-d56c-4aaf-b74a-022b828d5b3c,048c0def-6c91-4d25-a258-8d93db9d2149,,e5254c40-871a-48aa-bdf2-0d97cc7ce313,Unavailing,List(),Tethys,bouwwerf
List(),3e5ddba9-62e1-4ef0-b03b-4c92dbb295a1,Wyatt E.,2020-06-10 15:11:04,2020-01-11T03:22:20Z,f3a1d4ef-38e4-4d65-83d4-cb3ab65274f0,2944ff26-dba1-42d5-a533-202ec3632386,,c9f8c6e9-32ef-41a5-b4fb-71a1f2ac01e3,Mount Sinai/Aswan,List(),Mount Sinai,bouwwerf
List(),51d86ee6-ff9c-4017-b8d6-99b60335066e,Bismuth,2020-06-10 15:11:04,2020-01-11T03:05:51Z,4d98eb68-8b95-4d19-ad1c-f7942b2309c8,f4502019-05c7-4f60-8431-4a531565ee09,,e5254c40-871a-48aa-bdf2-0d97cc7ce313,Unavailing,List(),Of the Weak Willed,bouwwerf
List(),00a9c3bf-8b3a-4daa-93d9-e2bf798175f2,Warhorse,2020-06-10 15:11:03,2020-01-09T12:23:30Z,3b985c9b-11c6-4eb5-a3fb-7ec4fde26aa9,91843d02-ce77-44a2-b203-57104ee3e9c1,,5458b5d8-c8e9-4ec8-9695-281abf5ee73e,As Heaven Turns to Ash,List(),I Am Dying,bouwwerf
List(),d6962ee4-68a2-4159-8c89-793fad7b901c,Inter Arma,2020-06-10 15:11:03,2020-01-09T11:18:14Z,5cd16733-7418-4971-a362-9cb1f5c87cf6,1804ee5a-6aae-413c-9c58-a49b3feb817c,,1e926fb1-902a-48eb-8e15-90fc482e26e8,Paradise Gallows,List(),An Archer in the Emptiness,bouwwerf
List(),00a9c3bf-8b3a-4daa-93d9-e2bf798175f2,Warhorse,2020-06-10 15:11:02,2020-01-08T13:17:16Z,1be4cdd7-7563-4eda-88c5-19ca0e45462f,95250ff8-4657-44d6-9126-64c3cea25967,,5458b5d8-c8e9-4ec8-9695-281abf5ee73e,As Heaven Turns to Ash,List(),Dawn,bouwwerf
List(),3e5ddba9-62e1-4ef0-b03b-4c92dbb295a1,Wyatt E.,2020-06-10 15:11:03,2020-01-08T08:51:48Z,0600f038-ce34-47df-8554-0b3ff5c1102b,6b4c23f4-3393-4db9-a4be-574c5a5b001f,,c9f8c6e9-32ef-41a5-b4fb-71a1f2ac01e3,Mount Sinai/Aswan,List(),Aswan,bouwwerf


In [9]:
table_name = 'listens' # Name of temporary table

listens_df.createOrReplaceTempView(table_name)
formatted_listens = spark.sql("""
                SELECT user_name
                     , date_format(listened_at, 'EEEE') as day
                     , date_format(listened_at, 'H') as hour
                  FROM listens
              """.format(table=table_name))
formatted_listens.createOrReplaceTempView('listens')
display(formatted_listens)

user_name,day,hour
bouwwerf,Friday,13
bouwwerf,Wednesday,21
bouwwerf,Wednesday,21
bouwwerf,Saturday,5
bouwwerf,Saturday,3
bouwwerf,Saturday,3
bouwwerf,Thursday,12
bouwwerf,Thursday,11
bouwwerf,Wednesday,13
bouwwerf,Wednesday,8


In [10]:
import itertools

weekdays = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
hours = [hour for hour in range(0, 24)]
time_range = itertools.product(weekdays, hours)

time_range_df = spark.createDataFrame(time_range, schema=["day", "hour"])
time_range_df.createOrReplaceTempView('time_range')
display(time_range_df)

day,hour
Monday,0
Monday,1
Monday,2
Monday,3
Monday,4
Monday,5
Monday,6
Monday,7
Monday,8
Monday,9


In [11]:
ohkresult_without_zero_hours = spark.sql("""
            SELECT listens.user_name
                 , time_range.day
                 , time_range.hour
                 , count(*) as listen_count
              FROM listens
              JOIN time_range
                ON listens.day == time_range.day
               AND listens.hour == time_range.hour
          GROUP BY listens.user_name
                 , time_range.day
                 , time_range.hour
          """)

result_without_zero_hours.createOrReplaceTempView('result_without_zero_hours')
display(result_without_zero_hours.filter("user_name = 'ishaanshah'"))

In [12]:
result_without_zero_hours.explain()

In [13]:
result = spark.sql("""
                SELECT dist_user_name.user_name
                     , time_range.day
                     , time_range.hour
                     , ifnull(result_without_zero_hours.listen_count, 0) as listen_count
                  FROM (SELECT DISTINCT user_name FROM listens) dist_user_name
            CROSS JOIN time_range
             LEFT JOIN result_without_zero_hours
                    ON result_without_zero_hours.user_name == dist_user_name.user_name
                   AND result_without_zero_hours.day == time_range.day
                   AND result_without_zero_hours.hour == time_range.hour
              """)

display(result.filter("user_name = 'ishaanshah'"))

In [14]:
result.explain()

In [15]:
from pyspark.sql.functions import struct, collect_list

result_grouped_by_users = result \
                        .withColumn("daily_activity", struct("day", "hour", "listen_count")) \
                        .groupBy("user_name") \
                        .agg(collect_list("daily_activity").alias("daily_activity")) \
                        
display(result_grouped_by_users)