1- import typing
1+ from typing import Annotated , Literal
22
33import pydantic
4- from pydantic import ConfigDict
5-
6- from domains .common .cloudevent_base import (
7- BaseEvent ,
8- dataschema_field ,
9- type_field ,
10- )
4+ from cloudevents_pydantic .events import CloudEvent
5+ from cloudevents_pydantic .events .fields import metadata
6+ from cloudevents_pydantic .events .fields .types import URI , URIReference
7+ from pydantic import ConfigDict , Field
118
129
1310class BookCreatedV1Data (pydantic .BaseModel ):
@@ -16,22 +13,76 @@ class BookCreatedV1Data(pydantic.BaseModel):
1613 author_name : str
1714
1815
19- class BookCreatedV1 (BaseEvent ):
20- type : typing .Literal ["book.created.v1" ] = type_field ("book.created.v1" )
21- dataschema : str = dataschema_field ("book.created.v1" )
16+ def _dataschema_url (value : str ) -> str :
17+ return f"https://this_service/dataschemas/{ value } "
18+
19+
20+ class BookCreatedV1 (CloudEvent ):
21+ source : Annotated [
22+ URIReference ,
23+ Field (default = "/book_service" , validate_default = True ),
24+ metadata .FieldSource ,
25+ ]
26+ type : Annotated [
27+ Literal ["book.created.v1" ], Field (default = "book.created.v1" ), metadata .FieldType
28+ ]
29+ dataschema : Annotated [
30+ URI ,
31+ Field (default = _dataschema_url ("book.created.v1" ), validate_default = True ),
32+ metadata .FieldDataSchema ,
33+ ]
2234
2335 data : BookCreatedV1Data
2436
2537 # The first example is used to generate the OpenAPI documentation!
26- # Examples ate good! Add examples!
38+ # Examples are good! Add examples!
2739 model_config = ConfigDict (
2840 json_schema_extra = {
2941 "examples" : [
3042 {
3143 "source" : "this.service.url.here" ,
3244 "type" : "book.created.v1" ,
33- "dataschema" : "book.created.v1/some_event" ,
34- "datacontenttype" : "text/xml" ,
45+ "dataschema" : "/dataschemas/book.created.v1" ,
46+ "datacontenttype" : "application/json" ,
47+ "subject" : "123" ,
48+ "data" : {"book_id" : 0 , "title" : "string" , "author_name" : "string" },
49+ "id" : "A234-1234-1234" ,
50+ "specversion" : "1.0" ,
51+ "time" : "2018-04-05T17:31:00Z" ,
52+ }
53+ ]
54+ }
55+ )
56+
57+
58+ class BookUpdatedV1 (CloudEvent ):
59+ source : Annotated [
60+ URIReference ,
61+ Field (default = "/book_service" , validate_default = True ),
62+ metadata .FieldSource ,
63+ ]
64+ type : Annotated [
65+ Literal ["book.updated.v1" ], Field (default = "book.updated.v1" ), metadata .FieldType
66+ ]
67+ dataschema : Annotated [
68+ URI ,
69+ Field (default = _dataschema_url ("book.updated.v1" ), validate_default = True ),
70+ metadata .FieldDataSchema ,
71+ ]
72+
73+ # This is just an example, too lazy to use a different data class
74+ data : BookCreatedV1Data
75+
76+ # The first example is used to generate the OpenAPI documentation!
77+ # Examples are good! Add examples!
78+ model_config = ConfigDict (
79+ json_schema_extra = {
80+ "examples" : [
81+ {
82+ "source" : "this.service.url.here" ,
83+ "type" : "book.updated.v1" ,
84+ "dataschema" : "/dataschemas/book.updated.v1" ,
85+ "datacontenttype" : "application/json" ,
3586 "subject" : "123" ,
3687 "data" : {"book_id" : 0 , "title" : "string" , "author_name" : "string" },
3788 "id" : "A234-1234-1234" ,
0 commit comments