Examples

In all examples we use coffee bar project (See Tonga examples/coffee_bar folder)

Serializer

Serialize all avro schema to bytes, for register a new schema serializer needs an BaseRecord & BaseHandler. BaseRecord is a serialize class, BaseHandler is call by consumer for handle records.

Register BaseRecord (event / result / command)

# Import serializer
from tonga.services.serializer.avro import AvroSerializer

# Import store builder
from tonga.stores.store_builder.store_builder import StoreBuilder

# Import KafkaProducer
from tonga.services.producer.kafka_producer import KafkaProducer

# Import waiter events
from examples.coffee_bar.waiter.models.events.coffee_finished import CoffeeFinished

# Import waiter handlers
from examples.coffee_bar.waiter.models.handlers.coffee_finished_handler import CoffeeFinishedHandler

# Creates AvroSerializer
serializer = AvroSerializer(os.path.join(os.path.dirname(os.path.abspath(__file__)),
                            'examples/coffee_bar/avro_schemas'))

# Creates StoreBuilder  (More information below StoreBuilder section)
store_builder = StoreBuilder('Go to StoreBuilder documentation')

# Creates transactional producer (More information below Transactional producer section)
transactional_producer = KafkaProducer('Go to KafkaProducer documentation')

# Creates CoffeeFinishedHandler (Waiter example)
coffee_finished_handler = CoffeeFinishedHandler(store_builder, transactional_producer)

# Register CoffeeFinished (event class) / CoffeeFinishedHandler (instanced class handle all CoffeeFinished event)
serializer.register_class('tonga.waiter.event.CoffeeFinished', CoffeeFinished, coffee_finished_handler)

Register StoreRecord

This is an example with Tonga StoreRecordHandler

# Import serializer
from tonga.services.serializer.avro import AvroSerializer

# Import store builder
from tonga.stores.store_builder.store_builder import StoreBuilder

# Import StoreRecord & StoreRecordHandler
from tonga.models.store_record.store_record import StoreRecord
from tonga.models.store_record.store_record_handler import StoreRecordHandler

# Creates AvroSerializer
serializer = AvroSerializer(os.path.join(os.path.dirname(os.path.abspath(__file__)),
                           'examples/coffee_bar/avro_schemas'))

# Creates StoreBuilder  (More information below StoreBuilder section)
store_builder = StoreBuilder('Go to StoreBuilder documentation')

# Creates StoreRecordHandler (This handler consumer store record)
store_record_handler = StoreRecordHandler(store_builder)

# Register StoreRecord (Record class) and StoreRecordHandler (instanced class which handle StoreRecord)
serializer.register_event_handler_store_record(StoreRecord, store_record_handler)

Consumer / Producer

  • Producer send BaseRecord (event / command / result / store) in a Kafka topic.

  • Consumer receive BaseRecord (event / command / result / store) from a Kafka topic.

import uvloop
from signal import signal, SIGINT

# Import KafkaProducer / KafkaConsumer
from tonga.services.consumer.kafka_consumer import KafkaConsumer
from tonga.services.producer.kafka_producer import KafkaProducer

# Import key partitioner
from tonga.services.coordinator.partitioner.key_partitioner import KeyPartitioner

cur_instance = 0

loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)

# Creates KafkaProducer
producer = KafkaProducer(name=f'waiter-{cur_instance}', bootstrap_servers='localhost:9092',
                                         client_id=f'waiter-{cur_instance}', serializer=serializer,
                                         loop=waiter_app['loop'], partitioner=KeyPartitioner(),
                                         acks='all')

# Creates KafkaConsumer
consumer = KafkaConsumer(name=f'waiter-{cur_instance}', serializer=serializer,
                          bootstrap_servers='localhost:9092', client_id=f'waiter-{cur_instance}',
                          topics=['bartender-events'], loop=loop, group_id='waiter',
                          assignors_data={'instance': cur_instance,
                                          'nb_replica': nb_replica,
                                          'assignor_policy': 'only_own'}, isolation_level='read_committed')

# Ensures future of KafkaConsumer
asyncio.ensure_future(consumer.listen_event('committed'), loop=loop)

# Catch SIGINT
signal(SIGINT, lambda s, f: loop.stop())
try:
    # Runs forever
    loop.run_forever()
except Exception:
    # If an exception was raised loop was stopped
    loop.stop()

Transactional Producer

Warning

Transactional producer can’t send message on Kafka if is not in a transaction.

import uvloop
from signal import signal, SIGINT

# Import KafkaProducer / KafkaConsumer
from tonga.services.consumer.kafka_consumer import KafkaConsumer
from tonga.services.producer.kafka_producer import KafkaProducer

# Import key partitioner
from tonga.services.coordinator.partitioner.key_partitioner import KeyPartitioner

cur_instance = 0

# Creates event loop
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)

# Creates transactional KafkaProducer
producer = KafkaProducer(name=f'waiter-{cur_instance}', bootstrap_servers='localhost:9092',
                                         client_id=f'waiter-{cur_instance}', serializer=serializer,
                                         loop=waiter_app['loop'], partitioner=KeyPartitioner(),
                                         acks='all', transactional_id=f'waiter')

Make transaction

Transaction example from waiter project (tonga/example/coffee-bar/waiter)

from aiokafka import TopicPartition

# Import BaseCommandHandler
from tonga.models.handlers.command.command_handler import BaseCommandHandler
# Import BaseCommand
from tonga.models.records.command.command import BaseCommand
# Import BaseProducer
from tonga.services.producer.base import BaseProducer

from typing import Union
# Import MakeCoffeeResult / CoffeeStarted event
from examples.coffee_bar.coffeemaker.models.results.make_coffee_result import MakeCoffeeResult
from examples.coffee_bar.coffeemaker.models.events.coffee_started import CoffeeStarted


class MakeCoffeeHandler(BaseCommandHandler):
    _transactional_producer: BaseProducer

    def __init__(self, transactional_producer: BaseProducer, **kwargs) -> None:
        super().__init__(**kwargs)
        self._transactional_producer = transactional_producer

    async def execute(self, command: BaseCommand, tp: TopicPartition, group_id: str, offset: int) -> Union[str, None]:

       if not self._transactional_producer.is_running():
           await self._transactional_producer.start_producer()

        async with self._transactional_producer.init_transaction():
            # Creates commit_offsets dict

           commit_offsets = {tp: offset + 1}
             # Creates CoffeeStarted event and MakeCoffeeResult result
            coffee_started = CoffeeStarted(command.uuid, context=command.context)
            make_coffee_result = MakeCoffeeResult(command.uuid, context=command.context)

             # Sends CoffeeFinished event
            await self._transactional_producer.send_and_await(coffee_started, 'coffee-maker-events')
            await self._transactional_producer.send_and_await(make_coffee_result, 'coffee-maker-results')

             # End transaction
            await self._transactional_producer.end_transaction(commit_offsets, group_id)
        return 'transaction'

    @classmethod
    def handler_name(cls) -> str:
        return 'tonga.coffeemaker.command.MakeCoffee'

StoreBuilder

Store builder need an AvroSerializer which contains a StoreRecord and StoreRecordHandler

import uvloop
import asyncio
from kafka import KafkaAdminClient
from kafka.cluster import ClusterMetadata

# Import local & global store memory
from tonga.stores.local.memory import LocalStoreMemory
from tonga.stores.globall.memory import GlobalStoreMemory
# Import store builder
from tonga.stores.store_builder.store_builder import StoreBuilder

cur_instance = 0
nb_replica = 2

# Creates event loop
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)

# Creates local store memory / global store memory
local_store = LocalStoreMemory(name=f'waiter-{cur_instance}-local-memory')
global_store = GlobalStoreMemory(name=f'waiter-{cur_instance}-global-memory')

cluster_admin = KafkaAdminClient(bootstrap_servers='localhost:9092', client_id=f'waiter-{cur_instance}')
cluster_metadata = ClusterMetadata(bootstrap_servers='localhost:9092')

# Creates store builder
store_builder = StoreBuilder(name=f'waiter-{cur_instance}-store-builder', current_instance=cur_instance,
                            nb_replica=nb_replica, topic_store='waiter-stores', serializer=serializer,
                            local_store=local_store, global_store=global_store,
                            bootstrap_server='localhost:9092', cluster_metadata=cluster_metadata,
                            cluster_admin=cluster_admin, loop=loop, rebuild=True, event_sourcing=False)

# Ensures future of KafkaConsumer store builder
store_builder.return_consumer_task()

# Catch SIGINT
signal(SIGINT, lambda s, f: loop.stop())
try:
   # Runs forever
   loop.run_forever()
except Exception:
   # If an exception was raised loop was stopped
   loop.stop()