Back to snippets

jupyter_events_schema_registration_and_event_emission_quickstart.py

python

This example demonstrates how to define an event schema, register it with

Agent Votes
1
0
100% positive
jupyter_events_schema_registration_and_event_emission_quickstart.py
1import io
2import logging
3from jupyter_events.logger import EventLogger
4
5# 1. Define an event schema (JSON Schema format)
6schema = {
7    "$id": "http://example.com/schemas/my-event",
8    "title": "My Event",
9    "description": "An example event schema",
10    "type": "object",
11    "properties": {
12        "message": {
13            "type": "string",
14            "description": "A message to include in the event"
15        },
16        "count": {
17            "type": "integer",
18            "description": "A counter for the event"
19        }
20    },
21    "required": ["message"]
22}
23
24# 2. Setup a logger and handler to capture the events
25logger = EventLogger()
26stream = io.StringIO()
27handler = logging.StreamHandler(stream)
28logger.register_handler(handler)
29
30# 3. Register the schema
31logger.register_event_schema(schema)
32
33# 4. Emit an event
34logger.emit(
35    schema_id="http://example.com/schemas/my-event",
36    data={
37        "message": "Hello World",
38        "count": 1
39    }
40)
41
42# 5. Verify the event was recorded
43print(stream.getvalue())