(KO) PostgreSQL pgoutput — 내장 논리 복제 출력 플러그인
목차
- 이론적 배경
- DBMS 공통 설계 패턴
- PostgreSQL의 접근 방식
- 소스 워크스루
- 소스 검증 (2026-06-05 기준)
- PostgreSQL 너머 — 비교 설계와 연구 전선
- 출처
이론적 배경
섹션 제목: “이론적 배경”논리 복제(logical replication)는 물리 복제(physical replication)가 답하지 못하는 질문에 답한다. 생산자의 온디스크 페이지 레이아웃을 공유하지 않는 소비자가 생산자의 커밋된 변경을 어떻게 따라잡는가의 문제다. 물리(스트리밍) 복제는 원시 WAL 바이트를 그대로 전송한다. 블록 레이아웃과 바이트 오프셋이 동일해야 하므로 스탠바이는 바이트 단위로 호환 가능해야 한다(postgres-wal-sender-receiver.md). 논리 복제는 행 수준 변경 이벤트를 스키마 어휘로 전달한다. “테이블 t에 (1,'a')가 INSERT되었다”는 방식이므로, 소비자는 다른 메이저 버전이거나 다른 스키마이거나 PostgreSQL 이외의 시스템이어도 무방하다.
행 수준 이벤트 스트림은 논리 디코딩(postgres-logical-decoding.md)이 생산한다. 리오더 버퍼(reorder buffer)가 물리적으로 인터리빙된 WAL을 트랜잭션 단위, 커밋 순서로 재조합하고, 각 리두 레코드를 히스토릭(시간 일관성) 카탈로그 스냅샷으로 해석한 뒤, 논리 이벤트마다 출력 플러그인 콜백(begin·change·commit 및 스트리밍·2단계 커밋 변형)을 호출한다. 출력 플러그인은 마지막 구간이다. 역할은 오직 직렬화다. 각 콜백에 전달된 ReorderBufferChange / ReorderBufferTXN 객체를 소비자가 파싱할 수 있는 바이트 스트림으로 바꾸는 일이다. 논리 디코딩이 무엇을, 언제 결정하면, 출력 플러그인은 와이어에서 어떻게 보이는지를 결정한다.
Kleppmann의 Designing Data-Intensive Applications(11장 “스트림 처리”, §“Change Data Capture”)는 전체 파이프라인을 데이터베이스를 “복제 로그 파싱”으로 이벤트 스트림 원천으로 전환하는 문제로 규정한다. 유용한 변경 스트림은 “기록된 순서와 같아야” 하고, 소비자는 소스를 느리게 하지 않도록 비동기로 유지한다. 그러나 Kleppmann은 이벤트 인코딩을 명시적으로 구현 선택으로 남긴다. 이 선택이 바로 출력 플러그인의 역할이다. 동일한 디코더 위에서 플러그인만 다른 두 시스템이 공존할 수 있다. PostgreSQL은 같은 LogicalDecodingContext 위에 test_decoding(사람이 읽기 쉬운 디버그 형식)과 pgoutput(내장 구독자가 사용하는 콤팩트 바이너리 형식)을 모두 제공한다.
출력 플러그인 작성자가 다루는 설계 공간은 다섯 축으로 나뉜다.
-
인코딩: 텍스트 대 바이너리. 텍스트 형식은 디버깅하기 쉽지만 장황하고 바이너리 타입에서 손실이 생긴다. 바이너리 형식은 콤팩트하고 타입 충실하지만 불투명하다. pgoutput은 기본적으로 바이너리(
OUTPUT_PLUGIN_BINARY_OUTPUT)이며, 선택적binary모드에서는 각 열 값을 타입의 바이너리 send/recv 형식으로 전송한다. -
스키마 전달: 즉시 대 지연, 인밴드 대 아웃오브밴드. 소비자는 행을 해석하려면 테이블의 열 이름과 타입이 필요하다. 플러그인은 전체 카탈로그를 앞서 전송하거나, 각 릴레이션의 디스크립터를 인밴드로 해당 릴레이션이 처음 등장할 때 한 번만 전송할 수 있다. pgoutput은
schema_sent플래그로 후자를 구현한다. -
필터링: 어디서, 무엇을. 구독자가 모든 테이블의 모든 변경을 원하는 경우는 드물다. 플러그인은 테이블(발행 멤버십), 연산(
pubinsert/pubupdate/pubdelete/pubtruncate), 컬럼(컬럼 목록), 행(WHERE행 필터) 단위로 필터링할 수 있다. 필터링을 플러그인으로 밀어넣으면 대역폭을 절감하고 구독자가 버릴 디코딩 작업을 피할 수 있다. -
진행 중인 트랜잭션 스트리밍. 대형 트랜잭션을 커밋 전에 바이트로 내보낼 수 있다. 플러그인은 미커밋 트랜잭션의 하위 배치를
STREAM_START/STREAM_STOP으로 프레이밍해 각 변경에 xid를 붙임으로써, 구독자가 스풀하다 어보트 시 폐기할 수 있게 한다. 양쪽 메모리 사용량을 줄이는 대신 구독자 측 스풀링이 필요하다. -
2단계 커밋. 분산 원자성을 위해 플러그인이
PREPARE/COMMIT PREPARED/ROLLBACK PREPARED를 일급 메시지로 노출하면, 구독자는 로컬에서 prepare하고 전역 코디네이터가 결정한다.
pgoutput은 각 축에서 구체적인 선택을 하나씩 내리고, 시작 시 협상하는 proto_version 번호로 버전을 매겨 구 구독자와 새 발행자가 합의된 메시지 형태만 사용한다.
DBMS 공통 설계 패턴
섹션 제목: “DBMS 공통 설계 패턴”로그 기반 변경 데이터 캡처 시스템들 — Debezium, MySQL binlog 복제, Oracle GoldenGate, SQL Server 복제 — 은 인식 가능한 공통 관례로 수렴했다. 이를 명명해두면 pgoutput이 공유된 설계 패턴 안에서 내린 선택의 집합으로 읽힌다.
이벤트당 콜백 플러그인 경계
섹션 제목: “이벤트당 콜백 플러그인 경계”디코더와 직렬화기는 거의 항상 콜백 인터페이스로 분리된다. 디코더는 트랜잭션을 재조합하고 on_begin, on_row_change, on_commit을 호출한다. 이렇게 하면 비용이 크고 정확성이 중요한 디코딩 로직은 한 곳에 두고, 형식만 교체할 수 있다(Debezium의 JSON/Avro/protobuf, 네이티브 바이너리 프레임 등). PostgreSQL의 OutputPluginCallbacks 구조체가 바로 이 경계다.
타입화된 자기 기술적 메시지 스트림
섹션 제목: “타입화된 자기 기술적 메시지 스트림”와이어 스트림은 길이·태그로 구분된 메시지의 시퀀스이며, 각 메시지는 1바이트 타입 태그 — begin은 B, insert는 I 등 — 로 시작한다. 소비자는 태그에 대한 단순 디스패치 루프다. 동일 채널에 제어 프레임(begin/commit), 스키마 프레임(relation/type), 데이터 프레임(insert/update/delete)이 섞여 흐르기 때문에 자기 기술성이 중요하다.
지연·메모이즈된 스키마 디스패치
섹션 제목: “지연·메모이즈된 스키마 디스패치”세션이 몇 개 테이블만 건드릴 때 전체 카탈로그를 앞서 전송하는 것은 낭비다. 보편적 최적화는 릴레이션 디스크립터를 해당 릴레이션의 첫 번째 변경이 나타날 때 전송하고 이를 기억하는 것이다. 릴레이션 OID를 키로 하는 세션 단위 캐시에 “전송 완료” 여부를 저장하고, DDL로 테이블이 변경되면 “전송 완료” 비트를 지워 다음 변경에서 재전송한다.
생산자 측 필터링
섹션 제목: “생산자 측 필터링”생산자와 소비자 사이의 대역폭이 희소 자원이므로, 생산자가 직렬화 전에 불필요한 변경을 제거한다. 테이블·연산·컬럼 프로젝션·행 조건 순으로 필터링한다. 조건 평가기는 보통 엔진 자체 표현식 기계를 재활용한다. pgoutput이 릴레이션별로 ExprState와 EState를 구성하는 이유가 그것이다.
빈 트랜잭션 제거
섹션 제목: “빈 트랜잭션 제거”구독자가 구독하지 않는 테이블만 건드린 트랜잭션은 데이터 프레임이 없다. 그런 트랜잭션에 빈 BEGIN/COMMIT 쌍을 보내면 왕복 바이트가 낭비된다. 일반적인 방법은 실제로 전송될 첫 번째 변경이 나타날 때까지 BEGIN을 지연하고, BEGIN이 전송되지 않았다면 COMMIT도 생략하는 것이다.
진행 중인 트랜잭션 스트리밍
섹션 제목: “진행 중인 트랜잭션 스트리밍”메모리를 제한하기 위해 대형 트랜잭션을 커밋 전에 점진적으로 방출한다. 각 조각에 트랜잭션 id를 붙이고 start/stop 마커로 묶은 뒤, 스풀된 조각이 커밋되지 않을 경우 소비자가 폐기할 수 있도록 명시적 어보트 신호를 제공한다. 대형 트랜잭션을 지원하는 모든 CDC 시스템이 이 프레이밍을 재발명한다.
이론 ↔ PostgreSQL 매핑
섹션 제목: “이론 ↔ PostgreSQL 매핑”| 개념 | PostgreSQL 이름 |
|---|---|
| 출력 플러그인 콜백 테이블 | OutputPluginCallbacks (_PG_output_plugin_init에서 채움) |
| 플러그인 진입점 | pgoutput.c의 _PG_output_plugin_init |
| 세션 단위 플러그인 상태 | PGOutputData (ctx->output_plugin_private) |
| 트랜잭션 단위 플러그인 상태 | PGOutputTxnData (txn->output_plugin_private) |
| Begin/commit 제어 프레임 | 'B' LOGICAL_REP_MSG_BEGIN, 'C' LOGICAL_REP_MSG_COMMIT |
| 스키마 프레임 | 'R' LOGICAL_REP_MSG_RELATION, 'Y' LOGICAL_REP_MSG_TYPE |
| 행 변경 프레임 | 'I'/'U'/'D' INSERT/UPDATE/DELETE, 'T' TRUNCATE |
| 논리 메시지 프레임 | 'M' LOGICAL_REP_MSG_MESSAGE |
| 스트리밍 프레임 | 'S'/'E'/'A'/'c' STREAM_START/STOP/ABORT/COMMIT |
| 2단계 커밋 프레임 | 'b'/'P'/'K'/'r'/'p' BEGIN_PREPARE/PREPARE/COMMIT_PREPARED/ROLLBACK_PREPARED/STREAM_PREPARE |
| 릴레이션 단위 스키마 캐시 | RelationSyncCache (RelationSyncEntry의 HTAB) |
| “스키마 전송 완료” 메모 | RelationSyncEntry.schema_sent (+ streamed_txns) |
| 연산 필터 | RelationSyncEntry.pubactions (PublicationActions) |
| 컬럼 목록 | RelationSyncEntry.columns (Bitmapset) |
| 행 필터 | RelationSyncEntry.exprstate[NUM_ROWFILTER_PUBACTIONS] |
| 파티션 루트 대상 | RelationSyncEntry.publish_as_relid |
| 프로토콜 버전 협상 | proto_version 옵션, LOGICALREP_PROTO_*_VERSION_NUM |
PostgreSQL의 접근 방식
섹션 제목: “PostgreSQL의 접근 방식”플러그인은 로더블 콜백 테이블이다
섹션 제목: “플러그인은 로더블 콜백 테이블이다”pgoutput은 로더블 모듈로 빌드되지만(PG_MODULE_MAGIC_EXT 선언) 코어에 포함되어 슬롯의 플러그인 이름으로 pgoutput을 지정하면 선택된다. 논리 디코딩은 _PG_output_plugin_init 심볼을 찾아 한 번 호출해 콜백 테이블을 채운다. 모든 항목은 pgoutput.c의 static 함수다.
// _PG_output_plugin_init — src/backend/replication/pgoutput/pgoutput.cvoid_PG_output_plugin_init(OutputPluginCallbacks *cb){ cb->startup_cb = pgoutput_startup; cb->begin_cb = pgoutput_begin_txn; cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; cb->message_cb = pgoutput_message; cb->commit_cb = pgoutput_commit_txn;
cb->begin_prepare_cb = pgoutput_begin_prepare_txn; cb->prepare_cb = pgoutput_prepare_txn; cb->commit_prepared_cb = pgoutput_commit_prepared_txn; cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown;
/* transaction streaming */ cb->stream_start_cb = pgoutput_stream_start; cb->stream_stop_cb = pgoutput_stream_stop; cb->stream_abort_cb = pgoutput_stream_abort; cb->stream_commit_cb = pgoutput_stream_commit; cb->stream_change_cb = pgoutput_change; /* reused */ cb->stream_message_cb = pgoutput_message; /* reused */ cb->stream_truncate_cb = pgoutput_truncate; /* reused */ /* transaction streaming - two-phase commit */ cb->stream_prepare_cb = pgoutput_stream_prepare_txn;}stream_change_cb, stream_message_cb, stream_truncate_cb는 비스트리밍 변형과 같은 함수를 가리킨다. “스트리밍”과 “비스트리밍” 처리의 차이는 별도 함수가 아니라 PGOutputData.in_streaming 플래그다. pgoutput_stream_start가 이 플래그를 세트하면 공유 함수들이 각 메시지에 xid를 붙일지 여부를 결정하는 데 사용한다.
시작: 옵션 파싱과 프로토콜 버전 협상
섹션 제목: “시작: 옵션 파싱과 프로토콜 버전 협상”세션의 첫 번째 콜백은 pgoutput_startup이다. 세션 단위 PGOutputData를 할당하고(ctx->output_plugin_private에 고정), 출력을 바이너리로 선언한 뒤, 실제 복제 시작 시(슬롯 생성이 아닌 경우) 구독자가 전달한 옵션을 파싱하고 프로토콜 버전 범위를 검증한다.
// pgoutput_startup — src/backend/replication/pgoutput/pgoutput.cPGOutputData *data = palloc0(sizeof(PGOutputData));// ... data->context / cachectx / pubctx 메모리 컨텍스트 생성 ...ctx->output_plugin_private = data;opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; /* pgoutput은 바이너리 */
if (!is_init){ parse_output_parameters(ctx->output_plugin_options, data);
if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM) /* ... ERROR */ ; if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM) /* ... ERROR */ ;
if (data->streaming == LOGICALREP_STREAM_OFF) ctx->streaming = false; else if (data->streaming == LOGICALREP_STREAM_ON && data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM) /* ... ERROR: 스트리밍은 proto v2 필요 ... */ ; else if (data->streaming == LOGICALREP_STREAM_PARALLEL && data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM) /* ... ERROR: 병렬 스트리밍은 proto v4 필요 ... */ ; // ... two_phase는 LOGICALREP_PROTO_TWOPHASE_VERSION_NUM (v3)에서 게이팅 ...}프로토콜 버전은 호환성 경첩이다. 각 기능에는 최소 버전이 있다. 스트리밍은 프로토콜 2에서, 2단계 커밋은 3에서, 병렬 스트리밍은 4에서 도입되었다. 협상된 버전이 지원하지 않는 기능을 요청하면 조용히 다운그레이드하지 않고 ERROR를 반환한다.
릴레이션 단위 상태: RelationSyncCache와 RelationSyncEntry
섹션 제목: “릴레이션 단위 상태: RelationSyncCache와 RelationSyncEntry”핫 패스는 행 변경마다 한 번 호출되는 pgoutput_change다. 이 함수는 변경의 릴레이션을 두고 다음 질문에 답해야 한다. 이 테이블이 발행되는가? 이 연산이 활성화되어 있는가? 어느 컬럼인가? 행이 필터를 통과하는가? 구독자가 이미 스키마를 받았는가? 이를 변경마다 카탈로그에서 재계산하면 치명적이다. pgoutput은 이를 릴레이션 OID를 키로 하는 해시 테이블 RelationSyncCache에 메모이즈한다. 값은 RelationSyncEntry다.
// RelationSyncEntry (struct, abridged) — src/backend/replication/pgoutput/pgoutput.ctypedef struct RelationSyncEntry{ Oid relid; /* 해시 키: 릴레이션 OID */ bool replicate_valid; /* false이면 항목 재구성 */ bool schema_sent; /* RELATION/TYPE를 이미 전송했는가? */ PublishGencolsType include_gencols_type; List *streamed_txns; /* 스키마를 전송한 최상위 xid 목록 */ PublicationActions pubactions; /* insert/update/delete/truncate 플래그 */ ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS]; /* 연산별 행 필터 */ EState *estate; TupleTableSlot *new_slot; TupleTableSlot *old_slot; Oid publish_as_relid; /* pubviaroot일 때 파티션 루트 대상 */ AttrMap *attrmap; /* 파티션→조상 컬럼 재매핑 */ Bitmapset *columns; /* 컬럼 목록, NULL이면 전체 컬럼 */ MemoryContext entry_cxt;} RelationSyncEntry;get_rel_sync_entry는 지연 구성자다. 캐시 미스(또는 replicate_valid가 무효화로 클리어된 경우) 시 구독자의 발행 목록을 로드하고, 이를 순회해 pubactions를 계산하고, publish_as_relid(pubviaroot 설정 시 파티션 루트)를 확인하고, 행 필터·컬럼 목록·튜플 슬롯을 초기화한다.
// get_rel_sync_entry — src/backend/replication/pgoutput/pgoutput.c (abridged)entry = hash_search(RelationSyncCache, &relid, HASH_ENTER, &found);if (!found) { /* 새 항목 zero-초기화 */ }
if (!entry->replicate_valid){ List *pubids = GetRelationPublications(relid); List *schemaPubids = GetSchemaPublications(get_rel_namespace(relid)); // ... schema_sent, pubactions, columns, slots, exprstate 리셋 ...
foreach(lc, data->publications) { Publication *pub = lfirst(lc); // ... publish 판정 (FOR ALL TABLES / 테이블 / 스키마 / 조상) ... if (publish && (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot)) { entry->pubactions.pubinsert |= pub->pubactions.pubinsert; entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; entry->pubactions.pubtruncate|= pub->pubactions.pubtruncate; // ... 최상위 조상을 publish_as_relid로 추적 ... } } entry->publish_as_relid = publish_as_relid;
if (entry->pubactions.pubinsert || entry->pubactions.pubupdate || entry->pubactions.pubdelete) { init_tuple_slot(data, relation, entry); pgoutput_row_filter_init(data, rel_publications, entry); check_and_init_gencol(data, rel_publications, entry); pgoutput_column_list_init(data, rel_publications, entry); } entry->replicate_valid = true;}return entry;캐시 일관성은 무효화로 유지한다. rel_sync_cache_relation_cb(릴캐시 콜백)은 replicate_valid를, rel_sync_cache_publication_cb(syscache 콜백)은 publications_valid를 클리어해 다음 get_rel_sync_entry 호출에서 현재 카탈로그 기준으로 재구성하게 한다. DDL은 디코더가 읽는 동일한 WAL로 흐르므로, 이 무효화는 데이터 변경과 커밋 순서가 같다.
프로토콜 메시지 카탈로그
섹션 제목: “프로토콜 메시지 카탈로그”모든 콜백은 logicalrep_write_* 함수(proto.c에 있음)에 타입화된 메시지를 건네며, 이 함수가 1바이트 태그와 페이로드를 ctx->out에 추가한다. 태그는 열거형이다.
// LogicalRepMsgType — src/include/replication/logicalproto.htypedef enum LogicalRepMsgType{ LOGICAL_REP_MSG_BEGIN = 'B', LOGICAL_REP_MSG_COMMIT = 'C', LOGICAL_REP_MSG_ORIGIN = 'O', LOGICAL_REP_MSG_INSERT = 'I', LOGICAL_REP_MSG_UPDATE = 'U', LOGICAL_REP_MSG_DELETE = 'D', LOGICAL_REP_MSG_TRUNCATE = 'T', LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_TYPE = 'Y', LOGICAL_REP_MSG_MESSAGE = 'M', LOGICAL_REP_MSG_BEGIN_PREPARE = 'b', LOGICAL_REP_MSG_PREPARE = 'P', LOGICAL_REP_MSG_COMMIT_PREPARED = 'K', LOGICAL_REP_MSG_ROLLBACK_PREPARED = 'r', LOGICAL_REP_MSG_STREAM_START = 'S', LOGICAL_REP_MSG_STREAM_STOP = 'E', LOGICAL_REP_MSG_STREAM_COMMIT = 'c', LOGICAL_REP_MSG_STREAM_ABORT = 'A', LOGICAL_REP_MSG_STREAM_PREPARE = 'p',} LogicalRepMsgType;튜플의 각 컬럼은 값별로 태그가 붙어 소비자가 null인지, 변경되지 않았는지(TOAST 재로그 없음), 텍스트인지, 바이너리인지 알 수 있다.
// per-column value kinds — src/include/replication/logicalproto.h#define LOGICALREP_COLUMN_NULL 'n'#define LOGICALREP_COLUMN_UNCHANGED 'u'#define LOGICALREP_COLUMN_TEXT 't'#define LOGICALREP_COLUMN_BINARY 'b' /* added in PG14 */'u' 종류는 “변경되지 않은 out-of-line TOAST 열” 최적화의 와이어 표현이다. 큰 TOAST 열을 건드리지 않은 UPDATE는 값을 재전송하는 대신 'u'를 보낸다. 구독자는 해당 컬럼에 기존에 가진 값을 유지한다.
동작 흐름 예시
섹션 제목: “동작 흐름 예시”아래 그림은 발행된 테이블에 행을 삽입하고, 미발행 테이블에서 행을 갱신하는 작은 트랜잭션을 추적한다. 빈 트랜잭션 제거와 지연 스키마가 어디서 발동하는지 보여준다.
flowchart TD
A["reorderbuffer가 txn 커밋<br/>디코더가 버퍼된 변경 재생"] --> B["pgoutput_begin_txn<br/>PGOutputTxnData 할당<br/>sent_begin_txn = false"]
B --> C["pgoutput_change: 발행된 t에 INSERT"]
C --> D{"get_rel_sync_entry(t)<br/>pubactions.pubinsert?"}
D -->|no| Z1["return, 바이트 없음"]
D -->|yes| E{"pgoutput_row_filter<br/>통과?"}
E -->|no| Z2["return, 바이트 없음"]
E -->|yes| F{"txndata->sent_begin_txn?"}
F -->|false| G["pgoutput_send_begin<br/>'B' BEGIN 전송<br/>sent_begin_txn = true"]
F -->|true| H["maybe_send_schema(t)"]
G --> H
H --> I{"schema_sent?"}
I -->|no| J["사용자 타입별 'Y' TYPE 전송<br/>'R' RELATION 전송<br/>schema_sent = true"]
I -->|yes| K["'I' INSERT 전송 (new tuple)"]
J --> K
C2["pgoutput_change: 미발행 u에 UPDATE"] --> D2{"get_rel_sync_entry(u)<br/>pubactions.pubupdate?"}
D2 -->|no| Z3["return, 바이트 없음"]
K --> L["pgoutput_commit_txn"]
Z3 --> L
L --> M{"sent_begin_txn?"}
M -->|true| N["'C' COMMIT 전송"]
M -->|false| O["건너뜀: 빈 트랜잭션 제거"]
그림 1 — pgoutput을 통과하는 하나의 트랜잭션. BEGIN은 테이블·연산·행 필터를 통과한 첫 번째 변경에서만 방출되고, COMMIT은 BEGIN이 전송된 경우에만 방출된다. 미발행 UPDATE는 바이트를 생산하지 않는다. 스키마(TYPE+RELATION)는 릴레이션의 첫 번째 데이터 프레임 앞에 오고, 세션당 릴레이션 단위로 한 번만 전송된다.
소스 워크스루
섹션 제목: “소스 워크스루”아래 모든 심볼은 특별히 명시하지 않는 한 src/backend/replication/pgoutput/pgoutput.c에 있다. 와이어 형식 작성기(logicalrep_write_*)는 src/backend/replication/logical/proto.c에 있으며, pgoutput이 메시지를 넘기는 경계에서만 인용한다. 콜백 상위의 디코딩(리오더 버퍼, 히스토릭 스냅샷, 변경 재조합)은 postgres-logical-decoding.md이고, 이 메시지를 읽는 구독자 측은 postgres-logical-replication-apply.md다.
플러그인 표면과 생명주기 (pgoutput_startup, pgoutput_shutdown)
섹션 제목: “플러그인 표면과 생명주기 (pgoutput_startup, pgoutput_shutdown)”_PG_output_plugin_init은 유일하게 내보내는 심볼이다. 논리 디코딩이 dlsym으로 찾아 한 번 호출해 OutputPluginCallbacks를 채운다(3절). pgoutput_startup은 세션당 첫 번째 콜백이다. 출력을 바이너리 프레이밍으로 고정하고 구독자 옵션을 파싱·검증한다. 버전 최솟값 상수는 logicalproto.h에 있다.
// version floors — src/include/replication/logicalproto.h#define LOGICALREP_PROTO_MIN_VERSION_NUM 1#define LOGICALREP_PROTO_VERSION_NUM 1#define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 /* 스트리밍 */#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3 /* 2단계 커밋 */#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM 4 /* 병렬 apply */#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUMparse_output_parameters는 인식된 키(proto_version, publication_names, binary, messages, streaming, two_phase, origin)만 읽고, streaming/two_phase 기능은 pgoutput_startup에서 버전 최솟값 기준으로 게이팅한다. 협상 버전이 지원하지 않는 기능은 조용한 다운그레이드 없이 ERROR다. pgoutput_shutdown은 단순하다. 세션 단위 컨텍스트는 ctx->context의 자식이므로 디코딩 기계가 해제하고, 콜백은 캐시 컨텍스트 포인터만 리셋한다.
트랜잭션 프레임 (pgoutput_begin_txn, pgoutput_send_begin, pgoutput_commit_txn)
섹션 제목: “트랜잭션 프레임 (pgoutput_begin_txn, pgoutput_send_begin, pgoutput_commit_txn)”pgoutput_begin_txn은 BEGIN을 쓰지 않는다. sent_begin_txn = false로 설정된 트랜잭션 단위 PGOutputTxnData만 할당한다. 실제 BEGIN은 pgoutput_send_begin으로 지연된다. 데이터 콜백이 첫 번째 살아남은 변경에서 지연 호출하는 함수다.
// pgoutput_send_begin — src/backend/replication/pgoutput/pgoutput.cPGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;Assert(!txndata->sent_begin_txn);
OutputPluginPrepareWrite(ctx, !send_replication_origin);logicalrep_write_begin(ctx->out, txn);txndata->sent_begin_txn = true;send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, send_replication_origin);OutputPluginWrite(ctx, true);pgoutput_commit_txn은 같은 플래그를 읽어 BEGIN이 전송되지 않았을 때 트랜잭션 전체를 제거한다. 빈 트랜잭션 최적화의 구체적 구현이다.
// pgoutput_commit_txn — src/backend/replication/pgoutput/pgoutput.csent_begin_txn = txndata->sent_begin_txn;OutputPluginUpdateProgress(ctx, !sent_begin_txn);pfree(txndata);txn->output_plugin_private = NULL;
if (!sent_begin_txn){ elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid); return; /* 'C' COMMIT 미작성 */}
OutputPluginPrepareWrite(ctx, true);logicalrep_write_commit(ctx->out, txn, commit_lsn);OutputPluginWrite(ctx, true);OutputPluginUpdateProgress(ctx, !sent_begin_txn) 호출에 주목하라. 트랜잭션이 제거되었을 때도 pgoutput은 진행을 표시해 소비자가 바이트를 보지 않고도 슬롯의 confirmed_flush가 관련 없는 트랜잭션 연속을 넘어 전진할 수 있게 한다. 2단계 경로(pgoutput_begin_prepare_txn, pgoutput_prepare_txn, pgoutput_commit_prepared_txn, pgoutput_rollback_prepared_txn)는 같은 구조를 따르되 'b'/'P'/'K'/'r' 프레임을 쓴다.
DML 핫 패스 (pgoutput_change)
섹션 제목: “DML 핫 패스 (pgoutput_change)”pgoutput_change는 스트리밍·비스트리밍 모드 모두에서 행 변경마다 한 번 호출되는 가장 바쁜 콜백이다. 골격은 다음과 같다. 발행 불가 릴레이션을 거부하고, 메모이즈된 RelationSyncEntry를 가져오고, 테이블·연산 필터를 적용하고, pubviaroot이면 파티션 루트로 리다이렉트하고, 행 필터를 적용한 뒤, 지연 BEGIN과 스키마를 보내고, 타입화된 데이터 프레임을 쓴다.
// pgoutput_change — src/backend/replication/pgoutput/pgoutput.c (abridged)if (!is_publishable_relation(relation)) return;if (data->in_streaming) xid = change->txn->xid; /* 스트리밍된 변경마다 xid 태그 */
relentry = get_rel_sync_entry(data, relation);
/* 테이블 필터 먼저 확인 */switch (action){ case REORDER_BUFFER_CHANGE_INSERT: if (!relentry->pubactions.pubinsert) return; break; case REORDER_BUFFER_CHANGE_UPDATE: if (!relentry->pubactions.pubupdate) return; break; case REORDER_BUFFER_CHANGE_DELETE: if (!relentry->pubactions.pubdelete) return; if (!change->data.tp.oldtuple) return; /* RI 없음 → 전송 불가 */ break;}publish_as_relid로 리다이렉트하고 attrmap으로 튜플을 재매핑한 뒤, 행 필터를 평가하고 방출한다. BEGIN/스키마가 강제로 보내지는 곳이 바로 여기다.
// pgoutput_change — src/backend/replication/pgoutput/pgoutput.c (continued)if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action)) goto cleanup;
if (txndata && !txndata->sent_begin_txn) /* 지연 BEGIN */ pgoutput_send_begin(ctx, txn);
maybe_send_schema(ctx, change, relation, relentry); /* 지연 스키마 */
OutputPluginPrepareWrite(ctx, true);switch (action){ case REORDER_BUFFER_CHANGE_INSERT: logicalrep_write_insert(ctx->out, xid, targetrel, new_slot, data->binary, relentry->columns, relentry->include_gencols_type); break; case REORDER_BUFFER_CHANGE_UPDATE: logicalrep_write_update(ctx->out, xid, targetrel, old_slot, new_slot, data->binary, relentry->columns, relentry->include_gencols_type); break; case REORDER_BUFFER_CHANGE_DELETE: logicalrep_write_delete(ctx->out, xid, targetrel, old_slot, data->binary, relentry->columns, relentry->include_gencols_type); break;}OutputPluginWrite(ctx, true);action은 pgoutput_row_filter에 참조 전달되는 매개변수다. UPDATE가 이 함수를 거치면 INSERT 또는 DELETE로 바뀔 수 있다(다음 절). 위 switch는 재작성된 action을 반영한다. 전체 본문은 data->context에서 실행되며, 변경마다 마지막에 MemoryContextReset되는 변경 단위 아레나(arena)다.
ExprState를 통한 WHERE 행 필터 (pgoutput_row_filter_init, pgoutput_row_filter)
섹션 제목: “ExprState를 통한 WHERE 행 필터 (pgoutput_row_filter_init, pgoutput_row_filter)”발행의 WHERE (...) 절은 pubaction별로 ExprState로 컴파일되어 항목에 캐시된다. 필터링 가능한 action은 정확히 세 가지이며 map_changetype_pubaction[]으로 인덱싱된다.
// pgoutput_row_filter — src/backend/replication/pgoutput/pgoutput.cstatic const int map_changetype_pubaction[] = { [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT, [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE, [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE};filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];if (!filter_exprstate) return true; /* 필터 없음 → 항상 복제 */UPDATE의 경우 이전 이미지와 새 이미지를 모두 확인해 구독자의 복사본을 조건자와 일관되게 유지하기 위해 변경을 변환할 수 있다(4가지 케이스).
// pgoutput_row_filter — src/backend/replication/pgoutput/pgoutput.c (UPDATE 변환)old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt /* old */);new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt /* new */);
if (!old_matched && !new_matched) /* 케이스 1 */ return false; /* 제거 */if (!old_matched && new_matched) /* 케이스 2: 행이 필터 진입 */{ *action = REORDER_BUFFER_CHANGE_INSERT; if (tmp_new_slot) *new_slot_ptr = tmp_new_slot;}else if (old_matched && !new_matched) /* 케이스 3: 행이 필터 이탈 */ *action = REORDER_BUFFER_CHANGE_DELETE;/* 케이스 4: 둘 다 매치 → UPDATE 유지 */return true;NULL 필터 결과는 false로 처리한다(pgoutput_row_filter_exec_expr). 평가기는 엔진 자체 표현식 기계다. create_estate_for_relation이 구성한 EState + ExprContext로, 쿼리가 사용하는 것과 동일하다. 행 필터가 복제 아이덴티티로 덮인 컬럼만 참조할 수 있고 그 외에는 CREATE PUBLICATION 시 거부되는 이유가 여기에 있다.
지연 스키마 디스패치 (maybe_send_schema, send_relation_and_attrs)
섹션 제목: “지연 스키마 디스패치 (maybe_send_schema, send_relation_and_attrs)”maybe_send_schema는 릴레이션 디스크립터를 최대 한 번 전송하는 게이트다. 비스트리밍 트랜잭션에서는 항목의 schema_sent bool을 사용하고, 스트리밍 트랜잭션에서는 최상위 xid별 집합(streamed_txns)을 사용한다. 스트리밍된 트랜잭션은 일반 트랜잭션과 순서 없이 나중에 적용될 수 있으므로, 자신의 스트림 안에서 스키마를 재확립해야 하기 때문이다.
// maybe_send_schema — src/backend/replication/pgoutput/pgoutput.cif (data->in_streaming) schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);else schema_sent = relentry->schema_sent;if (schema_sent) return;
/* 조상으로 발행하는 경우 조상 스키마를 먼저 전송 */if (relentry->publish_as_relid != RelationGetRelid(relation)){ Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid); send_relation_and_attrs(ancestor, xid, ctx, relentry); RelationClose(ancestor);}send_relation_and_attrs(relation, xid, ctx, relentry);
if (data->in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid);else relentry->schema_sent = true;send_relation_and_attrs는 발행된 컬럼이 참조하는 사용자 정의 타입마다 'Y' TYPE 메시지를 방출한다. att->atttypid < FirstGenbkiObjectId 조건이 빌트인 타입(int4 등)을 제외한다. 구독자가 이미 알고 있는 타입이기 때문이다. 그런 다음 네임스페이스·이름·복제 아이덴티티 char·컬럼 목록을 반영한 속성별 정보를 담은 'R' RELATION 메시지 하나를 방출한다.
// send_relation_and_attrs — src/backend/replication/pgoutput/pgoutput.cfor (i = 0; i < desc->natts; i++){ Form_pg_attribute att = TupleDescAttr(desc, i); if (!logicalrep_should_publish_column(att, columns, include_gencols_type)) continue; if (att->atttypid < FirstGenbkiObjectId) /* 빌트인 타입 건너뜀 */ continue; OutputPluginPrepareWrite(ctx, false); logicalrep_write_typ(ctx->out, xid, att->atttypid); OutputPluginWrite(ctx, false);}OutputPluginPrepareWrite(ctx, false);logicalrep_write_rel(ctx->out, xid, relation, columns, include_gencols_type);OutputPluginWrite(ctx, false);컬럼 목록과 컬럼별 와이어 종류 (pgoutput_column_list_init, logicalrep_write_tuple)
섹션 제목: “컬럼 목록과 컬럼별 와이어 종류 (pgoutput_column_list_init, logicalrep_write_tuple)”pgoutput_column_list_init은 구독한 모든 발행을 순회해 릴레이션의 컬럼 목록 비트맵을 확인한다. 구독이 결합하는 발행들에서 같은 테이블에 서로 다른 컬럼 목록이 있으면 ERROR다. bms_equal로 감지한다.
// pgoutput_column_list_init — src/backend/replication/pgoutput/pgoutput.cif (first){ entry->columns = cols; first = false;}else if (!bms_equal(entry->columns, cols)) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot use different column lists for table \"%s.%s\" in different publications", get_namespace_name(RelationGetNamespace(relation)), RelationGetRelationName(relation)));entry->columns == NULL이면 전체 컬럼이다. 실제 프로젝션은 logicalrep_write_tuple에서 이루어지며, 컬럼별 값 종류('n' null, 'u' unchanged-toast, 't' text, 'b' binary)도 이 함수가 결정한다.
// logicalrep_write_tuple — src/backend/replication/logical/proto.c (abridged)if (isnull[i]) { pq_sendbyte(out, LOGICALREP_COLUMN_NULL); continue; }if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i])){ pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED); /* 'u' — TOAST 재전송 없음 */ continue;}if (binary && OidIsValid(typclass->typsend)){ pq_sendbyte(out, LOGICALREP_COLUMN_BINARY); /* 'b' */ /* ... typsend 출력 ... */}else{ pq_sendbyte(out, LOGICALREP_COLUMN_TEXT); /* 't' */ /* ... typoutput 텍스트 ... */}'u' 분기는 “변경되지 않은 out-of-line TOAST 값은 재로그하지 않는다”는 최적화의 와이어 표현이다. 구독자는 해당 컬럼에 이미 가지고 있는 값을 유지한다.
진행 중인 트랜잭션 스트리밍 (pgoutput_stream_start/stop/abort/commit)
섹션 제목: “진행 중인 트랜잭션 스트리밍 (pgoutput_stream_start/stop/abort/commit)”streaming이 활성화되면 논리 디코딩이 긴 진행 중 트랜잭션을 청크로 스필한다. pgoutput_stream_start는 data->in_streaming을 뒤집고 최상위 xid와 함께 'S' STREAM_START를 쓰며, 이것이 첫 번째 세그먼트인지 표시해 원점(origin)이 한 번만 전송되도록 한다.
// pgoutput_stream_start — src/backend/replication/pgoutput/pgoutput.cAssert(!data->in_streaming); /* 중첩 없음 */if (rbtxn_is_streamed(txn)) send_replication_origin = false;OutputPluginPrepareWrite(ctx, !send_replication_origin);logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr, send_replication_origin);OutputPluginWrite(ctx, true);data->in_streaming = true; /* 공유 함수들이 xid를 태그함 */in_streaming이 바로 공유 pgoutput_change/pgoutput_message/pgoutput_truncate 함수들이 각 프레임에 xid를 붙일지 결정하는 비트다. pgoutput_stream_stop은 이를 클리어하고 'E'를 쓴다. abort/commit 쌍이 스풀된 트랜잭션을 안전하게 만든다.
// pgoutput_stream_abort — src/backend/replication/pgoutput/pgoutput.cbool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);Assert(!data->in_streaming);toptxn = rbtxn_get_toptxn(txn);Assert(rbtxn_is_streamed(toptxn));OutputPluginPrepareWrite(ctx, true);logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn, txn->xact_time.abort_time, write_abort_info);OutputPluginWrite(ctx, true);cleanup_rel_sync_cache(toptxn->xid, false); /* 스트리밍 스키마 상태 제거 */'A' STREAM_ABORT는 최상위 xid와 서브 xid를 모두 담아 구독자가 스풀된 트랜잭션의 어보트된 서브트리만 정확히 폐기할 수 있도록 한다. 'c' STREAM_COMMIT(pgoutput_stream_commit)은 적용을 지시한다. 두 경로 모두 cleanup_rel_sync_cache를 호출해 해당 최상위 xid의 streamed_txns 부기를 제거한다. 이후 트랜잭션은 자신의 스트림에서 스키마를 재전송한다.
flowchart TD
S0["pgoutput_stream_start<br/>'S' + topxid 전송<br/>in_streaming = true"] --> S1["pgoutput_change (xid 태그)<br/>'I'/'U'/'D' + xid 접두사"]
S1 --> S2["pgoutput_stream_stop<br/>'E' 전송<br/>in_streaming = false"]
S2 --> S3{"청크 더 있음?"}
S3 -->|yes| S0
S3 -->|txn 어보트| A["pgoutput_stream_abort<br/>'A' topxid+subxid 전송<br/>cleanup_rel_sync_cache"]
S3 -->|txn 커밋| C["pgoutput_stream_commit<br/>'c' 전송<br/>cleanup_rel_sync_cache"]
A --> END["구독자가<br/>스풀된 서브트리 폐기"]
C --> END2["구독자가<br/>스풀된 트랜잭션 원자적 적용"]
그림 2 — 스트리밍 상태 기계. 대형 트랜잭션은 반복되는 STREAM_START/STREAM_STOP 청크로 방출되며, 각 데이터 프레임은 xid 태그가 붙어 구독자가 xid별로 스풀한다. 트랜잭션은 마지막 STREAM_STOP 이후에 도착하는 단 하나의 STREAM_ABORT(폐기) 또는 STREAM_COMMIT(적용)으로 정확히 한 번 해소된다. abort/commit 중에는 data->in_streaming이 false다.
릴레이션 동기화 캐시와 무효화 (init_rel_sync_cache, get_rel_sync_entry)
섹션 제목: “릴레이션 동기화 캐시와 무효화 (init_rel_sync_cache, get_rel_sync_entry)”init_rel_sync_cache는 RelationSyncCache HTAB을 구성하고 두 가지 무효화 콜백을 등록한다. rel_sync_cache_relation_cb(릴캐시)는 항목 하나의 replicate_valid를 클리어하고, rel_sync_cache_publication_cb와 publication_invalidation_cb(syscache)는 전역 publications_valid = false를 설정해 모든 항목을 재확인하게 한다. get_rel_sync_entry는 3절에서 이미 보인 지연 (재)구성자다. 핵심 속성은 발행된 테이블의 DDL이 디코더가 재생하는 바로 그 WAL로 흐른다는 점이다. 이 무효화는 데이터 변경과 커밋 순서가 같다. 캐시가 논리적으로 DDL 이전에 일어난 변경에 오래된 스키마를 제공하는 일은 없다.
위치 힌트 (2026-06-05 기준, REL_18 273fe94)
섹션 제목: “위치 힌트 (2026-06-05 기준, REL_18 273fe94)”| 심볼 | 파일 | 라인 |
|---|---|---|
_PG_output_plugin_init | src/backend/replication/pgoutput/pgoutput.c | 261 |
parse_output_parameters | src/backend/replication/pgoutput/pgoutput.c | 290 |
pgoutput_startup | src/backend/replication/pgoutput/pgoutput.c | 449 |
pgoutput_begin_txn | src/backend/replication/pgoutput/pgoutput.c | 594 |
pgoutput_send_begin | src/backend/replication/pgoutput/pgoutput.c | 608 |
pgoutput_commit_txn | src/backend/replication/pgoutput/pgoutput.c | 630 |
pgoutput_prepare_txn | src/backend/replication/pgoutput/pgoutput.c | 679 |
maybe_send_schema | src/backend/replication/pgoutput/pgoutput.c | 725 |
send_relation_and_attrs | src/backend/replication/pgoutput/pgoutput.c | 796 |
pgoutput_row_filter_init | src/backend/replication/pgoutput/pgoutput.c | 916 |
check_and_init_gencol | src/backend/replication/pgoutput/pgoutput.c | 1063 |
pgoutput_column_list_init | src/backend/replication/pgoutput/pgoutput.c | 1122 |
pgoutput_row_filter | src/backend/replication/pgoutput/pgoutput.c | 1301 |
pgoutput_change | src/backend/replication/pgoutput/pgoutput.c | 1482 |
pgoutput_truncate | src/backend/replication/pgoutput/pgoutput.c | 1654 |
pgoutput_message | src/backend/replication/pgoutput/pgoutput.c | 1722 |
pgoutput_stream_start | src/backend/replication/pgoutput/pgoutput.c | 1838 |
pgoutput_stream_stop | src/backend/replication/pgoutput/pgoutput.c | 1870 |
pgoutput_stream_abort | src/backend/replication/pgoutput/pgoutput.c | 1891 |
pgoutput_stream_commit | src/backend/replication/pgoutput/pgoutput.c | 1924 |
init_rel_sync_cache | src/backend/replication/pgoutput/pgoutput.c | 1972 |
get_rel_sync_entry | src/backend/replication/pgoutput/pgoutput.c | 2052 |
RelationSyncEntry (struct) | src/backend/replication/pgoutput/pgoutput.c | 126 |
PGOutputTxnData (struct) | src/backend/replication/pgoutput/pgoutput.c | 214 |
logicalrep_write_begin | src/backend/replication/logical/proto.c | 49 |
logicalrep_write_commit | src/backend/replication/logical/proto.c | 78 |
logicalrep_write_insert | src/backend/replication/logical/proto.c | 403 |
logicalrep_write_update | src/backend/replication/logical/proto.c | 450 |
logicalrep_write_delete | src/backend/replication/logical/proto.c | 528 |
logicalrep_write_rel | src/backend/replication/logical/proto.c | 667 |
logicalrep_write_tuple | src/backend/replication/logical/proto.c | 767 |
logicalrep_write_stream_start | src/backend/replication/logical/proto.c | 1061 |
LOGICAL_REP_MSG_BEGIN (enum) | src/include/replication/logicalproto.h | 59 |
LOGICALREP_COLUMN_NULL | src/include/replication/logicalproto.h | 96 |
LOGICALREP_PROTO_*_VERSION_NUM | src/include/replication/logicalproto.h | 40-45 |
소스 검증 (2026-06-05 기준)
섹션 제목: “소스 검증 (2026-06-05 기준)”검증된 사실
섹션 제목: “검증된 사실”-
_PG_output_plugin_init이 전체OutputPluginCallbacks테이블을 채우고, 스트리밍 change/message/truncate 슬롯은 비스트리밍 함수를 재사용한다. 직접 확인:cb->stream_change_cb = pgoutput_change,cb->stream_message_cb = pgoutput_message,cb->stream_truncate_cb = pgoutput_truncate. 스트리밍/비스트리밍 동작의 차이는 별도 함수가 아니라data->in_streaming플래그다. -
pgoutput은 바이너리 프레이밍을 무조건 사용하며, 컬럼별 바이너리는 별도의 선택적 모드다.
pgoutput_startup이opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT을 설정한다.binary옵션(data->binary)은logicalrep_write_tuple안에서 개별 datum이typsend('b')를 사용할지typoutput텍스트('t')를 사용할지만 결정한다. -
기능 최솟값: 스트리밍은 proto v2, 2단계 커밋은 v3, 병렬 스트리밍은 v4 필요;
LOGICALREP_PROTO_MAX_VERSION_NUM은 4다.logicalproto.h의#define과pgoutput_startup의 게이팅if래더로 확인했다. 협상 버전보다 높은 기능을 요청하면ERROR다. -
BEGIN은 첫 번째 살아남은 변경까지 지연되고, 빈 트랜잭션에서는COMMIT이 제거된다.pgoutput_begin_txn(상태만 할당),pgoutput_send_begin(!txndata->sent_begin_txn조건 하에pgoutput_change/pgoutput_truncate/pgoutput_message에서 호출),pgoutput_commit_txn(!sent_begin_txn일 때DEBUG1“skipped …empty transaction” 로그와 함께 조기return)으로 확인했다. -
테이블·연산 필터가 행 필터보다 먼저 실행되고, 행 필터가
BEGIN/스키마보다 먼저 실행된다.pgoutput_change의 문장 순서로 확인:pubactions스위치(조기return) →pgoutput_row_filter(goto cleanup) →pgoutput_send_begin→maybe_send_schema→ 데이터 쓰기. -
UPDATE는 행 필터에 의해INSERT또는DELETE로 재작성될 수 있다.pgoutput_row_filter의 4-케이스 로직(*action을 참조로 작성)과 함수 위 주석의 케이스 1-4 테이블로 확인했다.pgoutput_change의 최종switch는 재작성된action에 따라 디스패치한다. -
스키마는 세션당 릴레이션 단위로 최대 한 번 전송된다. 일반 트랜잭션은
schema_sent로, 스트리밍 트랜잭션은 최상위 xid별streamed_txns로 추적한다.maybe_send_schema(get_schema_sent_in_streamed_txn/relentry->schema_sent)로 확인했다. pubviaroot로 발행할 때 조상 스키마가 릴레이션 자체 스키마보다 먼저 전송된다. -
사용자 정의 타입만
'Y'TYPE 메시지를 받는다.send_relation_and_attrs의att->atttypid < FirstGenbkiObjectId테스트로 확인했다. 빌트인 타입의 OID는 메이저 버전 간에 안정적이어서 구독자가 이미 알고 있다. -
변경되지 않은 TOAST
'u'컬럼 종류는 out-of-line 값의 재전송을 피한다.logicalrep_write_tuple의att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i])→LOGICALREP_COLUMN_UNCHANGED분기로 확인했다. 행 필터는 조건자 평가에 해당 컬럼이 필요하면 이전 튜플에서 복사한다. -
스트리밍된 변경은 xid 태그가 붙고,
STREAM_ABORT는 최상위 xid와 서브 xid를 모두 담는다. abort/commit은 스트리밍 블록 바깥에서 발생한다.pgoutput_stream_start/_stop에서data->in_streaming세트/클리어,pgoutput_stream_abort/_commit의Assert(!data->in_streaming),logicalrep_write_stream_abort(..., toptxn->xid, txn->xid, ...), 두 해소 경로의cleanup_rel_sync_cache호출로 확인했다. -
결합된 발행에서 같은 테이블에 서로 다른 컬럼 목록을 사용하면 거부된다.
pgoutput_column_list_init의bms_equal불일치가ERRCODE_FEATURE_NOT_SUPPORTED를 발생시킨다. -
복제 아이덴티티 없는
DELETE(이전 튜플 없음)는 조용히 제거된다.pgoutput_change의REORDER_BUFFER_CHANGE_DELETE분기:if (!change->data.tp.oldtuple) return;이DEBUG1로그와 함께 조기 반환한다.
미해결 질문
섹션 제목: “미해결 질문”-
스트리밍된 트랜잭션마다 시작 시 스키마를 항상 재전송하는 것이 낭비인지 여부.
maybe_send_schema주석이 이를 명시적으로 지적한다(“XXX There is a scope of optimization here … we always send the schema first time in a streaming transaction”). 혼합 스트리밍/비스트리밍 워크로드에 대한 우려와 연결되어 있다. 조사 경로: 스키마가 안정된 스트림 집약적 워크로드에서'R'/'Y'바이트 오버헤드를 측정한다. -
행 필터가 있는 churn이 심한 발행에서 릴레이션별
EState/ExprState구성 비용.pgoutput_row_filter_init+create_estate_for_relation은 캐시 (재)구성 시 실행된다. DDL은 적지만 무효화가 잦은 워크로드에서 발행/릴캐시 무효화가 재구성을 얼마나 자주 강제하는지는 여기서 분석하지 않았다. 조사 경로:get_rel_sync_entry재구성 빈도를 무효화 원인별로 계측한다. -
publish_as_relid리다이렉션과 깊은 파티션 계층의 상호작용. pubviaroot로 발행된 리프 파티션의 각 변경은attrmap재매핑과RelationIdGetRelation조상 조회를 치른다. 다단계 계층에서의 변경 단위 비용은 측정되지 않았다. 조사 경로: 플랫 테이블 대 N단계 파티셔닝된 루트에서pgoutput_change비용을 비교한다.
PostgreSQL 너머 — 비교 설계와 연구 전선
섹션 제목: “PostgreSQL 너머 — 비교 설계와 연구 전선”-
동일 디코더 위의 Debezium / Kafka Connect. Debezium은
pgoutput또는wal2json/decoderbufs로 PostgreSQL 논리 디코딩을 소비한 뒤 JSON/Avro로 재인코딩해 Kafka 토픽에 스키마 레지스트리와 함께 발행한다. 대조가 시사적이다. pgoutput은 필터링(테이블/연산/컬럼/행)을 생산자에 밀어넣어 점대점 링크의 대역폭을 절감한다. Kafka 중심 CDC 파이프라인은 보통 모든 것을 전송하고 스트림 처리 계층에서 필터링한다. 각 시스템이 필터를 어디에 배치하는지를 나란히 비교하면 대역폭 대 유연성 트레이드오프가 선명해진다(Kleppmann, DDIA 11장 “Change Data Capture”). -
MySQL binlog 행 형식과 pgoutput RELATION/TYPE 방식. MySQL의 ROW-format binlog는 행 이벤트 전에
TABLE_MAP_EVENT를 전송해 숫자 테이블 id를 스키마에 바인딩한다. pgoutput의 릴레이션 OID로 키된'R'RELATION 프레임과 구조적으로 동일한 지연·메모이즈된 인밴드 스키마 디스패치다. 각 시스템이 스트림 중간의 DDL 변경을 어떻게 처리하는지(binlog의 테이블맵 재방출 대 pgoutput의 무효화 주도schema_sent클리어) 비교하면 집중된 동반 연구가 된다. -
Oracle GoldenGate / LogMiner와 논리적 로그 마이닝. GoldenGate는 리두를 자체 필터링·변환 DSL을 갖춘 벤더 트레일 형식으로 마이닝한다. PostgreSQL은 의도적으로 플러그인을 얇게 유지하고 변환을 구독자 측에 위임한다. 출력 플러그인 대 apply 측 중 얼마나 많은 로직이 어디에 있어야 하는가의 아키텍처 질문이 바로
pgoutput대postgres-logical-replication-apply.md경계다. -
전진 호환 메커니즘으로서의 프로토콜 버전 관리. pgoutput의
proto_version기능별 최솟값 방식(스트리밍@2, 2단계 커밋@3, 병렬@4)은 단순하고 단조로운 기능 협상이다. 메시지 스키마가 호환성을 담당하는 Avro/Confluent 스키마 레지스트리 기반 진화와 대조된다. 단일 협상 버전 정수와 메시지별 자기 기술 스키마의 트레이드오프를 설명하면, 버전이 쌍으로 묶인 인코어 발행자/구독자가 더 단순한 방식으로 가능한 이유를 드러낼 것이다. -
밀어넣기 조건자로서의 행 필터. pgoutput은 발행의
WHERE절을 엔진 자체ExprState로 컴파일해 변경마다 평가한다. FDW가WHERE를 원격에 밀어넣는 것과 유사한 생산자 측 조건자 푸시다운이다(postgres-fdw.md). 연구 관점의 질문은, apply 측 작업(예: 충돌 감지)을 더 많이 플러그인으로 밀어넣어 구독자 상태와의 결합 없이 처리할 수 있는지다.
트리 내 소스 파일 (REL_18_STABLE, commit 273fe94)
섹션 제목: “트리 내 소스 파일 (REL_18_STABLE, commit 273fe94)”src/backend/replication/pgoutput/pgoutput.c— 플러그인 본체:_PG_output_plugin_init,pgoutput_startup/_shutdown,pgoutput_begin_txn/_send_begin/_commit_txn, 2단계 콜백,pgoutput_change,pgoutput_truncate,pgoutput_message,pgoutput_row_filter(_init),pgoutput_column_list_init,maybe_send_schema,send_relation_and_attrs, 스트리밍 콜백,RelationSyncCache/RelationSyncEntry와get_rel_sync_entry.src/backend/replication/logical/proto.c— 와이어 작성기/읽기:logicalrep_write_begin/_commit,logicalrep_write_insert/_update/_delete,logicalrep_write_rel/_typ,logicalrep_write_tuple, 스트림 프레이밍 작성기.src/include/replication/logicalproto.h—LogicalRepMsgType태그 열거형,LOGICALREP_COLUMN_*값 종류,LOGICALREP_PROTO_*_VERSION_NUM기능 최솟값.src/include/replication/pgoutput.h—PGOutputData, 옵션 필드,PublishGencolsType.src/include/replication/output_plugin.h— pgoutput이 구현하는 콜백 경계OutputPluginCallbacks.src/include/replication/reorderbuffer.h— 각 콜백이 받는ReorderBufferTXN/ReorderBufferChange.
동반 문서 (이 트리)
섹션 제목: “동반 문서 (이 트리)”postgres-logical-decoding.md— pgoutput이 구현하는 콜백을 생산하는 리오더 버퍼, 히스토릭 스냅샷, 변경 재조합.postgres-logical-replication-apply.md— pgoutput이 쓰는 메시지를 소비하는 구독자 apply 워커.postgres-replication-slots.md— 디코딩 세션의restart_lsn/confirmed_flush를 고정하는 슬롯.postgres-wal-sender-receiver.md— pgoutput의 출력을 복제 프로토콜로 전송하는 walsender; 물리 복제와의 대조.postgres-toast.md—'u'컬럼 종류로 나타나는 “변경 없음” 최적화의 원천인 out-of-line 저장소.postgres-partitioning.md—publish_as_relid와attrmap재매핑의 원천인 파티션 루트와 pubviaroot.
논문 및 교재 챕터
섹션 제목: “논문 및 교재 챕터”- Kleppmann, M. (2017). Designing Data-Intensive Applications, 11장 “스트림 처리”, §“Change Data Capture” — 복제 로그를 이벤트 스트림 원천으로 규정; 인코딩은 출력 플러그인의 선택으로 남김.
raw/system/textbooks/하에 캡처됨. - Silberschatz, Korth & Sudarshan (2020). Database System Concepts, 7e — 복제와 복구 기초;
knowledge/research/dbms-general/. - Petrov, A. (2019). Database Internals, Part II — 변경 스트림이 생산되는 방식의 배경인 WAL과 로그 전송;
dbms-papers/dbms-general캡처.