profile image

L o a d i n g . . .

프로젝트에 사용된 코드

 

GitHub - examples-seonwkim/spring-boot-chat

Contribute to examples-seonwkim/spring-boot-chat development by creating an account on GitHub.

github.com

 

 

분산  시스템에서 채팅 애플리케이션을 개발하기 위해서는 서로 다른 서버에 접속한 유저에게 메시지를 송신할 수 있도록 Redis 등의 메시지 전송을 위한 미들웨어가 필요합니다. 다른 방법으로는 서버를 클러스터로 묶고 서로 통신할 수 있도록 하는 방법이 있는데요, 요 포스팅에서는 액터 클러스터링 기능을 활용해서 후자의 방법으로 채팅 애플리케이션을 개발해 보겠습니다. 

 

Java 진영의 대표적인 액터 라이브러리로는 Pekko가 있습니다. 다만 Spring Boot를 사용하는 경우 Pekko와의 통합이 복잡하기 때문에 이를 단순화시킨 spring-boot-starter-actor 라이브러리를 활용하겠습니다. 

plugins {
    kotlin("jvm") version "1.9.25"
    kotlin("plugin.spring") version "1.9.25"
    id("org.springframework.boot") version "3.3.12"
    id("io.spring.dependency-management") version "1.1.7"
}

group = "io.github.seonwkim"
version = "0.0.1-SNAPSHOT"

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(17)
    }
}

repositories {
    mavenCentral()
}


dependencyManagement {
    imports {
        // pekko-serialization-jackson_3 require minimum 2.17.3 version of jackson
        mavenBom("com.fasterxml.jackson:jackson-bom:2.17.3")
    }
}

dependencies {
    implementation("io.github.seonwkim:spring-boot-starter-actor_3:0.0.25")

    implementation("org.springframework.boot:spring-boot-starter")
    implementation("org.springframework.boot:spring-boot-starter-webflux")
    implementation("org.springframework.boot:spring-boot-starter-websocket")

    implementation("org.springframework.boot:spring-boot-starter-thymeleaf")
    implementation("io.micrometer:micrometer-registry-prometheus")
    implementation("com.fasterxml.jackson.core:jackson-databind")

    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testRuntimeOnly("org.junit.platform:junit-platform-launcher")

}

kotlin {
    compilerOptions {
        freeCompilerArgs.addAll("-Xjsr305=strict")
    }
}

tasks.withType<Test> {
    useJUnitPlatform()
}

 

프로젝트는 spring boot 버전 3을 활용하겠습니다. 추가로 웹소켓을 위해 spring-boot-starter-websocket을, 비동기 서버를 기본으로 사용하기 위해 spring-boot-starter-webflux를 추가하겠습니다. 

 

웹 소켓 설정 추가 

프런트에서 웹 소켓 세션을 생성할 수 있는 endpoint를 설정하는 코드를 우선 작성하겠습니다. 

import org.springframework.context.annotation.Configuration
import org.springframework.web.socket.config.annotation.EnableWebSocket
import org.springframework.web.socket.config.annotation.WebSocketConfigurer
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry

@Configuration
@EnableWebSocket
class WebSocketConfig(
    private val chatWebSocketHandler: ChatWebSocketHandler
) : WebSocketConfigurer {

    override fun registerWebSocketHandlers(registry: WebSocketHandlerRegistry) {
        registry
            .addHandler(chatWebSocketHandler, "/ws/chat")
            .setAllowedOrigins("*") // For development; restrict in production
    }
}

 

"/ws/chat" endpoint를 통해 웹 소켓 세션이 생성되고 해당 세션은 chatWebSocketHandler에 의해 처리됩니다. 다음으로는 웹 소켓과 관련된 로직을 처리하는 ChatWebSocketHandler을 추가하겠습니다. 

 

ChatWebSocketHandler 구현에 앞서 채팅 시스템의 아키텍처를 우선 생각해 보겠습니다. 유저가 웹소켓 커넥션을 생성하면 해당 커넥션을 담당하는 액터를 생성할 건데 해당 액터를 UserActor라고 명명하겠습니다. 그리고 특정 채팅방에서 발생하는 모든 이벤트를 처리하는 액터를 ChatRoomActor라고 명명하면 전체적인 아키텍처는 아래와 같습니다. 

채팅 애플리케이션 아키텍처

 

 

UserActor 

그럼 유저의 웹 소켓과 관련된 로직을 담당하는 UserActor을 구현하겠습니다. 

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import io.github.seonwkim.core.SpringActor
import io.github.seonwkim.core.SpringActorContext
import io.github.seonwkim.core.SpringActorSystem
import io.github.seonwkim.core.SpringShardedActorRef
import io.github.seonwkim.core.serialization.JsonSerializable
import org.apache.pekko.actor.typed.Behavior
import org.apache.pekko.actor.typed.javadsl.ActorContext
import org.apache.pekko.actor.typed.javadsl.Behaviors
import org.springframework.stereotype.Component
import org.springframework.web.socket.TextMessage
import org.springframework.web.socket.WebSocketSession
import java.io.IOException

@Component
class UserActor : SpringActor {

    interface Command : JsonSerializable

    class Connect : Command

    class Stop : Command

    data class JoinRoom(val roomId: String) : Command

    class LeaveRoom : Command

    data class SendMessage(val message: String) : Command

    data class JoinRoomEvent(val userId: String) : Command

    data class LeaveRoomEvent(val userId: String) : Command

    data class SendMessageEvent(
        val userId: String,
        val message: String
    ) : Command

    override fun commandClass(): Class<*> = Command::class.java

    class UserActorContext(
        val actorSystem: SpringActorSystem,
        val objectMapper: ObjectMapper,
        val userId: String,
        val session: WebSocketSession
    ) : SpringActorContext {
        override fun actorId(): String = userId
    }

    override fun create(actorContext: SpringActorContext): Behavior<Command> {
        val userActorContext = actorContext as? UserActorContext
            ?: throw IllegalStateException("Must be UserActorContext")

        return Behaviors.setup { context ->
            UserActorBehavior(
                context,
                userActorContext.actorSystem,
                userActorContext.objectMapper,
                userActorContext.userId,
                userActorContext.session
            ).create()
        }
    }

    private class UserActorBehavior(
        private val context: ActorContext<Command>,
        private val actorSystem: SpringActorSystem,
        private val objectMapper: ObjectMapper,
        private val userId: String,
        private val session: WebSocketSession
    ) {
        private var currentRoomId: String? = null

        fun create(): Behavior<Command> = Behaviors.receive(Command::class.java)
            .onMessage(Connect::class.java, ::onConnect)
            .onMessage(Stop::class.java, ::onStop)
            .onMessage(JoinRoom::class.java, ::onJoinRoom)
            .onMessage(LeaveRoom::class.java, ::onLeaveRoom)
            .onMessage(SendMessage::class.java, ::onSendMessage)
            .onMessage(JoinRoomEvent::class.java, ::onJoinRoomEvent)
            .onMessage(LeaveRoomEvent::class.java, ::onLeaveRoomEvent)
            .onMessage(SendMessageEvent::class.java, ::onSendMessageEvent)
            .build()

        private fun onConnect(connect: Connect): Behavior<Command> {
            sendEvent("connected") {
                put("userId", userId)
            }
            return Behaviors.same()
        }

        private fun onStop(stop: Stop): Behavior<Command> {
            if (currentRoomId == null) {
                context.log.info("$userId user has not joined any room.")
                return Behaviors.same()
            }

            val roomActor = getRoomActor()
            roomActor.tell(ChatRoomActor.LeaveRoom(userId))

            return Behaviors.same();
        }

        private fun onJoinRoom(command: JoinRoom): Behavior<Command> {
            currentRoomId = command.roomId
            val roomActor = getRoomActor()
            sendEvent("joined") {
                put("roomId", currentRoomId)
            }

            roomActor.tell(ChatRoomActor.JoinRoom(userId, context.self))
            return Behaviors.same()
        }

        private fun onLeaveRoom(command: LeaveRoom): Behavior<Command> {
            if (currentRoomId == null) {
                context.log.info("$userId user has not joined any room.")
                return Behaviors.same()
            }

            sendEvent("left") {
                put("roomId", currentRoomId)
            }

            val roomActor = getRoomActor()
            roomActor.tell(ChatRoomActor.LeaveRoom(userId))

            return Behaviors.same()
        }

        private fun onSendMessage(command: SendMessage): Behavior<Command> {
            if (currentRoomId == null) {
                context.log.info("$userId user has not joined any room.")
                return Behaviors.same()
            }

            val roomActor = getRoomActor()
            roomActor.tell(ChatRoomActor.SendMessage(userId, command.message))

            return Behaviors.same()
        }

        private fun onJoinRoomEvent(event: JoinRoomEvent): Behavior<Command> {
            sendEvent("user_joined") {
                put("userId", event.userId)
                put("roomId", currentRoomId)
            }
            return Behaviors.same()
        }

        private fun onLeaveRoomEvent(event: LeaveRoomEvent): Behavior<Command> {
            sendEvent("user_left") {
                put("userId", event.userId)
                put("roomId", currentRoomId)
            }
            return Behaviors.same()
        }

        private fun onSendMessageEvent(event: SendMessageEvent): Behavior<Command> {
            sendEvent("message") {
                put("userId", event.userId)
                put("message", event.message)
                put("roomId", currentRoomId)
            }
            return Behaviors.same()
        }

        private fun getRoomActor(): SpringShardedActorRef<ChatRoomActor.Command> =
            actorSystem.entityRef(ChatRoomActor.Companion.TYPE_KEY, currentRoomId!!)

        private fun sendEvent(type: String, builder: ObjectNode.() -> Unit) {
            try {
                val eventNode = objectMapper.createObjectNode().apply {
                    put("type", type)
                    builder()
                }

                if (session.isOpen) {
                    session.sendMessage(TextMessage(objectMapper.writeValueAsString(eventNode)))
                }
            } catch (e: IOException) {
                context.log.error("Failed to send message to WebSocket", e)
            }
        }
    }
}

 

UserActor가 처리하는 이벤트에 대한 설명은 다음과 같습니다. 

  • Connect: 처음 웹 소켓 커넥션이 생성됐을 때 발생 
  • Stop: 웹 소켓 커넥션이 끊긴 경우 
  • JoinRoom: 유저가 특정 채팅방에 입장했을 때 발생 
  • LeaveRoom: 유저가 현재 접속한 채팅방을 퇴장할 때 발생 
  • SendMessage: 유저가 채팅 메시지를 전송한 경우 발생 
  • JoinRoomEvent: 유저가 접속한 채팅방에 다른 유저가 접속한 경우. 
  • LeaveRoomEvent: 유저가 접속한 채팅방에 다른 유저가 퇴장하는 경우 
  • SendMessagEvent: 유저가 접속한 채팅방에서 채팅 메시지가 추가된 경우 

 

각 이벤트의 처리는 UserActorBehavior에 정의되어 있습니다. 

 

ChatRoomActor 

다음으로는 채팅방과 관련된 로직을 담당하는 ChatRoomActor을 구현하겠습니다. ChatRoomActor의 특징은 클러스터 내에서 유일해야 한다는 점인데요, 예를 들어 roomId가 1인 채팅방을 나타내는 ChatRoomActor 액터는 전체 클러스터에서 유일한 존재여야 합니다. 이를 위해 ShardedActor을 사용하겠습니다. 

package io.github.seonwkim.springbootchat

import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty
import io.github.seonwkim.core.serialization.JsonSerializable
import io.github.seonwkim.core.shard.DefaultShardingMessageExtractor
import io.github.seonwkim.core.shard.ShardEnvelope
import io.github.seonwkim.core.shard.ShardedActor
import org.apache.pekko.actor.typed.ActorRef
import org.apache.pekko.actor.typed.Behavior
import org.apache.pekko.actor.typed.javadsl.Behaviors
import org.apache.pekko.cluster.sharding.typed.ShardingMessageExtractor
import org.apache.pekko.cluster.sharding.typed.javadsl.EntityContext
import org.apache.pekko.cluster.sharding.typed.javadsl.EntityTypeKey
import org.springframework.stereotype.Component

@Component
class ChatRoomActor : ShardedActor<ChatRoomActor.Command> {

    companion object {
        val TYPE_KEY: EntityTypeKey<Command> = EntityTypeKey.create(Command::class.java, "ChatRoomActor")
    }

    interface Command : JsonSerializable

    data class JoinRoom @JsonCreator constructor(
        @JsonProperty("userId") val userId: String,
        @JsonProperty("userRef") val userRef: ActorRef<UserActor.Command>
    ) : Command

    data class LeaveRoom @JsonCreator constructor(
        @JsonProperty("userId") val userId: String
    ) : Command

    data class SendMessage @JsonCreator constructor(
        @JsonProperty("userId") val userId: String,
        @JsonProperty("message") val message: String
    ) : Command

    override fun typeKey(): EntityTypeKey<Command> = TYPE_KEY

    override fun create(ctx: EntityContext<Command>): Behavior<Command> = Behaviors.setup {
        val roomId = ctx.entityId
        chatRoom(roomId, HashMap())
    }

    private fun chatRoom(
        roomId: String,
        connectedUsers: MutableMap<String, ActorRef<UserActor.Command>>
    ): Behavior<Command> = Behaviors.receive(Command::class.java)
        .onMessage(JoinRoom::class.java) { msg ->
            // Add the user to the connected users
            connectedUsers[msg.userId] = msg.userRef

            // Notify all users that a new user has joined
            val joinRoomEvent = UserActor.JoinRoomEvent(msg.userId)
            broadcastCommand(connectedUsers, joinRoomEvent)

            chatRoom(roomId, connectedUsers)
        }
        .onMessage(LeaveRoom::class.java) { msg ->
            // Remove the user from connected users
            val userRef = connectedUsers.remove(msg.userId)

            userRef?.let {
                // Notify the user that they left the room
                val leaveRoomEvent = UserActor.LeaveRoomEvent(msg.userId)
                it.tell(leaveRoomEvent)

                // Notify all remaining users that a user has left
                broadcastCommand(connectedUsers, leaveRoomEvent)
            }

            chatRoom(roomId, connectedUsers)
        }
        .onMessage(SendMessage::class.java) { msg ->
            // Create a message received command
            val receiveMessageCmd = UserActor.SendMessageEvent(msg.userId, msg.message)

            // Broadcast the message to all connected users
            broadcastCommand(connectedUsers, receiveMessageCmd)

            Behaviors.same()
        }
        .build()

    private fun broadcastCommand(
        connectedUsers: Map<String, ActorRef<UserActor.Command>>,
        command: UserActor.Command
    ) {
        connectedUsers.values.forEach { it.tell(command) }
    }

    override fun extractor(): ShardingMessageExtractor<ShardEnvelope<Command>, Command> =
        DefaultShardingMessageExtractor(3)
}

 

ChatRoomActor에서 처리하는 이벤트는 다음과 같습니다. 

  • JoinRoom: 해당 채팅방에 특정 유저가 입장한 경우 
  • LeaveRoom: 해당 채팅방에서 특정 유저가 퇴장한 경우 
  • SendMessage: 해당 채팅방에서 메시지가 생성된 경우 

 

ChatRoomActor는 채팅방에 접속한 유저들에게 이벤트를 브로드캐스팅 방식을 통해 전달합니다(broadcastCommand 함수 참고). 

 

ChatWebSocketHandler 

자, 그럼 대망의 웹 소켓 핸들링 로직을 작성해 보겠습니다. 

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import io.github.seonwkim.core.SpringActorRef
import io.github.seonwkim.core.SpringActorSystem
import org.springframework.stereotype.Component
import org.springframework.web.socket.CloseStatus
import org.springframework.web.socket.TextMessage
import org.springframework.web.socket.WebSocketSession
import org.springframework.web.socket.handler.TextWebSocketHandler
import java.io.IOException
import java.util.*
import java.util.concurrent.ConcurrentHashMap

@Component
class ChatWebSocketHandler(
    private val objectMapper: ObjectMapper,
    private val actorSystem: SpringActorSystem
) : TextWebSocketHandler() {

    private val userActors = ConcurrentHashMap<String, SpringActorRef<UserActor.Command>>()

    override fun afterConnectionEstablished(session: WebSocketSession) {
        val userId = UUID.randomUUID().toString()
        session.attributes["userId"] = userId

        val userActorContext = UserActor.UserActorContext(
            actorSystem = actorSystem,
            objectMapper = objectMapper,
            userId = userId,
            session = session
        )

        actorSystem.spawn(UserActor.Command::class.java, userActorContext)
            .thenAccept { userActor ->
                userActors[userId] = userActor
                userActor.tell(UserActor.Connect())
            }
    }


    override fun afterConnectionClosed(session: WebSocketSession, status: CloseStatus) {
        val userId = session.attributes["userId"] as String?
        val userActor = getUserActor(userId)

        if (userId != null && userActor != null) {
            userActor.tell(UserActor.Stop())
            actorSystem.stop(UserActor.Command::class.java, userId)
            userActors.remove(userId)
        }
    }

    override fun handleTextMessage(session: WebSocketSession, message: TextMessage) {
        val userId = session.attributes["userId"] as String
        val payload = objectMapper.readTree(message.payload)

        when (payload.get("type").asText()) {
            "join" -> handleJoinRoom(userId, payload)
            "leave" -> handleLeaveRoom(userId)
            "message" -> handleChatMessage(userId, payload)
            else -> sendErrorMessage(session, "Unknown message type: ${payload.get("type").asText()}")
        }
    }

    private fun handleJoinRoom(userId: String, payload: JsonNode) {
        val roomId = payload.get("roomId").asText()
        val userActor = getUserActor(userId)

        if (roomId != null && userActor != null) {
            userActor.tell(UserActor.JoinRoom(roomId))
        }
    }

    private fun handleLeaveRoom(userId: String) {
        getUserActor(userId)?.tell(UserActor.LeaveRoom())
    }

    private fun handleChatMessage(userId: String, payload: JsonNode) {
        val userActor = getUserActor(userId)
        val messageText = payload.get("message").asText()

        if (userActor != null && messageText != null) {
            userActor.tell(UserActor.SendMessage(messageText))
        }
    }

    private fun getUserActor(userId: String?): SpringActorRef<UserActor.Command>? {
        return userId?.let { userActors[it] }
    }

    private fun sendErrorMessage(session: WebSocketSession, errorMessage: String) {
        try {
            if (session.isOpen) {
                val response = objectMapper.createObjectNode().apply {
                    put("type", "error")
                    put("message", errorMessage)
                }
                session.sendMessage(TextMessage(objectMapper.writeValueAsString(response)))
            }
        } catch (e: IOException) {
            e.printStackTrace()
        }
    }
}

 

각 함수에서 어떤 동작을 수행하는지는 아래와 같습니다. 

  • afterConnectionEstablished: UserActor을 생성. UserActor는 userId와 해당 유저의 웹 소켓 세션을 들고있습니다.  
  • afterConnectionClosed: UserActor을 시스템에서 종료합니다. 
  • handleTextMessage: 채팅방 접속, 퇴장, 메시지 생성과 관련된 로직을 처리합니다. 

 

전체 로직을 완성을 했으니 클러스터 실행을 위한 스크립트를 작성하겠습니다. 

#!/bin/bash

set -e

BASE_PORT=$1
BASE_PEKKO_PORT=$2
INSTANCE_COUNT=$3

if [[ -z "$BASE_PORT" || -z "$BASE_PEKKO_PORT" || -z "$INSTANCE_COUNT" ]]; then
  echo "Usage: $0 <basePort> <basePekkoPort> <instanceCount>"
  exit 1
fi

run_application() {
  local instance=$1
  local port=$((BASE_PORT + instance))
  local pekko_port=$((BASE_PEKKO_PORT + instance))

  echo "Starting instance $instance: port=${port}, pekko_port=${pekko_port}"

  ./gradlew bootRun \
    --args="--server.port=${port} \
            --spring.actor.pekko.remote.artery.canonical.port=${pekko_port}" \
    -PmainClass=${MAIN_CLASS} \
    > "log_${port}.txt" 2>&1 &
}

for ((i=0; i<INSTANCE_COUNT; i++)); do
  run_application $i
done

 

위 스크립트 파일명을 cluster-start.sh로 작성하면 "sh cluster-start.sh 8080 2551 3" 커맨드로 3개의 서버를 지닌 클러스터를 실행할 수 있습니다. 그럼 브라우저 3개를 띄우고 각각 localhost:8080, localhost:8081, localhost:8082를 접속하면 채팅 기능이 정상 동작하는 것을 확인할 수 있습니다. 

 

결과

 

spring-boot-starter-actor을 사용하면 spring boot와 pekko의 기능을 모두 편리하게 사용할 수 있습니다. 또한 pekko의 클러스터링을 활용하면 미들웨어 없이도 stateful 기능을 손쉽게 구현할 수 있습니다. 개인적으로 액터 모델은 무궁무진한 가능성이 있는 모델이라고 생각하기에 위 라이브러리를 production ready 상태로 만드는 것이 v1.0.0의 목표입니다. 언제나 컨트리뷰션은 환영입니다(편하게 의견 주셔도 좋습니다) 

 

 

GitHub - seonWKim/spring-boot-starter-actor: Actors kindly introduced to Spring

Actors kindly introduced to Spring . Contribute to seonWKim/spring-boot-starter-actor development by creating an account on GitHub.

github.com

 

복사했습니다!