Skip to content

PostgreSQL Logical Replication — Subscriber Apply, Launcher, and Table Sync

Contents:

Logical replication answers a different question than physical streaming replication. Physical replication (see postgres-wal-sender-receiver.md) ships raw WAL bytes: the standby is a byte-for-byte clone of the primary’s storage, replaying the same page images, unable to diverge in schema, unable to accept writes, unable to replicate a subset of tables. Logical replication instead ships row-level change events — “tuple T was inserted into table public.orders” — decoded from WAL and re-applied on the subscriber through the ordinary executor. The subscriber is a fully independent database that happens to be kept current with a publisher’s table contents.

This shift unlocks capabilities physical replication cannot offer: replicating between major versions, replicating a curated subset of tables (a publication), consolidating many publishers into one subscriber, replicating into a table with extra local columns or extra local indexes, and keeping the subscriber writable. The cost is that the subscriber must interpret changes rather than blindly replay bytes: it must map a remote relation to a local one, coerce remote column values into local types, locate the row to update or delete by replica identity, and decide what to do when the local row is not in the state the change assumes (a conflict).

Database Internals (Petrov, ch. 11, “Replication and Consistency”) frames the design space as a pipeline of logical change records flowing from a leader to followers, where each follower replays a serialized stream of logical operations. The pipeline has four conceptual stages, and PostgreSQL splits them across a process on each side:

  1. Capture / decode — turn the durable log into logical change records. On the publisher this is logical decoding plus the pgoutput plugin (deferred to postgres-logical-decoding.md and postgres-pgoutput.md). The walsender streams the result.
  2. Transport — deliver the records in commit order, exactly once, over a persistent connection. PostgreSQL reuses the same CopyBoth libpq transport as physical replication.
  3. Apply — re-execute each record against the follower’s local tables, transactionally, recording how far the stream has been durably applied so the connection can resume after a crash.
  4. Conflict resolution — reconcile the case where the follower’s row does not match the change’s assumption (already deleted, modified by a different origin, duplicate key).

This document covers stages 3 and 4 — the subscriber. The subscriber must also solve a bootstrap problem the theory often glosses over: a brand-new subscription starts with empty tables, but the change stream only carries deltas from a chosen point forward. Something must perform an initial snapshot copy of existing rows and then splice that copy into the live change stream without missing or double-applying any row. That splice — the table synchronization problem — is the subtlest part of the subscriber and is handled by a dedicated worker per table.

The design space a subscriber implementer chooses within:

  1. One apply process or many — does a single process apply all subscriptions serially, or does each subscription get its own worker? PostgreSQL uses one leader apply worker per subscription (plus optional parallel apply workers for large streamed transactions).
  2. How to bootstrap existing data — lock everything and copy, or copy table-by-table concurrently with the live stream. PostgreSQL copies per-table in tablesync workers that run concurrently and rejoin the main stream individually.
  3. How to find the row to modify — by primary key, by a configured replica identity, or by full-tuple scan. PostgreSQL uses replica identity (PK by default) and falls back to a sequential scan when no usable index exists.
  4. What to do on conflict — error and stop, skip, or apply last-writer-wins. PostgreSQL (REL_18) detects and logs conflicts with a typed taxonomy, and otherwise proceeds with the operation’s natural outcome (e.g. an update_missing is simply skipped).

Logical / row-based replication appears in essentially every mature SQL engine — MySQL row-based binlog replication, Oracle GoldenGate, SQL Server transactional replication, Debezium-style CDC pipelines. They converge on a recognizable set of engineering conventions. Naming them makes PostgreSQL’s specific symbols read as one set of choices within a shared playbook.

A supervisor that spawns one applier per replication stream

Section titled “A supervisor that spawns one applier per replication stream”

Row-based replication systems separate who decides to run an applier from the applier itself. A long-lived supervisor watches the configuration catalog (which streams should be active, owned by whom) and launches/restarts worker processes, while each worker owns exactly one upstream connection. The supervisor also rate-limits restarts so a worker that crashes on a poison message cannot consume the machine in a restart storm. PostgreSQL’s supervisor is the logical replication launcher; its rate limit is wal_retrieve_retry_interval.

A relation map that bridges remote and local schemas

Section titled “A relation map that bridges remote and local schemas”

Because the subscriber’s schema is independent, every applier maintains a relation map: remote relation id and column list on one side, the local table’s OID, tuple descriptor, and an attribute-number remapping on the other. The map is rebuilt lazily on cache invalidation, so a local ALTER TABLE is picked up without restarting the worker. PostgreSQL’s map is the LogicalRepRelMap hash of LogicalRepRelMapEntry.

Replica identity: how to locate the affected row

Section titled “Replica identity: how to locate the affected row”

An UPDATE or DELETE arrives as “the row that had these key columns now has these new values.” The applier must locate that row locally. The universal convention is a replica identity — a designated set of columns (the primary key by default) used to build the lookup. If the local replica identity is weaker than what the publisher sends, updates and deletes cannot be matched and are rejected.

The applier must record how far it has durably applied, so that after a crash it resumes from exactly the right LSN — not re-applying committed work, not skipping uncommitted work. The convention is a per-stream progress record advanced atomically with the local commit. PostgreSQL implements this as a replication origin (postgres-replication-slots.md covers the slot side; origins are the subscriber-side cursor).

Initial snapshot then catch-up, spliced per object

Section titled “Initial snapshot then catch-up, spliced per object”

To start replicating into empty tables, systems take a consistent snapshot of existing rows, copy it, then begin applying changes from the snapshot’s position. Doing this per-object concurrently (rather than one giant cluster-wide lock) is the scalable choice, and it requires a small state machine per object to track the handoff from “copying” to “live.” PostgreSQL’s per-table workers walk the pg_subscription_rel state machine INIT -> DATASYNC -> FINISHEDCOPY -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.

When the local row is not in the assumed state, the system must classify the situation: row already exists (insert), row missing (update/delete), row modified by a different origin since, duplicate unique key. A typed taxonomy lets operators monitor and reason about divergence. PostgreSQL’s taxonomy is the ConflictType enum (CT_INSERT_EXISTS, CT_UPDATE_ORIGIN_DIFFERS, CT_UPDATE_MISSING, CT_DELETE_MISSING, …).

ConceptPostgreSQL name
Replication supervisorlogical replication launcher (ApplyLauncherMain)
Per-stream applierleader apply worker (ApplyWorkerMain)
Worker registry (shared memory)LogicalRepCtx->workers[] of LogicalRepWorker
Worker kindLogicalRepWorkerType (WORKERTYPE_APPLY / WORKERTYPE_TABLESYNC / WORKERTYPE_PARALLEL_APPLY)
Restart rate limitwal_retrieve_retry_interval + per-sub last-start DSA hash
Receive-and-dispatch loopLogicalRepApplyLoop + apply_dispatch
Per-message handlersapply_handle_* (begin, commit, insert, update, delete, relation, truncate, stream_*, prepare)
Relation mapLogicalRepRelMap / LogicalRepRelMapEntry (logicalrep_rel_open)
Replica identity lookupGetRelationIdentityOrPK + logicalrep_rel_mark_updatable
Durable apply cursorreplication origin (replorigin_session_*)
Upstream feedbacksend_feedback gated by get_flush_position
Initial copy workertablesync worker (TablesyncWorkerMain)
Per-table copycopy_table + fetch_remote_table_info (COPY ... TO STDOUT)
Per-table state machinepg_subscription_rel.srsubstate (SUBREL_STATE_*)
Conflict taxonomyConflictType enum; ReportApplyConflict

Everything subscriber-side hangs off a single shared-memory control block that holds an array of worker slots. The launcher fills slots; apply and tablesync workers attach to them. A worker slot is a LogicalRepWorker:

// LogicalRepWorker (struct) — src/include/replication/worker_internal.h
typedef struct LogicalRepWorker
{
LogicalRepWorkerType type; /* APPLY / TABLESYNC / PARALLEL_APPLY */
TimestampTz launch_time;
bool in_use; /* is this slot used or free? */
uint16 generation; /* bumped each time slot is reused */
PGPROC *proc;
Oid dbid;
Oid userid;
Oid subid; /* subscription this worker serves */
Oid relid; /* table, for tablesync workers only */
char relstate; /* SUBREL_STATE_* (tablesync) */
XLogRecPtr relstate_lsn;
slock_t relmutex;
FileSet *stream_fileset; /* spill files for streamed xacts */
pid_t leader_pid; /* set for parallel apply workers */
bool parallel_apply;
XLogRecPtr last_lsn; /* stats */
XLogRecPtr reply_lsn;
/* ... timing fields ... */
} LogicalRepWorker;

The worker kind is a small enum; the three values determine which entry function the background worker runs and how the slot is interpreted:

// LogicalRepWorkerType — src/include/replication/worker_internal.h
typedef enum LogicalRepWorkerType
{
WORKERTYPE_UNKNOWN = 0,
WORKERTYPE_TABLESYNC,
WORKERTYPE_APPLY, /* the per-subscription leader */
WORKERTYPE_PARALLEL_APPLY, /* helper for large streamed xacts */
} LogicalRepWorkerType;

MyLogicalRepWorker points at this process’s slot; the predicates am_leader_apply_worker(), am_tablesync_worker(), and am_parallel_apply_worker() branch on MyLogicalRepWorker->type.

The launcher is registered with the postmaster as a background worker at server start (ApplyLauncherRegister), so it is restarted automatically if it dies. Its body is a poll loop: each cycle it reads the list of enabled subscriptions and, for any subscription that has no running apply worker, launches one — subject to the per-subscription restart throttle.

// ApplyLauncherMain — src/backend/replication/logical/launcher.c
for (;;)
{
long wait_time = DEFAULT_NAPTIME_PER_CYCLE; /* 3 min */
sublist = get_subscription_list();
foreach(lc, sublist)
{
Subscription *sub = (Subscription *) lfirst(lc);
if (!sub->enabled)
continue;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
w = logicalrep_worker_find(sub->oid, InvalidOid, false);
LWLockRelease(LogicalRepWorkerLock);
if (w != NULL)
continue; /* worker is running already */
/* Throttle: at most one start per wal_retrieve_retry_interval. */
last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
now = GetCurrentTimestamp();
if (last_start == 0 ||
(elapsed = TimestampDifferenceMilliseconds(last_start, now))
>= wal_retrieve_retry_interval)
{
ApplyLauncherSetWorkerStartTime(sub->oid, now);
logicalrep_worker_launch(WORKERTYPE_APPLY, sub->dbid, sub->oid,
sub->name, sub->owner, InvalidOid,
DSM_HANDLE_INVALID);
}
else
wait_time = Min(wait_time, wal_retrieve_retry_interval - elapsed);
}
/* sleep on the latch until the next cycle or an explicit wakeup */
rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
wait_time, WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
}

The last-start times live in a DSA-backed dynamic hash (logicalrep_launcher_attach_dshmem), not in the fixed worker array, because they must survive a worker’s death and be visible to other backends. The crucial design point: an apply worker that errors out exits, the launcher notices the empty slot next cycle, but the throttle keeps the restart cadence to wal_retrieve_retry_interval so a persistently-failing subscription does not spin. A backend that expects a restart (e.g. after ALTER SUBSCRIPTION) explicitly forgets the last-start entry so the worker can restart immediately.

logicalrep_worker_launch finds a free slot, fills it, registers a dynamic background worker pointing at ApplyWorkerMain (or TablesyncWorkerMain), and then blocks in WaitForReplicationWorkerAttach until the new worker has set its proc pointer or the postmaster reports it could not start. This attach handshake closes the race where the launcher would otherwise think a slot is in use before the child has actually come up.

flowchart TD
    PM["postmaster"] -->|registers at startup| L["ApplyLauncherMain<br/>(background worker)"]
    L -->|"poll get_subscription_list()<br/>every 3 min or on wakeup"| L
    L -->|"per enabled sub<br/>with no running worker"| TH{"last_start older than<br/>wal_retrieve_retry_interval?"}
    TH -->|no| WAIT["shorten wait_time<br/>retry next cycle"]
    TH -->|yes| LAUNCH["logicalrep_worker_launch<br/>WORKERTYPE_APPLY"]
    LAUNCH --> SLOT["claim LogicalRepWorker slot<br/>under LogicalRepWorkerLock"]
    SLOT --> BGW["RegisterDynamicBackgroundWorker<br/>bgw_function = ApplyWorkerMain"]
    BGW --> ATT["WaitForReplicationWorkerAttach<br/>until child sets proc"]
    ATT --> AW["leader apply worker running"]
    AW -.->|"needs initial copy"| TS["tablesync worker(s)<br/>TablesyncWorkerMain"]

The leader apply worker: connect, set origin, loop

Section titled “The leader apply worker: connect, set origin, loop”

ApplyWorkerMain is the entry point. It runs the shared setup (SetupApplyOrSyncWorkerInitializeLogRepWorker) and then run_apply_worker. The shared setup is what makes an apply worker safe: it sets session_replication_role = replica (so the applier does not fire ordinary triggers/rules unless explicitly REPLICA-enabled), connects to the worker’s database as the subscription owner, locks down search_path to empty, and loads the subscription row into a persistent memory context.

// InitializeLogRepWorker — src/backend/replication/logical/worker.c (condensed)
SetConfigOption("session_replication_role", "replica",
PGC_SUSET, PGC_S_OVERRIDE);
BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
MyLogicalRepWorker->userid, 0);
SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
/* Lock + re-read the subscription so a concurrent DROP is caught. */
LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
AccessShareLock);
MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
/* ... bail out if subscription was removed or disabled during startup ... */
CacheRegisterSyscacheCallback(SUBSCRIPTIONOID, subscription_change_cb, 0);
before_shmem_exit(replorigin_reset, (Datum) 0);

run_apply_worker then sets up the replication origin — the durable cursor — connects to the publisher, and asks the origin for the LSN to resume from:

// run_apply_worker — src/backend/replication/logical/worker.c (condensed)
ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
originname, sizeof(originname));
StartTransactionCommand();
originid = replorigin_by_name(originname, true);
if (!OidIsValid(originid))
originid = replorigin_create(originname);
replorigin_session_setup(originid, 0);
replorigin_session_origin = originid;
origin_startpos = replorigin_session_get_progress(false); /* resume LSN */
CommitTransactionCommand();
LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, true,
must_use_password,
MySubscription->name, &err);
set_stream_options(&options, slotname, &origin_startpos);
/* two_phase stays PENDING until all tablesyncs are READY */
if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
AllTablesyncsReady())
options.proto.logical.twophase = true;
walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
start_apply(origin_startpos); /* -> LogicalRepApplyLoop */

The origin is the linchpin of correctness. Every committed apply transaction advances the origin to the remote commit LSN in the same transaction as the data change, so a crash leaves the origin and the data consistent. If the apply errors out mid-transaction, start_apply’s PG_CATCH calls replorigin_reset(0, 0) so the origin does not advance past the failed transaction — otherwise the publisher would never resend it and the change would be silently lost.

LogicalRepApplyLoop is the steady-state heart of the worker. It blocks on the libpq connection, receives a buffer, and for each message peeks the first byte: 'w' is a WAL-data (CopyData) wrapper carrying a decoded logical-replication message, 'k' is a keepalive. The logical message itself is handed to apply_dispatch.

// LogicalRepApplyLoop — src/backend/replication/logical/worker.c (condensed)
for (;;)
{
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
/* ... process all buffered messages without blocking ... */
c = pq_getmsgbyte(&s);
if (c == 'w')
{
start_lsn = pq_getmsgint64(&s);
end_lsn = pq_getmsgint64(&s);
send_time = pq_getmsgint64(&s);
if (last_received < end_lsn)
last_received = end_lsn;
UpdateWorkerStats(last_received, send_time, false);
apply_dispatch(&s); /* the decoded logical message */
}
else if (c == 'k')
{
/* keepalive: maybe reply, maybe ping */
}
/* ... compute flush position, send feedback, wait on latch+socket ... */
}

apply_dispatch reads the message-type byte and routes to one of the apply_handle_* functions. The full set is what defines the logical-replication wire protocol on the apply side:

// apply_dispatch — src/backend/replication/logical/worker.c (condensed)
LogicalRepMsgType action = pq_getmsgbyte(s);
switch (action)
{
case LOGICAL_REP_MSG_BEGIN: apply_handle_begin(s); break;
case LOGICAL_REP_MSG_COMMIT: apply_handle_commit(s); break;
case LOGICAL_REP_MSG_INSERT: apply_handle_insert(s); break;
case LOGICAL_REP_MSG_UPDATE: apply_handle_update(s); break;
case LOGICAL_REP_MSG_DELETE: apply_handle_delete(s); break;
case LOGICAL_REP_MSG_TRUNCATE: apply_handle_truncate(s); break;
case LOGICAL_REP_MSG_RELATION: apply_handle_relation(s); break;
case LOGICAL_REP_MSG_TYPE: apply_handle_type(s); break;
case LOGICAL_REP_MSG_ORIGIN: apply_handle_origin(s); break;
case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); break;
case LOGICAL_REP_MSG_STREAM_STOP: apply_handle_stream_stop(s); break;
case LOGICAL_REP_MSG_STREAM_ABORT: apply_handle_stream_abort(s); break;
case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); break;
case LOGICAL_REP_MSG_BEGIN_PREPARE: apply_handle_begin_prepare(s); break;
case LOGICAL_REP_MSG_PREPARE: apply_handle_prepare(s); break;
case LOGICAL_REP_MSG_COMMIT_PREPARED: apply_handle_commit_prepared(s); break;
case LOGICAL_REP_MSG_ROLLBACK_PREPARED: apply_handle_rollback_prepared(s); break;
case LOGICAL_REP_MSG_STREAM_PREPARE: apply_handle_stream_prepare(s); break;
/* default: ERROR protocol violation */
}

A logical transaction therefore arrives as a BEGIN, a sequence of RELATION / TYPE metadata messages and INSERT/UPDATE/DELETE/ TRUNCATE row changes, and a closing COMMIT. The RELATION message must precede any change that references it, so the applier knows the remote relation’s schema before it has to map a row.

apply_handle_begin records the remote final LSN and flips in_remote_transaction; apply_handle_commit validates the commit LSN, commits the local transaction (advancing the origin), and then calls process_syncing_tables to advance any tablesync handoffs. The interesting work is in the row handlers. apply_handle_insert is representative:

// apply_handle_insert — src/backend/replication/logical/worker.c (condensed)
if (is_skipping_changes() ||
handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
return;
begin_replication_step();
relid = logicalrep_read_insert(s, &newtup);
rel = logicalrep_rel_open(relid, RowExclusiveLock); /* map remote->local */
if (!should_apply_changes_for_rel(rel)) /* tablesync filtering */
{
logicalrep_rel_close(rel, RowExclusiveLock);
end_replication_step();
return;
}
/* Run user code (defaults, indexes) as the table owner unless opted out. */
if (!MySubscription->runasowner)
SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
edata = create_edata_for_relation(rel);
remoteslot = ExecInitExtraTupleSlot(edata->estate, RelationGetDescr(rel->localrel),
&TTSOpsVirtual);
slot_store_data(remoteslot, rel, &newtup); /* coerce remote values */
slot_fill_defaults(rel, edata->estate, remoteslot);
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
apply_handle_tuple_routing(edata, remoteslot, NULL, CMD_INSERT);
else
{
ExecOpenIndices(edata->targetRelInfo, false);
apply_handle_insert_internal(edata, edata->targetRelInfo, remoteslot);
ExecCloseIndices(edata->targetRelInfo);
}
finish_edata(edata);
logicalrep_rel_close(rel, NoLock);
end_replication_step();

Two design choices stand out. First, should_apply_changes_for_rel implements per-table filtering: while a table is still being synced by a tablesync worker, the leader apply worker skips its changes (the tablesync worker owns them until handoff). Second, user-visible side effects (index maintenance, default expressions) run as the table owner via SwitchToUntrustedUser unless the subscription sets runasowner, closing a privilege-escalation hole where a malicious table owner could otherwise run code as the (often superuser-ish) subscription owner.

Locating the row: replica identity and conflict detection

Section titled “Locating the row: replica identity and conflict detection”

UPDATE and DELETE are where the subscriber must find a local row, and that is the job of replica identity. apply_handle_update_internal builds a remoteslot from the change, then asks FindReplTupleInLocalRel to locate the matching local tuple — by index when one is usable, otherwise by a full sequential scan:

// FindReplTupleInLocalRel — src/backend/replication/logical/worker.c (condensed)
*localslot = table_slot_create(localrel, &estate->es_tupleTable);
Assert(OidIsValid(localidxoid) ||
(remoterel->replident == REPLICA_IDENTITY_FULL));
if (OidIsValid(localidxoid))
found = RelationFindReplTupleByIndex(localrel, localidxoid,
LockTupleExclusive,
remoteslot, *localslot);
else
found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
remoteslot, *localslot);
return found;

The localidxoid comes from GetRelationIdentityOrPK, which prefers an explicitly-configured REPLICA IDENTITY index and falls back to the primary key. When neither exists, the published table must be REPLICA IDENTITY FULL, and the subscriber either uses a column-compatible non-unique index (FindUsableIndexForReplicaIdentityFull) or, failing that, a sequential scan that compares whole tuples:

// GetRelationIdentityOrPK — src/backend/replication/logical/relation.c
idxoid = RelationGetReplicaIndex(rel);
if (!OidIsValid(idxoid))
idxoid = RelationGetPrimaryKeyIndex(rel, false);
return idxoid;

Whether a relation is updatable at all is decided once, when the relmap entry is (re)built, by logicalrep_rel_mark_updatable. It walks the local replica-identity (or PK) key columns and clears entry->updatable if any identity column is not covered by the remote-published key set — so an UPDATE/DELETE on a relation whose local identity is stronger than what the publisher sends is rejected early with a clear error rather than silently matching the wrong row:

// logicalrep_rel_mark_updatable — src/backend/replication/logical/relation.c (condensed)
entry->updatable = true;
idkey = RelationGetIndexAttrBitmap(entry->localrel, INDEX_ATTR_BITMAP_IDENTITY_KEY);
if (idkey == NULL) /* no replica identity -> try PK */
{
idkey = RelationGetIndexAttrBitmap(entry->localrel, INDEX_ATTR_BITMAP_PRIMARY_KEY);
if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
entry->updatable = false;
}
while ((i = bms_next_member(idkey, i)) >= 0)
{
int attnum = AttrNumberGetAttrOffset(i + FirstLowInvalidHeapAttributeNumber);
if (entry->attrmap->attnums[attnum] < 0 ||
!bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
{
entry->updatable = false; /* identity col not in remote key */
break;
}
}

When the local row is found, the applier does not blindly overwrite it. apply_handle_update_internal first inspects the local tuple’s commit metadata via GetTupleTransactionInfo. If the row was last written by a different origin than this apply worker’s session origin, that is a divergence — two upstreams (or a local writer plus an upstream) touched the same row — and PostgreSQL 18 reports it as CT_UPDATE_ORIGIN_DIFFERS before proceeding with the update:

// apply_handle_update_internal — src/backend/replication/logical/worker.c (condensed)
found = FindReplTupleInLocalRel(edata, localrel, &relmapentry->remoterel,
localindexoid, remoteslot, &localslot);
if (found)
{
if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
&conflicttuple.origin, &conflicttuple.ts) &&
conflicttuple.origin != replorigin_session_origin)
{
conflicttuple.slot = localslot;
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
remoteslot, newslot, list_make1(&conflicttuple));
}
slot_modify_data(remoteslot, localslot, relmapentry, newtup);
InitConflictIndexes(relinfo);
ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot, remoteslot);
}
else
/* The tuple to be updated could not be found — log and skip. */
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
remoteslot, newslot, list_make1(&conflicttuple));

The conflict taxonomy is a small typed enum, deliberately exhaustive so that every divergence has a name an operator can monitor (it feeds PgStat_StatSubEntry::conflict_count):

// ConflictType — src/include/replication/conflict.h
typedef enum
{
CT_INSERT_EXISTS, /* INSERT violates a unique constraint */
CT_UPDATE_ORIGIN_DIFFERS, /* row to update was written by another origin */
CT_UPDATE_EXISTS, /* updated row violates a unique constraint */
CT_UPDATE_MISSING, /* row to update is gone */
CT_DELETE_ORIGIN_DIFFERS, /* row to delete was written by another origin */
CT_DELETE_MISSING, /* row to delete is gone */
CT_MULTIPLE_UNIQUE_CONFLICTS,/* violates several unique constraints */
} ConflictType;

ReportApplyConflict formats a per-tuple errdetail (origin, xmin, commit timestamp of the conflicting local tuple), bumps the subscription’s conflict counter through pgstat_report_subscription_conflict, and logs at the requested level. Crucially, REL_18 detects and logs — it does not abort the stream on a *_MISSING or *_ORIGIN_DIFFERS conflict; the change proceeds with its natural outcome (an update_missing is simply skipped, an update_origin_differs still applies the newer remote value). An INSERT that hits a unique violation (CT_INSERT_EXISTS) still raises an error and stops the worker, because there is no safe automatic resolution.

flowchart TD
    M["apply_dispatch sees<br/>UPDATE/DELETE message"] --> OPEN["logicalrep_rel_open<br/>map remote rel -> local"]
    OPEN --> IDX["GetRelationIdentityOrPK<br/>pick identity/PK index"]
    IDX --> FIND{"FindReplTupleInLocalRel<br/>index scan or seq scan"}
    FIND -->|found| ORIG{"local tuple origin<br/>!= my session origin?"}
    FIND -->|not found| MISS["ReportApplyConflict<br/>CT_UPDATE_MISSING /<br/>CT_DELETE_MISSING<br/>(log, skip)"]
    ORIG -->|yes| RPT["ReportApplyConflict<br/>CT_*_ORIGIN_DIFFERS (log)"]
    ORIG -->|no| DO["ExecSimpleRelationUpdate /<br/>ExecSimpleRelationDelete"]
    RPT --> DO

Feedback: never acknowledge more than is durable

Section titled “Feedback: never acknowledge more than is durable”

The apply loop periodically sends write/flush/apply LSNs upstream so the publisher can advance its slot and release WAL. The subscriber must never report a flush position past what it has durably committed, or a crash would lose transactions the publisher believes are safe. get_flush_position walks the lsn_mapping list (remote-commit-LSN paired with the local WAL LSN it produced) and only reports a remote position as flushed once the corresponding local LSN is <= GetFlushRecPtr():

// get_flush_position — src/backend/replication/logical/worker.c (condensed)
XLogRecPtr local_flush = GetFlushRecPtr(NULL);
dlist_foreach_modify(iter, &lsn_mapping)
{
FlushPosition *pos = dlist_container(FlushPosition, node, iter.cur);
*write = pos->remote_end;
if (pos->local_end <= local_flush)
{
*flush = pos->remote_end; /* durable: safe to ack */
dlist_delete(iter.cur);
pfree(pos);
}
else
{
*have_pending_txes = true; /* not yet durable: hold back */
return;
}
}

send_feedback then ships (write, flush, apply) over the libpq connection; the flush field is exactly this gated value. This is the subscriber-side analogue of the standby feedback in postgres-wal-sender-receiver.md, but the unit is a logical transaction’s commit, not a raw WAL byte position.

A new subscription’s tables start empty; the change stream only carries deltas from the slot’s creation point forward. The tablesync worker bridges the gap. TablesyncWorkerMain runs the same SetupApplyOrSyncWorker scaffolding as the apply worker but then calls LogicalRepSyncTableStart, which: checks the row’s srsubstate, creates a per-table replication slot on the publisher (named via ReplicationSlotNameForTablesync), takes a snapshot, and COPYs the table contents:

// copy_table — src/backend/replication/logical/tablesync.c (condensed)
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
RelationGetRelationName(rel), &lrel, &qual, &gencol_published);
logicalrep_relmap_update(&lrel);
relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
/* Build "COPY <schema.table> (cols) TO STDOUT" and stream rows in. */
appendStringInfo(&cmd, "COPY %s",
quote_qualified_identifier(lrel.nspname, lrel.relname));
/* ... append column list ... */
appendStringInfoString(&cmd, " TO STDOUT");
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
/* CopyFrom() drains the COPY stream into the local table. */

The handoff from “copying” to “live streaming” is the subtle part, and it is a two-process handshake driven through the pg_subscription_rel.srsubstate column. After the copy finishes (FINISHEDCOPY), the sync worker advances to SYNCWAIT and parks. The leader apply worker, in process_syncing_tables_for_apply, notices the SYNCWAIT state and flips the sync worker to CATCHUP, recording the apply worker’s current LSN as the sync point:

// process_syncing_tables_for_apply — src/backend/replication/logical/tablesync.c (condensed)
if (rstate->state == SUBREL_STATE_SYNCWAIT)
{
/* Sync worker is waiting for apply. Tell it it can catch up now. */
syncworker->relstate = SUBREL_STATE_CATCHUP;
syncworker->relstate_lsn = Max(syncworker->relstate_lsn, current_lsn);
}
if (rstate->state == SUBREL_STATE_SYNCWAIT)
{
logicalrep_worker_wakeup_ptr(syncworker);
/* commit to release locks, then busy-wait for the sync worker */
wait_for_relation_state_change(rstate->relid, SUBREL_STATE_SYNCDONE);
}

The sync worker, now in CATCHUP, applies the change stream itself from the copy snapshot’s position up to the apply worker’s sync LSN. Once it reaches that LSN, process_syncing_tables_for_sync moves it to SYNCDONE, drops the per-table slot at the publisher, and exits:

// process_syncing_tables_for_sync — src/backend/replication/logical/tablesync.c (condensed)
if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
current_lsn >= MyLogicalRepWorker->relstate_lsn)
{
MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
MyLogicalRepWorker->relstate_lsn = current_lsn;
UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn, false);
walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
}

Finally, when the leader apply worker’s stream position passes the sync worker’s SYNCDONE LSN (current_lsn >= rstate->lsn), the apply worker promotes the table to READY — and from then on should_apply_changes_for_rel returns true for it, so the apply worker streams that table’s changes normally. The whole state machine is per-table, so a hundred-table subscription syncs tables concurrently and folds each one into the live stream independently.

flowchart TD
    INIT["INIT<br/>(srsubstate 'i')"] --> DSYNC["DATASYNC 'd'<br/>create slot + snapshot"]
    DSYNC --> COPY["COPY table TO STDOUT<br/>(copy_table / CopyFrom)"]
    COPY --> FC["FINISHEDCOPY 'f'"]
    FC --> SW["SYNCWAIT 'w'<br/>sync worker parks"]
    SW -->|"apply worker sets state"| CU["CATCHUP 'c'<br/>sync worker applies stream<br/>up to apply LSN"]
    CU -->|"current_lsn >= relstate_lsn"| SD["SYNCDONE 's'<br/>drop tablesync slot, exit"]
    SD -->|"apply worker passes SYNCDONE LSN"| RDY["READY 'r'<br/>leader apply streams normally"]

The subscriber code lives under src/backend/replication/logical/. Four files carry the bulk of the apply path; this walkthrough names the stable symbols in call-flow order and ends with a position-hint table.

  • ApplyLauncherRegister — called from the postmaster at startup; registers the launcher as a BgWorker with BgWorkerStart_RecoveryFinished so it comes up after the server can accept connections.
  • ApplyLauncherMain — the poll loop. Calls get_subscription_list, and for each enabled subscription with no running worker, applies the wal_retrieve_retry_interval throttle and calls logicalrep_worker_launch.
  • ApplyLauncherGetWorkerStartTime / ApplyLauncherSetWorkerStartTime — read/write the per-subscription last-start time in the DSA-backed hash installed by logicalrep_launcher_attach_dshmem.
  • logicalrep_worker_launch — claims a free LogicalRepWorker slot under LogicalRepWorkerLock, registers a dynamic bgworker pointing at ApplyWorkerMain or TablesyncWorkerMain, then blocks in WaitForReplicationWorkerAttach.
  • logicalrep_worker_find — scans the shared worker array for the slot matching (subid, relid); the launcher uses it to detect a still-running worker.
  • ApplyWorkerMain — bgworker entry point. Calls SetupApplyOrSyncWorker then run_apply_worker.
  • SetupApplyOrSyncWorker / InitializeLogRepWorker — set session_replication_role = replica, connect to the worker’s database as the subscription owner, empty search_path, load MySubscription, and register the syscache invalidation callback.
  • run_apply_worker — set up the replication origin (the durable cursor), walrcv_connect to the publisher, walrcv_startstreaming, then start_applyLogicalRepApplyLoop.
  • LogicalRepApplyLoop — receive CopyData, peek the leading byte ('w' data / 'k' keepalive), and dispatch each decoded message via apply_dispatch; periodically send_feedback.
  • apply_dispatch — switch on the LogicalRepMsgType byte and call the matching apply_handle_* routine.
  • apply_handle_begin / apply_handle_commit — bracket a remote transaction; commit advances the origin and calls process_syncing_tables. apply_handle_commit_internal does the actual local commit.
  • apply_handle_insert / apply_handle_update / apply_handle_delete — decode the row, logicalrep_rel_open, filter via should_apply_changes_for_rel, build the slot, and call the corresponding *_internal.
  • apply_handle_insert_internal / apply_handle_update_internal / apply_handle_delete_internal — run the change through the executor (ExecSimpleRelationInsert/Update/Delete); update/delete first call FindReplTupleInLocalRel and may ReportApplyConflict.
  • FindReplTupleInLocalRel — locate the local row via RelationFindReplTupleByIndex (when an identity/PK index exists) or RelationFindReplTupleSeq (REPLICA IDENTITY FULL).
  • get_flush_position / send_feedback / store_flush_position — maintain the remote↔local LSN mapping and gate upstream feedback on local durability.
  • logicalrep_relmap_update — install/refresh a LogicalRepRelation (from a RELATION message) into the LogicalRepRelMap hash.
  • logicalrep_rel_open / logicalrep_rel_close — open the local relation for a remote id, rebuilding the attribute map and derived state if the entry was invalidated by local DDL; release it.
  • logicalrep_rel_mark_updatable — decide entry->updatable from the local identity/PK key vs. the remote published key set.
  • GetRelationIdentityOrPK / FindUsableIndexForReplicaIdentityFull / FindLogicalRepLocalIndex — choose the index used to locate rows.
  • logicalrep_partition_open — relmap entry for a leaf partition during tuple routing.
  • TablesyncWorkerMain — bgworker entry point for a per-table sync worker; shares SetupApplyOrSyncWorker with the apply worker.
  • LogicalRepSyncTableStart — the copy driver: read srsubstate, create the per-table slot, snapshot, copy_table, then move toward SYNCWAIT.
  • copy_table / fetch_remote_table_info — issue COPY ... TO STDOUT on the publisher and drain it into the local table via CopyFrom.
  • process_syncing_tablesprocess_syncing_tables_for_apply / process_syncing_tables_for_sync — drive the SYNCWAIT → CATCHUP → SYNCDONE → READY handshake from each side.
  • wait_for_relation_state_change — busy-wait helper used by the apply worker while a sync worker catches up.
  • UpdateSubscriptionRelState / GetSubscriptionRelState — read/write the pg_subscription_rel.srsubstate catalog column.

Position hints (as of 2026-06-05, REL_18 273fe94)

Section titled “Position hints (as of 2026-06-05, REL_18 273fe94)”
SymbolFileLine
ApplyLauncherRegistersrc/backend/replication/logical/launcher.c928
ApplyLauncherMainsrc/backend/replication/logical/launcher.c1132
logicalrep_worker_launchsrc/backend/replication/logical/launcher.c310
logicalrep_worker_findsrc/backend/replication/logical/launcher.c247
ApplyWorkerMainsrc/backend/replication/logical/worker.c4833
SetupApplyOrSyncWorkersrc/backend/replication/logical/worker.c4792
InitializeLogRepWorkersrc/backend/replication/logical/worker.c4674
run_apply_workersrc/backend/replication/logical/worker.c4561
LogicalRepApplyLoopsrc/backend/replication/logical/worker.c3589
apply_dispatchsrc/backend/replication/logical/worker.c3383
apply_handle_commitsrc/backend/replication/logical/worker.c1020
apply_handle_commit_internalsrc/backend/replication/logical/worker.c2268
apply_handle_insertsrc/backend/replication/logical/worker.c2398
apply_handle_insert_internalsrc/backend/replication/logical/worker.c2489
apply_handle_update_internalsrc/backend/replication/logical/worker.c2677
apply_handle_delete_internalsrc/backend/replication/logical/worker.c2862
FindReplTupleInLocalRelsrc/backend/replication/logical/worker.c2930
should_apply_changes_for_relsrc/backend/replication/logical/worker.c461
get_flush_positionsrc/backend/replication/logical/worker.c3503
send_feedbacksrc/backend/replication/logical/worker.c3853
logicalrep_relmap_updatesrc/backend/replication/logical/relation.c164
logicalrep_rel_mark_updatablesrc/backend/replication/logical/relation.c296
logicalrep_rel_opensrc/backend/replication/logical/relation.c349
logicalrep_rel_closesrc/backend/replication/logical/relation.c504
logicalrep_partition_opensrc/backend/replication/logical/relation.c633
FindUsableIndexForReplicaIdentityFullsrc/backend/replication/logical/relation.c776
GetRelationIdentityOrPKsrc/backend/replication/logical/relation.c891
process_syncing_tables_for_syncsrc/backend/replication/logical/tablesync.c294
process_syncing_tables_for_applysrc/backend/replication/logical/tablesync.c418
process_syncing_tablessrc/backend/replication/logical/tablesync.c695
wait_for_relation_state_changesrc/backend/replication/logical/tablesync.c183
fetch_remote_table_infosrc/backend/replication/logical/tablesync.c825
copy_tablesrc/backend/replication/logical/tablesync.c1143
LogicalRepSyncTableStartsrc/backend/replication/logical/tablesync.c1319
ReportApplyConflictsrc/backend/replication/logical/conflict.c103
InitConflictIndexessrc/backend/replication/logical/conflict.c138
ConflictType (enum)src/include/replication/conflict.h31
SUBREL_STATE_* (macros)src/include/catalog/pg_subscription_rel.h62
LogicalRepWorker (struct)src/include/replication/worker_internal.h
LogicalRepWorkerType (enum)src/include/replication/worker_internal.h

Claims in this document were checked against the REL_18 tree at commit 273fe94 under /data/hgryoo/references/postgres. Spot-checks:

  • Three worker kinds. LogicalRepWorkerType in src/include/replication/worker_internal.h defines exactly WORKERTYPE_UNKNOWN, WORKERTYPE_TABLESYNC, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY. Verified.
  • Conflict taxonomy. ConflictType in src/include/replication/conflict.h lists CT_INSERT_EXISTS, CT_UPDATE_ORIGIN_DIFFERS, CT_UPDATE_EXISTS, CT_UPDATE_MISSING, CT_DELETE_ORIGIN_DIFFERS, CT_DELETE_MISSING, CT_MULTIPLE_UNIQUE_CONFLICTS, with CONFLICT_NUM_TYPES = CT_MULTIPLE_UNIQUE_CONFLICTS + 1. The CT_*_ORIGIN_DIFFERS and CT_MULTIPLE_UNIQUE_CONFLICTS members confirm this is REL_18, not an older tree. Verified.
  • State machine constants. SUBREL_STATE_INIT 'i', SUBREL_STATE_DATASYNC 'd', SUBREL_STATE_FINISHEDCOPY 'f', SUBREL_STATE_SYNCWAIT 'w', SUBREL_STATE_CATCHUP 'c', SUBREL_STATE_SYNCDONE 's', SUBREL_STATE_READY 'r' all present in src/include/catalog/pg_subscription_rel.h. Verified.
  • Replica-identity fallback. GetRelationIdentityOrPK in relation.c calls RelationGetReplicaIndex then RelationGetPrimaryKeyIndex; FindReplTupleInLocalRel asserts OidIsValid(localidxoid) || replident == REPLICA_IDENTITY_FULL and branches to RelationFindReplTupleByIndex vs RelationFindReplTupleSeq. Verified.
  • Feedback gating. get_flush_position reports a remote LSN as *flush only when pos->local_end <= GetFlushRecPtr(NULL). Verified.
  • Privilege handling. apply_handle_insert calls SwitchToUntrustedUser(...relowner...) when !MySubscription->runasowner, and InitializeLogRepWorker sets session_replication_role = replica. Verified.
  • Line numbers in the position-hint table were read directly from the tree on 2026-06-05; symbols are the durable anchor and line numbers are hints that decay with reformatting.

Scope boundary: logical decoding on the publisher, the pgoutput plugin, and the walsender/walreceiver transport are out of scope here and are covered by postgres-logical-decoding.md, postgres-pgoutput.md, and postgres-wal-sender-receiver.md. Replication slots and origins as durable objects are in postgres-replication-slots.md. contrib/ modules are out of scope entirely.

Beyond PostgreSQL — Comparative Designs & Research Frontiers

Section titled “Beyond PostgreSQL — Comparative Designs & Research Frontiers”

MySQL row-based replication. MySQL’s binlog in ROW format is the closest analogue: each Write_rows/Update_rows/Delete_rows event carries before/after images, and the SQL/applier thread re-executes them. MySQL matches rows by primary key (or a full image scan when none exists), exactly mirroring PostgreSQL’s replica-identity-or-seqscan choice. The big architectural difference is parallelism: MySQL’s multi-threaded applier (MTS) partitions by database or by logical_clock commit groups, whereas PostgreSQL keeps one leader apply worker per subscription and only fans out for streamed in-progress transactions via parallel apply workers.

Oracle GoldenGate / SQL Server transactional replication. Both decouple capture (a redo/transaction-log reader) from delivery (a queue) from apply (a replicat / distribution-agent), the same three-stage pipeline Database Internals (Petrov, ch. 11) describes. GoldenGate’s conflict detection and resolution (CDR) is richer than REL_18’s: it offers configurable resolution methods (timestamp/LWW, discard, custom procedure) where PostgreSQL 18 detects-and-logs and otherwise takes the natural outcome. The PostgreSQL community has been incrementally building toward configurable resolution; the typed ConflictType taxonomy and per-tuple origin/xmin/commit-ts reporting added in recent releases are the foundation a future LWW or custom resolver would build on.

Debezium and the CDC ecosystem. Debezium reads PostgreSQL’s logical decoding output (via pgoutput or wal2json) and ships change events to Kafka, inverting the topology: the “subscriber” is an external stream processor rather than another PostgreSQL. This shows the decode/transport boundary is a clean seam — the same publisher feeds either a native subscriber or a CDC pipeline. PostgreSQL’s native apply path is the “batteries-included” case of the general CDC pattern.

Conflict-free replicated data types (CRDTs) and multi-master. The research frontier for logical replication is active-active (multi-master) operation, where every node both publishes and subscribes. The hard problem is convergence under concurrent conflicting writes. Designing Data-Intensive Applications (Kleppmann, ch. 5 “Replication” and ch. 7) lays out the options: last-write-wins (lossy), version vectors, and CRDTs (conflict-free by construction). PostgreSQL’s origin-tracking — every commit records which node first produced it, surfaced as CT_*_ORIGIN_DIFFERS — is precisely the metadata a version-vector or LWW resolver needs, and is why extensions like BDR/pglogical can layer multi-master semantics on top of the core apply machinery. The REL_18 in-core feature set stops at detection; automatic resolution policy remains the active design conversation.

Replication theory framing. Petrov (ch. 11, “Replication and Consistency”) and Silberschatz (Database System Concepts, ch. 23, distributed databases) both frame the core guarantee a logical applier must provide as exactly-once, in-commit-order application with a durable cursor. PostgreSQL realizes “durable cursor” as a replication origin advanced in the same transaction as the data change, and “exactly-once” as the combination of that origin plus the flush-gated feedback that prevents the publisher from discarding WAL the subscriber has not durably applied. The tablesync splice is the engineering answer to the bootstrap gap the theory usually assumes away.

  • Code (REL_18, commit 273fe94, as of 2026-06-05):
    • src/backend/replication/logical/worker.c — apply worker, apply loop, dispatch, row handlers, conflict reporting, feedback.
    • src/backend/replication/logical/launcher.c — launcher poll loop, worker launch/registry, restart throttle.
    • src/backend/replication/logical/tablesync.c — tablesync worker, copy driver, SYNCWAIT/CATCHUP/SYNCDONE/READY handshake.
    • src/backend/replication/logical/relation.c — relation map, replica identity index selection, updatability.
    • src/backend/replication/logical/conflict.cReportApplyConflict, InitConflictIndexes, conflict errdetail formatting.
    • src/include/replication/worker_internal.hLogicalRepWorker, LogicalRepWorkerType.
    • src/include/replication/conflict.hConflictType.
    • src/include/catalog/pg_subscription_rel.hSUBREL_STATE_*.
    • src/include/catalog/pg_subscription.h — subscription catalog.
  • Textbook / theory:
    • Petrov, Database Internals (2019), ch. 11 “Replication and Consistency” — logical change records, leader/follower pipeline.
    • Kleppmann, Designing Data-Intensive Applications (2017), ch. 5 “Replication” and ch. 7 — conflict handling, LWW, version vectors, CRDTs; ch. 11 logical change capture.
    • Silberschatz et al., Database System Concepts (7e), ch. 23 — distributed databases and replication.
  • Related KB docs: postgres-logical-decoding.md, postgres-pgoutput.md, postgres-wal-sender-receiver.md, postgres-replication-slots.md, postgres-synchronous-replication.md.