Top.Mail.Ru

Kafka Streams — непростая жизнь в production

Вокруг меня сформировался позитивный информационный фон на тему обработки событий через Kafka Streams. Этот инструмент привлекает множеством видео-докладов и статей на Хабре, подробной документацией, понятным API и красивой архитектурой. Некоторые мои знакомые и коллеги разрабатывают с его помощью свои системы. Но что происходит в реальной жизни, когда эти системы уходят в production?

В этой статье я опущу введение в Kafka Streams, предполагая, что читатель уже знаком с ней, и расскажу о нашем опыте жизни с этой библиотекой на примере достаточно нагруженной системы.

Коротко о проекте

Внутренней командой вместе с партнерами мы работаем над биржей Ad Exchange, которая помогает перепродавать рекламный трафик. Специфику подобных инструментов мы уже когда-то описывали. По мере роста числа партнеров среди SSP и DSP, нагрузка на сервера биржи растет. А для повышения ценности самой биржи мы должны собирать с этого трафика развернутую аналитику. Тут-то мы и попытались применить Kafka Streams.

Внедрить новый инструмент у себя на продакшене — это всегда некоторый риск. Мы его осознавали, но смирились с этим, поскольку в целом Kafka Streams концептуально должен был хорошо подойти для расчета агрегатов. И хотя первое впечатление было отличным, далее я расскажу о проблемах, к которым это привело.

Статья не претендует на объективность и описывает только лишь наш опыт и проблемы, с которыми мы столкнулись, работая с этой технологией. Надеюсь, кому-то она поможет избежать ошибок при использовании Kafka Streams. А может быть даст повод посмотреть по сторонам.

Агрегаты

В нашу систему приходят десятки и сотни тысяч сообщений в секунду. Первое, что нам необходимо было сделать, — агрегации по различным ключам. Например, идёт поток событий «Показ рекламы». Для него надо считать на лету общую стоимость и количество, группируя по различным ключам: стране, партнёру, рекламной площадке и т.д.

Вроде бы стандартный кейс для Kafka Streams: используем функции groupBy и aggregate и получаем нужный результат. Всё работает именно так, причем с отличной скоростью за счёт внутреннего кэша: несколько последовательных изменений по одному и тому же ключу сначала выполняются в кэше и только в определённые моменты отправляются в changelog-топик. Далее Kafka в фоне удаляет устаревшие дублирующиеся ключи через механизм log compaction. Что здесь может пойти не так?

Репартиционирование

Если ваш ключ группировки отличается от ключа, под которым изначально пришло событие, то Kafka Streams создаёт специальный repartition-топик, отправляет в него событие уже под новым ключом, а потом считывает его оттуда и только после этого проводит агрегацию и отправку в changelog-топик. В нашем примере вполне может быть, что событие «Показ рекламы» пришло с ключом в виде UUID. Почему нет? Если вам надо сделать группировку, например по трём другим ключам, то это будет три разных repartition-топика. На каждый топик будет одно дополнительное чтение и одна дополнительная запись в Kafka. Чувствуете, к чему я веду?

Предположим, на входе у вас 100 тысяч показов рекламы в секунду. В нашем примере вы создадите дополнительную нагрузку на брокер сообщений в размере +600 тысяч сообщений в секунду (300 на запись и 300 на чтение). И ведь не только на брокер. Для таких объёмов надо добавлять дополнительные сервера с сервисами Kafka Streams. Можете посчитать, во сколько тысяч долларов обойдется такое решение с учётом цен на железо.

Для читателей, не очень хорошо знакомых с механизмом репартиционирования, я поясню один момент. Это не баг или недоработка Kafka Streams. С учётом её идеологии и архитектуры это единственное возможное поведение — его нельзя просто взять и отключить. Когда у сообщения меняется ключ, этот новый ключ надо равномерно «размазать» по кластеру так, чтобы каждый инстанс Kafka Streams имел собственный набор ключей. Для этого и служат дополнительные запись/чтение в repartition-топик. При этом если инстанс А записал в топик сообщение, то не факт, что он же его и прочитает. Это может сделать инстанс Б, В и т.д. в зависимости от того, кто какую партицию читает. В итоге каждый ключ группировки у вас будет более-менее равномерно распределён по серверам кластера (если вы его хэшируете, конечно).

Запросы к агрегатам

Данные пишут для того, чтобы с ними потом можно было работать. Например делать к ним запросы. И здесь наши возможности оказались серьезно ограничены. Исчерпывающий список того, как мы можем запрашивать данные у Kafka Streams, есть здесь или здесь для оконных агрегатов. Если используется стандартная реализация state-store на основе RocksDB (а скорее всего это так), фактически данные можно получать только по ключам.

Предположу, что первое, что вам хочется сделать с полученными данными, — это фильтрация, сортировка и пагинация. Но здесь таких возможностей нет. Максимум, что вы можете, — это считать всё, что есть, используя метод all(), а потом вручную делать нужные операции. Вам повезет, если ключей не слишком много и данные уместятся в оперативной памяти. Нам не повезло. Пришлось писать дополнительный модуль, который по ночам выгружал данные из RocksDB и отправлял в Postgres.

Ещё надо помнить, что на каждом отдельно взятом сервисе находится только часть ключей. Что если вам необходимо отдавать ваши агрегаты, скажем, по HTTP? Вот кто-то сделал запрос к одному из сервисов Kafka Streams: мол, дай мне такие-то данные. Сервис смотрит у себя локально — данных нет. Или есть, но какая-то часть. Другая часть может быть на другом сервере. Или на всех. Что делать? Документация Kafka Streams говорит, что это наши проблемы.

Стабильность Kafka Streams

У нас бывают проблемы с хостинг провайдером. Иногда выходит из строя оборудование, иногда человеческий фактор, иногда и то, и другое. Если по какой-то причине теряется связь с Kafka, то Kafka Streams переводит все свои потоки в состояние DEAD и ждёт, когда мы проснёмся и сделаем ей рестарт. При этом рядом стоят соседние сервисы, которые работают с той же Kafka через Spring и @KafkaListener. Они восстанавливаются сами, как ни в чём не бывало.

Kafka Streams может умереть и по другой причине: не пойманные исключения в вашем коде. Предположим, вы делаете какой-то внешний вызов внутри потоков Kafka Streams. Пусть это будет обращение к базе данных. Когда база данных падает, у вас два варианта. Первый — вы ловите эту ошибку и продолжаете работать дальше. Но если база будет лежать долго, а работа с ней — критична, вы будете считывать большое количество сообщений из топиков и некорректно их обрабатывать. Второй вариант кажется лучше — совсем не ловить исключение и дать Kafka Streams умереть. Дальше как обычно: ночной подъём, рестарт всех серверов с Kafka Streams. «Из коробки» у вас нет варианта подождать, пока база восстановится, и продолжить работу.

Нам пришлось дописать в каждый сервис Kafka Streams дополнительный модуль, который работает как watchdog: поднимает Kafka Streams, если видит, что она умерла.

Кстати, если вы работаете с Kafka Streams через Spring, то не забудьте переопределить стандартный StreamsBuilderFactoryBean, указав в нём свой CleanupConfig. Иначе будете неприятно удивлены тем, что при каждом рестарте будет удаляться вся локальная база RocksDB. Напомню, что это приведёт к тому, что при каждом рестарте все сервера начнут активно считывать данные из changelog-топика. Поверьте, вам это не нужно.

KStream-KStream Join

Здесь можно было бы обойтись одной фразой: никогда это не используйте. Джоин двух потоков создаёт десятки топиков в Kafka и огромную нагрузку на всю систему. Просто не делайте этого. Ну или хотя бы проверьте все под нагрузкой, прежде чем ставить в production.

Вообще Kafka Streams любит создавать топики под различные свои нужды. Если вы не изучили под лупой документацию и не протестировали, как это работает, то это может стать для вас и ваших DevOps неприятным сюрпризом. Чем больше топиков, тем сложнее их администрировать.

Масштабируемость

Любой человек, знакомый с Kafka, скажет, что масштабируемость — одна из главных её преимуществ: «В чём проблема? Добавляем партиций и радуемся». Всё так. Но не для Kafka Streams.

Если вы используете джоины, то все ваши топики должны быть ко-партиционированы (co-partitioning), что, в числе прочего, означает, что у них должно быть одинаковое количество партиций. Так в чём же проблема?

Представим, что вы сделали несколько топологий: с агрегатами, джоинами, всё как положено. Поставили в production, оно работает, вы радуетесь. Дальше бизнес пошел в гору, нагрузка начала расти, вы добавили серверов и хотите увеличить количество партиций, чтобы загрузить эти новые сервера. Но Kafka Streams наплодил десятки топиков. И у всех надо поднимать количество партиций, иначе следующий рестарт ваших сервисов может стать последним. Если Kafka Streams увидит, что топики не ко-партиционированы, она выдаст ошибку и просто не запустится.

На вопрос, что с этим делать, у меня сегодня ответа нет. Вероятно, можно потушить все рабочие инстансы Kafka Streams, потом поднять число партиций на всех причастных топиках, затем поднять Kafka Streams обратно и молиться. А может быть последовать совету отсюда: Matthias J. Sax пишет, что это нужно делать, создавая новый топик с новым количеством партиций и подключать к нему Kafka Streams с новым application.id. Там же есть совет, что если вы знаете заранее, что нагрузка будет большая, то лучше сделать партиций с запасом.

Заключение

Если у вас в системе нет и не планируется высоких нагрузок, то со всеми этими проблемами вы скорее всего не столкнётесь. Всё будет работать отлично. В противном случае стоит серьёзно задуматься, нужно ли вам это, особенно если планируете использовать следующий уровень абстракции — KSQL.

Автор статьи: Андрей Буров, Максилект.

Наши статьи по теме:

Все статьи

Связаться с нами

Мы свяжемся с вами в течение 24 часов.