Quickstart¶
Basic usage
KafkaProducer¶
KafkaProducer is a high-level, asynchronous message producer.
import uvloop
import asyncio
import uuid
import os
from recipes.setup_color_logger import setup_logger
from tonga.services.producer.kafka_producer import KafkaProducer
from tonga.services.serializer.avro import AvroSerializer
from tonga.services.coordinator.partitioner.key_partitioner import KeyPartitioner
from examples.coffee_bar.waiter.models.events.coffee_ordered import CoffeeOrdered
from examples.coffee_bar.waiter.models.handlers.coffee_ordered_handler import CoffeeOrderedHandler
async def send_one(kafka_producer):
coffee_uuid = uuid.uuid4().hex
coffee_order = CoffeeOrdered(partition_key=coffee_uuid, uuid=coffee_uuid, coffee_for='toto',
coffee_type='Classic', cup_type='Venti', amount=2.5)
try:
await kafka_producer.send_and_await(coffee_order, 'waiter-events')
finally:
await kafka_producer.stop_producer()
cur_instance = 0
logger = setup_logger()
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
# Creates AvroSerializer
serializer = AvroSerializer(os.path.join(os.path.dirname(os.path.abspath(__file__)),
'examples/coffee_bar/avro_schemas'))
coffee_ordered_handler = CoffeeOrderedHandler()
# Register CoffeeFinished (event class) / CoffeeFinishedHandler (instanced class handle all CoffeeFinished event)
serializer.register_class('tonga.waiter.event.CoffeeOrdered', CoffeeOrdered, coffee_ordered_handler)
# Creates KafkaProducer
producer = KafkaProducer(name=f'waiter-{cur_instance}', bootstrap_servers='localhost:9092',
client_id=f'waiter-{cur_instance}', serializer=serializer,
loop=loop, partitioner=KeyPartitioner(), acks='all')
loop.run_until_complete(send_one(producer))
KafkaConsumer¶
import uvloop
import asyncio
import uuid
import os
from signal import signal, SIGINT
from recipes.setup_color_logger import setup_logger
from tonga.services.consumer.kafka_consumer import KafkaConsumer
from tonga.services.serializer.avro import AvroSerializer
from examples.coffee_bar.waiter.models.events.coffee_ordered import CoffeeOrdered
from examples.coffee_bar.waiter.models.handlers.coffee_ordered_handler import CoffeeOrderedHandler
cur_instance = 0
nb_replica = 2
logger = setup_logger()
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
# Creates AvroSerializer
serializer = AvroSerializer(os.path.join(os.path.dirname(os.path.abspath(__file__)),
'examples/coffee_bar/avro_schemas'))
coffee_ordered_handler = CoffeeOrderedHandler()
# Register CoffeeFinished (event class) / CoffeeFinishedHandler (instanced class handle all CoffeeFinished event)
serializer.register_class('tonga.waiter.event.CoffeeOrdered', CoffeeOrdered, coffee_ordered_handler)
consumer = KafkaConsumer(name=f'waiter-{cur_instance}', serializer=serializer,
bootstrap_servers='localhost:9092', client_id=f'waiter-{cur_instance}',
topics=['waiter-events'], loop=loop,
assignors_data={'instance': cur_instance,
'nb_replica': nb_replica,
'assignor_policy': 'only_own'}, isolation_level='read_committed')
# Ensures future of KafkaConsumer
asyncio.ensure_future(consumer.listen_event('earliest'), loop=loop)
# Catch SIGINT
signal(SIGINT, lambda s, f: loop.stop())
try:
# Runs forever
loop.run_forever()
except Exception:
# If an exception was raised loop was stopped
loop.stop()