RabbitMQ


Requirements#


  • Docker
    • DockerHub Access
  • 2 Spring Projects
    • Project Gradle
    • Language Kotlin
    • Spring-Boot Default
    • Packaging Jar
    • Java e.g. 11

Presets#


RabbitMQ Docker-Container#


1. Save compose to any directory.#

# docker-compose.yaml
version: '3.7'
services:
rabbitmq:
image: rabbitmq:3-management-alpine
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672

2. Start compose#

cd /path/to/your/compose
docker-compose up -d

3. Verify container up#

Open Docker-Dashboard (Taskbar > Right > Docker-Icon > Right-Click > Dashboard)

OR

Open RabbitMQ UI

You can always check if your messages delivered correctly via RabbitMQ UI

Project Configuration#

1. Add Gradle dependency#

// build.gradle.kts
dependencies {
implementation("org.springframework.boot:spring-boot-starter-amqp")
// (...)
}

2. Setup Connection#

# application.yml
spring:
rabbitmq:
port: 5672
host: docker-machine

2. Create config#

import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.amqp.core.DirectExchange
import org.springframework.amqp.core.Queue
import org.springframework.amqp.core.TopicExchange
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
import org.springframework.amqp.support.converter.SimpleMessageConverter
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
/**
* @author Nicholas Dietz
**/
@Configuration
class RabbitMqConfig {
// Autowire your global jackson object-mapper or use default
@Autowired
private lateinit var objectMapper: ObjectMapper
// Use this one if you get generic message data
@Bean
fun simpleConverter(): SimpleMessageConverter {
return SimpleMessageConverter()
}
@Bean
fun rabbitMqTemplate(connectionFactory: ConnectionFactory, jsonMessageConverter: Jackson2JsonMessageConverter): RabbitTemplate {
return RabbitTemplate(connectionFactory).apply {
messageConverter = simpleConverter()
}
}
}

Concept#

You have to connect 2 services with each other via Exchange nicos-dev.user-exchange. There'll be 2 Queues nicos-dev.user-service and nicos-dev.cache-service The use-case is that you must synchronize data (users) from User Service to Cache Service


Legend#

Service - Queue - Exchange - Routing-Key

Message Data (User)#

This is the default JSON-Format our User Object:

{
"id": "String",
"firstName": "String",
"lastName": "String",
"email": "String"
}


Events#

Event names are Routing-Keys

Event - nicos-dev.user.sync-request#

Cache Service âž” Message with nicos-dev.user.sync-request âž” nicos-dev.user-exchange

Event - nicos-dev.user.sync-response#

User Service âž” Message with nicos-dev.user.sync-response âž” nicos-dev.user-exchange

Event - nicos-dev.user.created#

User Service âž” Message with nicos-dev.user.created âž” nicos-dev.user-exchange

Event - nicos-dev.user.deleted#

User Service âž” Message with nicos-dev.user.deleted âž” nicos-dev.user-exchange

Event - nicos-dev.data.updated#

User Service âž” Message with nicos-dev.user.updated âž” nicos-dev.user-exchange

Sender#

Messages are send to a Exchange with a specific Routing-Key, but without defining a specific Queue.

import com.nicos-dev.usermanagement.config.amqp.RabbitMqConfig
import org.slf4j.LoggerFactory
import org.springframework.amqp.core.Message
import org.springframework.amqp.core.MessageProperties
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
/**
* @author Nicholas Dietz
**/
@Service
class UserRabbitMqSenderServiceImp: UserRabbitMqSenderService {
@Autowired
private lateinit var rabbitMqTemplate: RabbitTemplate
override fun requestUsers() {
rabbitMqTemplate.send(
"nicos-dev.user-exchange", // Exchange
"nicos-dev.user.sync-request", // Routing Key
Message("".toByteArray(), MessageProperties()) // Empty message
)
}
}

Receiver#

To receive any messages you have to declare bindings. Because Messages are send to nicos-dev.user-exchange you must define which Routing-Keys should be forwarded to your Queue in order to receive any messages.

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonTypeRef
import org.slf4j.LoggerFactory
import org.springframework.amqp.core.ExchangeTypes
import org.springframework.amqp.core.Message
import org.springframework.amqp.rabbit.annotation.Exchange
import org.springframework.amqp.rabbit.annotation.Queue
import org.springframework.amqp.rabbit.annotation.QueueBinding
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.amqp.support.AmqpHeaders
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.messaging.handler.annotation.Header
import org.springframework.messaging.handler.annotation.Payload
import org.springframework.stereotype.Service
/**
* @author Nicholas Dietz
**/
@Service
class UserRabbitMqReceiver {
@Autowired
private lateinit var themeRepository: ThemeRepository
@Autowired
private lateinit var objectMapper: ObjectMapper
@RabbitListener(
bindings = [
QueueBinding(
value = Queue(value = "nicos-dev.cache-service", durable = "true"),
exchange = Exchange(value = "nicos-dev.user-exchange", type = ExchangeTypes.TOPIC),
key = [
"nicos-dev.user.sync-response",
"nicos-dev.user.created",
"nicos-dev.user.updated",
"nicos-dev.user.deleted"
]
)
]
)
fun messageArrived(@Payload value: String, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) key: String) {
when (key) {
"nicos-dev.user.sync-response" -> syncUsers(value)
"nicos-dev.user.created" -> userCreated(value)
"nicos-dev.user.updated" -> userUpdated(value)
"nicos-dev.user.deleted" -> userDeleted(value)
}
}
fun userCreated(value: String) {
val user = objectMapper.readValue(value, UserDto::class.java)
TODO("Do something")
}
fun userUpdated(value: String) {
val user = objectMapper.readValue(value, UserDto::class.java)
TODO("Do something")
}
fun userDeleted(value: String) {
val user = objectMapper.readValue(value, UserDto::class.java)
TODO("Do something")
}
fun syncUsers(value: String) {
val users = objectMapper.readValue(value, jacksonTypeRef<List<UserDto>>())
TODO("Do something")
}
}