准备工作
from pyspark.sql import SparkSession
import time
spark = SparkSession.builder.appName('Postgres').master('spark://localhost:7077').config("spark.driver.extraClassPath", "postgresql-42.6.0.jar").getOrCreate()
jdbc_url = 'jdbc:postgresql://127.0.0.1:5432/database'
properties = {
"user": "username",
"password": "password",
"driver": "org.postgresql.Driver"
}
foo = spark.read.jdbc(url=jdbc_url, table="public.foo", properties=properties)
foo.createOrReplaceTempView("foo")
SparkSQL 的操作
query = """
select bar, sum(sfid) qty from foo group by bar;
"""
result = spark.sql(query)
result.show()
DataFrame 的操作
from pyspark.sql.functions import avg, sum
avg_df = foo.groupBy("bar").agg(sum("sfid").alias("qty"))
avg_df.show()
输出
+----+---+
| bar|qty|
+----+---+
|blah| 1|
| yo| 2|
+----+---+
建议
DataFrame 和 SQL 都提供更为高级的 API 以及更好的优化去操作数据,他们都是声明式语言,告诉 Spark 我要什么,而不是具体的怎么做,适合并行操作。而之前的 RDD,则需要使用map
等类似的聚合方法去操作数据,是告诉 Spark 具体怎么做,不一定适合并行操作。