(KO) PostgreSQL 논리 복제 — 구독자 Apply, Launcher, Table Sync
목차:
- 이론적 배경
- DBMS 공통 설계 패턴
- PostgreSQL의 접근 방식
- 소스 워크스루
- 소스 검증 (2026-06-05 기준)
- PostgreSQL 너머 — 비교 설계와 연구 전선
- 출처
이론적 배경
섹션 제목: “이론적 배경”논리 복제는 물리 스트리밍 복제와 다른 문제를 푼다. 물리 복제(postgres-wal-sender-receiver.md 참고)는 날(raw) WAL 바이트를 그대로 스탠바이에 전달한다. 스탠바이는 프라이머리 스토리지의 바이트 단위 복제본으로, 같은 페이지 이미지를 재생하므로 스키마를 달리할 수 없고 일부 테이블만 복제하거나 쓰기를 받을 수 없다. 논리 복제는 대신 행 수준 변경 이벤트를 전달한다. “public.orders 테이블에 튜플 T가 삽입되었다”는 식으로 WAL을 디코딩하고 구독자의 executor로 재적용한다. 구독자는 퍼블리셔의 테이블 내용을 최신 상태로 유지하는 완전히 독립적인 데이터베이스다.
이 방식은 물리 복제로 불가능한 기능을 열어 준다. 메이저 버전 간 복제, 특정 테이블만 묶은 publication의 복제, 여러 퍼블리셔를 하나의 구독자로 통합, 로컬 컬럼이나 인덱스가 추가된 테이블로의 복제, 구독자를 쓰기 가능 상태로 유지하는 것 등이 모두 가능하다. 대신 구독자는 변경을 해석해야 한다. 원격 관계를 로컬 관계로 매핑하고, 원격 컬럼 값을 로컬 타입으로 변환하고, 레플리카 아이덴티티(replica identity)로 갱신·삭제 대상 행을 찾고, 로컬 행의 상태가 변경이 전제하는 상태와 다를 때(충돌) 처리 방법을 결정해야 한다.
Petrov의 Database Internals(11장 “Replication and Consistency”)는 설계 공간을 논리 변경 레코드가 리더에서 팔로워로 흐르는 파이프라인으로 규정한다. 이 파이프라인은 네 단계이며, PostgreSQL은 각 단계를 양쪽 프로세스에 나눠 배치한다.
- 캡처 / 디코딩 — 내구성 로그를 논리 변경 레코드로 변환한다. 퍼블리셔에서는 논리 디코딩과 pgoutput 플러그인이 담당한다(
postgres-logical-decoding.md,postgres-pgoutput.md참고). walsender가 결과를 스트리밍한다. - 전송 — 레코드를 커밋 순서로, 정확히 한 번 영속 연결로 전달한다. PostgreSQL은 물리 복제와 동일한 CopyBoth libpq 전송을 재사용한다.
- Apply — 각 레코드를 팔로워의 로컬 테이블에 트랜잭션 단위로 재실행하며, 크래시 후 재개할 수 있도록 스트림의 내구적 적용 위치를 기록한다.
- 충돌 해결 — 팔로워의 행이 변경의 전제와 맞지 않는 경우(이미 삭제됨, 다른 오리진이 수정함, 유니크 키 중복)를 처리한다.
이 문서는 3단계와 4단계, 즉 구독자를 다룬다. 구독자는 이론이 종종 넘어가는 부트스트랩 문제도 풀어야 한다. 신규 구독은 테이블이 비어 있는 상태로 시작하지만, 변경 스트림은 지정 시점 이후의 델타만 전달한다. 기존 행의 초기 스냅샷 복사를 수행하고, 그 복사본을 라이브 변경 스트림에 손실이나 중복 없이 이어 붙여야 한다. 이 연결 — 테이블 동기화 문제 — 이 구독자에서 가장 미묘한 부분이며, 테이블별 전용 워커가 처리한다.
구독자 구현자가 선택하는 설계 공간은 다음과 같다.
- 단일 apply 프로세스냐 다수냐 — 하나의 프로세스가 모든 구독을 직렬 처리하는가, 아니면 각 구독이 전용 워커를 갖는가. PostgreSQL은 구독당 리더 apply 워커 하나를 사용한다(대형 스트리밍 트랜잭션을 위한 병렬 apply 워커 옵션 별도 존재).
- 기존 데이터 부트스트랩 방법 — 전체를 잠그고 복사하는 방식, 아니면 라이브 스트림과 병렬로 테이블별 복사. PostgreSQL은 tablesync 워커로 테이블별 병렬 복사를 수행하고 각 테이블을 독립적으로 메인 스트림에 합류시킨다.
- 수정 대상 행을 찾는 방법 — 기본 키, 설정된 레플리카 아이덴티티, 또는 전체 튜플 스캔. PostgreSQL은 레플리카 아이덴티티(기본값: PK)를 사용하며 사용 가능한 인덱스가 없을 때 순차 스캔으로 폴백한다.
- 충돌 처리 방법 — 에러 후 중단, 건너뜀, last-writer-wins 적용. PostgreSQL(REL_18)은 타입화된 분류 체계로 충돌을 감지하고 기록하며, 변경의 자연스러운 결과를 그대로 진행한다(
update_missing은 단순 건너뜀).
DBMS 공통 설계 패턴
섹션 제목: “DBMS 공통 설계 패턴”논리 / 행 기반 복제는 MySQL 행 기반 binlog 복제, Oracle GoldenGate, SQL Server 트랜잭셔널 복제, Debezium 스타일 CDC 파이프라인 등 모든 성숙한 SQL 엔진에서 볼 수 있다. 이들은 공통적인 엔지니어링 관례로 수렴한다. 그 패턴을 먼저 명명해 두면 PostgreSQL의 구체적인 심볼이 공유된 플레이북 안의 특정 선택으로 읽힌다.
복제 스트림당 하나의 applier를 생성하는 슈퍼바이저
섹션 제목: “복제 스트림당 하나의 applier를 생성하는 슈퍼바이저”행 기반 복제 시스템은 누가 applier를 실행할지 결정하는 주체와 applier 자체를 분리한다. 오래 실행되는 슈퍼바이저가 설정 카탈로그를 감시하며 워커 프로세스를 기동·재시작하고, 각 워커는 하나의 업스트림 연결을 전담한다. 슈퍼바이저는 재시작 속도를 제한해 오류 메시지에서 계속 죽는 워커가 재시작 폭풍으로 머신을 소모하지 않도록 한다. PostgreSQL의 슈퍼바이저는 논리 복제 런처이며, 속도 제한은 wal_retrieve_retry_interval이다.
원격 스키마와 로컬 스키마를 연결하는 관계 맵
섹션 제목: “원격 스키마와 로컬 스키마를 연결하는 관계 맵”구독자의 스키마는 독립적이므로, 모든 applier는 관계 맵을 유지한다. 한쪽에는 원격 관계 id와 컬럼 목록, 다른 쪽에는 로컬 테이블의 OID, 튜플 디스크립터, 속성 번호 재매핑이 있다. 캐시 무효화 시 지연 재구성되므로 로컬 ALTER TABLE이 워커 재시작 없이 반영된다. PostgreSQL의 맵은 LogicalRepRelMapEntry의 LogicalRepRelMap 해시다.
레플리카 아이덴티티: 대상 행을 찾는 방법
섹션 제목: “레플리카 아이덴티티: 대상 행을 찾는 방법”UPDATE와 DELETE는 “이 키 컬럼을 가진 행이 이제 이 새 값을 가진다”는 형태로 도착한다. applier는 그 행을 로컬에서 찾아야 한다. 보편적인 관례는 레플리카 아이덴티티 — 검색에 사용할 지정된 컬럼 집합(기본값: 기본 키) — 다. 로컬 레플리카 아이덴티티가 퍼블리셔가 보내는 것보다 약하면 갱신과 삭제를 매칭할 수 없어 거부된다.
적용된 스트림에 대한 내구적 커서
섹션 제목: “적용된 스트림에 대한 내구적 커서”applier는 내구적으로 적용된 위치를 기록해, 크래시 후 정확한 LSN에서 재개할 수 있어야 한다. 커밋된 작업을 재적용하지도 않고 미커밋 작업을 건너뛰지도 않아야 한다. 관례는 로컬 커밋과 원자적으로 전진하는 스트림별 진행 레코드다. PostgreSQL은 이를 복제 오리진으로 구현한다(postgres-replication-slots.md 참고).
초기 스냅샷 후 따라잡기, 객체별 연결
섹션 제목: “초기 스냅샷 후 따라잡기, 객체별 연결”빈 테이블에서 복제를 시작하려면 기존 행의 일관된 스냅샷을 찍고 복사한 뒤, 스냅샷 위치부터 변경을 적용해야 한다. 객체별로 병렬 수행하는 것이 확장 가능한 선택이며, 이를 위해 객체별 상태 기계가 필요하다. PostgreSQL의 테이블별 워커는 pg_subscription_rel 상태 기계 INIT → DATASYNC → FINISHEDCOPY → SYNCWAIT → CATCHUP → SYNCDONE → READY를 따른다.
타입화된 분류 체계를 갖춘 충돌 감지
섹션 제목: “타입화된 분류 체계를 갖춘 충돌 감지”로컬 행이 전제된 상태가 아닐 때 시스템은 상황을 분류해야 한다. 행이 이미 존재함(INSERT), 행 없음(UPDATE/DELETE), 다른 오리진이 수정함, 유니크 키 중복. 타입화된 분류 체계는 운영자가 발산을 모니터링하고 추론할 수 있게 한다. PostgreSQL의 분류 체계는 ConflictType 열거형(CT_INSERT_EXISTS, CT_UPDATE_ORIGIN_DIFFERS, CT_UPDATE_MISSING, CT_DELETE_MISSING 등)이다.
이론 ↔ PostgreSQL 대응표
섹션 제목: “이론 ↔ PostgreSQL 대응표”| 개념 | PostgreSQL 이름 |
|---|---|
| 복제 슈퍼바이저 | 논리 복제 런처 (ApplyLauncherMain) |
| 스트림별 applier | 리더 apply 워커 (ApplyWorkerMain) |
| 워커 레지스트리(공유 메모리) | LogicalRepCtx->workers[] (LogicalRepWorker) |
| 워커 종류 | LogicalRepWorkerType (WORKERTYPE_APPLY / WORKERTYPE_TABLESYNC / WORKERTYPE_PARALLEL_APPLY) |
| 재시작 속도 제한 | wal_retrieve_retry_interval + 구독별 last-start DSA 해시 |
| 수신-디스패치 루프 | LogicalRepApplyLoop + apply_dispatch |
| 메시지별 핸들러 | apply_handle_* (begin, commit, insert, update, delete, relation, truncate, stream_*, prepare) |
| 관계 맵 | LogicalRepRelMap / LogicalRepRelMapEntry (logicalrep_rel_open) |
| 레플리카 아이덴티티 검색 | GetRelationIdentityOrPK + logicalrep_rel_mark_updatable |
| 내구적 apply 커서 | 복제 오리진 (replorigin_session_*) |
| 업스트림 피드백 | send_feedback (get_flush_position으로 게이팅) |
| 초기 복사 워커 | tablesync 워커 (TablesyncWorkerMain) |
| 테이블별 복사 | copy_table + fetch_remote_table_info (COPY ... TO STDOUT) |
| 테이블별 상태 기계 | pg_subscription_rel.srsubstate (SUBREL_STATE_*) |
| 충돌 분류 체계 | ConflictType 열거형; ReportApplyConflict |
PostgreSQL의 접근 방식
섹션 제목: “PostgreSQL의 접근 방식”세 종류의 프로세스, 하나의 공유 레지스트리
섹션 제목: “세 종류의 프로세스, 하나의 공유 레지스트리”구독자 쪽 모든 구조는 워커 슬롯 배열을 담은 단일 공유 메모리 제어 블록에서 출발한다. 런처가 슬롯을 채우고, apply 워커와 tablesync 워커가 슬롯에 연결(attach)한다. 워커 슬롯은 LogicalRepWorker 구조체다.
// LogicalRepWorker (struct) — src/include/replication/worker_internal.htypedef struct LogicalRepWorker{ LogicalRepWorkerType type; /* APPLY / TABLESYNC / PARALLEL_APPLY */ TimestampTz launch_time; bool in_use; /* is this slot used or free? */ uint16 generation; /* bumped each time slot is reused */ PGPROC *proc; Oid dbid; Oid userid; Oid subid; /* subscription this worker serves */ Oid relid; /* table, for tablesync workers only */ char relstate; /* SUBREL_STATE_* (tablesync) */ XLogRecPtr relstate_lsn; slock_t relmutex; FileSet *stream_fileset; /* spill files for streamed xacts */ pid_t leader_pid; /* set for parallel apply workers */ bool parallel_apply; XLogRecPtr last_lsn; /* stats */ XLogRecPtr reply_lsn; /* ... timing fields ... */} LogicalRepWorker;워커 종류는 작은 열거형으로 표현한다. 세 값이 백그라운드 워커가 실행하는 진입 함수와 슬롯 해석 방식을 결정한다.
// LogicalRepWorkerType — src/include/replication/worker_internal.htypedef enum LogicalRepWorkerType{ WORKERTYPE_UNKNOWN = 0, WORKERTYPE_TABLESYNC, WORKERTYPE_APPLY, /* the per-subscription leader */ WORKERTYPE_PARALLEL_APPLY, /* helper for large streamed xacts */} LogicalRepWorkerType;MyLogicalRepWorker가 현재 프로세스의 슬롯을 가리키며, am_leader_apply_worker(), am_tablesync_worker(), am_parallel_apply_worker() 술어가 MyLogicalRepWorker->type을 기반으로 분기한다.
런처: 폴링 슈퍼바이저
섹션 제목: “런처: 폴링 슈퍼바이저”런처는 서버 시작 시 ApplyLauncherRegister로 postmaster에 백그라운드 워커로 등록되므로, 죽으면 자동 재시작된다. 본문은 폴 루프다. 매 주기에 활성화된 구독 목록을 읽고, 실행 중인 apply 워커가 없는 구독은 구독별 재시작 스로틀을 적용한 뒤 워커를 기동한다.
// ApplyLauncherMain — src/backend/replication/logical/launcher.cfor (;;){ long wait_time = DEFAULT_NAPTIME_PER_CYCLE; /* 3 min */ sublist = get_subscription_list(); foreach(lc, sublist) { Subscription *sub = (Subscription *) lfirst(lc); if (!sub->enabled) continue;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); w = logicalrep_worker_find(sub->oid, InvalidOid, false); LWLockRelease(LogicalRepWorkerLock); if (w != NULL) continue; /* worker is running already */
/* Throttle: at most one start per wal_retrieve_retry_interval. */ last_start = ApplyLauncherGetWorkerStartTime(sub->oid); now = GetCurrentTimestamp(); if (last_start == 0 || (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) { ApplyLauncherSetWorkerStartTime(sub->oid, now); logicalrep_worker_launch(WORKERTYPE_APPLY, sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid, DSM_HANDLE_INVALID); } else wait_time = Min(wait_time, wal_retrieve_retry_interval - elapsed); } /* sleep on the latch until the next cycle or an explicit wakeup */ rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, wait_time, WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);}last-start 시각은 고정 워커 배열이 아닌 DSA 기반 동적 해시(logicalrep_launcher_attach_dshmem)에 저장된다. 워커가 죽어도 값이 살아 있어야 하고 다른 백엔드에서도 보여야 하기 때문이다. 핵심 설계 포인트가 있다. apply 워커가 오류로 종료하면 런처는 다음 주기에 빈 슬롯을 확인하지만, 스로틀이 재시작 주기를 wal_retrieve_retry_interval로 유지해 지속적으로 실패하는 구독이 스핀하지 않는다. 재시작을 기대하는 백엔드(ALTER SUBSCRIPTION 이후 등)는 last-start 항목을 명시적으로 삭제해 워커를 즉시 재시작할 수 있다.
logicalrep_worker_launch는 빈 슬롯을 찾아 채우고 ApplyWorkerMain(또는 TablesyncWorkerMain)을 가리키는 동적 백그라운드 워커를 등록한 뒤, 새 워커가 proc 포인터를 설정하거나 postmaster가 시작 불가를 보고할 때까지 WaitForReplicationWorkerAttach에서 블록한다. 이 연결 핸드셰이크는 런처가 슬롯 사용 중으로 잘못 판단하는 경쟁 조건을 닫는다.
flowchart TD
PM["postmaster"] -->|시작 시 등록| L["ApplyLauncherMain<br/>(background worker)"]
L -->|"get_subscription_list() 폴<br/>매 3분 또는 웨이크업 시"| L
L -->|"실행 중 워커 없는<br/>활성화된 구독마다"| TH{"last_start가<br/>wal_retrieve_retry_interval<br/>보다 오래됐는가?"}
TH -->|아니오| WAIT["wait_time 단축<br/>다음 주기에 재시도"]
TH -->|예| LAUNCH["logicalrep_worker_launch<br/>WORKERTYPE_APPLY"]
LAUNCH --> SLOT["LogicalRepWorkerLock 하에<br/>LogicalRepWorker 슬롯 확보"]
SLOT --> BGW["RegisterDynamicBackgroundWorker<br/>bgw_function = ApplyWorkerMain"]
BGW --> ATT["WaitForReplicationWorkerAttach<br/>자식이 proc 설정까지 대기"]
ATT --> AW["리더 apply 워커 실행 중"]
AW -.->|"초기 복사 필요"| TS["tablesync 워커들<br/>TablesyncWorkerMain"]
리더 apply 워커: 접속, 오리진 설정, 루프
섹션 제목: “리더 apply 워커: 접속, 오리진 설정, 루프”ApplyWorkerMain이 진입점이다. 공유 설정(SetupApplyOrSyncWorker → InitializeLogRepWorker)을 실행한 뒤 run_apply_worker를 호출한다. 공유 설정이 apply 워커를 안전하게 만드는 핵심이다. session_replication_role = replica로 설정해 applier가 REPLICA 활성화된 것을 제외한 일반 트리거와 규칙을 실행하지 않도록 하고, 구독 소유자로 워커 데이터베이스에 접속하며, search_path를 비우고, 구독 행을 영속 메모리 컨텍스트에 로드한다.
// InitializeLogRepWorker — src/backend/replication/logical/worker.c (condensed)SetConfigOption("session_replication_role", "replica", PGC_SUSET, PGC_S_OVERRIDE);BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid, MyLogicalRepWorker->userid, 0);SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);/* Lock + re-read the subscription so a concurrent DROP is caught. */LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0, AccessShareLock);MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);/* ... bail out if subscription was removed or disabled during startup ... */CacheRegisterSyscacheCallback(SUBSCRIPTIONOID, subscription_change_cb, 0);before_shmem_exit(replorigin_reset, (Datum) 0);run_apply_worker는 복제 오리진 — 내구적 커서 — 를 설정하고, 퍼블리셔에 접속해 오리진에서 재개 LSN을 가져온다.
// run_apply_worker — src/backend/replication/logical/worker.c (condensed)ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, originname, sizeof(originname));StartTransactionCommand();originid = replorigin_by_name(originname, true);if (!OidIsValid(originid)) originid = replorigin_create(originname);replorigin_session_setup(originid, 0);replorigin_session_origin = originid;origin_startpos = replorigin_session_get_progress(false); /* resume LSN */CommitTransactionCommand();
LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, true, must_use_password, MySubscription->name, &err);set_stream_options(&options, slotname, &origin_startpos);/* two_phase stays PENDING until all tablesyncs are READY */if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && AllTablesyncsReady()) options.proto.logical.twophase = true;walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);start_apply(origin_startpos); /* -> LogicalRepApplyLoop */오리진이 정확성의 핵심이다. 모든 apply 커밋 트랜잭션은 데이터 변경과 같은 트랜잭션 안에서 오리진을 원격 커밋 LSN으로 전진시킨다. 크래시가 발생하면 오리진과 데이터가 일관된 상태로 남는다. apply가 트랜잭션 중간에 오류를 내면 start_apply의 PG_CATCH가 replorigin_reset(0, 0)을 호출해 오리진이 실패한 트랜잭션 너머로 전진하지 않는다. 그렇지 않으면 퍼블리셔는 해당 변경을 다시 보내지 않아 묵묵히 손실된다.
Apply 루프와 디스패치 테이블
섹션 제목: “Apply 루프와 디스패치 테이블”LogicalRepApplyLoop는 워커의 정상 상태 핵심이다. libpq 연결을 블록하며 버퍼를 수신하고, 각 메시지의 첫 바이트를 들여다본다. 'w'는 디코딩된 논리 복제 메시지를 담은 WAL 데이터(CopyData) 래퍼이고, 'k'는 keepalive다. 논리 메시지 자체는 apply_dispatch로 전달된다.
// LogicalRepApplyLoop — src/backend/replication/logical/worker.c (condensed)for (;;){ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); /* ... process all buffered messages without blocking ... */ c = pq_getmsgbyte(&s); if (c == 'w') { start_lsn = pq_getmsgint64(&s); end_lsn = pq_getmsgint64(&s); send_time = pq_getmsgint64(&s); if (last_received < end_lsn) last_received = end_lsn; UpdateWorkerStats(last_received, send_time, false); apply_dispatch(&s); /* the decoded logical message */ } else if (c == 'k') { /* keepalive: maybe reply, maybe ping */ } /* ... compute flush position, send feedback, wait on latch+socket ... */}apply_dispatch는 메시지 타입 바이트를 읽어 apply_handle_* 함수 중 하나로 라우팅한다. 전체 집합이 apply 쪽 논리 복제 와이어 프로토콜을 정의한다.
// apply_dispatch — src/backend/replication/logical/worker.c (condensed)LogicalRepMsgType action = pq_getmsgbyte(s);switch (action){ case LOGICAL_REP_MSG_BEGIN: apply_handle_begin(s); break; case LOGICAL_REP_MSG_COMMIT: apply_handle_commit(s); break; case LOGICAL_REP_MSG_INSERT: apply_handle_insert(s); break; case LOGICAL_REP_MSG_UPDATE: apply_handle_update(s); break; case LOGICAL_REP_MSG_DELETE: apply_handle_delete(s); break; case LOGICAL_REP_MSG_TRUNCATE: apply_handle_truncate(s); break; case LOGICAL_REP_MSG_RELATION: apply_handle_relation(s); break; case LOGICAL_REP_MSG_TYPE: apply_handle_type(s); break; case LOGICAL_REP_MSG_ORIGIN: apply_handle_origin(s); break; case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); break; case LOGICAL_REP_MSG_STREAM_STOP: apply_handle_stream_stop(s); break; case LOGICAL_REP_MSG_STREAM_ABORT: apply_handle_stream_abort(s); break; case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); break; case LOGICAL_REP_MSG_BEGIN_PREPARE: apply_handle_begin_prepare(s); break; case LOGICAL_REP_MSG_PREPARE: apply_handle_prepare(s); break; case LOGICAL_REP_MSG_COMMIT_PREPARED: apply_handle_commit_prepared(s); break; case LOGICAL_REP_MSG_ROLLBACK_PREPARED: apply_handle_rollback_prepared(s); break; case LOGICAL_REP_MSG_STREAM_PREPARE: apply_handle_stream_prepare(s); break; /* default: ERROR protocol violation */}논리 트랜잭션은 BEGIN, RELATION / TYPE 메타데이터 메시지와 INSERT/UPDATE/DELETE/TRUNCATE 행 변경의 연속, 그리고 COMMIT으로 마무리된다. RELATION 메시지는 해당 관계를 참조하는 변경보다 먼저 도착해야 한다. 행을 매핑하기 전에 원격 관계의 스키마를 알아야 하기 때문이다.
행 변경 Apply
섹션 제목: “행 변경 Apply”apply_handle_begin은 원격 최종 LSN을 기록하고 in_remote_transaction을 설정한다. apply_handle_commit은 커밋 LSN을 검증하고, 로컬 트랜잭션을 커밋하며(오리진 전진), process_syncing_tables를 호출해 tablesync 핸드오프를 진행한다. 흥미로운 작업은 행 핸들러에 있다. apply_handle_insert가 대표적이다.
// apply_handle_insert — src/backend/replication/logical/worker.c (condensed)if (is_skipping_changes() || handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) return;begin_replication_step();relid = logicalrep_read_insert(s, &newtup);rel = logicalrep_rel_open(relid, RowExclusiveLock); /* map remote->local */if (!should_apply_changes_for_rel(rel)) /* tablesync filtering */{ logicalrep_rel_close(rel, RowExclusiveLock); end_replication_step(); return;}/* Run user code (defaults, indexes) as the table owner unless opted out. */if (!MySubscription->runasowner) SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);edata = create_edata_for_relation(rel);remoteslot = ExecInitExtraTupleSlot(edata->estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual);slot_store_data(remoteslot, rel, &newtup); /* coerce remote values */slot_fill_defaults(rel, edata->estate, remoteslot);if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) apply_handle_tuple_routing(edata, remoteslot, NULL, CMD_INSERT);else{ ExecOpenIndices(edata->targetRelInfo, false); apply_handle_insert_internal(edata, edata->targetRelInfo, remoteslot); ExecCloseIndices(edata->targetRelInfo);}finish_edata(edata);logicalrep_rel_close(rel, NoLock);end_replication_step();두 가지 설계 선택이 눈에 띈다. 첫째, should_apply_changes_for_rel이 테이블별 필터링을 구현한다. 테이블이 아직 tablesync 워커에서 동기화 중이면 리더 apply 워커는 해당 변경을 건너뛴다. tablesync 워커가 핸드오프 때까지 소유권을 갖는다. 둘째, 인덱스 유지, 기본 표현식 등 사용자 가시적 부작용은 구독에 runasowner가 설정되지 않은 한 SwitchToUntrustedUser로 테이블 소유자 권한에서 실행된다. 악의적인 테이블 소유자가 (종종 슈퍼유저에 가까운) 구독 소유자로 코드를 실행하는 권한 상승 취약점을 막기 위해서다.
행 찾기: 레플리카 아이덴티티와 충돌 감지
섹션 제목: “행 찾기: 레플리카 아이덴티티와 충돌 감지”UPDATE와 DELETE에서 구독자는 로컬 행을 찾아야 한다. 레플리카 아이덴티티가 그 역할을 한다. apply_handle_update_internal은 변경에서 remoteslot을 만들고, FindReplTupleInLocalRel로 매칭되는 로컬 튜플을 찾는다. 사용 가능한 인덱스가 있으면 인덱스 스캔, 없으면 순차 스캔이다.
// FindReplTupleInLocalRel — src/backend/replication/logical/worker.c (condensed)*localslot = table_slot_create(localrel, &estate->es_tupleTable);Assert(OidIsValid(localidxoid) || (remoterel->replident == REPLICA_IDENTITY_FULL));if (OidIsValid(localidxoid)) found = RelationFindReplTupleByIndex(localrel, localidxoid, LockTupleExclusive, remoteslot, *localslot);else found = RelationFindReplTupleSeq(localrel, LockTupleExclusive, remoteslot, *localslot);return found;localidxoid는 GetRelationIdentityOrPK에서 온다. 명시적으로 설정된 REPLICA IDENTITY 인덱스를 우선하고 기본 키로 폴백한다. 둘 다 없으면 게시된 테이블이 REPLICA IDENTITY FULL이어야 하며, 구독자는 컬럼 호환 비고유 인덱스(FindUsableIndexForReplicaIdentityFull)나 그것도 없으면 전체 튜플을 비교하는 순차 스캔을 쓴다.
// GetRelationIdentityOrPK — src/backend/replication/logical/relation.cidxoid = RelationGetReplicaIndex(rel);if (!OidIsValid(idxoid)) idxoid = RelationGetPrimaryKeyIndex(rel, false);return idxoid;관계의 갱신 가능 여부는 relmap 항목을 (재)구성할 때 logicalrep_rel_mark_updatable로 한 번 결정한다. 로컬 레플리카 아이덴티티(또는 PK)의 키 컬럼을 순회하며, 아이덴티티 컬럼이 원격 게시 키 집합에 없으면 entry->updatable을 해제한다. 이렇게 하면 로컬 아이덴티티가 퍼블리셔가 보내는 것보다 강한 관계에 대한 UPDATE/DELETE가 잘못된 행을 묵묵히 매칭하는 대신 명확한 오류로 조기 거부된다.
// logicalrep_rel_mark_updatable — src/backend/replication/logical/relation.c (condensed)entry->updatable = true;idkey = RelationGetIndexAttrBitmap(entry->localrel, INDEX_ATTR_BITMAP_IDENTITY_KEY);if (idkey == NULL) /* no replica identity -> try PK */{ idkey = RelationGetIndexAttrBitmap(entry->localrel, INDEX_ATTR_BITMAP_PRIMARY_KEY); if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL) entry->updatable = false;}while ((i = bms_next_member(idkey, i)) >= 0){ int attnum = AttrNumberGetAttrOffset(i + FirstLowInvalidHeapAttributeNumber); if (entry->attrmap->attnums[attnum] < 0 || !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys)) { entry->updatable = false; /* identity col not in remote key */ break; }}로컬 행을 찾은 뒤 applier는 무조건 덮어쓰지 않는다. apply_handle_update_internal은 먼저 GetTupleTransactionInfo로 로컬 튜플의 커밋 메타데이터를 검사한다. 행이 이 apply 워커의 세션 오리진과 다른 오리진에서 마지막으로 쓰였다면 발산이다. 두 업스트림(또는 로컬 쓰기와 업스트림)이 같은 행을 건드린 것이다. PostgreSQL 18은 갱신을 진행하기 전에 이를 CT_UPDATE_ORIGIN_DIFFERS로 보고한다.
// apply_handle_update_internal — src/backend/replication/logical/worker.c (condensed)found = FindReplTupleInLocalRel(edata, localrel, &relmapentry->remoterel, localindexoid, remoteslot, &localslot);if (found){ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, &conflicttuple.origin, &conflicttuple.ts) && conflicttuple.origin != replorigin_session_origin) { conflicttuple.slot = localslot; ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, remoteslot, newslot, list_make1(&conflicttuple)); } slot_modify_data(remoteslot, localslot, relmapentry, newtup); InitConflictIndexes(relinfo); ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot, remoteslot);}else /* The tuple to be updated could not be found — log and skip. */ ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING, remoteslot, newslot, list_make1(&conflicttuple));충돌 분류 체계는 작은 타입화된 열거형으로, 운영자가 모니터링할 수 있도록 모든 발산에 이름을 부여한다(PgStat_StatSubEntry::conflict_count로 집계된다).
// ConflictType — src/include/replication/conflict.htypedef enum{ CT_INSERT_EXISTS, /* INSERT violates a unique constraint */ CT_UPDATE_ORIGIN_DIFFERS, /* row to update was written by another origin */ CT_UPDATE_EXISTS, /* updated row violates a unique constraint */ CT_UPDATE_MISSING, /* row to update is gone */ CT_DELETE_ORIGIN_DIFFERS, /* row to delete was written by another origin */ CT_DELETE_MISSING, /* row to delete is gone */ CT_MULTIPLE_UNIQUE_CONFLICTS,/* violates several unique constraints */} ConflictType;ReportApplyConflict는 튜플별 errdetail(충돌 로컬 튜플의 오리진, xmin, 커밋 타임스탬프)를 포맷하고, pgstat_report_subscription_conflict로 구독의 충돌 카운터를 증가시키며 요청 레벨로 기록한다. 중요한 점은 REL_18이 감지하고 기록한다는 것이다. *_MISSING이나 *_ORIGIN_DIFFERS 충돌에서 스트림을 중단하지 않는다. 변경은 자연스러운 결과로 진행된다(update_missing은 단순 건너뜀, update_origin_differs는 새 원격 값으로 적용). CT_INSERT_EXISTS처럼 유니크 위반을 일으키는 INSERT는 안전한 자동 해결 방법이 없으므로 오류를 내고 워커를 중단한다.
flowchart TD
M["apply_dispatch가<br/>UPDATE/DELETE 메시지 수신"] --> OPEN["logicalrep_rel_open<br/>원격 rel → 로컬 매핑"]
OPEN --> IDX["GetRelationIdentityOrPK<br/>아이덴티티/PK 인덱스 선택"]
IDX --> FIND{"FindReplTupleInLocalRel<br/>인덱스 스캔 또는 순차 스캔"}
FIND -->|찾음| ORIG{"로컬 튜플 오리진이<br/>세션 오리진과 다른가?"}
FIND -->|찾지 못함| MISS["ReportApplyConflict<br/>CT_UPDATE_MISSING /<br/>CT_DELETE_MISSING<br/>(로그, 건너뜀)"]
ORIG -->|예| RPT["ReportApplyConflict<br/>CT_*_ORIGIN_DIFFERS (로그)"]
ORIG -->|아니오| DO["ExecSimpleRelationUpdate /<br/>ExecSimpleRelationDelete"]
RPT --> DO
피드백: 내구적 데이터 이상 확인하지 않기
섹션 제목: “피드백: 내구적 데이터 이상 확인하지 않기”apply 루프는 주기적으로 write/flush/apply LSN을 업스트림에 보내 퍼블리셔가 슬롯을 전진시키고 WAL을 해제할 수 있게 한다. 구독자는 내구적으로 커밋한 것 이상의 flush 위치를 절대 보고해선 안 된다. 그렇지 않으면 크래시 시 퍼블리셔가 안전하다고 믿는 트랜잭션을 잃는다. get_flush_position은 lsn_mapping 리스트(원격 커밋 LSN과 이를 생성한 로컬 WAL LSN 쌍)를 순회하며, 대응하는 로컬 LSN이 <= GetFlushRecPtr()인 경우에만 원격 위치를 flushed로 보고한다.
// get_flush_position — src/backend/replication/logical/worker.c (condensed)XLogRecPtr local_flush = GetFlushRecPtr(NULL);dlist_foreach_modify(iter, &lsn_mapping){ FlushPosition *pos = dlist_container(FlushPosition, node, iter.cur); *write = pos->remote_end; if (pos->local_end <= local_flush) { *flush = pos->remote_end; /* durable: safe to ack */ dlist_delete(iter.cur); pfree(pos); } else { *have_pending_txes = true; /* not yet durable: hold back */ return; }}send_feedback는 이 게이팅된 값을 (write, flush, apply) 형태로 libpq 연결로 업스트림에 전송한다. flush 필드가 바로 이 값이다. 이는 postgres-wal-sender-receiver.md의 스탠바이 피드백에 대응하는 구독자 쪽 아날로그지만, 단위는 날 WAL 바이트 위치가 아닌 논리 트랜잭션 커밋이다.
테이블 동기화: 연결 문제
섹션 제목: “테이블 동기화: 연결 문제”신규 구독의 테이블은 비어 있고, 변경 스트림은 슬롯 생성 시점 이후의 델타만 전달한다. tablesync 워커가 그 간격을 메운다. TablesyncWorkerMain은 apply 워커와 동일한 SetupApplyOrSyncWorker 스캐폴딩을 실행하지만 이후 LogicalRepSyncTableStart를 호출한다. 이 함수는 행의 srsubstate를 확인하고, 퍼블리셔에 테이블별 복제 슬롯을 생성하고(ReplicationSlotNameForTablesync로 명명), 스냅샷을 찍고, 테이블 내용을 COPY한다.
// copy_table — src/backend/replication/logical/tablesync.c (condensed)fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), RelationGetRelationName(rel), &lrel, &qual, &gencol_published);logicalrep_relmap_update(&lrel);relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);/* Build "COPY <schema.table> (cols) TO STDOUT" and stream rows in. */appendStringInfo(&cmd, "COPY %s", quote_qualified_identifier(lrel.nspname, lrel.relname));/* ... append column list ... */appendStringInfoString(&cmd, " TO STDOUT");res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);/* CopyFrom() drains the COPY stream into the local table. */복사에서 라이브 스트리밍으로의 핸드오프가 미묘한 부분이며, pg_subscription_rel.srsubstate 컬럼을 통한 두 프로세스 핸드셰이크로 구동된다. 복사가 완료되면(FINISHEDCOPY) 동기화 워커는 SYNCWAIT으로 전진하고 대기한다. 리더 apply 워커는 process_syncing_tables_for_apply에서 SYNCWAIT 상태를 감지해 동기화 워커를 CATCHUP으로 전환하고, apply 워커의 현재 LSN을 동기 포인트로 기록한다.
// process_syncing_tables_for_apply — src/backend/replication/logical/tablesync.c (condensed)if (rstate->state == SUBREL_STATE_SYNCWAIT){ /* Sync worker is waiting for apply. Tell it it can catch up now. */ syncworker->relstate = SUBREL_STATE_CATCHUP; syncworker->relstate_lsn = Max(syncworker->relstate_lsn, current_lsn);}if (rstate->state == SUBREL_STATE_SYNCWAIT){ logicalrep_worker_wakeup_ptr(syncworker); /* commit to release locks, then busy-wait for the sync worker */ wait_for_relation_state_change(rstate->relid, SUBREL_STATE_SYNCDONE);}이제 CATCHUP 상태인 동기화 워커는 복사 스냅샷 위치부터 apply 워커의 동기 LSN까지 변경 스트림 자체를 적용한다. 그 LSN에 도달하면 process_syncing_tables_for_sync가 SYNCDONE으로 이동하고, 퍼블리셔의 테이블별 슬롯을 삭제한 뒤 종료한다.
// process_syncing_tables_for_sync — src/backend/replication/logical/tablesync.c (condensed)if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP && current_lsn >= MyLogicalRepWorker->relstate_lsn){ MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; MyLogicalRepWorker->relstate_lsn = current_lsn; UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn, false); walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);}마지막으로 리더 apply 워커의 스트림 위치가 동기화 워커의 SYNCDONE LSN을 통과하면(current_lsn >= rstate->lsn) apply 워커가 테이블을 READY로 승격한다. 그 이후 should_apply_changes_for_rel이 해당 테이블에서 true를 반환하므로 apply 워커가 정상적으로 스트리밍한다. 상태 기계는 테이블별로 독립 동작하므로, 100개 테이블 구독이 테이블을 병렬로 동기화하고 각각 라이브 스트림에 독립적으로 합류한다.
flowchart TD
INIT["INIT<br/>(srsubstate 'i')"] --> DSYNC["DATASYNC 'd'<br/>슬롯 생성 + 스냅샷"]
DSYNC --> COPY["COPY table TO STDOUT<br/>(copy_table / CopyFrom)"]
COPY --> FC["FINISHEDCOPY 'f'"]
FC --> SW["SYNCWAIT 'w'<br/>동기화 워커 대기"]
SW -->|"apply 워커가 상태 설정"| CU["CATCHUP 'c'<br/>동기화 워커가 스트림 적용<br/>apply LSN까지"]
CU -->|"current_lsn >= relstate_lsn"| SD["SYNCDONE 's'<br/>tablesync 슬롯 삭제, 종료"]
SD -->|"apply 워커가 SYNCDONE LSN 통과"| RDY["READY 'r'<br/>리더 apply가 정상 스트리밍"]
소스 워크스루
섹션 제목: “소스 워크스루”구독자 코드는 src/backend/replication/logical/ 아래에 있다. 네 파일이 apply 경로의 대부분을 담으며, 이 워크스루는 호출 흐름 순으로 안정적인 심볼을 나열하고 위치 힌트 테이블로 마무리한다.
런처 (launcher.c)
섹션 제목: “런처 (launcher.c)”ApplyLauncherRegister— 서버 시작 시 postmaster에서 호출한다.BgWorkerStart_RecoveryFinished로 런처를BgWorker로 등록해 서버가 연결을 받을 수 있게 된 후 시작된다.ApplyLauncherMain— 폴 루프.get_subscription_list를 호출하고, 실행 중인 워커가 없는 활성화된 구독마다wal_retrieve_retry_interval스로틀을 적용한 뒤logicalrep_worker_launch를 호출한다.ApplyLauncherGetWorkerStartTime/ApplyLauncherSetWorkerStartTime—logicalrep_launcher_attach_dshmem이 설치한 DSA 기반 해시에서 구독별 last-start 시각을 읽고 쓴다.logicalrep_worker_launch—LogicalRepWorkerLock하에 빈LogicalRepWorker슬롯을 확보하고,ApplyWorkerMain또는TablesyncWorkerMain을 가리키는 동적 bgworker를 등록한 뒤WaitForReplicationWorkerAttach에서 블록한다.logicalrep_worker_find—(subid, relid)에 매칭되는 슬롯을 찾아 공유 워커 배열을 스캔한다. 런처가 아직 실행 중인 워커를 감지하는 데 쓴다.
Apply 워커 (worker.c)
섹션 제목: “Apply 워커 (worker.c)”ApplyWorkerMain— bgworker 진입점.SetupApplyOrSyncWorker후run_apply_worker를 호출한다.SetupApplyOrSyncWorker/InitializeLogRepWorker—session_replication_role = replica설정, 구독 소유자로 워커 데이터베이스 접속,search_path비움,MySubscription로드, syscache 무효화 콜백 등록.run_apply_worker— 복제 오리진(내구적 커서) 설정, 퍼블리셔로walrcv_connect,walrcv_startstreaming,start_apply→LogicalRepApplyLoop.LogicalRepApplyLoop— CopyData 수신, 선두 바이트 확인('w'데이터 /'k'keepalive),apply_dispatch로 각 디코딩 메시지 디스패치, 주기적send_feedback.apply_dispatch—LogicalRepMsgType바이트로 스위치해 매칭되는apply_handle_*루틴 호출.apply_handle_begin/apply_handle_commit— 원격 트랜잭션을 감싼다. 커밋이 오리진을 전진시키고process_syncing_tables를 호출한다.apply_handle_commit_internal이 실제 로컬 커밋을 수행한다.apply_handle_insert/apply_handle_update/apply_handle_delete— 행 디코딩,logicalrep_rel_open,should_apply_changes_for_rel로 필터링, 슬롯 구성, 대응하는*_internal호출.apply_handle_insert_internal/apply_handle_update_internal/apply_handle_delete_internal— executor(ExecSimpleRelationInsert/Update/Delete)로 변경 실행. update/delete는 먼저FindReplTupleInLocalRel을 호출하고ReportApplyConflict를 실행할 수 있다.FindReplTupleInLocalRel—RelationFindReplTupleByIndex(아이덴티티/PK 인덱스 존재 시) 또는RelationFindReplTupleSeq(REPLICA IDENTITY FULL)로 로컬 행을 찾는다.get_flush_position/send_feedback/store_flush_position— 원격↔로컬 LSN 매핑을 유지하고 로컬 내구성에 기반해 업스트림 피드백을 게이팅한다.
관계 맵 (relation.c)
섹션 제목: “관계 맵 (relation.c)”logicalrep_relmap_update—RELATION메시지에서 온LogicalRepRelation을LogicalRepRelMap해시에 설치/갱신한다.logicalrep_rel_open/logicalrep_rel_close— 원격 id로 로컬 관계를 열고, 로컬 DDL에 의해 항목이 무효화되면 속성 맵과 파생 상태를 재구성한다.logicalrep_rel_mark_updatable— 로컬 아이덴티티/PK 키 대 원격 게시 키 집합에서entry->updatable을 결정한다.GetRelationIdentityOrPK/FindUsableIndexForReplicaIdentityFull/FindLogicalRepLocalIndex— 행 찾기에 쓸 인덱스를 선택한다.logicalrep_partition_open— 튜플 라우팅 중 리프 파티션의 relmap 항목.
Tablesync (tablesync.c)
섹션 제목: “Tablesync (tablesync.c)”TablesyncWorkerMain— 테이블별 동기화 워커의 bgworker 진입점. apply 워커와SetupApplyOrSyncWorker를 공유한다.LogicalRepSyncTableStart— 복사 드라이버.srsubstate읽기, 테이블별 슬롯 생성, 스냅샷,copy_table실행 후SYNCWAIT으로 진행.copy_table/fetch_remote_table_info— 퍼블리셔에서COPY ... TO STDOUT을 실행하고CopyFrom으로 로컬 테이블에 드레인.process_syncing_tables→process_syncing_tables_for_apply/process_syncing_tables_for_sync— 각 쪽에서SYNCWAIT → CATCHUP → SYNCDONE → READY핸드셰이크를 구동한다.wait_for_relation_state_change— 동기화 워커가 따라잡는 동안 apply 워커가 쓰는 바쁜 대기 헬퍼.UpdateSubscriptionRelState/GetSubscriptionRelState—pg_subscription_rel.srsubstate카탈로그 컬럼 읽기/쓰기.
위치 힌트 (2026-06-05 기준, REL_18 273fe94)
섹션 제목: “위치 힌트 (2026-06-05 기준, REL_18 273fe94)”| 심볼 | 파일 | 줄 |
|---|---|---|
ApplyLauncherRegister | src/backend/replication/logical/launcher.c | 928 |
ApplyLauncherMain | src/backend/replication/logical/launcher.c | 1132 |
logicalrep_worker_launch | src/backend/replication/logical/launcher.c | 310 |
logicalrep_worker_find | src/backend/replication/logical/launcher.c | 247 |
ApplyWorkerMain | src/backend/replication/logical/worker.c | 4833 |
SetupApplyOrSyncWorker | src/backend/replication/logical/worker.c | 4792 |
InitializeLogRepWorker | src/backend/replication/logical/worker.c | 4674 |
run_apply_worker | src/backend/replication/logical/worker.c | 4561 |
LogicalRepApplyLoop | src/backend/replication/logical/worker.c | 3589 |
apply_dispatch | src/backend/replication/logical/worker.c | 3383 |
apply_handle_commit | src/backend/replication/logical/worker.c | 1020 |
apply_handle_commit_internal | src/backend/replication/logical/worker.c | 2268 |
apply_handle_insert | src/backend/replication/logical/worker.c | 2398 |
apply_handle_insert_internal | src/backend/replication/logical/worker.c | 2489 |
apply_handle_update_internal | src/backend/replication/logical/worker.c | 2677 |
apply_handle_delete_internal | src/backend/replication/logical/worker.c | 2862 |
FindReplTupleInLocalRel | src/backend/replication/logical/worker.c | 2930 |
should_apply_changes_for_rel | src/backend/replication/logical/worker.c | 461 |
get_flush_position | src/backend/replication/logical/worker.c | 3503 |
send_feedback | src/backend/replication/logical/worker.c | 3853 |
logicalrep_relmap_update | src/backend/replication/logical/relation.c | 164 |
logicalrep_rel_mark_updatable | src/backend/replication/logical/relation.c | 296 |
logicalrep_rel_open | src/backend/replication/logical/relation.c | 349 |
logicalrep_rel_close | src/backend/replication/logical/relation.c | 504 |
logicalrep_partition_open | src/backend/replication/logical/relation.c | 633 |
FindUsableIndexForReplicaIdentityFull | src/backend/replication/logical/relation.c | 776 |
GetRelationIdentityOrPK | src/backend/replication/logical/relation.c | 891 |
process_syncing_tables_for_sync | src/backend/replication/logical/tablesync.c | 294 |
process_syncing_tables_for_apply | src/backend/replication/logical/tablesync.c | 418 |
process_syncing_tables | src/backend/replication/logical/tablesync.c | 695 |
wait_for_relation_state_change | src/backend/replication/logical/tablesync.c | 183 |
fetch_remote_table_info | src/backend/replication/logical/tablesync.c | 825 |
copy_table | src/backend/replication/logical/tablesync.c | 1143 |
LogicalRepSyncTableStart | src/backend/replication/logical/tablesync.c | 1319 |
ReportApplyConflict | src/backend/replication/logical/conflict.c | 103 |
InitConflictIndexes | src/backend/replication/logical/conflict.c | 138 |
ConflictType (enum) | src/include/replication/conflict.h | 31 |
SUBREL_STATE_* (매크로) | src/include/catalog/pg_subscription_rel.h | 62 |
LogicalRepWorker (struct) | src/include/replication/worker_internal.h | — |
LogicalRepWorkerType (enum) | src/include/replication/worker_internal.h | — |
소스 검증 (2026-06-05 기준)
섹션 제목: “소스 검증 (2026-06-05 기준)”이 문서의 주장은 /data/hgryoo/references/postgres의 REL_18 트리 커밋 273fe94를 기준으로 검증했다. 주요 확인 항목:
- 세 종류의 워커.
src/include/replication/worker_internal.h의LogicalRepWorkerType이 정확히WORKERTYPE_UNKNOWN,WORKERTYPE_TABLESYNC,WORKERTYPE_APPLY,WORKERTYPE_PARALLEL_APPLY를 정의한다. 확인됨. - 충돌 분류 체계.
src/include/replication/conflict.h의ConflictType이CT_INSERT_EXISTS,CT_UPDATE_ORIGIN_DIFFERS,CT_UPDATE_EXISTS,CT_UPDATE_MISSING,CT_DELETE_ORIGIN_DIFFERS,CT_DELETE_MISSING,CT_MULTIPLE_UNIQUE_CONFLICTS를 포함하며,CONFLICT_NUM_TYPES = CT_MULTIPLE_UNIQUE_CONFLICTS + 1.CT_*_ORIGIN_DIFFERS와CT_MULTIPLE_UNIQUE_CONFLICTS멤버가 이것이 REL_18 트리임을 확인한다. 확인됨. - 상태 기계 상수.
SUBREL_STATE_INIT 'i',SUBREL_STATE_DATASYNC 'd',SUBREL_STATE_FINISHEDCOPY 'f',SUBREL_STATE_SYNCWAIT 'w',SUBREL_STATE_CATCHUP 'c',SUBREL_STATE_SYNCDONE 's',SUBREL_STATE_READY 'r'가src/include/catalog/pg_subscription_rel.h에 모두 있다. 확인됨. - 레플리카 아이덴티티 폴백.
relation.c의GetRelationIdentityOrPK가RelationGetReplicaIndex후RelationGetPrimaryKeyIndex를 호출한다.FindReplTupleInLocalRel이OidIsValid(localidxoid) || replident == REPLICA_IDENTITY_FULL을 단언하고RelationFindReplTupleByIndex와RelationFindReplTupleSeq로 분기한다. 확인됨. - 피드백 게이팅.
get_flush_position이pos->local_end <= GetFlushRecPtr(NULL)인 경우에만 원격 LSN을*flush로 보고한다. 확인됨. - 권한 처리.
apply_handle_insert가!MySubscription->runasowner일 때SwitchToUntrustedUser(...relowner...)를 호출하고,InitializeLogRepWorker가session_replication_role = replica를 설정한다. 확인됨. - 위치 힌트 표의 줄 번호는 2026-06-05 트리에서 직접 읽었다. 심볼이 내구적 앵커이며 줄 번호는 리포맷에 따라 달라지는 힌트다.
범위 경계: 퍼블리셔의 논리 디코딩, pgoutput 플러그인, walsender/walreceiver 전송은 이 문서의 범위 밖이며 postgres-logical-decoding.md, postgres-pgoutput.md, postgres-wal-sender-receiver.md에서 다룬다. 내구적 객체로서의 복제 슬롯과 오리진은 postgres-replication-slots.md에 있다. contrib/ 모듈은 전체 범위 밖이다.
PostgreSQL 너머 — 비교 설계와 연구 전선
섹션 제목: “PostgreSQL 너머 — 비교 설계와 연구 전선”MySQL 행 기반 복제. MySQL의 ROW 포맷 binlog가 가장 가까운 유사체다. 각 Write_rows/Update_rows/Delete_rows 이벤트가 이전/이후 이미지를 담고, SQL/applier 스레드가 재실행한다. MySQL도 기본 키로 행을 매칭하고(없으면 전체 이미지 스캔), PostgreSQL의 레플리카 아이덴티티-or-순차스캔 선택과 정확히 대응한다. 큰 아키텍처 차이는 병렬성이다. MySQL의 멀티스레드 applier(MTS)는 데이터베이스별 또는 logical_clock 커밋 그룹으로 분할하는 반면, PostgreSQL은 구독당 리더 apply 워커 하나를 유지하며 스트리밍 진행 중 트랜잭션에 한해 병렬 apply 워커로 팬아웃한다.
Oracle GoldenGate / SQL Server 트랜잭셔널 복제. 둘 다 캡처(리두/트랜잭션 로그 리더), 전달(큐), apply(리플리캣/배포 에이전트)를 분리하는 3단계 파이프라인을 쓴다. Petrov(Database Internals, 11장)가 설명한 구조와 같다. GoldenGate의 충돌 감지와 해결(CDR)은 REL_18보다 풍부하다. 타임스탬프/LWW, 폐기, 커스텀 프로시저 등 설정 가능한 해결 방법을 제공하는 반면, PostgreSQL 18은 감지-기록하고 자연스러운 결과를 취한다. PostgreSQL 커뮤니티는 설정 가능한 해결을 향해 점진적으로 구축해 왔으며, 최근 릴리즈에서 추가된 타입화된 ConflictType 분류 체계와 튜플별 오리진/xmin/커밋-ts 보고가 미래의 LWW 또는 커스텀 리졸버가 구축할 기반이다.
Debezium과 CDC 생태계. Debezium은 PostgreSQL의 논리 디코딩 출력(pgoutput 또는 wal2json)을 읽어 Kafka에 변경 이벤트를 전달하며, 토폴로지를 역전시킨다. “구독자”가 다른 PostgreSQL이 아닌 외부 스트림 프로세서가 된다. 이는 디코딩/전송 경계가 깔끔한 이음새임을 보여 준다. 같은 퍼블리셔가 네이티브 구독자와 CDC 파이프라인 어느 쪽에도 피딩한다. PostgreSQL의 네이티브 apply 경로는 일반 CDC 패턴의 “배터리 포함” 사례다.
충돌 없는 복제 데이터 타입(CRDT)과 멀티마스터. 논리 복제의 연구 전선은 모든 노드가 퍼블리싱과 구독을 모두 하는 액티브-액티브(멀티마스터) 운영이다. 핵심 난제는 동시 충돌 쓰기 아래의 수렴이다. Kleppmann의 Designing Data-Intensive Applications(5장, 7장)는 옵션을 제시한다. last-write-wins(손실 있음), 버전 벡터, CRDT(구조적으로 충돌 없음). PostgreSQL의 오리진 추적 — 모든 커밋이 어떤 노드에서 최초 생성됐는지를 기록하며 CT_*_ORIGIN_DIFFERS로 노출됨 — 은 버전 벡터 또는 LWW 리졸버가 필요한 정확히 그 메타데이터다. 그래서 BDR/pglogical 같은 확장이 코어 apply 기계 위에 멀티마스터 의미론을 레이어링할 수 있다. REL_18 인코어 기능 집합은 감지에 머문다. 자동 해결 정책은 진행 중인 설계 대화로 남아 있다.
복제 이론 프레이밍. Petrov(11장)와 Silberschatz(Database System Concepts, 7e, 23장)는 논리 applier가 제공해야 하는 핵심 보장을 커밋 순서로 정확히 한 번 적용, 내구적 커서 유지로 규정한다. PostgreSQL은 “내구적 커서”를 데이터 변경과 같은 트랜잭션에서 전진하는 복제 오리진으로, “정확히 한 번”을 그 오리진과 구독자가 내구적으로 적용하지 않은 WAL을 퍼블리셔가 폐기하지 않도록 막는 플러시 게이팅 피드백의 조합으로 구현한다. tablesync 스플라이스는 이론이 보통 건너가는 부트스트랩 간격에 대한 엔지니어링 해답이다.
- 코드 (REL_18, 커밋 273fe94, 2026-06-05 기준):
src/backend/replication/logical/worker.c— apply 워커, apply 루프, 디스패치, 행 핸들러, 충돌 보고, 피드백.src/backend/replication/logical/launcher.c— 런처 폴 루프, 워커 기동/레지스트리, 재시작 스로틀.src/backend/replication/logical/tablesync.c— tablesync 워커, 복사 드라이버,SYNCWAIT/CATCHUP/SYNCDONE/READY핸드셰이크.src/backend/replication/logical/relation.c— 관계 맵, 레플리카 아이덴티티 인덱스 선택, 갱신 가능 여부.src/backend/replication/logical/conflict.c—ReportApplyConflict,InitConflictIndexes, 충돌 errdetail 포맷.src/include/replication/worker_internal.h—LogicalRepWorker,LogicalRepWorkerType.src/include/replication/conflict.h—ConflictType.src/include/catalog/pg_subscription_rel.h—SUBREL_STATE_*.src/include/catalog/pg_subscription.h— 구독 카탈로그.
- 교과서 / 이론:
- Petrov, Database Internals (2019), 11장 “Replication and Consistency” — 논리 변경 레코드, 리더/팔로워 파이프라인.
- Kleppmann, Designing Data-Intensive Applications (2017), 5장 “Replication”과 7장 — 충돌 처리, LWW, 버전 벡터, CRDT; 11장 논리 변경 캡처.
- Silberschatz 외, Database System Concepts (7e), 23장 — 분산 데이터베이스와 복제.
- 관련 KB 문서:
postgres-logical-decoding.md,postgres-pgoutput.md,postgres-wal-sender-receiver.md,postgres-replication-slots.md,postgres-synchronous-replication.md.