From 6903d4ac78146af5eb3b963881630122e07ec2ac Mon Sep 17 00:00:00 2001 From: Andriy Levchenko Date: Wed, 29 Nov 2017 16:57:00 +0200 Subject: [PATCH] Added kafka producer to storage service and consumer to ratings service --- rating-service/pom.xml | 5 +++ .../service/ApartmentRecordListener.java | 35 ++++++++++++++++++ .../src/main/resources/application.yml | 9 +++++ storage-service/pom.xml | 5 +++ .../ApartmentRepositoryEventListener.java | 36 +++++++++++++++++++ 5 files changed, 90 insertions(+) create mode 100644 rating-service/src/main/java/com/lohika/jclub/rating/service/ApartmentRecordListener.java create mode 100644 rating-service/src/main/resources/application.yml create mode 100644 storage-service/src/main/java/com/lohika/jclub/storage/service/ApartmentRepositoryEventListener.java diff --git a/rating-service/pom.xml b/rating-service/pom.xml index 25862a6..3887788 100644 --- a/rating-service/pom.xml +++ b/rating-service/pom.xml @@ -47,6 +47,11 @@ lombok 1.16.16 + + org.springframework.kafka + spring-kafka + 1.2.2.RELEASE + org.springframework.boot spring-boot-starter-test diff --git a/rating-service/src/main/java/com/lohika/jclub/rating/service/ApartmentRecordListener.java b/rating-service/src/main/java/com/lohika/jclub/rating/service/ApartmentRecordListener.java new file mode 100644 index 0000000..f1bdafb --- /dev/null +++ b/rating-service/src/main/java/com/lohika/jclub/rating/service/ApartmentRecordListener.java @@ -0,0 +1,35 @@ +package com.lohika.jclub.rating.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.math.BigDecimal; + +/** + * @author Andriy Levchenko + */ +@Slf4j +@Component +@EnableKafka +public class ApartmentRecordListener { + + @Autowired + private RatingService ratingService; + + @Autowired + private ObjectMapper objectMapper; + + @KafkaListener(topics = "${kafka.topic.boot}") + public void receive(ConsumerRecord record) throws IOException { + log.info("Received message: " + record); + Apartment apartment = objectMapper.readValue(record.value(), Apartment.class); + BigDecimal rating = ratingService.calculateRating(apartment); + log.info("Rating : {}", rating); + } +} diff --git a/rating-service/src/main/resources/application.yml b/rating-service/src/main/resources/application.yml new file mode 100644 index 0000000..f8f441c --- /dev/null +++ b/rating-service/src/main/resources/application.yml @@ -0,0 +1,9 @@ +spring: + kafka: + consumer: + auto-offset-reset: earliest + group-id: boot + +kafka: + topic: + boot: apartments \ No newline at end of file diff --git a/storage-service/pom.xml b/storage-service/pom.xml index 230df06..059cf79 100644 --- a/storage-service/pom.xml +++ b/storage-service/pom.xml @@ -55,6 +55,11 @@ spring-boot-starter-test test + + org.springframework.kafka + spring-kafka + 1.2.2.RELEASE + diff --git a/storage-service/src/main/java/com/lohika/jclub/storage/service/ApartmentRepositoryEventListener.java b/storage-service/src/main/java/com/lohika/jclub/storage/service/ApartmentRepositoryEventListener.java new file mode 100644 index 0000000..b8e8677 --- /dev/null +++ b/storage-service/src/main/java/com/lohika/jclub/storage/service/ApartmentRepositoryEventListener.java @@ -0,0 +1,36 @@ +package com.lohika.jclub.storage.service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.rest.core.event.AbstractRepositoryEventListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +/** + * @author Andriy Levchenko + */ +@Slf4j +@Component +public class ApartmentRepositoryEventListener extends AbstractRepositoryEventListener { + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + private ObjectMapper objectMapper; + + @Override + protected void onAfterCreate(ApartmentRecord apartmentRecord) { + log.info("Creating message for apartmentRecord " + apartmentRecord.toString()); + String json; + try { + json = objectMapper.writeValueAsString(apartmentRecord); + } catch (JsonProcessingException e) { + throw new RuntimeException("Error", e); + } + kafkaTemplate.send("apartments", json); + log.info("Message sent for apartmentRecord " + apartmentRecord.toString()); + } +}