SDS-2.2, Scalable Data Science

Article Topics in Retweet Network

Student Project

by Li Caldeira Balkeståhl and Mariama Jaiteh

Scalable web scrapper

Based on Mastering Spark for Data Science book (Chap. 6)

This cleaned up notebook contains code to perform web scraping, which should then be called from another notebook

Need to install the goose web scraper library, available with maven coordinate:

com.syncthemall:goose:2.1.25

//import the web scraper
import com.gravity.goose._
import com.gravity.goose._

The functions for expanding the urls

import org.apache.commons.lang3.StringUtils 
import java.net.HttpURLConnection;
import java.net.URL;
//function to expand once the urls
def expandUrl(url: String) : String = {
  var connection: HttpURLConnection = null
  try {
      connection = new URL(url)
                  .openConnection
                  .asInstanceOf[HttpURLConnection]
      connection.setInstanceFollowRedirects(false)
      connection.setUseCaches(false)
      connection.setRequestMethod("GET")
      connection.connect()
      val redirectedUrl = connection.getHeaderField("Location")
      if(StringUtils.isNotEmpty(redirectedUrl)){
        redirectedUrl
      } else {
        url
      }
  } catch {
      case e: Throwable => url
  } finally {
      if(connection != null)
        connection.disconnect()
   }
}

//function to keep expanding until you get the same
def expandUrluntilsame(url:String) : String ={
  var expanded=expandUrl(url)
  var unexpanded=url
  while(! StringUtils.equals(expanded,unexpanded)){
    unexpanded=expanded
    expanded=expandUrl(expanded)
  }
  expanded
}
import org.apache.commons.lang3.StringUtils
import java.net.HttpURLConnection
import java.net.URL
expandUrl: (url: String)String
expandUrluntilsame: (url: String)String

To get the articles from the web, use a dataframe of urls and return a dataframe with body, domain, title

  • First we make a case class Articles containing body, domain, title and meta description (from the HTML)
  • Use a function getArticles takes a string as input and return an Articles (filled with "null" string if any exception occured)
  • Then make UserDefinedFunction that takes a column name and returns Articles from stuff in that column (to be able to use directly on the DF)
//the class for the web article
case class Articles(
  title: String, 
  body: String, 
  domain: String,  //problem: If we add more stuff to the Articles class,  getArticles  needs to be changed
  description: String,
  keywords: String,
  status: String
)

//the scraper
def getArticles(url_article: String) : Articles = {
  try{
    //using configuration to make goose skip retrieving images
    val conf: Configuration = new Configuration
    conf.setEnableImageFetching(false)
    val goosy = new Goose(conf)
    val article = goosy.extractContent(url_article) 
    val d = article.domain
    val t = article.title
    val b = article.cleanedArticleText
    val desc=article.metaDescription
    val keyw=article.metaKeywords
    Articles(t, b, d,desc,keyw, "found")
  }
  catch {
    case _: Throwable  => Articles("null","null","null","","","not found")
//    case uhe: java.net.UnknownHostException => Seq(Articles("null","null","null","not found")).toDF()

  }
}

//user defined function to be used with dataframes
val ArticleUserDefined = udf((s:String) => getArticles(expandUrluntilsame(s))) //this actually creates a new instance of goose for each article,this is slow and not scalable as they say in the book (i think)
defined class Articles
getArticles: (url_article: String)Articles
ArticleUserDefined: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StructType(StructField(title,StringType,true), StructField(body,StringType,true), StructField(domain,StringType,true), StructField(description,StringType,true), StructField(status,StringType,true)),Some(List(StringType)))

Flattening the dataframe that does not need information on the number of new columns we want (copied with modification from notebook 007 of the course):

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

implicit class DataFrameFlattener(df: DataFrame) {
  def flattenSchema: DataFrame = {
    df.select(flatten(Nil, df.schema): _*)
  }

  protected def flatten(path: Seq[String], schema: DataType): Seq[Column] = schema match {
    case s: StructType => s.fields.flatMap(f => flatten(path :+ f.name, f.dataType)) 
    //case other => col(path.map(n => s"`$n`").mkString(".")).as(path.mkString(".")) :: Nil //original
    case other => col(path.map(n => s"`$n`").mkString(".")).as(path.last) :: Nil //i just want the lowest nested name (is last too computationally expensive?)
  }
}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
defined class DataFrameFlattener

Finally a function to do all the above:

//just one function to be called on the URL dataframe
def getContent(urlDF:DataFrame): DataFrame = {
  //needs a column called URL with the URLs
    urlDF.withColumn("article",ArticleUserDefined($"URL")).flattenSchema  
}
getContent: (urlDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame