Reactive DB · R2DBC & Reactive Mongo
한 줄 소개 — WebFlux 로 논블로킹을 만들어도 DB 접근이 블로킹(JDBC)이면 거기서 막힌다. 그래서 DB 까지 논블로킹으로 — R2DBC(PostgreSQL) 와 Reactive MongoDB 두 갈래로 직접 CRUD API 를 구현했다. 직접 만든 레포지토리 → github.com/taehyuklee/RxProgramming (Spring Cloud Gateway + WebFlux 리액티브 API). 이 글은 그 rxApis 패키지(mongoRxApi · postgreRxApi · multidbRxApi)의 설계를 정리한 것이다.
1 · 무엇을 만들었나
레포는 크게 두 덩어리다 — gatewayMsa(API 게이트웨이 → 마이크로서비스 라우팅)와 rxApis(WebFlux 기반 논블로킹 REST API). 이 글의 주제는 rxApis 의 리액티브 DB 다.
| 모듈 | DB | 핵심 |
|---|---|---|
postgreRxApi | R2DBC · PostgreSQL | 관계형 + 논블로킹. R2DBC 의 @Id 생성 규칙이 JPA 와 달라 주의 필요 |
mongoRxApi | Reactive MongoDB | 문서형 + 논블로킹. 연관 데이터를 임베딩 |
multidbRxApi | 둘 다 (yaml 선택) | auto-config 를 모두 끄고, yaml 설정으로 Postgres/Mongo 를 전환 |
2 · 두 드라이버 — 의존성과 설정값
// 의존성 (build.gradle)
// R2DBC + PostgreSQL
implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
runtimeOnly 'org.postgresql:r2dbc-postgresql'
// Reactive MongoDB
implementation 'org.springframework.boot:spring-boot-starter-data-mongodb-reactive'# application.yaml
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5432/postgres
username: postgres
password: ********
pool:
max-size: 100
data:
mongodb:
uri: mongodb://localhost:27017/
database: RxTest3 · DB 설정 (수동 Config)
각 드라이버를 Java Config 로 직접 구성했다. Reactive Mongo 는 AbstractReactiveMongoConfiguration 을 상속해 MongoClient·ReactiveMongoTemplate 을, R2DBC 는 ConnectionFactory 로부터 R2dbcEntityTemplate·DatabaseClient 를 만든다.
// Reactive MongoDB
@Configuration
@ConditionalOnProperty(value = "common.db", havingValue = "mongo") // yaml 로 켜고 끔
@EnableReactiveMongoRepositories(basePackages = "...person.repository")
public class RxMongoConfig extends AbstractReactiveMongoConfiguration {
@Override public MongoClient reactiveMongoClient() { return MongoClients.create(); }
@Override protected String getDatabaseName() { return "TestDB"; }
@Bean public ReactiveMongoTemplate reactiveMongoTemplate() {
return new ReactiveMongoTemplate(reactiveMongoClient(), getDatabaseName());
}
}
// R2DBC (PostgreSQL)
@Configuration
@ConditionalOnProperty(value = "common.db", havingValue = "rdb")
@EnableR2dbcRepositories(basePackages = "...person.repository")
public class R2dbcTemplate {
@Bean public R2dbcEntityTemplate r2dbcEntityTemplate(ConnectionFactory cf) {
return new R2dbcEntityTemplate(cf);
}
@Bean public DatabaseClient databaseClient(ConnectionFactory cf) {
return DatabaseClient.builder().connectionFactory(cf).build();
}
}4 · 멀티 DB 전환 (multidbRxApi)
하나의 코드베이스로 Postgres·Mongo 를 골라 쓰려면, 스프링이 자동으로 켜는 auto-config 를 전부 끄고 위의 Java Config 를 @ConditionalOnProperty(common.db) 로 한쪽만 활성화한다.
# application.yaml — 자동설정 제외 + 스위치
spring:
autoconfigure:
exclude:
- org...data.r2dbc.R2dbcAutoConfiguration
- org...data.r2dbc.R2dbcRepositoriesAutoConfiguration
- org...data.mongo.MongoReactiveDataAutoConfiguration
- org...data.mongo.MongoReactiveRepositoriesAutoConfiguration
common:
db: mongo # ← "mongo" 또는 "rdb" 로 전환5 · R2DBC 의 함정 ① — @Id (insert vs update)
R2DBC 는 JPA 와 @Id 생성·식별 원리가 다르다. JPA 영속성 컨텍스트가 없어서, ID 가 채워진 엔티티를 save() 할 때 이게 새 INSERT 인지 기존 UPDATE 인지 스스로 판단하지 못한다(직접 부여한 String id 면 더더욱). 그래서 Persistable<ID> 을 구현해 isNew() 로 직접 알려준다.
@Data @Accessors(chain = true)
@Table("Person") // R2DBC (관계형 테이블)
@Document // Reactive Mongo (문서) — 같은 엔티티를 양쪽에 매핑
public class Person extends AuditEntity implements Persistable {
@Transient private boolean isNew; // 영속 여부 플래그
@Id private String id;
// private String teamId; // mongo 면 불필요(임베딩), rdb 면 FK 로 주입
private String email;
private String name;
private Integer score;
@Override public boolean isNew() { return isNew; } // true → INSERT, false → UPDATE
} 6 · 관계 매핑의 차이 — 임베딩 vs FK
가장 까다로운 지점. Mongo 는 연관 데이터를 한 문서에 임베딩 하면 그만이지만, R2DBC 는 관계형이라 FK 로 잇고 따로 조회해 조립해야 한다 — 게다가 R2DBC 엔 JPA 의 @OneToMany 같은 연관관계 매핑이 없다. 그래서 JPA 의 @JoinColumn 을 흉내 낸 커스텀 애너테이션 @RxJoinColumn / @RxTransient 를 만들고, AOP 와 save 콜백으로 연관 데이터를 채워 넣게 했다.
@Data @Accessors(chain = true)
@Table("Team") @Document
public class Team extends AuditEntity implements Persistable {
@Transient private boolean isNew;
@Id private String id;
private String teamName;
@RxTransient // 컬럼/필드로 저장하지 않음
@RxJoinColumn(name = "team_id") // 직접 만든 "조인 컬럼" — FK 로 연결
private List teamMembers = new ArrayList<>();
@Override public boolean isNew() { return isNew; }
}
// 커스텀 애너테이션 정의
@Retention(RetentionPolicy.RUNTIME)
@Target({ FIELD, METHOD, ANNOTATION_TYPE })
public @interface RxJoinColumn { String name(); } 7 · Repository 와 반환 타입
리액티브 레포지토리는 JPA 의 PagingAndSortingRepository 대신 ReactiveSortingRepository(또는 ReactiveCrudRepository)를 상속한다. 핵심 차이는 반환값이 Mono/Flux 로 감싸진다 는 것 — 블로킹 없이 결과 스트림을 흘려보낸다. 같은 인터페이스로 R2DBC·Mongo 양쪽에서 동작한다.
@Repository
public interface PersonRepository extends ReactiveSortingRepository {
Mono deleteById(String id);
Mono findByEmail(String email); // 메서드 이름 → 쿼리 자동 생성
}
// 컨트롤러도 Mono/Flux 를 그대로 반환 (논블로킹)
@GetMapping("/persons")
public Flux findAll() { return personRepository.findAll(); }
@PostMapping("/person")
public Mono save(@RequestBody Person person) {
return personRepository.save(person.setNew(true)); // isNew=true → INSERT
} 배운 점 — "리액티브"는 단순히 WebFlux 를 쓰는 게 아니라 요청 → 비즈니스 → DB 전 구간이 논블로킹이어야 의미가 있다. R2DBC 는 JPA 의 편의(영속성 컨텍스트·연관관계·@Id 자동 판별)가 없어 직접 메워야 할 부분이 많았고, 그 과정에서 Persistable·커스텀 애너테이션·AOP·save 콜백까지 만들어 보며 Spring Data 의 추상화 동작을 깊이 이해할 수 있었다.
8 · 게이트웨이 필터에서 리액티브 DB 조회
DB 설정만 따로 떼어 정리한 companion 레포 → github.com/taehyuklee/ReactiveDataBase (MultiDB · ReactiveMongoDB · R2dbc 모듈). 여기서 한 발 더 나아가, Spring Cloud Gateway 필터 안에서 리액티브 DB 를 조회 하는 것도 실험했다 — AbstractGatewayFilterFactory 를 만들어, 라우팅 체인이 끝난 뒤 Mono.defer 로 DB 를 논블로킹 조회 해 응답 헤더에 실어 보낸다.
@Component @Slf4j
public class DbFetchGatewayFilterFactory
extends AbstractGatewayFilterFactory<DbFetchGatewayFilterFactory.Config> {
private final PersonRepository personRepository; // ReactiveCrudRepository
public DbFetchGatewayFilterFactory(PersonRepository personRepository) {
super(Config.class);
this.personRepository = personRepository;
}
@Override
public GatewayFilter apply(Config config) {
return (exchange, chain) ->
chain.filter(exchange).then(Mono.defer(() -> // 라우팅이 끝난 뒤
personRepository.findAll() // 논블로킹 DB 조회 (Flux)
.doOnNext(p -> exchange.getResponse().getHeaders()
.add(p.getFirstName(), p.getLastName()))
.doOnComplete(() -> log.info("end thread"))
.then() // Mono<Void> 로 마무리
));
}
public static class Config {}
}인터페이스 관점 — 결국 핵심은 "JPA(블로킹) 인터페이스 ↔ Reactive(논블로킹) 인터페이스"의 대응이다. JpaRepository(블로킹)와 ReactiveCrudRepository/ReactiveSortingRepository(논블로킹)는 거의 같은 메서드 모양을 가지되, 후자는 결과를 Mono/Flux 로 돌려준다 — 그래서 게이트웨이 필터·서비스·DB 까지 하나의 리액티브 체인으로 이어 붙일 수 있다.
📓 2023년 제가 리액티브 프로그래밍을 공부하며 직접 구현한 노트입니다 · 구현 GitHub · RxProgramming ↗ · DB 설정 ReactiveDataBase ↗ · 정리 Notion ↗