tgoop.com/zasql_python/393
Last Update:
Раньше в
Сейчас для моих задач Spark - это необходимость, чтобы не падал JupyterHub по оперативной памяти: все вычисления выполняются распределённо на кластере с большим объёмом ресурсов. Но это не волшебная таблетка, т.к. важно следить за тем, как используются ресурсы, грамотно настраивать Spark-приложения и оптимизировать запросы. На самом деле подход к работе с ресурсами здесь другой, и есть ряд ограничений, о которых расскажу в следующих постах
1. Собираю данные из разных источников
В реальных задачах часто нужно объединять сразу несколько источников: выгрузки из разных баз, parquet и тд. Пока всё влезает в pandas - норм, но когда данных слишком много, pandas начинает падать. Spark позволяет легко подтянуть все необходимые источники и собрать их в одну большую таблицу, не заботясь об ограничениях памяти.
2. Выполняю тяжёлые вычисления и агрегации
После того как все данные собраны, начинаются подсчеты метрик по большим объёмам данных. Здесь Spark выигрывает за счёт распределённых вычислений: вся тяжёлая работа идёт на кластере, а не на ноутбуке. Как только нужные агрегаты посчитаны, можно забрать результат и уже дальше анализировать, строить графики и т.д.
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, count
# запускаем Spark-сессию, тут еще можно закопаться в настройки приложения (если будет много 🐳, выложу)
spark = SparkSession.builder.appName("zasql_python").getOrCreate() # название приложения может быть произвольным
# читаем csv и кучу источников
df_csv = spark.read.csv("file.csv", header=True, inferSchema=True)
df_parquet = spark.read.parquet("file.parquet")
df_json = spark.read.json("file.json")
# джойним таблицы между собой
df_joined = df_csv.join(df_parquet, on="user_id", how="inner")
# фильтруем данные
df_filtered = df_joined.filter(df_joined["is_active"] == 1)
# применяем агрегирующие функции, считаем сумму строчек, среднее значение по заказам
df_grouped = (
df_filtered
.groupBy("country")
.agg(
count("*").alias("users_count"),
avg("order_sum").alias("avg_order_sum")
)
)
df_pandas = df_grouped.toPandas()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("zasql_python_sql").getOrCreate() # произвольное название приложения, должно быть другим, если запускаем параллельно
df_orders = spark.read.parquet("orders.parquet") # читаем в Spark DataFrame первый источник источник
df_users = spark.read.csv("users.csv", header=True, inferSchema=True) # читаем в Spark DataFrame второй источник
df_orders.createOrReplaceTempView("orders") # создаем темповые таблицы заказов
df_users.createOrReplaceTempView("users") # создаем темповые таблицы юзеров
# теперь читаем тут в sql-формате
query = """
SELECT
u.country,
COUNT(DISTINCT o.user_id) AS active_users,
AVG(o.order_sum) AS avg_order_sum
FROM orders o
JOIN users u ON o.user_id = u.user_id
WHERE o.is_active = 1
GROUP BY u.country
ORDER BY avg_order_sum DESC
"""
result = spark.sql(query) # читаем в spark.sql, результат тот же получаем, но в SQL-формате
result.show() # показать значения, но можно перевести и в pandas, но ресурсов много сожрет
Spark спасает, когда надо соединить и обработать десятки миллионов строк из разных источников, и обычный pandas падает по памяти, ядро умирает.
Ставьте