continue

# Обработка сообщения

order = json.loads(msg.value().decode('utf-8'))

total_sales += order['price']

print(f"Обработан заказ: {order['order_id']}, текущая сумма: {total_sales}")

except KeyboardInterrupt:

print(f"Общая сумма всех заказов: {total_sales}")

finally:

consumer.close()

```

Преимущества использования Kafka

1. Высокая производительность. Kafka поддерживает миллионы событий в секунду благодаря своей архитектуре и использованию партиций.

2. Надежность. Данные хранятся в Kafka до тех пор, пока их не обработают все подписчики.

3. Масштабируемость. Kafka легко масштабируется путем добавления новых брокеров.

4. Универсальность. Kafka поддерживает интеграцию с большинством современных инструментов обработки данных.

Apache Kafka предоставляет мощный набор инструментов для потоковой обработки данных. Используя Python, вы можете легко настроить передачу данных, их обработку и анализ в реальном времени. Это особенно полезно для систем, где требуется высокая производительность и минимальная задержка при обработке больших потоков данных.


Задачи для практики

Задача 1: Фильтрация событий по условию

Описание:

У вас есть топик `clickstream`, содержащий события о кликах на веб-сайте. Каждое событие содержит следующие поля:

– `user_id` – идентификатор пользователя.

– `url` – URL-адрес, на который был клик.

– `timestamp` – время клика.

Ваша задача: создать консьюмера, который будет читать события из Kafka, фильтровать только события с URL-адресами, содержащими слово "product", и сохранять их в новый топик `filtered_clicks`.

Решение:

```python

from confluent_kafka import Producer, Consumer

import json

# Настройки Kafka

broker = 'localhost:9092'

# Создание продюсера для записи в новый топик

producer = Producer({'bootstrap.servers': broker})

def produce_filtered_event(event):

producer.produce('filtered_clicks', value=json.dumps(event))

producer.flush()

# Создание консьюмера для чтения из исходного топика

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'clickstream-group',

'auto.offset.reset': 'earliest'

})

consumer.subscribe(['clickstream'])

# Чтение и фильтрация событий

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение из Kafka в Python-объект

event = json.loads(msg.value().decode('utf-8'))

# Фильтруем события с URL, содержащими "product"

if 'product' in event['url']:

print(f"Фильтруем событие: {event}")

produce_filtered_event(event)

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

– Консьюмер читает события из топика `clickstream`.

– Каждое сообщение проверяется на наличие слова "product" в поле `url`.

– Отфильтрованные события отправляются в новый топик `filtered_clicks` через продюсера.


Задача 2: Подсчет количества событий в реальном времени

Описание:

Топик `log_events` содержит логи системы. Каждое сообщение содержит:

– `log_level` (например, "INFO", "ERROR", "DEBUG").

– `message` (текст лога).

Ваша задача: написать программу, которая считает количество событий уровня "ERROR" в реальном времени и каждые 10 секунд выводит их общее количество.

Решение:

```python

from confluent_kafka import Consumer

import time

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'log-group',

'auto.offset.reset': 'earliest'

})

consumer.subscribe(['log_events'])

error_count = 0

start_time = time.time()

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект