logo

PySpark-SQL

Apache Spark is de meest succesvolle software van Apache Software Foundation en ontworpen voor snel computergebruik. Verschillende industrieën gebruiken Apache Spark om hun oplossingen te vinden. PySpark SQL is een module in Spark die relationele verwerking integreert met de functionele programmeer-API van Spark. We kunnen de gegevens extraheren met behulp van een SQL-querytaal. We kunnen de query's op dezelfde manier gebruiken als de SQL-taal.

Als u basiskennis heeft van RDBMS, is PySpark SQL eenvoudig te gebruiken, waarmee u de beperkingen van traditionele relationele gegevensverwerking kunt uitbreiden. Spark ondersteunt ook de Hive Query Language, maar er zijn beperkingen aan de Hive-database. Spark SQL is ontwikkeld om de nadelen van de Hive-database weg te nemen. Laten we eens kijken naar de volgende nadelen van Hive:

Nadelen van Hive

  • De verwerking kan niet worden hervat, wat betekent dat als de uitvoering midden in een workflow mislukt, u niet kunt hervatten vanaf het punt waarop deze vastliep.
  • We kunnen de gecodeerde databases niet in cascade verwijderen als de prullenbak is ingeschakeld. Het leidt tot de uitvoeringsfout. Om een ​​dergelijk type database te verwijderen, moeten gebruikers de optie Opschonen gebruiken.
  • De ad-hocquery's worden uitgevoerd met behulp van MapReduce, dat wordt gelanceerd door de Hive, maar wanneer we de middelgrote database analyseren, vertraagt ​​dit de prestaties.
  • Hive ondersteunt de update- of verwijderbewerking niet.
  • Het is beperkt tot de ondersteuning van subquery's.

Deze nadelen zijn de redenen om de Apache SQL te ontwikkelen.

PySpark SQL korte introductie

PySpark ondersteunt geïntegreerde relationele verwerking met de functionele programmering van Spark. Het biedt ondersteuning voor de verschillende gegevensbronnen en maakt het mogelijk om SQL-query's te verweven met codetransformaties, waardoor een zeer krachtig hulpmiddel ontstaat.

PySpark SQL brengt de verbinding tot stand tussen de RDD en de relationele tabel. Het biedt een veel nauwere integratie tussen relationele en procedurele verwerking via declaratieve Dataframe API, die is geïntegreerd met Spark-code.

tekenreeks bevat Java

Met behulp van SQL kan het gemakkelijk toegankelijk zijn voor meer gebruikers en de optimalisatie voor de huidige verbeteren. Het ondersteunt ook het brede scala aan databronnen en algoritmen in Big-data.

Functie van PySpark SQL

De kenmerken van PySpark SQL worden hieronder gegeven:

1) Consistentiegegevenstoegang

Het biedt consistente gegevenstoegang, wat betekent dat SQL een gedeelde manier ondersteunt om toegang te krijgen tot verschillende gegevensbronnen, zoals Hive, Avro, Parquet, JSON en JDBC. Het speelt een belangrijke rol bij het onderbrengen van alle bestaande gebruikers in Spark SQL.

2) Integratie met Spark

PySpark SQL-query's zijn geïntegreerd met Spark-programma's. We kunnen de query's binnen de Spark-programma's gebruiken.

Een van de grootste voordelen is dat ontwikkelaars statusfouten niet handmatig hoeven te beheren of de applicatie gesynchroniseerd moeten houden met batchtaken.

3) Standaardconnectiviteit

Het biedt een verbinding via JDBC of ODBC, en deze twee zijn de industriestandaarden voor connectiviteit voor business intelligence-tools.

4) Door de gebruiker gedefinieerde functies

PySpark SQL heeft een taalgecombineerde door de gebruiker gedefinieerde functie (UDF's). UDF wordt gebruikt om een ​​nieuwe kolomgebaseerde functie te definiëren die de woordenschat van Spark SQL's DSL uitbreidt voor het transformeren van DataFrame.

'abc' is in cijfers'

5) Compatibiliteit met Hives

PySpark SQL voert ongewijzigde Hive-query's uit op huidige gegevens. Het maakt volledige compatibiliteit met huidige Hive-gegevens mogelijk.

PySpark SQL-module

Enkele belangrijke klassen van Spark SQL en DataFrames zijn de volgende:

    pyspark.sql.SparkSession:Het vertegenwoordigt het belangrijkste toegangspunt voor Gegevensframe en SQL-functionaliteit.pyspark.sql.DataFrame:Het vertegenwoordigt een gedistribueerde verzameling gegevens, gegroepeerd in benoemde kolommen.pyspark.sql.Kolom:Het vertegenwoordigt een kolomexpressie in a Gegevensframe. pyspark.sql.Row:Het vertegenwoordigt een rij gegevens in a Gegevensframe. pyspark.sql.GroupedData:Aggregatiemethoden, geretourneerd door DataFrame.groupBy(). pyspark.sql.DataFrameNaFuncties:Het vertegenwoordigt methoden voor het verwerken van ontbrekende gegevens (null-waarden).pyspark.sql.DataFrameStatFunctions:Het vertegenwoordigt methoden voor statistische functionaliteit.pysark.sql.functions:Het vertegenwoordigt een lijst met ingebouwde functies die beschikbaar zijn Gegevensframe. pyspark.sql.types:Het vertegenwoordigt een lijst met beschikbare gegevenstypen.pyspark.sql.Window:Het wordt gebruikt om met vensterfuncties te werken.

Bekijk het volgende voorbeeld van PySpark SQL.

 import findspark findspark.init() import pyspark # only run after findspark.init() from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = spark.sql('''select 'spark' as hello ''') df.show() 

Uitgang:

 +-----+ |hello| +-----+ |spark| +-----+ 

Code-uitleg:

In de bovenstaande code hebben we de vindvonk module en gebeld findspark.init() bouwer; Vervolgens hebben we de SparkSession-module geïmporteerd om een ​​Spark-sessie te maken.

bestand geopend in Java

vanuit pyspark.sql importeer SparkSession

Een Spark-sessie kan worden gebruikt om de Dataset- en DataFrame-API te maken. Een SparkSession kan ook worden gebruikt om DataFrame te maken, DataFrame als tabel te registreren, SQL via tabellen uit te voeren, tabel in de cache op te slaan en het parketbestand te lezen.

klasse bouwer

Het is een bouwer van Spark Session.

getOrCreate()

Het wordt gebruikt om een ​​bestaande te verkrijgen SparkSessie, of als er geen bestaande is, maak dan een nieuwe aan op basis van de opties die in de builder zijn ingesteld.

Weinig andere methoden

Er zijn enkele methoden van PySpark SQL:

1. appNaam(naam)

Het wordt gebruikt om de naam van de applicatie in te stellen, die wordt weergegeven in de Spark-webgebruikersinterface. De parameter naam accepteert de naam van de parameter.

2. config(key=Geen, waarde = Geen, conf = Geen)

Het wordt gebruikt om een ​​configuratieoptie in te stellen. Opties die met deze methode zijn ingesteld, worden automatisch naar beide doorgegeven SparkConf En SparkSessie 's configuratie.

 from pyspark.conf import SparkConfSparkSession.builder.config(conf=SparkConf()) 

Parameters:

java tekenreeks
    sleutel-Een sleutelnaamreeks van een configuratie-eigenschap.waarde-Het vertegenwoordigt de waarde van een configuratie-eigenschap.conf -Een exemplaar van SparkConf.

3. meester(meester)

Het stelt de Spark Master-URL in waarmee verbinding moet worden gemaakt, zoals 'local' om lokaal te draaien, 'local[4]' om lokaal te draaien met 4 kernen.

Parameters:

    meester:een URL voor Spark Master.

4. SparkSession.catalog

Het is een interface waarmee de gebruiker de onderliggende database, tabellen, functies, enz. kan maken, verwijderen, wijzigen of opvragen.

5. SparkSession.conf

dubbel in Java

Het is een runtime-configuratie-interface voor Spark. Dit is de interface waarmee de gebruiker alle Spark- en Hadoop-configuraties kan ophalen en instellen die relevant zijn voor Spark SQL.

klasse pyspark.sql.DataFrame

Het is een gedistribueerde verzameling gegevens gegroepeerd in benoemde kolommen. Een DataFrame is vergelijkbaar met de relationele tabel in Spark SQL en kan worden gemaakt met behulp van verschillende functies in SQLContext.

 student = sqlContext.read.csv('...') 

Nadat het dataframe is gemaakt, kunnen we het manipuleren met behulp van de verschillende domeinspecifieke talen (DSL), die vooraf gedefinieerde functies van DataFrame zijn. Beschouw het volgende voorbeeld.

 # To create DataFrame using SQLContext student = sqlContext.read.parquet('...') department = sqlContext.read.parquet('...') student.filter(marks > 55).join(department, student.student_Id == department.id)  .groupBy(student.name, 'gender').({'name': 'student_Id', 'mark': 'department'}) 

Laten we het volgende voorbeeld bekijken:

Query's uitvoeren met Spark SQL

In de volgende code maken we eerst een DataFrame en voeren we de SQL-query's uit om de gegevens op te halen. Beschouw de volgende code:

 from pyspark.sql import * #Create DataFrame songdf = spark.read.csv(r'C:UsersDEVANSH SHARMA	op50.csv', inferSchema = True, header = True) #Perform SQL queries songdf.select('Genre').show() songdf.filter(songdf['Genre']=='pop').show() 

Uitgang:

 +----------------+ | Genre| +----------------+ | canadian pop| | reggaeton flow| | dance pop| | pop| | dfw rap| | pop| | trap music| | pop| | country rap| | electropop| | reggaeton| | dance pop| | pop| | panamanian pop| |canadian hip hop| | dance pop| | latin| | dfw rap| |canadian hip hop| | escape room| +----------------+ only showing top 20 rows +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name| Artist.Name|Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 6|I Don't Care (wit...| Ed Sheeran| pop| 102| 68| 80| -5| 9| 84| 220| 9| 4| 84| | 8| How Do You Sleep?| Sam Smith| pop| 111| 68| 48| -5| 8| 35| 202| 15| 9| 90| | 13| Someone You Loved|Lewis Capaldi| pop| 110| 41| 50| -6| 11| 45| 182| 75| 3| 88| | 38|Antisocial (with ...| Ed Sheeran| pop| 152| 82| 72| -5| 36| 91| 162| 13| 5| 87| | 44| Talk| Khalid| pop| 136| 40| 90| -9| 6| 35| 198| 5| 13| 84| | 50|Cross Me (feat. C...| Ed Sheeran| pop| 95| 79| 75| -6| 7| 61| 206| 21| 12| 82| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ 

De groupBy()-functie gebruiken

De functie groupBy() verzamelt vergelijkbare categoriegegevens.

 songdf.groupBy('Genre').count().show() 

Uitgang:

 +----------------+-----+ | Genre|count| +----------------+-----+ | boy band| 1| | electropop| 2| | pop| 7| | brostep| 2| | big room| 1| | pop house| 1| | australian pop| 1| | edm| 3| | r&b en espanol| 1| | dance pop| 8| | reggaeton| 2| | canadian pop| 2| | trap music| 1| | escape room| 1| | reggaeton flow| 2| | panamanian pop| 2| | atl hip hop| 1| | country rap| 2| |canadian hip hop| 3| | dfw rap| 2| +----------------+-----+ 

distributie(numpartities, *cols)

De verdeling() retourneert een nieuw DataFrame dat een partitie-expressie is. Deze functie accepteert twee parameters aantalpartities En *kol. De aantalpartities parameter specificeert het doelaantal kolommen.

 song_spotify.repartition(10).rdd.getNumPartitions() data = song_spotify.union(song_spotify).repartition('Energy') data.show(5) 

Uitgang:

 +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name|Artist.Name| Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| | 17| LA CANCI?N| J Balvin| latin| 176| 65| 75| -6| 11| 43| 243| 15| 32| 90| | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ only showing top 5 rows