Architecture

메세지 브로커로 동시성 이슈 트러블 슈팅하기 (feat. AWS SQS)

JeongKyun 2024. 5. 10.
반응형

이번 글에서는 B2B SaaS 제품을 만들면서 발생한 동시성 제어 이슈를 트러블 슈팅한 이야기를 정리해보려한다.
 

Background

  • 도메인 서비스들은 MSA 환경으로 이루어져있다.
  • 스케줄(schedule), 수납(purchase), 시술권(ticket)이라는 Bounded Context가 존재한다.
  • 각 B.C간 의존 관계는 다음과 같다.
    • 스케줄 -> 수납
    • 스케줄 -> 시술권
    • 시술권 -> 수납
  • 수납을 하면, "수납됨" 이벤트에 속한 구매 항목 기반으로 시술권 그룹이 생성된다.
    • (Purchase B.C.) Purchased Event Publish
    • (Ticket B.C.) Purchased Event Consume -> Handle Event -> Process Event -> Create Ticket
  • 생성된 시술권은 스케줄에 등록하여 여러장을 사용/ 취소 처리할 수 있다. (이 프로세스에서 동시성 문제 발생❗)
    • (Schedule B.C.) UseTicketCommand or CancelTicketCommand -> TicketChanged Event Publish
      • 스케줄에 추가한 시술을 사용 or 취소 처리할 수 있다.
      • sqs fifo로 명령 메세지를 전달하고있다.
    • (Ticket B.C.) TicketChanged Event Consume -> Handle Event -> Process Event -> Change Ticket Status
  • 스케줄 애그리것
    • Schedule (Aggreagate)
      • Id
      • Tasks
        • title
        • ticketId
      • ...
  • 시술권 그룹 애그리것
    • TicketGroup (Aggregate)
      • Id
      • Tickets
        • ticketId
        • status (USED, CANCELED, AVAILABLE)
        • title
      • ...

 
 

Problem

위 배경에 작성한 "생성된 시술권은 스케줄에 등록하여 사용/ 취소 처리할 수 있다." 프로세스에서 동시성 문제가 발생했다.
더 정확히는, 스케줄 시술에 등록된 여러 시술권들을 사용 요청을 보냈을 때 동시성 문제가 발생했다.
현재 동시성 이슈가 있는 프로세스를 도식화해보면 다음과 같다.

 
위에서 Critical Section으로 표시해놓은 영역인 ChangeTicketStatusCommandExecutor에서 동시성 문제가 발생한다.
해당 명령 실행기 로직을 의사코드로 작성하면 이렇다.

class ChangeTicketStatusCommandExecutor(
	private val ticketRepository: ITicketRepository
) {

    fun execute(command: ChangeTicketStatusCommand) {
            when (command.status) {
                TicketStatus.USED -> ticketRepository.update(
                    tenantId,
                    command.ticketId // TicketGroup 애그리것의 하위 엔터티인 ticketId로 데이터 변경을 하고있음
                ) { ticket ->
                    ticket.copy(
                        status = TicketStatus.USED,
                        usedDateTimeUtc = OffsetDateTime.now(ZoneOffset.UTC),
                        updatedDateTimeUtc = OffsetDateTime.now(ZoneOffset.UTC)
                    )
                }

                //...

                else -> {
                    log.error { "Can`t handle ticket status as ${command.ticket.status}" }
                }
            }	
	}
}

interface ITicketRepository {
    fun update(tenantId: String, ticketId: String, modifier: (Ticket) -> Ticket)
}

 
위 코드에서 왜 동시성 문제가 발생할까?
 
위에 정의한 시술권 그룹 애그리것 정의부를 가져와보면,
시술권 그룹 애그리것

  • TicketGroup (Aggregate)
    • Id
    • Tickets
      • ticketId
      • status (USED, CANCELED, AVAILABLE)
      • title

시술권(ticket)은 시술권 그룹 애그리것의 엔터티이다.
결국 애그리것의 전역 식별자(ticket group id)를 이용하여 제어하는것이 아닌 자식 엔터티의 식별자(ticket id)를 통해 애그리것을 제어하면서 발생한 동시성 문제이다.
 
조금 더 구체적으로 정리해보자면,
TicketStatusChangedEvent는 ticketId를 groupId로 갖는 sqs fifo 형식이기에, 변경된 ticketId의 요청은 순서 보장이 될것이라 생각했다. 그러나 ticketId는 서로 다른 uuid를 갖기에, partition key가 달라서 처리 순서 보장이 되지 않고있었다.
이에 따라, message를 consume했을 때, "ticketId로 ticketGroup을 map 돌면서 ticket을 식별하고 update" 로직을 동시에 여러 스레드가 처리하면서 동시성 문제가 발생한 것이다.
 
이를 해결하기 위해선 몇 가지 방법들은 다음과 같았다.
1. 외부 B.C.에 ticketId가 아닌 ticketGroup 애그리것 루트의 전역 식별자(guid)를 전파한다.
현재 스케줄 B.C.에서는 ticketGroupId가 아닌 각 시술(task)별로 ticketId만 존재한다.
여기서 ticketGroup context를 전파하기 위해선, ticketId만 가지고있는 곳에 ticketGroupId 프로퍼티 추가 적용이 필요하다.
 
2. 시술권 B.C에서 내부적으로 ticketId로 제어하는게 아닌 ticketGroupId로 제어할 수 있도록 논리를 수정한다.
이때 1번 항목은 스케줄 B.C에 변경을 전파하는것보다는 2번 항목의 시술권 B.C 내부에서 이슈를 해결하는게 맞다고 판단했고, 해결은 Solution에 작성한 내용대로 진행했다.
 

Solution

To Be의 도식화로 설명을 시작해보려한다.
중요한 변경 사실은 루트 애그리거트의 하위 엔터티인 시술권이 루트 애그리거트의 변경사항에 침투하는 것을 Requester 구간을 한번 더 두어 처리한 부분이다. 이를 좀 더 구체적으로 적어보면 아래와 같다.
 
--
1. 스케줄 B.C.에게 시술권 상태 변경 요청을 보낸다
 
2. 시술권 B.C. 에서는 해당 시술권 ID로 시술권 그룹의 전역 식별자를 찾은 후 SQS FIFO의 GroupId를 해당 식별자로 설정하여 메세지를 요청한다.
 
3. 해당 이벤트를 소비하여 루트 애그리거트의 전역 식별자로 하위 엔터티인 시술권 상태를 변경한다.
--
 
위 프로세스를 정리해보면 해당 동시성 이슈는 하위 엔터티가 루트 애그리거트의 변경을 실행하는 과정에서 발생한 문제였고, 이를 루트 애그리거트의 전역 식별자로만 수정할 수 있도록 변경한 흐름이다.
 
아래는 To Be의 명령 실행기 의사코드이다.

class ChangeTicketStatusOfTicketGroupCommandExecutor(
    private val ticketGroupRepository: TicketGroupRepository
) {

    fun execute(tenantId: String, command: ChangeTicketStatusOfTicketGroupCommand) {
        when (command.ticket.status) {
            TicketStatus.USED -> ticketGroupRepository.update(
                tenantId,
                command.ticketGroupId // ticketGroup 애그리것의 전역 식별자 Id로 제어하도록 변경됨 !
            ) { ticketGroup ->
                ticketGroup.copy(
                    tickets = ticketGroup.tickets.map {
                        if (it.id == command.ticket.id)
                            it.copy(
                                status = TicketStatus.USED,
                                usedDateTimeUtc = OffsetDateTime.now(ZoneOffset.UTC),
                                updatedDateTimeUtc = OffsetDateTime.now(ZoneOffset.UTC)
                            )
                        else
                            it
                    }
                )
            }
            
			//...
            
            else -> {
                log.error { "Can`t handle ticket status as ${command.ticket.status}" }
                return
            }
        }
    }
}

이제 최종으로 위 명령 실행기가 성공되었을 때, 다음 큐에 있는 명령이 실행될것이다.
따라서, 여러 스레드가 ticketId로 ticketGroup의 ticket status를 변경하지않고, 순서에 맞추어 ticketGroupId로 변경하는 방식으로 변경되어 동시성 이슈가 해결된다.
 
(* 위 과정에서 ticketGroupId가 현재 SQS FIFO의 GroupId로 변경되었기에 메세지 처리의 순서 보장 가능하다.)
 

마무리

이번 글에서는 다수의 티켓 사용/ 취소 처리하면서 발생한 동시성 이슈에 대해 작성해보았는데, 이슈가 발생한 가장 큰 이유는 하위 엔터티가 루트 애그리것의 변경을 명령하면서 발생한것이 가장 큰 골자라고 생각한다. 
 
문제 해결과정에서 순서 보장과 파티셔닝 처리를 위해 SQS FIFO를 적극 사용하였는데, 다른 Redis의 분산락 또는 Database Lock을 이용하여 처리할 수 있었으나 추가적인 인프라 비용 또는 데이터베이스 비용을 들이는것보다는 이미 준비되어있는 인프라 비용을 사용하여 빠르게 해결하는게 낫다고 판단하였다.
 
당시 이슈가 발생했을 때 정말 예상치 못한 케이스라 문제 원인을 찾는것도 어려웠고 뜨거운 식은땀을 흘렸었다

댓글

💲 많이 본 글