Skip to content

PostgreSQL pgoutput — The Built-in Logical Replication Output Plugin

Contents:

Logical replication answers a question physical replication cannot: how does a consumer that does not share the producer’s on-disk page layout stay current with the producer’s committed changes? Physical (streaming) replication ships raw WAL bytes — same block layout, same byte offsets — so the standby must be byte-for-byte compatible (postgres-wal-sender-receiver.md). Logical replication ships row-level change events described in the vocabulary of the schema — “an INSERT of (1,'a') into table t” — so the consumer can be a different major version, a different schema, or not even PostgreSQL at all.

That row-level event stream is produced by logical decoding (postgres-logical-decoding.md): a reorder buffer reassembles the physically-interleaved WAL into transaction-atomic, commit-ordered change streams, decodes each redo record against a historic (time-consistent) catalog snapshot, and invokes a set of output-plugin callbacksbegin, change, commit, and their streaming/two-phase siblings — once per logical event. The output plugin is the last mile. Its sole job is serialization: turn the in-memory ReorderBufferChange / ReorderBufferTXN objects handed to each callback into a byte stream the consumer can parse. Logical decoding decides what and when; the output plugin decides how it looks on the wire.

Kleppmann’s Designing Data-Intensive Applications (ch. 11, “Stream Processing”, §“Change Data Capture”) frames the whole pipeline as turning a database into “a source of an event stream” by parsing the replication log. The chapter is explicit that a usable change stream is delivered “in the same order as they were written” and that the consumer is “a log consumer” kept asynchronous so it does not slow the source. But Kleppmann deliberately stops short of specifying the encoding of the events — that is an implementation choice, and it is exactly the choice an output plugin makes. Two systems can run the identical decoder and differ entirely in their plugin: PostgreSQL ships test_decoding (a human-readable debug format) and pgoutput (the compact binary format the built-in subscriber speaks) over the same LogicalDecodingContext.

The design space an output-plugin author works within:

  1. Encoding: text vs. binary. A text format (one line per change, SQL-ish) is debuggable but verbose and lossy for binary types. A binary format is compact and type-faithful but opaque. pgoutput is binary by default (OUTPUT_PLUGIN_BINARY_OUTPUT), with an optional per-column binary mode that sends datums in their type’s binary send/recv format rather than text.

  2. Schema delivery: eager vs. lazy, in-band vs. out-of-band. The consumer needs the table’s column names and types to interpret a row. The plugin can ship the full catalog up front, or send each relation’s descriptor in-band the first time that relation appears, and not again. pgoutput does the latter via RELATION/TYPE messages gated by a schema_sent flag.

  3. Filtering: where and what. A subscriber rarely wants every change to every table. The plugin can filter by table (publication membership), by operation (pubinsert/pubupdate/pubdelete/ pubtruncate), by column (column lists — send only a subset of columns), and by row (WHERE row filters — send only matching rows). Pushing filtering into the plugin saves bandwidth and avoids decoding work the consumer would discard.

  4. In-progress (streamed) transactions. A large transaction need not be buffered in full before any byte leaves the producer. The plugin can stream sub-batches of an uncommitted transaction, framed by STREAM_START/STREAM_STOP, tagging each change with its xid so the consumer can spool — and discard on abort — without ever seeing a COMMIT. This caps memory on both ends at the cost of consumer-side spooling.

  5. Two-phase commit. For distributed atomicity the plugin can surface PREPARE/COMMIT PREPARED/ROLLBACK PREPARED as first-class messages, so the subscriber prepares locally and the global coordinator decides.

pgoutput makes one concrete choice in each axis, versioned by a proto_version number negotiated at startup so that an old subscriber and a new publisher agree on which message shapes are legal.

Log-based change-data-capture systems — Debezium, MySQL’s binlog replication, Oracle GoldenGate, SQL Server replication — converge on a recognizable set of conventions. Naming them makes pgoutput read as one set of choices within a shared playbook rather than as PostgreSQL trivia.

The decoder and the serializer are almost always separated by a callback interface: the decoder reassembles transactions and calls on_begin, on_row_change, on_commit. This keeps the expensive, correctness- critical decoding logic in one place and lets the format be swapped (JSON for Debezium, Avro, protobuf, a native binary frame) without touching the decoder. PostgreSQL’s OutputPluginCallbacks struct is exactly this boundary.

The wire stream is a sequence of length-or-tag-delimited messages, each led by a one-byte (or one-field) type tagB for begin, I for insert, and so on. The consumer is a simple dispatch loop over the tag. Self-description matters because the same channel carries control frames (begin/commit), schema frames (relation/type), and data frames (insert/update/delete) intermixed.

Sending the full catalog up front is wasteful when a session touches a handful of tables. The universal optimization is to send a relation’s descriptor the first time a change for it appears and remember that it was sent — keyed by relation OID in a per-session cache — so subsequent changes reference the relation by a compact id. Schema invalidation (DDL on the table) clears the “sent” bit so the next change re-sends.

Bandwidth between producer and consumer is the scarce resource, so the producer drops uninteresting changes before serialization: by table, by operation, by column projection, and by row predicate. The predicate evaluator typically reuses the engine’s own expression machinery (so a row filter is the same WHERE evaluator a query would use), which is why pgoutput builds an ExprState and an EState per relation.

A transaction that touches only tables the consumer doesn’t subscribe to produces no data frames. Emitting a bare BEGIN/COMMIT pair for it wastes a round trip’s worth of bytes per such transaction — and on a busy system most transactions may be irrelevant to a given subscriber. The common trick is to defer the BEGIN until the first change that will actually be sent, and to skip COMMIT if no BEGIN was sent.

To bound memory, large transactions are emitted incrementally before commit, with each fragment tagged by transaction id and bracketed by start/stop markers, and an explicit abort signal so the consumer can discard a spooled fragment that never commits. Every CDC system that supports “large transaction” handling reinvents this framing.

ConceptPostgreSQL name
Output-plugin callback tableOutputPluginCallbacks (filled by _PG_output_plugin_init)
Plugin entry point_PG_output_plugin_init in pgoutput.c
Per-session plugin statePGOutputData (ctx->output_plugin_private)
Per-transaction plugin statePGOutputTxnData (txn->output_plugin_private)
Begin / commit control frames'B' LOGICAL_REP_MSG_BEGIN, 'C' LOGICAL_REP_MSG_COMMIT
Schema frames'R' LOGICAL_REP_MSG_RELATION, 'Y' LOGICAL_REP_MSG_TYPE
Row-change frames'I'/'U'/'D' INSERT/UPDATE/DELETE, 'T' TRUNCATE
Logical-message frame'M' LOGICAL_REP_MSG_MESSAGE
Streaming frames'S'/'E'/'A'/'c' STREAM_START/STOP/ABORT/COMMIT
Two-phase frames'b'/'P'/'K'/'r'/'p' BEGIN_PREPARE/PREPARE/COMMIT_PREPARED/ROLLBACK_PREPARED/STREAM_PREPARE
Per-relation schema cacheRelationSyncCache (HTAB of RelationSyncEntry)
“Schema already sent” memoRelationSyncEntry.schema_sent (+ streamed_txns)
Operation filterRelationSyncEntry.pubactions (PublicationActions)
Column listRelationSyncEntry.columns (Bitmapset)
Row filterRelationSyncEntry.exprstate[NUM_ROWFILTER_PUBACTIONS]
Partition-root targetRelationSyncEntry.publish_as_relid
Protocol version negotiationproto_version option, LOGICALREP_PROTO_*_VERSION_NUM

pgoutput is built as a loadable module (it lives in its own directory and declares PG_MODULE_MAGIC_EXT), but it is shipped in-core and selected by naming pgoutput as the slot’s plugin. Logical decoding finds the symbol _PG_output_plugin_init and calls it once to populate the callback table. Every entry is a static function in pgoutput.c:

// _PG_output_plugin_init — src/backend/replication/pgoutput/pgoutput.c
void
_PG_output_plugin_init(OutputPluginCallbacks *cb)
{
cb->startup_cb = pgoutput_startup;
cb->begin_cb = pgoutput_begin_txn;
cb->change_cb = pgoutput_change;
cb->truncate_cb = pgoutput_truncate;
cb->message_cb = pgoutput_message;
cb->commit_cb = pgoutput_commit_txn;
cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
cb->prepare_cb = pgoutput_prepare_txn;
cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown;
/* transaction streaming */
cb->stream_start_cb = pgoutput_stream_start;
cb->stream_stop_cb = pgoutput_stream_stop;
cb->stream_abort_cb = pgoutput_stream_abort;
cb->stream_commit_cb = pgoutput_stream_commit;
cb->stream_change_cb = pgoutput_change; /* reused */
cb->stream_message_cb = pgoutput_message; /* reused */
cb->stream_truncate_cb = pgoutput_truncate; /* reused */
/* transaction streaming - two-phase commit */
cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
}

Note the reuse: stream_change_cb, stream_message_cb and stream_truncate_cb point at the same functions as the non-streaming variants. The difference between “streamed” and “non-streamed” handling is not a different function — it is the PGOutputData.in_streaming flag, set by pgoutput_stream_start, which the shared functions consult to decide whether to tag each message with an xid.

Startup: parse options, negotiate the protocol version

Section titled “Startup: parse options, negotiate the protocol version”

The first callback for a session is pgoutput_startup. It allocates the per-session PGOutputData (anchored at ctx->output_plugin_private), declares the output binary, and — on a real replication start, not slot creation — parses the options the subscriber passed and validates them against the protocol version:

// pgoutput_startup — src/backend/replication/pgoutput/pgoutput.c
PGOutputData *data = palloc0(sizeof(PGOutputData));
// ... create data->context / cachectx / pubctx memory contexts ...
ctx->output_plugin_private = data;
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; /* pgoutput is binary */
if (!is_init)
{
parse_output_parameters(ctx->output_plugin_options, data);
if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM) /* ... ERROR */ ;
if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM) /* ... ERROR */ ;
if (data->streaming == LOGICALREP_STREAM_OFF)
ctx->streaming = false;
else if (data->streaming == LOGICALREP_STREAM_ON &&
data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
/* ... ERROR: streaming needs proto v2 ... */ ;
else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
/* ... ERROR: parallel streaming needs proto v4 ... */ ;
// ... two_phase gated on LOGICALREP_PROTO_TWOPHASE_VERSION_NUM (v3) ...
}

The protocol version is the compatibility hinge. Each capability has a floor: streaming arrived in protocol 2, two-phase in 3, parallel streaming in 4. parse_output_parameters recognizes exactly proto_version, publication_names, binary, messages, streaming, two_phase, and origin, erroring on anything else — the subscriber and publisher must agree on the option vocabulary.

Per-relation state: RelationSyncCache and RelationSyncEntry

Section titled “Per-relation state: RelationSyncCache and RelationSyncEntry”

The hot path is pgoutput_change, once per row change. It must answer, for the change’s relation: is this table published? for this operation? which columns? does the row pass the filter? does the consumer already have the schema? Recomputing that from the catalog per change would be ruinous, so pgoutput memoizes it in a hash table keyed by relation OID, RelationSyncCache, whose values are RelationSyncEntry:

// RelationSyncEntry (struct, abridged) — src/backend/replication/pgoutput/pgoutput.c
typedef struct RelationSyncEntry
{
Oid relid; /* hash key: relation oid */
bool replicate_valid; /* false ⇒ rebuild the entry */
bool schema_sent; /* did we already send RELATION/TYPE? */
PublishGencolsType include_gencols_type;
List *streamed_txns; /* top-xids we sent the schema in */
PublicationActions pubactions; /* insert/update/delete/truncate flags */
ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS]; /* one row filter per action */
EState *estate;
TupleTableSlot *new_slot;
TupleTableSlot *old_slot;
Oid publish_as_relid; /* partition-root target, if pubviaroot */
AttrMap *attrmap; /* partition→ancestor column remap */
Bitmapset *columns; /* column list, or NULL = all columns */
MemoryContext entry_cxt;
} RelationSyncEntry;

get_rel_sync_entry is the lazy builder. On a cache miss (or when replicate_valid was cleared by an invalidation), it loads the subscriber’s publications, walks them to compute pubactions, resolves publish_as_relid (the partition root when pubviaroot is set), and initializes the row filter, column list, and tuple slots:

// get_rel_sync_entry — src/backend/replication/pgoutput/pgoutput.c (abridged)
entry = hash_search(RelationSyncCache, &relid, HASH_ENTER, &found);
if (!found) { /* zero-initialize the new entry */ }
if (!entry->replicate_valid)
{
List *pubids = GetRelationPublications(relid);
List *schemaPubids = GetSchemaPublications(get_rel_namespace(relid));
// ... reset schema_sent, pubactions, columns, slots, exprstate ...
foreach(lc, data->publications)
{
Publication *pub = lfirst(lc);
// ... decide `publish` (FOR ALL TABLES / table / schema / ancestor) ...
if (publish &&
(relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
{
entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
entry->pubactions.pubtruncate|= pub->pubactions.pubtruncate;
// ... track the top-most ancestor as publish_as_relid ...
}
}
entry->publish_as_relid = publish_as_relid;
if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
entry->pubactions.pubdelete)
{
init_tuple_slot(data, relation, entry);
pgoutput_row_filter_init(data, rel_publications, entry);
check_and_init_gencol(data, rel_publications, entry);
pgoutput_column_list_init(data, rel_publications, entry);
}
entry->replicate_valid = true;
}
return entry;

Invalidation keeps the cache honest: rel_sync_cache_relation_cb (a relcache callback) and rel_sync_cache_publication_cb (a syscache callback on pg_publication) clear replicate_valid / publications_valid so the next get_rel_sync_entry rebuilds against the current catalog. Because DDL on a published table flows through the same WAL the decoder is reading, these invalidations fire in commit order relative to the data changes.

Every callback ends by handing a typed message to a logicalrep_write_* function (in proto.c), which appends the one-byte tag and the payload to ctx->out. The tags are an enum:

// LogicalRepMsgType — src/include/replication/logicalproto.h
typedef enum LogicalRepMsgType
{
LOGICAL_REP_MSG_BEGIN = 'B',
LOGICAL_REP_MSG_COMMIT = 'C',
LOGICAL_REP_MSG_ORIGIN = 'O',
LOGICAL_REP_MSG_INSERT = 'I',
LOGICAL_REP_MSG_UPDATE = 'U',
LOGICAL_REP_MSG_DELETE = 'D',
LOGICAL_REP_MSG_TRUNCATE = 'T',
LOGICAL_REP_MSG_RELATION = 'R',
LOGICAL_REP_MSG_TYPE = 'Y',
LOGICAL_REP_MSG_MESSAGE = 'M',
LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
LOGICAL_REP_MSG_PREPARE = 'P',
LOGICAL_REP_MSG_COMMIT_PREPARED = 'K',
LOGICAL_REP_MSG_ROLLBACK_PREPARED = 'r',
LOGICAL_REP_MSG_STREAM_START = 'S',
LOGICAL_REP_MSG_STREAM_STOP = 'E',
LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
LOGICAL_REP_MSG_STREAM_ABORT = 'A',
LOGICAL_REP_MSG_STREAM_PREPARE = 'p',
} LogicalRepMsgType;

A tuple’s columns are themselves tagged per value, so the consumer knows whether a column is null, unchanged (TOAST not re-logged), text, or binary:

// per-column value kinds — src/include/replication/logicalproto.h
#define LOGICALREP_COLUMN_NULL 'n'
#define LOGICALREP_COLUMN_UNCHANGED 'u'
#define LOGICALREP_COLUMN_TEXT 't'
#define LOGICALREP_COLUMN_BINARY 'b' /* added in PG14 */

The 'u' kind is the wire-level expression of the “unchanged toasted column” optimization: an UPDATE that did not touch a large TOASTed column sends 'u' instead of re-shipping the (unchanged) datum.

The figure traces a small transaction that inserts one row into a published table and updates a row in an unpublished table, showing where empty- transaction elision and lazy schema both fire.

flowchart TD
    A["reorderbuffer commits txn<br/>decoder replays buffered changes"] --> B["pgoutput_begin_txn<br/>alloc PGOutputTxnData<br/>sent_begin_txn = false"]
    B --> C["pgoutput_change: INSERT into published t"]
    C --> D{"get_rel_sync_entry(t)<br/>pubactions.pubinsert?"}
    D -->|no| Z1["return, no bytes"]
    D -->|yes| E{"pgoutput_row_filter<br/>passes?"}
    E -->|no| Z2["return, no bytes"]
    E -->|yes| F{"txndata->sent_begin_txn?"}
    F -->|false| G["pgoutput_send_begin<br/>write 'B' BEGIN<br/>sent_begin_txn = true"]
    F -->|true| H["maybe_send_schema(t)"]
    G --> H
    H --> I{"schema_sent?"}
    I -->|no| J["send 'Y' TYPE per user type<br/>send 'R' RELATION<br/>schema_sent = true"]
    I -->|yes| K["write 'I' INSERT (new tuple)"]
    J --> K
    C2["pgoutput_change: UPDATE on unpublished u"] --> D2{"get_rel_sync_entry(u)<br/>pubactions.pubupdate?"}
    D2 -->|no| Z3["return, no bytes"]
    K --> L["pgoutput_commit_txn"]
    Z3 --> L
    L --> M{"sent_begin_txn?"}
    M -->|true| N["write 'C' COMMIT"]
    M -->|false| O["skip: empty transaction elided"]

Figure 1 — One transaction through pgoutput. BEGIN is emitted only on the first change that survives table+operation+row filtering; COMMIT is emitted only if a BEGIN was sent. The unpublished UPDATE produces no bytes. Schema (TYPE+RELATION) precedes the first data frame for a relation and is sent at most once per relation per session.

All symbols below are in src/backend/replication/pgoutput/pgoutput.c unless the heading or comment says otherwise. The wire-format writers (logicalrep_write_*) live in src/backend/replication/logical/proto.c; this doc cites them only at the boundary where pgoutput hands off a message. Decoding upstream of the callbacks (the reorder buffer, the historic snapshot, change reassembly) is postgres-logical-decoding.md, and the subscriber side that reads these messages is postgres-logical-replication-apply.md.

Plugin surface and lifecycle (pgoutput_startup, pgoutput_shutdown)

Section titled “Plugin surface and lifecycle (pgoutput_startup, pgoutput_shutdown)”

_PG_output_plugin_init is the one exported symbol; logical decoding dlsyms it and calls it once to fill OutputPluginCallbacks (Section 3). pgoutput_startup is then the first per-session callback. It pins the output to binary framing and parses+validates the subscriber’s options against the negotiated protocol version. The version floors are constants in logicalproto.h:

// version floors — src/include/replication/logicalproto.h
#define LOGICALREP_PROTO_MIN_VERSION_NUM 1
#define LOGICALREP_PROTO_VERSION_NUM 1
#define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 /* streaming */
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3 /* two-phase */
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM 4 /* parallel apply */
#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM

parse_output_parameters reads exactly the recognized keys (proto_version, publication_names, binary, messages, streaming, two_phase, origin) and the streaming/two_phase capabilities are then gated against those floors in pgoutput_startup — asking for a capability the negotiated version can’t carry is an ERROR, not a silent downgrade. pgoutput_shutdown is trivial: the per-session contexts are children of ctx->context, so the decoding machinery frees them; the callback just resets the cache-context pointer.

The transaction frame (pgoutput_begin_txn, pgoutput_send_begin, pgoutput_commit_txn)

Section titled “The transaction frame (pgoutput_begin_txn, pgoutput_send_begin, pgoutput_commit_txn)”

pgoutput_begin_txn does not write a BEGIN. It only allocates the per-transaction PGOutputTxnData with sent_begin_txn = false. The real BEGIN is deferred to pgoutput_send_begin, which the data callbacks call lazily on the first surviving change:

// pgoutput_send_begin — src/backend/replication/pgoutput/pgoutput.c
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
Assert(!txndata->sent_begin_txn);
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_begin(ctx->out, txn);
txndata->sent_begin_txn = true;
send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, send_replication_origin);
OutputPluginWrite(ctx, true);

pgoutput_commit_txn reads the same flag and elides the whole transaction when no BEGIN was sent — this is the empty-transaction optimization made concrete:

// pgoutput_commit_txn — src/backend/replication/pgoutput/pgoutput.c
sent_begin_txn = txndata->sent_begin_txn;
OutputPluginUpdateProgress(ctx, !sent_begin_txn);
pfree(txndata);
txn->output_plugin_private = NULL;
if (!sent_begin_txn)
{
elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
return; /* no 'C' COMMIT written */
}
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);

Note OutputPluginUpdateProgress(ctx, !sent_begin_txn) — when the txn was elided, pgoutput still flags progress so the slot’s confirmed_flush can advance past a stretch of irrelevant transactions without the consumer ever seeing a byte. The two-phase path (pgoutput_begin_prepare_txn, pgoutput_prepare_txn, pgoutput_commit_prepared_txn, pgoutput_rollback_prepared_txn) mirrors this shape but writes the 'b' / 'P' / 'K' / 'r' frames.

pgoutput_change is the busiest callback — once per row change, in both streaming and non-streaming modes. Its skeleton is: reject non-publishable relations, fetch the memoized RelationSyncEntry, apply the table+ operation filter, redirect to the partition root if pubviaroot, then apply the row filter, then (lazily) BEGIN + schema, then the typed data frame.

// pgoutput_change — src/backend/replication/pgoutput/pgoutput.c (abridged)
if (!is_publishable_relation(relation))
return;
if (data->in_streaming)
xid = change->txn->xid; /* tag every streamed change */
relentry = get_rel_sync_entry(data, relation);
/* First check the table filter */
switch (action)
{
case REORDER_BUFFER_CHANGE_INSERT:
if (!relentry->pubactions.pubinsert) return; break;
case REORDER_BUFFER_CHANGE_UPDATE:
if (!relentry->pubactions.pubupdate) return; break;
case REORDER_BUFFER_CHANGE_DELETE:
if (!relentry->pubactions.pubdelete) return;
if (!change->data.tp.oldtuple) return; /* no RI ⇒ can't ship */
break;
}

After redirecting to publish_as_relid and remapping tuples through attrmap (partition → ancestor column order), it evaluates the row filter, then emits — and only here is BEGIN/schema forced:

// pgoutput_change — src/backend/replication/pgoutput/pgoutput.c (continued)
if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
goto cleanup;
if (txndata && !txndata->sent_begin_txn) /* lazy BEGIN */
pgoutput_send_begin(ctx, txn);
maybe_send_schema(ctx, change, relation, relentry); /* lazy schema */
OutputPluginPrepareWrite(ctx, true);
switch (action)
{
case REORDER_BUFFER_CHANGE_INSERT:
logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
data->binary, relentry->columns,
relentry->include_gencols_type);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
new_slot, data->binary, relentry->columns,
relentry->include_gencols_type);
break;
case REORDER_BUFFER_CHANGE_DELETE:
logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
data->binary, relentry->columns,
relentry->include_gencols_type);
break;
}
OutputPluginWrite(ctx, true);

The action is a by-reference parameter to pgoutput_row_filter: an UPDATE can leave that function as an INSERT or DELETE (next subsection), and the switch above honors the rewritten action. The whole body runs in data->context, which is MemoryContextReset-ed at the end of every change — a per-change arena that caps the row filter’s transient allocations.

Row filters as WHERE over ExprState (pgoutput_row_filter_init, pgoutput_row_filter)

Section titled “Row filters as WHERE over ExprState (pgoutput_row_filter_init, pgoutput_row_filter)”

A publication’s WHERE (...) clause is compiled, per pubaction, into an ExprState cached on the entry. There are exactly three filterable actions, indexed by map_changetype_pubaction[]:

// pgoutput_row_filter — src/backend/replication/pgoutput/pgoutput.c
static const int map_changetype_pubaction[] = {
[REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
[REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
[REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
};
filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
if (!filter_exprstate)
return true; /* no filter ⇒ always replicate */

The interesting case is UPDATE, where the old and new images are both checked and the change can be transformed to keep the subscriber’s copy consistent with the predicate (the four cases are documented above pgoutput_row_filter):

// pgoutput_row_filter — src/backend/replication/pgoutput/pgoutput.c (UPDATE transform)
old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt /* old */);
new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt /* new */);
if (!old_matched && !new_matched) /* Case 1 */
return false; /* drop */
if (!old_matched && new_matched) /* Case 2: row enters the filter */
{
*action = REORDER_BUFFER_CHANGE_INSERT;
if (tmp_new_slot) *new_slot_ptr = tmp_new_slot;
}
else if (old_matched && !new_matched) /* Case 3: row leaves the filter */
*action = REORDER_BUFFER_CHANGE_DELETE;
/* Case 4: both match ⇒ stays an UPDATE */
return true;

A NULL filter result is treated as false (pgoutput_row_filter_exec_expr). The evaluator is the engine’s own expression machinery: EState + ExprContext built by create_estate_for_relation, exactly what a query would use — which is why a row filter can only reference columns covered by the replica identity and is rejected at CREATE PUBLICATION time otherwise.

Lazy schema dispatch (maybe_send_schema, send_relation_and_attrs)

Section titled “Lazy schema dispatch (maybe_send_schema, send_relation_and_attrs)”

maybe_send_schema is the gate that ensures a relation’s descriptor is sent at most once. For non-streamed transactions it consults the entry’s schema_sent bool; for streamed transactions it consults a per-top-xid set (streamed_txns), because a streamed txn may be applied later and out of order relative to regular txns, so its schema must be re-established within its own stream:

// maybe_send_schema — src/backend/replication/pgoutput/pgoutput.c
if (data->in_streaming)
schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
else
schema_sent = relentry->schema_sent;
if (schema_sent)
return;
/* If publishing via an ancestor, send the ancestor's schema first. */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
send_relation_and_attrs(ancestor, xid, ctx, relentry);
RelationClose(ancestor);
}
send_relation_and_attrs(relation, xid, ctx, relentry);
if (data->in_streaming)
set_schema_sent_in_streamed_txn(relentry, topxid);
else
relentry->schema_sent = true;

send_relation_and_attrs emits a 'Y' TYPE message for each user-created type referenced by a published column (the att->atttypid < FirstGenbkiObjectId cutoff means built-in types like int4 are never described — the subscriber already knows them), then one 'R' RELATION message carrying the namespace, name, replica-identity char, and the per-attribute info honoring the column list:

// send_relation_and_attrs — src/backend/replication/pgoutput/pgoutput.c
for (i = 0; i < desc->natts; i++)
{
Form_pg_attribute att = TupleDescAttr(desc, i);
if (!logicalrep_should_publish_column(att, columns, include_gencols_type))
continue;
if (att->atttypid < FirstGenbkiObjectId) /* skip built-in types */
continue;
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_typ(ctx->out, xid, att->atttypid);
OutputPluginWrite(ctx, false);
}
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_rel(ctx->out, xid, relation, columns, include_gencols_type);
OutputPluginWrite(ctx, false);

Column lists and the per-column wire kinds (pgoutput_column_list_init, logicalrep_write_tuple)

Section titled “Column lists and the per-column wire kinds (pgoutput_column_list_init, logicalrep_write_tuple)”

pgoutput_column_list_init resolves the relation’s column-list bitmap by walking every subscribed publication. Crucially, PostgreSQL forbids a table having different column lists across the publications a subscription combines — that contradiction is an ERROR, detected by bms_equal:

// pgoutput_column_list_init — src/backend/replication/pgoutput/pgoutput.c
if (first)
{
entry->columns = cols;
first = false;
}
else if (!bms_equal(entry->columns, cols))
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
get_namespace_name(RelationGetNamespace(relation)),
RelationGetRelationName(relation)));

entry->columns == NULL means “all columns.” The actual projection happens down in logicalrep_write_tuple, which also realizes the per-column value kinds ('n' null, 'u' unchanged-toast, 't' text, 'b' binary):

// logicalrep_write_tuple — src/backend/replication/logical/proto.c (abridged)
if (isnull[i]) { pq_sendbyte(out, LOGICALREP_COLUMN_NULL); continue; }
if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
{
pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED); /* 'u' — don't reship TOAST */
continue;
}
if (binary && OidIsValid(typclass->typsend))
{
pq_sendbyte(out, LOGICALREP_COLUMN_BINARY); /* 'b' */
/* ... typsend output ... */
}
else
{
pq_sendbyte(out, LOGICALREP_COLUMN_TEXT); /* 't' */
/* ... typoutput text ... */
}

The 'u' branch is the wire expression of “an unchanged out-of-line TOAST value is not re-logged” — the subscriber keeps whatever it already has for that column.

Streaming in-progress transactions (pgoutput_stream_start/stop/abort/commit)

Section titled “Streaming in-progress transactions (pgoutput_stream_start/stop/abort/commit)”

When streaming is on, logical decoding spills a long in-progress txn in chunks. pgoutput_stream_start flips data->in_streaming and writes an 'S' STREAM_START tagged with the top xid, marking whether this is the first segment so the origin is sent only once:

// pgoutput_stream_start — src/backend/replication/pgoutput/pgoutput.c
Assert(!data->in_streaming); /* no nesting */
if (rbtxn_is_streamed(txn))
send_replication_origin = false;
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr, send_replication_origin);
OutputPluginWrite(ctx, true);
data->in_streaming = true; /* shared funcs now tag xids */

in_streaming is precisely the bit the shared pgoutput_change / pgoutput_message / pgoutput_truncate functions consult to decide whether to prefix each frame with an xid. pgoutput_stream_stop clears it and writes 'E'. The abort/commit pair are what make a spooled transaction safe:

// pgoutput_stream_abort — src/backend/replication/pgoutput/pgoutput.c
bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
Assert(!data->in_streaming);
toptxn = rbtxn_get_toptxn(txn);
Assert(rbtxn_is_streamed(toptxn));
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
txn->xact_time.abort_time, write_abort_info);
OutputPluginWrite(ctx, true);
cleanup_rel_sync_cache(toptxn->xid, false); /* forget streamed-schema state */

'A' STREAM_ABORT names both the top xid and the (sub)xid so the subscriber can discard exactly the aborted subtree of a spooled transaction; 'c' STREAM_COMMIT (in pgoutput_stream_commit) tells it to apply. Both end by calling cleanup_rel_sync_cache to drop the streamed_txns bookkeeping for that top xid, so a later transaction re-sends schema in its own stream.

flowchart TD
    S0["pgoutput_stream_start<br/>write 'S' + topxid<br/>in_streaming = true"] --> S1["pgoutput_change (xid-tagged)<br/>'I'/'U'/'D' with xid prefix"]
    S1 --> S2["pgoutput_stream_stop<br/>write 'E'<br/>in_streaming = false"]
    S2 --> S3{"more chunks<br/>buffered?"}
    S3 -->|yes| S0
    S3 -->|txn aborts| A["pgoutput_stream_abort<br/>write 'A' topxid+subxid<br/>cleanup_rel_sync_cache"]
    S3 -->|txn commits| C["pgoutput_stream_commit<br/>write 'c'<br/>cleanup_rel_sync_cache"]
    A --> END["subscriber discards<br/>spooled subtree"]
    C --> END2["subscriber applies<br/>spooled txn atomically"]

Figure 2 — The streaming state machine. A large transaction is emitted as repeated STREAM_START/STREAM_STOP chunks, each data frame xid-tagged so the subscriber spools per-xid. The transaction is resolved exactly once, by a single STREAM_ABORT (discard) or STREAM_COMMIT (apply) that arrives after the final STREAM_STOP; data->in_streaming is false during abort/commit.

The relation-sync cache and invalidation (init_rel_sync_cache, get_rel_sync_entry)

Section titled “The relation-sync cache and invalidation (init_rel_sync_cache, get_rel_sync_entry)”

init_rel_sync_cache builds the RelationSyncCache HTAB and registers two invalidation callbacks: rel_sync_cache_relation_cb (relcache) clears one entry’s replicate_valid, and rel_sync_cache_publication_cb plus the syscache publication_invalidation_cb set the global publications_valid = false so every entry is rechecked. get_rel_sync_entry is the lazy (re)builder already shown in Section 3; the key property is that because DDL on a published table travels through the very WAL the decoder is replaying, these invalidations fire in commit order relative to the data changes — the cache can never serve a stale schema for a change that logically precedes the DDL.

Position hints (as of 2026-06-05, REL_18 273fe94)

Section titled “Position hints (as of 2026-06-05, REL_18 273fe94)”
SymbolFileLine
_PG_output_plugin_initsrc/backend/replication/pgoutput/pgoutput.c261
parse_output_parameterssrc/backend/replication/pgoutput/pgoutput.c290
pgoutput_startupsrc/backend/replication/pgoutput/pgoutput.c449
pgoutput_begin_txnsrc/backend/replication/pgoutput/pgoutput.c594
pgoutput_send_beginsrc/backend/replication/pgoutput/pgoutput.c608
pgoutput_commit_txnsrc/backend/replication/pgoutput/pgoutput.c630
pgoutput_prepare_txnsrc/backend/replication/pgoutput/pgoutput.c679
maybe_send_schemasrc/backend/replication/pgoutput/pgoutput.c725
send_relation_and_attrssrc/backend/replication/pgoutput/pgoutput.c796
pgoutput_row_filter_initsrc/backend/replication/pgoutput/pgoutput.c916
check_and_init_gencolsrc/backend/replication/pgoutput/pgoutput.c1063
pgoutput_column_list_initsrc/backend/replication/pgoutput/pgoutput.c1122
pgoutput_row_filtersrc/backend/replication/pgoutput/pgoutput.c1301
pgoutput_changesrc/backend/replication/pgoutput/pgoutput.c1482
pgoutput_truncatesrc/backend/replication/pgoutput/pgoutput.c1654
pgoutput_messagesrc/backend/replication/pgoutput/pgoutput.c1722
pgoutput_stream_startsrc/backend/replication/pgoutput/pgoutput.c1838
pgoutput_stream_stopsrc/backend/replication/pgoutput/pgoutput.c1870
pgoutput_stream_abortsrc/backend/replication/pgoutput/pgoutput.c1891
pgoutput_stream_commitsrc/backend/replication/pgoutput/pgoutput.c1924
init_rel_sync_cachesrc/backend/replication/pgoutput/pgoutput.c1972
get_rel_sync_entrysrc/backend/replication/pgoutput/pgoutput.c2052
RelationSyncEntry (struct)src/backend/replication/pgoutput/pgoutput.c126
PGOutputTxnData (struct)src/backend/replication/pgoutput/pgoutput.c214
logicalrep_write_beginsrc/backend/replication/logical/proto.c49
logicalrep_write_commitsrc/backend/replication/logical/proto.c78
logicalrep_write_insertsrc/backend/replication/logical/proto.c403
logicalrep_write_updatesrc/backend/replication/logical/proto.c450
logicalrep_write_deletesrc/backend/replication/logical/proto.c528
logicalrep_write_relsrc/backend/replication/logical/proto.c667
logicalrep_write_tuplesrc/backend/replication/logical/proto.c767
logicalrep_write_stream_startsrc/backend/replication/logical/proto.c1061
LOGICAL_REP_MSG_BEGIN (enum)src/include/replication/logicalproto.h59
LOGICALREP_COLUMN_NULLsrc/include/replication/logicalproto.h96
LOGICALREP_PROTO_*_VERSION_NUMsrc/include/replication/logicalproto.h40-45
  • _PG_output_plugin_init fills the full OutputPluginCallbacks table, and the streaming change/message/truncate slots reuse the non-streaming functions. Verified directly: cb->stream_change_cb = pgoutput_change, cb->stream_message_cb = pgoutput_message, cb->stream_truncate_cb = pgoutput_truncate. The streamed-vs-not behavior is the data->in_streaming flag, not a separate function.

  • pgoutput emits binary framing unconditionally; per-column binary is a separate, optional mode. pgoutput_startup sets opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT. The binary option (data->binary) only governs whether individual datums use typsend ('b') versus typoutput text ('t') inside logicalrep_write_tuple.

  • Capability floors: streaming needs proto v2, two-phase v3, parallel streaming v4; LOGICALREP_PROTO_MAX_VERSION_NUM is 4. Verified against the #defines in logicalproto.h and the gating if ladder in pgoutput_startup. Asking for a capability above the negotiated version is an ERROR.

  • BEGIN is deferred to the first surviving change and COMMIT is elided for empty transactions. Verified in pgoutput_begin_txn (allocates state only), pgoutput_send_begin (called from pgoutput_change / pgoutput_truncate / pgoutput_message under !txndata->sent_begin_txn), and pgoutput_commit_txn (early return with a DEBUG1 “skipped …empty transaction” when !sent_begin_txn).

  • The table+operation filter runs before the row filter, which runs before BEGIN/schema. Verified by the statement order in pgoutput_change: the pubactions switch (early return), then pgoutput_row_filter (goto cleanup), then pgoutput_send_begin, then maybe_send_schema, then the data write.

  • An UPDATE can be rewritten to INSERT or DELETE by the row filter to preserve subscriber consistency. Verified in pgoutput_row_filter’s four-case logic (*action written by reference) and the documented Case 1–4 table in the comment above the function. pgoutput_change’s final switch dispatches on the possibly-rewritten action.

  • Schema is sent at most once per relation per session — tracked by schema_sent for regular txns and per-top-xid streamed_txns for streamed ones. Verified in maybe_send_schema (get_schema_sent_in_streamed_txn / relentry->schema_sent). The ancestor’s schema precedes the relation’s own when publishing via root.

  • Only user-created types get a 'Y' TYPE message. Verified in send_relation_and_attrs: the att->atttypid < FirstGenbkiObjectId test skips built-in types, whose OIDs are stable across major versions.

  • The unchanged-TOAST 'u' column kind avoids re-shipping out-of-line values. Verified in logicalrep_write_tuple (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i])LOGICALREP_COLUMN_UNCHANGED), and the row filter copies such a column from the old tuple when needed for predicate evaluation.

  • Streamed changes are xid-tagged; STREAM_ABORT carries both top and sub xid; abort/commit happen outside the streaming block. Verified: data->in_streaming set/cleared in pgoutput_stream_start/_stop, Assert(!data->in_streaming) in pgoutput_stream_abort/_commit, logicalrep_write_stream_abort(..., toptxn->xid, txn->xid, ...), and the cleanup_rel_sync_cache calls on both resolution paths.

  • Different column lists for one table across combined publications are rejected. Verified in pgoutput_column_list_init: the bms_equal mismatch raises ERRCODE_FEATURE_NOT_SUPPORTED.

  • DELETE with no old tuple (no replica identity) is silently dropped. Verified in pgoutput_change’s REORDER_BUFFER_CHANGE_DELETE arm: if (!change->data.tp.oldtuple) return; with a DEBUG1 log.

  1. Whether always re-sending schema at the start of every streamed transaction is wasted work. The maybe_send_schema comment explicitly flags this (“XXX There is a scope of optimization here … we always send the schema first time in a streaming transaction”) and ties the hesitation to mixed streaming/non-streaming workloads. Investigation path: measure the 'R'/'Y' byte overhead under a stream-heavy, schema-stable workload.

  2. The cost of building an EState/ExprState per relation for row filters on a churn-heavy publication. pgoutput_row_filter_init + create_estate_for_relation run on cache (re)build; how often publication or relcache invalidations force a rebuild under DDL-light but invalidation-noisy workloads is not characterized here. Investigation path: instrument get_rel_sync_entry rebuild frequency vs. invalidation source.

  3. Interaction of publish_as_relid redirection with deep partition hierarchies. Each change on a leaf partition published via root pays an attrmap remap and an ancestor RelationIdGetRelation; the per-change cost on a many-level hierarchy under pubviaroot is unmeasured. Investigation path: compare pgoutput_change cost on a flat table vs. an N-level partitioned root.

Beyond PostgreSQL — Comparative Designs & Research Frontiers

Section titled “Beyond PostgreSQL — Comparative Designs & Research Frontiers”
  • Debezium / Kafka Connect over the same decoder. Debezium can consume PostgreSQL logical decoding via pgoutput or wal2json/decoderbufs, then re-encode into JSON/Avro on a Kafka topic with a schema registry. The contrast is instructive: pgoutput pushes filtering (table/op/column/row) to the producer to save bandwidth on a point-to-point link, whereas a Kafka-centric CDC pipeline often ships everything and filters in the stream-processing layer. A side-by-side of where each system places the filter would sharpen the bandwidth-vs-flexibility trade. (Kleppmann, DDIA ch. 11, “Change Data Capture”.)

  • MySQL binlog row format vs. the pgoutput RELATION/TYPE scheme. MySQL’s ROW-format binlog ships a TABLE_MAP_EVENT before row events to bind a numeric table id to a schema — structurally the same lazy, memoized, in-band schema dispatch as pgoutput’s 'R' RELATION frame keyed by relation OID. Comparing how each handles a DDL change mid-stream (binlog’s table-map re-emission vs. pgoutput’s invalidation-driven schema_sent clear) is a focused companion study.

  • Oracle GoldenGate / LogMiner and “logical” log mining. GoldenGate mines redo into a vendor trail format with its own filtering and transformation DSL. PostgreSQL deliberately keeps the plugin thin and pushes transformation to the subscriber; the architectural question — how much logic belongs in the output plugin vs. the apply side — is exactly the pgoutput vs. postgres-logical-replication-apply.md boundary.

  • Protocol versioning as a forward-compat mechanism. pgoutput’s proto_version floor-per-capability scheme (streaming@2, two-phase@3, parallel@4) is a minimal, monotonic capability negotiation. Contrast with schema-registry-based evolution (Avro/Confluent) where the message schema, not a single integer, carries compatibility. A note on the trade between a single negotiated version integer and per-message self-describing schemas would frame why an in-core, version-paired publisher/subscriber can get away with the simpler scheme.

  • Row filters as pushed-down predicates. pgoutput compiles a publication WHERE clause into the engine’s own ExprState and evaluates it per change — a producer-side predicate pushdown analogous to an FDW pushing a WHERE to a remote (postgres-fdw.md). The research-grade question is whether more of the apply-side work (e.g., conflict detection) could likewise be pushed into the plugin without coupling it to subscriber state.

In-tree source files (REL_18_STABLE, commit 273fe94)

Section titled “In-tree source files (REL_18_STABLE, commit 273fe94)”
  • src/backend/replication/pgoutput/pgoutput.c — the plugin: _PG_output_plugin_init, pgoutput_startup/_shutdown, pgoutput_begin_txn/_send_begin/_commit_txn, the two-phase callbacks, pgoutput_change, pgoutput_truncate, pgoutput_message, pgoutput_row_filter(_init), pgoutput_column_list_init, maybe_send_schema, send_relation_and_attrs, the streaming callbacks, RelationSyncCache/RelationSyncEntry and get_rel_sync_entry.
  • src/backend/replication/logical/proto.c — the wire writers/readers: logicalrep_write_begin/_commit, logicalrep_write_insert/_update/ _delete, logicalrep_write_rel/_typ, logicalrep_write_tuple, the stream framing writers.
  • src/include/replication/logicalproto.hLogicalRepMsgType tag enum, the LOGICALREP_COLUMN_* value kinds, and the LOGICALREP_PROTO_*_VERSION_NUM capability floors.
  • src/include/replication/pgoutput.hPGOutputData, the option fields, PublishGencolsType.
  • src/include/replication/output_plugin.hOutputPluginCallbacks, the callback boundary pgoutput implements.
  • src/include/replication/reorderbuffer.hReorderBufferTXN / ReorderBufferChange, the objects each callback receives.
  • postgres-logical-decoding.md — the reorder buffer, historic snapshots, and change reassembly that produce the callbacks pgoutput implements.
  • postgres-logical-replication-apply.md — the subscriber apply worker that consumes the messages pgoutput writes.
  • postgres-replication-slots.md — the slot that anchors restart_lsn / confirmed_flush for the decoding session.
  • postgres-wal-sender-receiver.md — the walsender that ships pgoutput’s output over the replication protocol; contrast with physical replication.
  • postgres-toast.md — the out-of-line storage whose “unchanged” optimization surfaces as the 'u' column kind.
  • postgres-partitioning.md — partition roots and pubviaroot, the source of publish_as_relid and the attrmap remap.
  • Kleppmann, M. (2017). Designing Data-Intensive Applications, ch. 11 “Stream Processing”, §“Change Data Capture” — framing the replication log as an event-stream source; the encoding is left to the output plugin. Captured under raw/system/textbooks/.
  • Silberschatz, Korth & Sudarshan (2020). Database System Concepts, 7e — replication and recovery foundations; knowledge/research/dbms-general/.
  • Petrov, A. (2019). Database Internals, Part II — WAL and log-shipping background for how the change stream is produced; dbms-papers/ dbms-general captures.