A comprehensive Avro schema library for the codedstream real-time crypto risk detection demonstration systems. This library provides standardized data contracts and schema definitions for building scalable, evolvable risk management platforms.
graph TB
A[TradeEvent] --> D[RiskAlert]
B[MarketDataEvent] --> D
C[RiskRule] --> D
A --> E[PositionUpdate]
B --> E
subgraph "Core Events"
A
B
end
subgraph "Risk Domain"
C
D
E
end
| Schema | Purpose | Key Features |
|---|---|---|
| TradeEvent | Trading activity | Multi-exchange support, cancellation tracking |
| MarketDataEvent | Price and volume data | Bid/ask spreads, 24h metrics |
| RiskAlert | Risk detection outputs | Severity scoring, pattern metadata |
| RiskRule | Dynamic rules | Hot-reloadable, versioned configurations |
| PositionUpdate | Real-time exposures | PnL tracking, position aggregation |
<dependency>
<groupId>com.codedstreams</groupId>
<artifactId>codedstreams-risko-library</artifactId>
<version>1.0.0</version>
</dependency>implementation("com.codedstreams:codedstreams-risko-library:1.0.0")git clone https://github.com/codedstreams/risko-library.git
cd risko-library
mvn clean install{
"type": "record",
"name": "TradeEvent",
"namespace": "com.codedstreams.schemas",
"fields": [
{"name": "eventId", "type": "string"},
{"name": "traderId", "type": "string"},
{"name": "exchange", "type": {"type": "enum", "name": "Exchange", "symbols": ["BINANCE", "COINBASE", "KRAKEN"]}},
{"name": "symbol", "type": "string"},
{"name": "tradeType", "type": {"type": "enum", "name": "TradeType", "symbols": ["MARKET", "LIMIT"]}},
{"name": "side", "type": {"type": "enum", "name": "OrderSide", "symbols": ["BUY", "SELL"]}},
{"name": "price", "type": "double"},
{"name": "quantity", "type": "double"},
{"name": "timestamp", "type": "long"},
{"name": "orderId", "type": "string"},
{"name": "isCancel", "type": "boolean", "default": false},
{"name": "parentOrderId", "type": ["null", "string"], "default": null}
]
}
{
"type": "record",
"name": "RiskAlert",
"namespace": "com.codedstreams.schemas",
"fields": [
{"name": "alertId", "type": "string"},
{"name": "traderId", "type": "string"},
{"name": "alertType", "type": {"type": "enum", "name": "AlertType", "symbols": ["WASH_TRADE", "SPOOFING", "POSITION_LIMIT", "RATE_LIMIT", "MARKET_ABUSE"]}},
{"name": "pattern", "type": "string"},
{"name": "severity", "type": "double"},
{"name": "description", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "metadata", "type": {"type": "map", "values": "string"}},
{"name": "ruleId", "type": ["null", "string"], "default": null}
]
}
// Creating a trade event
TradeEvent trade = TradeEvent.newBuilder()
.setEventId(UUID.randomUUID().toString())
.setTraderId("trader-123")
.setExchange(Exchange.BINANCE)
.setSymbol("BTC-USDT")
.setTradeType(TradeType.LIMIT)
.setSide(OrderSide.BUY)
.setPrice(45000.50)
.setQuantity(1.5)
.setTimestamp(System.currentTimeMillis())
.setOrderId("order-456")
.setIsCancel(false)
.setParentOrderId(null)
.build();
// Creating a risk alert
RiskAlert alert = RiskAlert.newBuilder()
.setAlertId(UUID.randomUUID().toString())
.setTraderId("trader-123")
.setAlertType(AlertType.WASH_TRADE)
.setPattern("BUY-SELL-CANCEL")
.setSeverity(0.85)
.setDescription("Potential wash trading detected")
.setTimestamp(System.currentTimeMillis())
.setMetadata(Map.of("buyOrderId", "order-456", "sellOrderId", "order-457"))
.setRuleId("rule-wash-trade-v1")
.build();Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
KafkaProducer<String, TradeEvent> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("trades", trade.getTraderId(), trade));Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "risk-engine");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put("specific.avro.reader", "true");
KafkaConsumer<String, RiskAlert> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("risk-alerts"));The library supports backward and forward compatibility through Avro schema evolution:
// New field with default value
{
"name": "newField",
"type": "string",
"default": "default_value"
}
// Field can be safely removed if it had a default
{
"name": "oldField",
"type": "string",
"default": "old_default" // Required for safe removal
}
sequenceDiagram
participant P as Producer
participant SR as Schema Registry
participant K as Kafka
participant C as Consumer
P->>SR: Register/Validate Schema
SR->>P: Schema ID
P->>K: Send Avro + Schema ID
C->>K: Poll Messages
C->>SR: Get Schema by ID
SR->>C: Return Schema
C->>C: Deserialize Avro
public class TradeEventTest {
@Test
public void testTradeEventCreation() {
TradeEvent trade = TradeEvent.newBuilder()
.setEventId("test-123")
.setTraderId("trader-1")
.setExchange(Exchange.BINANCE)
// ... set other fields
.build();
assertEquals("trader-1", trade.getTraderId());
assertEquals(Exchange.BINANCE, trade.getExchange());
}
}# Test schema compatibility
mvn avro:schema
mvn avro:test-compatibility// Flink Avro Deserialization
KafkaSource<TradeEvent> source = KafkaSource.<TradeEvent>builder()
.setBootstrapServers("localhost:9092")
.setTopics("trades")
.setValueOnlyDeserializer(
RegistryAvroDeserializationSchema.forSpecific(TradeEvent.class, schemaRegistryUrl)
)
.build();The library supports integration with monitoring systems through structured logging and metrics:
// Structured logging example
logger.info("Risk alert generated",
"alertId", alert.getAlertId(),
"traderId", alert.getTraderId(),
"severity", alert.getSeverity(),
"pattern", alert.getPattern());- Fork the repository
- Create a feature branch:
git checkout -b feature/new-schema - Update schemas in
src/main/avro/ - Regenerate Java classes:
mvn compile - Submit a pull request
flowchart LR
A[Propose Change] --> B[Update .avsc]
B --> C[Regenerate Java]
C --> D[Test Compatibility]
D --> E[Submit PR]
E --> F[Review & Merge]
Apache License 2.0 - See LICENSE file for details.
- π Documentation
- π Issue Tracker
- π¬ Discussions
This library follows Semantic Versioning:
- MAJOR - Breaking schema changes
- MINOR - New features, backward compatible
- PATCH - Bug fixes, backward compatible
Built with β€οΈ by CodedStreams for the passionate data streamers and risk mitigation engineers