Lakehouse Podcast
Наконец посмотрела подкаст про лейкхаус с моим прошлым техлидом. Он сейчас CPO в Arenadata, крутой чел, и мне всегда интересно, как у него дела
Сначала я подумала, что это просто рандомный подкаст, в детали не вчитывалась, поэтому вспомнила про него только спустя два месяца😅 А там как раз есть некоторые ответы на вопросы из этого поста. Ниже некоторые моменты, которые мне особенно запомнились
🙂 "На какие бизнесовые метрики влияет лейкхаус?"
У меня сразу возникла ассоциация, как Бах выражался про игру на инструменте))
🙂 Пара интересных метрик
Time to Insight - время до получения инсайта из данных
Time to Data - время до того, как можно воспользоваться данными
🤩 Еще я всегда слышала про Cloud-Native - сервисы, которые изначально пилились под облака
🤩 Но не слышала про Cloud-Ready - изначально не под облако, но легко мигрировать
🤩 А еще гуглинг привел к Cloud-Compatible - не под облако, но работает и с ним
🤩 И к Cloud-Enabled - не под облако, но его как-то адаптировали
🤩 И к Cloud-Agnostic - без использования специфичных сервисов провайдера в отличие от native
На самом деле этих cloud- штук очень много, между ними может быть тончайшая разница или одно быть частью другого
🙂 Финальная мысль
Наконец посмотрела подкаст про лейкхаус с моим прошлым техлидом. Он сейчас CPO в Arenadata, крутой чел, и мне всегда интересно, как у него дела
Сначала я подумала, что это просто рандомный подкаст, в детали не вчитывалась, поэтому вспомнила про него только спустя два месяца
Основная бизнес-метрика - точность попадания в качество и во время
У меня сразу возникла ассоциация, как Бах выражался про игру на инструменте))
Играть на музыкальном инструменте просто. Нужно вовремя нажимать нужные клавиши
Time to Insight - время до получения инсайта из данных
Time to Data - время до того, как можно воспользоваться данными
На самом деле этих cloud- штук очень много, между ними может быть тончайшая разница или одно быть частью другого
Возможно ли, что лейкхаус станет устаревшим раньше, чем большинство компаний успеют его внедрить?
Please open Telegram to view this post
VIEW IN TELEGRAM
YouTube
Lakehouse: почему дом данных всё время на реконструкции
Встречайте 10-й юбилейный выпуск подкаста Pro Данные! Вместе с экспертами из Arenadata и DIS Group обсуждаем Lakehouse. Почему компании сейчас активно присматриваются к этой архитектуре и всем ли стоит туда идти?
В гостях:
Алексей Быков, product owner,…
В гостях:
Алексей Быков, product owner,…
1👍11🔥6❤3 1
Full + Incremental Load
Начала читать книгу "Data Engineering Design Patterns" (2025), 375 страниц. Несколько раз видела хорошие отзывы, по содержанию очень прикольная. Это про паттерны загрузки данных, как лучше работать с ошибками в данных, как организовать правильный перезапуск пайплайна и еще много всего
В книге дали ссылочку на гитхаб с готовым кодом❤
Эта серия постов будет неким конспектом с добавлением моих мыслей
Итак, начнем
Data Ingestion/Загрузка данных
🌷 Full Load
Опасности и решения:
1. Следить за ростом датасета. В идеале не слишком много строк, растет медленно
2. drop-insert - опасная штука, пользователи могут читать в момент записи. Использовать вьюшку:
🤩 пользователи ходят в вьюшку
🤩 вьюшка смотрит на table1
🤩 данные пишутся в table2
🤩 table2 подменяется на table1
У нас реально были такие проблемы:
Что мы сделали:
1. Добавили зависимости по событию от источников
2. shadow calc: создается копия витрины, все манипуляции происходят с копией в стейджинге, в конце делается rename
🪴 Incremental Load
1⃣ Pattern: Incremental Loader
1й способ. Иметь дату загрузки, чтобы определить инкремент. Опасно использовать дату события, потому что они могут долетать позже. Например, временно отключился интернет, события долетели с лагом, а мы уже обработали этот период. Последняя дата загрузки должна где-то сохраняться
2й способ. Делать партиции по времени. Например, даг работает каждый час и всегда берет данные за предыдущий час
Опасности и решения:
1. Для удаленных строк применять soft delete (просто маркируем удаленной) вместо hard delete, иначе они просто останутся у нас в системе
2. Использовать Insert-only/append-only - в табличку только добавляем данные
Реализации:
1. Для даты загрузки - обязательно добавлять фильтр
2. Для партиций по времени - добавить сенсор, который смотрит на появление следующей партиции. Если партиция появилась, значит, текущий период закончился и его можно обработать. Плюс обязательно передать дату в Airflow через {{ ds }}
Я была удивлена, прочитав этот механизм. Все делаем по книжке, получается😎
2⃣ Pattern: Change Data Capture
Используется, когда события нужно получать быстро (~30s). CDC - это стриминг логов журналов (WAL) баз данных
Был приведен пример с Delta Lake, но для Iceberg я тоже нашла примеры
На этом пока все, это была даже не половина всей главы🥺
#depatterns
Начала читать книгу "Data Engineering Design Patterns" (2025), 375 страниц. Несколько раз видела хорошие отзывы, по содержанию очень прикольная. Это про паттерны загрузки данных, как лучше работать с ошибками в данных, как организовать правильный перезапуск пайплайна и еще много всего
В книге дали ссылочку на гитхаб с готовым кодом
Эта серия постов будет неким конспектом с добавлением моих мыслей
Итак, начнем
Data Ingestion/Загрузка данных
Опасности и решения:
1. Следить за ростом датасета. В идеале не слишком много строк, растет медленно
2. drop-insert - опасная штука, пользователи могут читать в момент записи. Использовать вьюшку:
У нас реально были такие проблемы:
File does not exist:
hdfs://warehouse/hive/my_db.db/my_table/2
6-01_29_data.0.parq
It is possible the underlying files have been
updated. You can explicitly invalidate the
cache in Spark by running 'REFRESH TABLE
tableName' command in SQL or by recreating
the Dataset/DataFrame involved.
Что мы сделали:
1. Добавили зависимости по событию от источников
2. shadow calc: создается копия витрины, все манипуляции происходят с копией в стейджинге, в конце делается rename
1й способ. Иметь дату загрузки, чтобы определить инкремент. Опасно использовать дату события, потому что они могут долетать позже. Например, временно отключился интернет, события долетели с лагом, а мы уже обработали этот период. Последняя дата загрузки должна где-то сохраняться
2й способ. Делать партиции по времени. Например, даг работает каждый час и всегда берет данные за предыдущий час
Опасности и решения:
1. Для удаленных строк применять soft delete (просто маркируем удаленной) вместо hard delete, иначе они просто останутся у нас в системе
2. Использовать Insert-only/append-only - в табличку только добавляем данные
Реализации:
1. Для даты загрузки - обязательно добавлять фильтр
f'ingestion_time BETWEEN "{date_from}" AND "{date_to}"'2. Для партиций по времени - добавить сенсор, который смотрит на появление следующей партиции. Если партиция появилась, значит, текущий период закончился и его можно обработать. Плюс обязательно передать дату в Airflow через {{ ds }}
Я была удивлена, прочитав этот механизм. Все делаем по книжке, получается
Используется, когда события нужно получать быстро (~30s). CDC - это стриминг логов журналов (WAL) баз данных
Был приведен пример с Delta Lake, но для Iceberg я тоже нашла примеры
На этом пока все, это была даже не половина всей главы
#depatterns
Please open Telegram to view this post
VIEW IN TELEGRAM
👍29❤8🔥3
Сопутки загрузок
🍔 Data Compaction
Pattern: Compactor
Стриминг пишет очень много мелких файлов, и эта проблема актуальна не только для hdfs, но и для s3. Потому что нужно перебирать много файлов, открывать/закрывать, скапливается куча меты для табличных форматов (delta lake, iceberg, hudi)
🧊 Решение - объединить в файлы побольше. В iceberg для этого есть процедура rewrite_data_files. Если таблица тяжелая, компакшн можно встроить в сам процесс загрузки. Или поставить на внерабочие часы, чтобы никому не мешать. Кстати, со стримингами у нас даже дважды компактится: каждый час и каждый день
После компакшена в iceberg можно почистить все ненужное через expire_snapshots - задаем время, раньше которого удаляются все снепшоты вместе с данными. Или просто подождать, пока оно само по ретеншену отвалится
☕️ Data Readiness
Pattern: Readiness Marker
Иногда потребителям данных нужно знать, что данные готовы
1й способ. Создавать файл по типу _SUCCESS в спарке отдельной таской в Airflow. Тогда потребители через FileSensor смогут мониторить появление файла
2й способ. Дождаться создания следующей партиции - это значит, что текущая закрыта
Тут используется pull - потребители сами чекают, готовы ли данные
⛅️ Event Driven
Pattern: External Trigger
А тут про push - загрузчик сам уведомляет, что данные появились
🌱 🌱 🌱 🌱 🌱
Очень прикольно, что даются описание и примеры для батча и для стриминга. И все с использованием самого популярного: Spark, Flink, Kafka, Airflow, Python, Scala, SQL, иногда с добавлением чисто западных штук
💐 Кстати, мне тут очень понравились несколько выражений из книги, первые два очень поэтичные:
#depatterns
Pattern: Compactor
Стриминг пишет очень много мелких файлов, и эта проблема актуальна не только для hdfs, но и для s3. Потому что нужно перебирать много файлов, открывать/закрывать, скапливается куча меты для табличных форматов (delta lake, iceberg, hudi)
После компакшена в iceberg можно почистить все ненужное через expire_snapshots - задаем время, раньше которого удаляются все снепшоты вместе с данными. Или просто подождать, пока оно само по ретеншену отвалится
Pattern: Readiness Marker
Иногда потребителям данных нужно знать, что данные готовы
1й способ. Создавать файл по типу _SUCCESS в спарке отдельной таской в Airflow. Тогда потребители через FileSensor смогут мониторить появление файла
2й способ. Дождаться создания следующей партиции - это значит, что текущая закрыта
Тут используется pull - потребители сами чекают, готовы ли данные
Pattern: External Trigger
А тут про push - загрузчик сам уведомляет, что данные появились
Очень прикольно, что даются описание и примеры для батча и для стриминга. И все с использованием самого популярного: Spark, Flink, Kafka, Airflow, Python, Scala, SQL, иногда с добавлением чисто западных штук
Sad but true, but your data engineering life will rarely be a bed of roses.
However, life is not that rosy.
Always expecting the worst is probably not the best way to go through life, but it’s definitely one of the best approaches you can take to your data engineering projects.
#depatterns
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥13❤4👍4
Козырной вопрос
А у вас есть прикольный вопрос, который вы спрашиваете у руководителя на собесе? Команда, стек, задачи, скрам/не скрам - это все понятно. Но есть ли что-то особенное?
Мне когда-то один из коллег вкинул вопрос про мемы. Когда я не чувствую метча с лидом и понимаю, что я не хочу там быть, то я не спрашиваю. Но если мне ребята и проект кажутся прикольными, то я спрашиваю:
Ответы бывали разные:
🤩 нет такого
🤩 нет такого, но есть рабочий чат, в принципе туда можно кидать (но никто так не делает)
🤩 у нас спринты называются мемами
🤩 есть, я даже могу найти последние, но они больше про инфраструктуру у клиента
🤩 есть в корп мессенджере
🤩 есть и в корп, и в тележке
🤩 конечно! есть командный, еще есть для всего департамента
Становится понятно, какие люди там работают, обстановка сразу разбавляется, и мы заканчиваем на положительной ноте)
А у вас есть прикольный вопрос, который вы спрашиваете у руководителя на собесе? Команда, стек, задачи, скрам/не скрам - это все понятно. Но есть ли что-то особенное?
Мне когда-то один из коллег вкинул вопрос про мемы. Когда я не чувствую метча с лидом и понимаю, что я не хочу там быть, то я не спрашиваю. Но если мне ребята и проект кажутся прикольными, то я спрашиваю:
А у вас есть мемный чатик?
Ответы бывали разные:
Становится понятно, какие люди там работают, обстановка сразу разбавляется, и мы заканчиваем на положительной ноте)
Please open Telegram to view this post
VIEW IN TELEGRAM
👍17🔥9💅4❤1
Управляем ошибками. Часть 1
Unprocessable Records
К нам пришли строки, которые падают при обработке. Есть 2 пути:
1. Сразу падать. Но если в стриминге много плохих событий, то мы замучаемся постоянно переподнимать
2. Не падать, а игнорить их - но особым образом
📒 Pattern: Dead-Letter
Что делаем:
1. Определяем места в коде, где что-то может упасть
2. Оборачиваем в try/catch, if/else
3. Добавляем мету для анализа ошибки
4. Пишем зафейленные строки/файлы в другую папку
5. Добавляем алерты
6. Пишем пайплайн для перезапуска из зафейленной папки (опционально)
У нас есть подобная штука - если поля критичные и точно не должны быть пустыми, то такие данные сразу складываются отдельно. Но разбираются ли причины - это загадка)🤷♀️
Опасности и решения:
1. Если просто отфильтровать некорректные записи, то другие пайплайны будут использовать неполные данные. Но если мы поправим ошибки и перепроцессим, то пользователям тоже придется все перезапускать. А там может быть 20 пайплайнов, которые ссылаются друг на друга. И вообще у них может быть не реализован пересчет за прошлое🙂
2. Чтобы отличать скорректированные записи, может понадобиться какой-нибудь флаг
3. Можно заполнять NULL при ошибке, но тогда придется сравнивать: это действительно NULL-значение в источнике, или что-то пошло не так? (мне не нравится)
4. Обязательно алертить, если количество проигноренных данных очень большое, и даже останавливать джобу. Прям с остановкой дальнейших тасок я не сталкивалась, интересный подход на полном доверии к dq)
🌿 🌿 Duplicated Records
Pattern: Windowed Deduplicator
Для батча все просто: distinct/dropDuplicates или окно с row_number = 1
Для стриминга нужно выделить временное окно и сохранять уже обработанные уникальные ключи:
В этом примере ключи будут храниться в течение 10 минут. Если для новой записи ключ уже существует, он скипнется. Если тот же самый ключ придет через 11 минут, то будут дубликаты
#depatterns
Unprocessable Records
К нам пришли строки, которые падают при обработке. Есть 2 пути:
1. Сразу падать. Но если в стриминге много плохих событий, то мы замучаемся постоянно переподнимать
2. Не падать, а игнорить их - но особым образом
Что делаем:
1. Определяем места в коде, где что-то может упасть
2. Оборачиваем в try/catch, if/else
3. Добавляем мету для анализа ошибки
4. Пишем зафейленные строки/файлы в другую папку
5. Добавляем алерты
6. Пишем пайплайн для перезапуска из зафейленной папки (опционально)
У нас есть подобная штука - если поля критичные и точно не должны быть пустыми, то такие данные сразу складываются отдельно. Но разбираются ли причины - это загадка)
Опасности и решения:
1. Если просто отфильтровать некорректные записи, то другие пайплайны будут использовать неполные данные. Но если мы поправим ошибки и перепроцессим, то пользователям тоже придется все перезапускать. А там может быть 20 пайплайнов, которые ссылаются друг на друга. И вообще у них может быть не реализован пересчет за прошлое
2. Чтобы отличать скорректированные записи, может понадобиться какой-нибудь флаг
3. Можно заполнять NULL при ошибке, но тогда придется сравнивать: это действительно NULL-значение в источнике, или что-то пошло не так? (мне не нравится)
4. Обязательно алертить, если количество проигноренных данных очень большое, и даже останавливать джобу. Прям с остановкой дальнейших тасок я не сталкивалась, интересный подход на полном доверии к dq)
Pattern: Windowed Deduplicator
Для батча все просто: distinct/dropDuplicates или окно с row_number = 1
Для стриминга нужно выделить временное окно и сохранять уже обработанные уникальные ключи:
.withWatermark("visit_time", "10 minutes")
.dropDuplicates(["visit_id", "visit_time"])
В этом примере ключи будут храниться в течение 10 минут. Если для новой записи ключ уже существует, он скипнется. Если тот же самый ключ придет через 11 минут, то будут дубликаты
#depatterns
Please open Telegram to view this post
VIEW IN TELEGRAM
❤12👍6🔥4
Управляем ошибками. Часть 2
📼 Late Data
Посмотрим на 3 паттерна для работы с данными, которые пришли позже, чем ожидалось
1️⃣ Pattern: Late Data Detector
Пока что для меня сложная и непонятная история про стриминг. В целом, книга легко читается, но требуется время, чтобы переварить. Все концепты сжаты, но очень насыщенны. Перечитаю, когда нужно будет с этим работать
Интересная мысль - "shifting the late data problem"
Пример: мы пишем партиции по времени обработки. В партицию 21:00 к нам залетел кусок данных за 20:00 и 19:00. А наши пользователи используют партиции по времени события. Тогда мы перекладываем ответственность ковыряться в этих партициях на них😁
2️⃣ Pattern: Static Late Data Integrator
Как вообще можно перегрузить данные за прошлое?
1. Создать кучу дагранов, где каждый перегружает 1 день. Если упало - перезапускаем конкретный день
2. Создать один дагран, где в коде генерируется список нужных дат. И по каждой дате запускается загрузка. Если упало - просто перезапускаем, пойдет считаться с упавшего дня. Это и есть Static Late Data Integrator. А статическое - потому что мы сами задаем 14 дней или сколько угодно
И тут я поняла, что неосознанно это и делала. У нас часто была проблема, что данные в источники просто не приходили😁 Потом мы шли разбираться с владельцами, и данные заливались, но позднее. Чтобы это учитывать, в моем подходе был такой алгоритм:
1. Задаем стартовую и конечную даты расчета
2. Создаем диапазон значений
3. Из меты достаем существующие партиции
4. Находим разницу
5. Итерируемся по потеряшкам
Если в будущем снова будет пустая дата, нам не придется перезапускать определенный день - он пойдет считаться сам
3️⃣ Pattern: Dynamic Late Data Integrator
Предлагается завести табличку с 4 полями:
🤩 партиция
🤩 время обработки
🤩 время добавления новых записей
🤩 флаг обработано или нет
Так мы запросом можем найти партиции, которые уже обрабатывались, но в которые попали новые данные. А в iceberg есть удобное свойство last_updated_at на уровне таблицы
🤩 Filtering
Pattern: Filter Interceptor
Как будто это антипаттерн. Предлагается создать доп колонки с фильтрами id_is_not_null, status_is_not_failed и выводить количество отфильтрованных записей, чтобы понимать, на каком этапе ошибка в коде или в данных. Но прям пробегаться по каждой записи в датафрейме… Как будто это все-таки dq
🌳 Fault Tolerance
Pattern: Checkpointer
Просто нужно создавать чекпоинты и хранить последний оффсет обработанной записи и состояние, если оно есть
Еще раз напомнили про семантики доставки:
🤩 exactly once - нужны другие паттерны, расскажу, когда дойду
🤩 at least once - чекпоинт после обработки, могут быть дубликаты при перезапуске после падения
🤩 at most once - чекпоинт до обработки, данные потеряются при падении
#depatterns
Посмотрим на 3 паттерна для работы с данными, которые пришли позже, чем ожидалось
Пока что для меня сложная и непонятная история про стриминг. В целом, книга легко читается, но требуется время, чтобы переварить. Все концепты сжаты, но очень насыщенны. Перечитаю, когда нужно будет с этим работать
Интересная мысль - "shifting the late data problem"
Пример: мы пишем партиции по времени обработки. В партицию 21:00 к нам залетел кусок данных за 20:00 и 19:00. А наши пользователи используют партиции по времени события. Тогда мы перекладываем ответственность ковыряться в этих партициях на них
Как вообще можно перегрузить данные за прошлое?
1. Создать кучу дагранов, где каждый перегружает 1 день. Если упало - перезапускаем конкретный день
2. Создать один дагран, где в коде генерируется список нужных дат. И по каждой дате запускается загрузка. Если упало - просто перезапускаем, пойдет считаться с упавшего дня. Это и есть Static Late Data Integrator. А статическое - потому что мы сами задаем 14 дней или сколько угодно
И тут я поняла, что неосознанно это и делала. У нас часто была проблема, что данные в источники просто не приходили
1. Задаем стартовую и конечную даты расчета
2. Создаем диапазон значений
full_range = pd.date_range(start=str(start_dt), end=str(end_dt)).strftime("%Y-%m-%d")
3. Из меты достаем существующие партиции
def get_existing_partitions(table_name):
partitions = (
spark.sql(f"show partitions {table_name}")
.select(F.split(F.col("partition"), "=")[1].alias("dt"))
.collect()
)
return [p[0] for p in partitions]
4. Находим разницу
lost_range = full_range.difference(existing_partitions_pdf)
5. Итерируемся по потеряшкам
for dt in lost_range:
calc_mart(dt)
Если в будущем снова будет пустая дата, нам не придется перезапускать определенный день - он пойдет считаться сам
Предлагается завести табличку с 4 полями:
Так мы запросом можем найти партиции, которые уже обрабатывались, но в которые попали новые данные. А в iceberg есть удобное свойство last_updated_at на уровне таблицы
Pattern: Filter Interceptor
Как будто это антипаттерн. Предлагается создать доп колонки с фильтрами id_is_not_null, status_is_not_failed и выводить количество отфильтрованных записей, чтобы понимать, на каком этапе ошибка в коде или в данных. Но прям пробегаться по каждой записи в датафрейме… Как будто это все-таки dq
Pattern: Checkpointer
Просто нужно создавать чекпоинты и хранить последний оффсет обработанной записи и состояние, если оно есть
Еще раз напомнили про семантики доставки:
#depatterns
Please open Telegram to view this post
VIEW IN TELEGRAM
❤10👍6🤔1
Мои заметки
У меня скопилось много заметок с митапов, из книг и разговоров. Я решила все собрать в один пост, а у себя почистить) Это будут просто рандомные мысли, которые меня зацепили, и иногда пояснения контекста. Разделила по категориям, чтобы это был порядочный хаос
Про жизнь
🟡 Никто не обязан знать все и быть всегда правым
🟡 В большинстве случаев все решаемо
Про работу
🟣 На груминге задачи: «Мы не знаем, как это сделать, но надо как-то сделать»
🟣 Если что-то хорошо работает, оно не должно хорошо работать. Просто вы еще не знаете, что именно работает плохо
Про карьеру
🟡 «Синьор одного проекта» - не имеет достаточного опыта с процессами, с другими командами, с решением кейсов
🟡 Фидбек интервьюера на собесе: «Иногда важно не что отвечает, а как»
Про менеджерство
🔴 Микроменеджмент - недостаток доверия - ревность
На одной конфе сказали левую половину фразы. Я подумала, что ревность тоже возникает от недостатка доверия. Значит ли это, что микроменеджмент = ревность?🤔
🔴 Я не понимаю, почему в играх ты повышаешь уровень, когда все проходишь без ошибок. Ведь ты получаешь новые уровни, когда ошибаешься. Ошибка - ресурс, который конвертируется в результат
🔴 Проджект менеджер - это пастух встреч
Про технику
⭕️ База данных - всего лишь кэшированная версия журнала (т.к. в ней последние значения полей, а в журнале - все)
⭕️ В потоках сортировка слиянием неприменима, т к. сортировка невозможна на бесконечном наборе данных
⭕️ CDC - это хак, журналы низкоуровневые и до определенного момента не использовались
⭕️ В Hadoop используется концепция «складывай как есть, потом решим». Однако на практике выясняется: быстро получить данные, даже представленные в странном, трудном для применения, необработанном формате, зачастую более ценно, чем пытаться заранее выбрать идеальную модель данных
⭕️ Для разных людей по-разному могут работать сайты - для активных пользователей быстрее, для пассивных - дольше (не факт, что это реальная инфа, но мысль интересная)
Про безопасность
🟡 Менее страшно, если взломали того, где ИБ не было. Страшно, если взломали того, где ИБ было
🟡 Самая безопасная система - выключенная
🟡 Большинство не являющихся критически важными для безопасности систем выбирают дешевизну и ненадежность вместо дороговизны и надежности
Про красивое
🟣 Учусь дышать над поломанным смыслом
🟣 Инерция пылинки
Про байки
На одной из конф рассказывали байки, одна из них меня очень зацепила:
🔴 Правда или ложь: если человеку, который купил вещь, в течение недели/месяца не показывать похожие вещи в рекомендациях, то конверсия в продажи вырастет?
❤️ - правда
🤔 - ложь
У меня скопилось много заметок с митапов, из книг и разговоров. Я решила все собрать в один пост, а у себя почистить) Это будут просто рандомные мысли, которые меня зацепили, и иногда пояснения контекста. Разделила по категориям, чтобы это был порядочный хаос
Про жизнь
Про работу
Про карьеру
Про менеджерство
На одной конфе сказали левую половину фразы. Я подумала, что ревность тоже возникает от недостатка доверия. Значит ли это, что микроменеджмент = ревность?
Про технику
Про безопасность
Про красивое
Про байки
На одной из конф рассказывали байки, одна из них меня очень зацепила:
❤️ - правда
🤔 - ложь
Please open Telegram to view this post
VIEW IN TELEGRAM
❤31🤔8
Возможен ли Sort-Merge Join без шафла?
Ага, в спарке есть такая оптимизация - Storage Partition Join (SPJ). Если таблицы одинаково партиционированы и джойнятся по этому ключу, то не приходится перемещать кучу данных. Правда, это работает только в форматах, поддерживающих DataSource API V2. Например, в Iceberg, Hudi, Delta Lake
Что нужно:
🌸 Spark 3.3+
🌸 Условие равенства в джойне
🌸 У таблиц должно быть общее поле-партиция, которое входит в ключ джойна
🎈 Пример
Я взяла 2 таблицы, партицировала, заполнила данными и положила. Потом их поджойнила, и вот какой план запроса у меня сначала получился (картинка 1):
У нас есть Exchange - происходит шафл даных. Потом я накинула несколько конфигов:
План запроса чуть поменялся (картинка 2) - исчез шафл:
В одной из статей говорится, что перформанс увеличился на 45-70% на джойнах. А кто-то уже использовал на практике?
@data_engineerette
Ага, в спарке есть такая оптимизация - Storage Partition Join (SPJ). Если таблицы одинаково партиционированы и джойнятся по этому ключу, то не приходится перемещать кучу данных. Правда, это работает только в форматах, поддерживающих DataSource API V2. Например, в Iceberg, Hudi, Delta Lake
Что нужно:
Я взяла 2 таблицы, партицировала, заполнила данными и положила. Потом их поджойнила, и вот какой план запроса у меня сначала получился (картинка 1):
== Physical Plan ==
CollectLimit
+- * Project
+- * SortMergeJoin Inner
:- * Sort
: +- Exchange
: +- * Project
: +- BatchScan spark_catalog.test.table1
+- * Sort
+- Exchange
+- * Project
+- BatchScan spark_catalog.test.table2
У нас есть Exchange - происходит шафл даных. Потом я накинула несколько конфигов:
# включает Storage Partition Join
spark.conf.set("spark.sql.sources.v2.bucketing.enabled", "true")
# сохраняет текущее партиционирование при планировании запроса
spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "true")
# убирает шафл, когда партиции между таблицами не совпадают по количеству
spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true")
# иначе ключи джойна должны быть такие же, как партиции, в том же порядке
spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", "false")
# борется с перекосом данных
spark.conf.set("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", "true")
# разрешает включать части партиций в ключ джойна
spark.conf.set("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled", "true")
# на всякий можно отключить broadcast, адаптивку и sortmerge (если это точно нужно)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
План запроса чуть поменялся (картинка 2) - исчез шафл:
== Physical Plan ==
CollectLimit
+- * Project
+- * SortMergeJoin Inner
:- * Sort
: +- * Project
: +- BatchScan spark_catalog.test.table1
+- * Sort
+- * Project
+- BatchScan spark_catalog.test.table2
В одной из статей говорится, что перформанс увеличился на 45-70% на джойнах. А кто-то уже использовал на практике?
@data_engineerette
Please open Telegram to view this post
VIEW IN TELEGRAM
❤13 7🤔1
Минутка рефлексии
Не люблю рефлексировать, потому что это всегда вгоняет меня в тоску😔 Но мем прям метчится со мной. У меня практически никогда не бывает такого, что я прыгаю от радости или злюсь и агрессирую. Мне просто норм
Вот вы пишете код, sql-запросы, копаетесь в логах ошибок, запускается пайплайны, уточняете требования, ресерчите источники данных, делаете преобразования, нажимаете на кнопочки, стучите по клавиатуре - это вам в кайф? Думаете ли вы: "Уррра, завтра я буду разбираться в проблемах с качеством данных"? "Наконец-то этот поток упал, чтобы я выяснил все проблемы и поднял его"?
Вот в конце июля опубликовали 2025 Stack Overflow Developer Survey, и там всего лишь 25% людей прям счастливы на работе. Остальным 25% вообще не нравится, 50% ок
Не люблю рефлексировать, потому что это всегда вгоняет меня в тоску
Вот вы пишете код, sql-запросы, копаетесь в логах ошибок, запускается пайплайны, уточняете требования, ресерчите источники данных, делаете преобразования, нажимаете на кнопочки, стучите по клавиатуре - это вам в кайф? Думаете ли вы: "Уррра, завтра я буду разбираться в проблемах с качеством данных"? "Наконец-то этот поток упал, чтобы я выяснил все проблемы и поднял его"?
Вот в конце июля опубликовали 2025 Stack Overflow Developer Survey, и там всего лишь 25% людей прям счастливы на работе. Остальным 25% вообще не нравится, 50% ок
Please open Telegram to view this post
VIEW IN TELEGRAM
❤16😭10👍6
Как ускорить ClickHouse?
Обсуждали с одним де перформанс систем и связку Trino + ClickHouse. Моя первая мысль - серьезно? КХ и так быстрый, куда еще к нему Trino накручивать?? Моя вторая мысль - тестим! Дисклеймер - мои тесты не претендуют на супер истину, но дают определенное представление
💻 Я взяла ноут, подняла кх с 1 нодой и трино с кх каталогом. Создала тестовые таблички с 1к, 10к, 100к, 1млн, 10млн строк. Больше уже не отрабатывало
Придумала такой запрос:
Тут есть работа со строками, агрегация, оконка и т.д. - поэтому тестим в какой-то мере тяжелую операцию
Запрос я запустила по 50 раз на каждом размере данных для кх и трино. Потом так же погоняла insert моей cte в таблицу
Изначально сходила в system.query_log и посчитала длительность, строки, байты, память в среднем по всем запускам. Потом поняла, что для трино они будут отражать только кусок работы в кх, поэтому сравнение будет некорректным
По итогу решила сравнить время: query_duration_ms vs elapsedTime
Какие выводы я могу сделать:
1. Трино поверх КХ проигрывает КХ. Так получилось при моих данных и в моих условиях. Хотя есть мнение, что на большом объеме данных он работает быстрее бд (~100гб)
2. При insert трино проигрывает еще значительнее, потому что сначала пишет данные во временную таблицу, потом перекладывает в основную:
3. В query_log по Трино не собирается часть метрик: user_aggregate_functions, used_functions, used_data_type_families. А также отличаются result_rows, query. Если это важно для мониторинга, то использовать не получится
4. Я также хотела отделить разные тесты комментами, чтобы потом удобнее искать в query_log:
Но в трино нет такого функционала, так что многие необходимые кликхаусные штучки просто не будут работать
@data_engineerette
Обсуждали с одним де перформанс систем и связку Trino + ClickHouse. Моя первая мысль - серьезно? КХ и так быстрый, куда еще к нему Trino накручивать?? Моя вторая мысль - тестим! Дисклеймер - мои тесты не претендуют на супер истину, но дают определенное представление
Придумала такой запрос:
with cte as (
select
id,
concat(cast(id, 'String'), '_', cast(id, 'String')),
sum(id),
row_number() over (order by id) as rn
from test.tbl1000
group by 1, 2
)
select * from cte
where rn in (14, 49)
order by rn desc
Тут есть работа со строками, агрегация, оконка и т.д. - поэтому тестим в какой-то мере тяжелую операцию
Запрос я запустила по 50 раз на каждом размере данных для кх и трино. Потом так же погоняла insert моей cte в таблицу
Изначально сходила в system.query_log и посчитала длительность, строки, байты, память в среднем по всем запускам. Потом поняла, что для трино они будут отражать только кусок работы в кх, поэтому сравнение будет некорректным
По итогу решила сравнить время: query_duration_ms vs elapsedTime
Какие выводы я могу сделать:
1. Трино поверх КХ проигрывает КХ. Так получилось при моих данных и в моих условиях. Хотя есть мнение, что на большом объеме данных он работает быстрее бд (~100гб)
2. При insert трино проигрывает еще значительнее, потому что сначала пишет данные во временную таблицу, потом перекладывает в основную:
INSERT INTO "test"."result1000" ("id", "str", "sum_", "rn") SELECT "id", "str", "sum_", "rn" FROM "test"."tmp_trino_c5cb30af" temp_table
3. В query_log по Трино не собирается часть метрик: user_aggregate_functions, used_functions, used_data_type_families. А также отличаются result_rows, query. Если это важно для мониторинга, то использовать не получится
4. Я также хотела отделить разные тесты комментами, чтобы потом удобнее искать в query_log:
SETTINGS log_comment = 'test'
Но в трино нет такого функционала, так что многие необходимые кликхаусные штучки просто не будут работать
@data_engineerette
Please open Telegram to view this post
VIEW IN TELEGRAM
👍24🌚1
Please open Telegram to view this post
VIEW IN TELEGRAM
😁33🔥14❤4😭2💅1
Как движки вас газлайтят?
Расскажу историю, которая на днях прозошла на нашем кластере
💋 Что надо сделать? Создать табличку и запустить пайплайн заливки данных. Казалось бы - что может быть проще?
1. Создаем таблицу в Trino:
2. Запускаем пайплайн - создается веточка:
Пайплайн падает, не успев записать данные. Но я правлю ошибку и перезапускаю - теперь все прекрасно🙌 Ужастики так-то тоже начинаются с размеренной жизни
Но в один момент я создаю таблицу через Spark:
Снова запускаю пайплайн, и он снова падает. Я правлю ошибку и перезапускаю...
Что случилось? Таблица точно такая же, код точно такой же. Просчитался, но... где?🤔
😑 Тут начинаются раскопки. Создаю табличку через трино и через спарк, иду на s3 смотреть файл с метаданными
Оказывается, что трино создает отдельный снепшот с добавлением 0 строк:
🤍 "current-snapshot-id" : 677178324561195060
🤍 таблица raw.first_table.history имеет 1 строку
А спарк такого не делает - он их создает, только когда данные меняются, компактятся:
🤍 "current-snapshot-id" : -1
🤍 таблица raw.second_table.history пустая
Теперь идем в репку айсберга, а там такое:
🩵 Какой ход событий получается:
1. Попадаем в условие if: create есть, replace есть, ветки еще нет. Ветка создается
2. Пытаемся записать данные в ветку
3. Код падает, а ветка остается(!)
4. Перезапускаем и в условие if уже не попадаем - ветка уже есть
5. Попадаем в else if (replace) - вспоминаем, что через спарк никакой снепшот не создается, поэтому snapshotId у нас null
➡️ Как решаем проблему
Перед созданием ветки нужно дропнуть существующую, если вдруг пайплайн упал и не успел почистить ветку:
Либо добавить catch/except, который дропнет ее в самом конце
Общий вывод такой: при работе с разными движками любой кусок может работать по-разному. Даже самая простейшая штука, которую вы не ожидали🥺
@data_engineerette
Расскажу историю, которая на днях прозошла на нашем кластере
1. Создаем таблицу в Trino:
CREATE TABLE iceberg.raw.first_table (
my_beautiful_column varchar
)
WITH (
format = 'PARQUET',
format_version = 2
)
2. Запускаем пайплайн - создается веточка:
ALTER TABLE raw.first_table
CREATE OR REPLACE BRANCH my_branch
SELECT *
FROM iceberg.raw."first_table$refs"
--main BRANCH
--my_branch BRANCH
Пайплайн падает, не успев записать данные. Но я правлю ошибку и перезапускаю - теперь все прекрасно
Но в один момент я создаю таблицу через Spark:
CREATE TABLE raw.second_table (
my_beautiful_column STRING
)
USING iceberg
Снова запускаю пайплайн, и он снова падает. Я правлю ошибку и перезапускаю...
Exception in thread "main" java.lang.IllegalArgumentException: Cannot complete replace branch operation on raw.second_table, main has no snapshot
Что случилось? Таблица точно такая же, код точно такой же. Просчитался, но... где?
Оказывается, что трино создает отдельный снепшот с добавлением 0 строк:
А спарк такого не делает - он их создает, только когда данные меняются, компактятся:
Теперь идем в репку айсберга, а там такое:
if (create && replace && !refExists) {
safeCreateBranch()
} else if (replace) {
Preconditions.checkArgument(snapshotId != null,
"Cannot complete replace branch operation on %s, main has no snapshot", ident)
manageSnapshots.replaceBranch(branch, snapshotId)
}
1. Попадаем в условие if: create есть, replace есть, ветки еще нет. Ветка создается
2. Пытаемся записать данные в ветку
3. Код падает, а ветка остается(!)
4. Перезапускаем и в условие if уже не попадаем - ветка уже есть
5. Попадаем в else if (replace) - вспоминаем, что через спарк никакой снепшот не создается, поэтому snapshotId у нас null
Перед созданием ветки нужно дропнуть существующую, если вдруг пайплайн упал и не успел почистить ветку:
ALTER TABLE raw.second_table
DROP BRANCH IF EXISTS my_branch
Либо добавить catch/except, который дропнет ее в самом конце
Общий вывод такой: при работе с разными движками любой кусок может работать по-разному. Даже самая простейшая штука, которую вы не ожидали
@data_engineerette
Please open Telegram to view this post
VIEW IN TELEGRAM
👍18🔥9 6💯1
Давайте подробнее про харды 🤓
Потому что это тоже важно, если вы хотите расти. Вот спросят на собесе "Что такое Lakehouse", что вы ответите?
Стоит только погуглить - почти каждая компания делает свой lakehouse. А по результатам опроса на SmartData использование Trino + K8s + S3 увеличилось на 17% по сравнению с прошлым годом
Артём @dataengineerlab как раз обозревает Big Data технологии для дата инженеров. Из интересного:
🟡 Знакомство с Lakehouse
🟡 Роадмап дата инженера
🟡 Архитектура Spark
🟡 Всё, что нужно знать о Kafka
🟡 Как устроен стриминг данных?
🟡 С чего начать свой путь в DE?
Подписываемся: @dataengineerlab
Потому что это тоже важно, если вы хотите расти. Вот спросят на собесе "Что такое Lakehouse", что вы ответите?
Стоит только погуглить - почти каждая компания делает свой lakehouse. А по результатам опроса на SmartData использование Trino + K8s + S3 увеличилось на 17% по сравнению с прошлым годом
Артём @dataengineerlab как раз обозревает Big Data технологии для дата инженеров. Из интересного:
Подписываемся: @dataengineerlab
Please open Telegram to view this post
VIEW IN TELEGRAM
👍9🔥3❤2🤔1🤷1 1
SQL Advent. Day 1
Сегодня стартовал адвент по sql! К сожалению, орг прошлогоднего адвента решил в этом году его не делать😪
Но я нашла еще один!
🤩 всего 24 дня
🤩 задания открываются в 17:00 по мск
Первый день супер легкий, погнали вместе проходить)
📍 Полезные ссылки
Адвент календарь (с впн)
Мои решения
Сегодня стартовал адвент по sql! К сожалению, орг прошлогоднего адвента решил в этом году его не делать
Но я нашла еще один!
Первый день супер легкий, погнали вместе проходить)
Адвент календарь (с впн)
Мои решения
Please open Telegram to view this post
VIEW IN TELEGRAM
❤25 7
SQL Advent. Day 2
Сегодня задание на джойн)
Заметила, что ллмка анализирует твое решение и пишет в чат моменты на улучшение:
📍 Полезные ссылки
Адвент календарь
Мои решения
Сегодня задание на джойн)
Заметила, что ллмка анализирует твое решение и пишет в чат моменты на улучшение:
To make sure you only get unique toy IDs and names (in case a toy was delivered multiple times), you might want to consider using DISTINCT. Also, ordering the results can make the output easier to read.
Адвент календарь
Мои решения
Please open Telegram to view this post
VIEW IN TELEGRAM
❤8
SQL Advent. Day 3
Уровень hard - уже похоже на задачки с собесов. На скрине одна из таких - когда нужно найти топ по каждой категории
📍 Полезные ссылки
Адвент календарь
Мои решения
Уровень hard - уже похоже на задачки с собесов. На скрине одна из таких - когда нужно найти топ по каждой категории
Адвент календарь
Мои решения
Please open Telegram to view this post
VIEW IN TELEGRAM
❤6
Тимлидский митап
Сегодня прошел тимлидский онлайн-митап, описание которого мне очень понравилось. Посмотрим, какие темы обсуждались)
1️⃣ "Поедатели времени команды"
Этот доклад я слушала особенно внимательно, он в меня попал на 100%. Я выделила по несколько моментов, которые особенно меня зацепили
➡️ затянутые стендапы
➡️ постоянное переключение контекста
Это и про мгновенный ответ на сообщения, включенные уведомления. У меня есть такая проблема, я хочу быть в курсе, даже если меня не особо касается😒 Но я хотя бы научилась игнорить красные плашки. Поэтому сейчас я живу между "прочитано сразу" и "прочитано почти никогда"
Сюда еще относится слишком много экспертизы в одних руках - это забавно)) А также несколько встреч с перерывом в 30мин/1ч. В таких случаях я чувствую, что надо и водички попить, и на сообщение ответить, и задачку доделать, и вопросы к новой встрече просмотреть, и подготовить свою часть - и вот уже надо снова брать наушники и подключаться. А время-то куда делось?
Из особо полезного:
- договориться о фокус-времени - вся команда работает с 11 до 14 и друг друга не отвлекает
- просматривать входящие по расписанию - раз в 1-2ч, а не сразу
➡️ проблема неделегируемых задач
- bus factor - что будет, если дать задачу другому?
- составить список уникальных умений и людей, выбрать наследника👑
➡️ долгие обсуждения и затянутые ревью
- если много вопросов, делить на несколько встреч
- ограничить размер PR
- нечетное количество ревьюеров - чтобы одно из полярных мнений подкрепил кто-то еще
- SLA на время реакции на ревью
Мне особенно понравился последний совет. Людям может быть лень смотреть, когда есть и свои задачи. SLA прошел, все: молчание - знак согласия👍
2️⃣ "Как удобрять инженера, чтобы вырос крепкий лид"
Тут 2 проблемные ситуации:
🤩 Есть желающие -> нет ставки
🤩 Нужен лид -> нет желающих
Сначала нужно договориться, кого отдаем, какие задачи делегируем (1:1, решение проблем команды, отслеживание метрик, отчеты по работе). После определенного срока собираем ОС. В итоге сняли немного инженерных задач, накинули менеджерских
Растить лида внутри или брать снаружи?
🤩 Сложные задачи, кризис-менеджмент - опытный с рынка/из соседних подразделений
🤩 Есть время на сопровождение, обучение - свой
Также спикер упомянула про ресурсы для взвращивания лида. Я попросила ссылочку
3️⃣ "Собираем Dream Team"
Про работу с рекрутером и как ускорить процесс найма. Картинка взята из этого доклада
4️⃣ "Сообщество как ресурс руководителя"
Как сообщества ускоряют онбординг, повышают компетенции, облегчают работу лида, унифицируют стек и процессы
Youtube
VK
@data_engineerette
Сегодня прошел тимлидский онлайн-митап, описание которого мне очень понравилось. Посмотрим, какие темы обсуждались)
Этот доклад я слушала особенно внимательно, он в меня попал на 100%. Я выделила по несколько моментов, которые особенно меня зацепили
Это и про мгновенный ответ на сообщения, включенные уведомления. У меня есть такая проблема, я хочу быть в курсе, даже если меня не особо касается
Сюда еще относится слишком много экспертизы в одних руках - это забавно)) А также несколько встреч с перерывом в 30мин/1ч. В таких случаях я чувствую, что надо и водички попить, и на сообщение ответить, и задачку доделать, и вопросы к новой встрече просмотреть, и подготовить свою часть - и вот уже надо снова брать наушники и подключаться. А время-то куда делось?
Из особо полезного:
- договориться о фокус-времени - вся команда работает с 11 до 14 и друг друга не отвлекает
- просматривать входящие по расписанию - раз в 1-2ч, а не сразу
- bus factor - что будет, если дать задачу другому?
- составить список уникальных умений и людей, выбрать наследника
- если много вопросов, делить на несколько встреч
- ограничить размер PR
- нечетное количество ревьюеров - чтобы одно из полярных мнений подкрепил кто-то еще
- SLA на время реакции на ревью
Мне особенно понравился последний совет. Людям может быть лень смотреть, когда есть и свои задачи. SLA прошел, все: молчание - знак согласия
Тут 2 проблемные ситуации:
Сначала нужно договориться, кого отдаем, какие задачи делегируем (1:1, решение проблем команды, отслеживание метрик, отчеты по работе). После определенного срока собираем ОС. В итоге сняли немного инженерных задач, накинули менеджерских
Растить лида внутри или брать снаружи?
Также спикер упомянула про ресурсы для взвращивания лида. Я попросила ссылочку
Про работу с рекрутером и как ускорить процесс найма. Картинка взята из этого доклада
Как сообщества ускоряют онбординг, повышают компетенции, облегчают работу лида, унифицируют стек и процессы
Youtube
VK
@data_engineerette
Please open Telegram to view this post
VIEW IN TELEGRAM
1❤11🔥4
Датасеты как тетрис. Как найти нужные кубики, чтобы не сломать вообще все
Данные из 1С, CRM и Excel поступают хаотично, а привести их в порядок еще сложнее. Иногда нужного набора данных приходится ждать слишком долго — как подходящего блока в тетрисе, и это замедляет работу команды. Приходите на вебинар 15 декабря в 17:00. Эксперты VK Cloud и Loginom расскажут, как построить единую аналитическую систему за один день и готовить актуальные датасеты без ручной рутины.
О чем еще будем говорить
⚫️ Соберем пазл данных на 10-ой скорости: архитектура решения на основе облачных БД VK Cloud и платформы Loginom
⚫️ Автоматизируем ETL: очистка и подготовка данных с помощью low-code инструментов
⚫️ Разберем кейсы: готовые решения для ритейла, телекома и фарминдустрии
Спикеры:
Константин Дудников,
директор центра бизнес-решений VK Cloud, VK Tech
Алексей Арустамов,
CEO Loginom Company
Участие бесплатное, но количество мест ограничено.
Зарегистрироваться
Данные из 1С, CRM и Excel поступают хаотично, а привести их в порядок еще сложнее. Иногда нужного набора данных приходится ждать слишком долго — как подходящего блока в тетрисе, и это замедляет работу команды. Приходите на вебинар 15 декабря в 17:00. Эксперты VK Cloud и Loginom расскажут, как построить единую аналитическую систему за один день и готовить актуальные датасеты без ручной рутины.
О чем еще будем говорить
Спикеры:
Константин Дудников,
директор центра бизнес-решений VK Cloud, VK Tech
Алексей Арустамов,
CEO Loginom Company
Участие бесплатное, но количество мест ограничено.
Зарегистрироваться
Please open Telegram to view this post
VIEW IN TELEGRAM
👍3🔥3 3😁1
Data Value Design Patterns
Еще одна интересная глава. Она посвящена добавлению бизнес-ценности нашим данным, хотя технические моменты тоже есть
🎨 Data Enrichment
Тут есть два подхода для обогащения данных. На самом деле в книге простые вещи говорятся сложными словами, поэтому несколько страниц текста я уложу в две строчки:
Pattern: Static Joiner - обычный джойн двух таблиц
Pattern: Dynamic Joiner - джойн двух потоков с временным окном
🤩 Data Decoration
Бывает нужно добавить к нашим данным дополнительные поля, и вот что предлагается
1️⃣ Pattern: Wrapper
У нас есть табличка, мы хотим добавить в нее метаданные/технические поля/рассчитанные поля (версия джобы, время расчета). Паттерн относится к организации таких доп. полей в таблице: можно их добавить отдельными столбцами, закинуть в структуру, вынести в другую таблицу - как вам нравится
Мы такой подход используем. Но если требования часто меняются, то я бы все доп поля вынесла в один struct/json
2️⃣ Pattern: Metadata Decorator
Мы хотим делать все то же самое, но не показывать пользователям технические поля. Тогда:
🤩 Для бд создать вьюшку без тех полей или запретить чтение определенных столбцов на уровне прав
🤩 Для s3 есть штука с тегами, их можно навесить на весь файл. Теги - это пара ключ-значение: mytag=myvalue. Как работать с тегами можно полистать тут
🤩 Для отправки в кафку писать в header
🌸 Data Aggregation
Как под капотом происходит агрегация данных
1️⃣ Pattern: Distributed Aggregator
Это про шаффлы в распределенных системах. В контексте шафла важно учитывать data skew. С ним можно бороться, например, такими способами:
🤩 добавить соль, т.е. мы подмешиваем рандомное значение, чтобы равномернее распределить поле
🤩 включить AQE - адаптивку в спарке. Ниже полезные конфиги, с которыми мы работаем:
Еще интересный поинт про шафл - когда я только пришла, то как раз в команде велись обсуждения на эту тему. По дефолту даже если на одной ноде все посчиталось, то никто не заберет ее ресурсы, пока не посчитается все. На случай, чтобы не пришлось заново решафлить, если что-то упадет
External Shuffle Service в Spark как раз хранит данные шафла отдельно. Когда посчиталось, ресурсы ноды могут отправиться другому приложению, потому что данные шафла в безопасности
2️⃣ Pattern: Local Aggregator
В контексте спарка мне показалось, что это про broadcast
Дальше в главе рассказывалось, как строить витрины с сессиями пользователей, про особые виды сортировок - думаю, к этому можно будет вернуться, когда реально понадобится, для себя пока не вижу применения
#depatterns
Еще одна интересная глава. Она посвящена добавлению бизнес-ценности нашим данным, хотя технические моменты тоже есть
Тут есть два подхода для обогащения данных. На самом деле в книге простые вещи говорятся сложными словами, поэтому несколько страниц текста я уложу в две строчки:
Pattern: Static Joiner - обычный джойн двух таблиц
Pattern: Dynamic Joiner - джойн двух потоков с временным окном
Бывает нужно добавить к нашим данным дополнительные поля, и вот что предлагается
У нас есть табличка, мы хотим добавить в нее метаданные/технические поля/рассчитанные поля (версия джобы, время расчета). Паттерн относится к организации таких доп. полей в таблице: можно их добавить отдельными столбцами, закинуть в структуру, вынести в другую таблицу - как вам нравится
Мы такой подход используем. Но если требования часто меняются, то я бы все доп поля вынесла в один struct/json
Мы хотим делать все то же самое, но не показывать пользователям технические поля. Тогда:
Как под капотом происходит агрегация данных
Это про шаффлы в распределенных системах. В контексте шафла важно учитывать data skew. С ним можно бороться, например, такими способами:
dataset.withColumn("salt", (rand() * 3).cast("int"))
.groupBy("group_key", "salt").agg(...)
.groupBy("group_key").agg(...)
spark.sql.adaptive.enabled - включает адаптивку
spark.sql.adaptive.advisoryPartitionSizeInBytes - объединяет мелкие партиции или разделяет крупные
spark.sql.adaptive.skewJoin.enabled - разделяет партиции в джойнах
spark.sql.adaptive.skewJoin.skewedPartitionFactor - кэф для определения крупной партиции
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes - то же самое, только в байтах
Еще интересный поинт про шафл - когда я только пришла, то как раз в команде велись обсуждения на эту тему. По дефолту даже если на одной ноде все посчиталось, то никто не заберет ее ресурсы, пока не посчитается все. На случай, чтобы не пришлось заново решафлить, если что-то упадет
External Shuffle Service в Spark как раз хранит данные шафла отдельно. Когда посчиталось, ресурсы ноды могут отправиться другому приложению, потому что данные шафла в безопасности
В контексте спарка мне показалось, что это про broadcast
Дальше в главе рассказывалось, как строить витрины с сессиями пользователей, про особые виды сортировок - думаю, к этому можно будет вернуться, когда реально понадобится, для себя пока не вижу применения
#depatterns
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥6👍4❤1
