Skip to content

Conversation

@jgaskins
Copy link
Contributor

@jgaskins jgaskins commented Jul 18, 2025

The basic idea is that you create a replication subscription with PG.connect_replication. It's similar to PG.connect_listen but, unlike a LISTEN connection where every message is the same type (it just yields strings), each message can be a different type. So instead of passing a block you pass a PG::Replication::Handler instance with received methods defined for receiving different types of replication messages:

class MyHandler
  include PG::Replication::Handler

  # Override this to tell the Postgres server where you are in the WAL.
  # Make sure you `yield` in between any setup and teardown.
  def received(data : PG::Replication::XLogData, connection : PG::Replication::Connection)
    yield
    connection.last_wal_byte_applied = data.wal_end
    connection.last_wal_byte_flushed = data.wal_end
  end

  # New relation defined
  def received(relation : PG::Replication::Relation)
  end

  # Record updated
  def received(update : PG::Replication::Update)
  end
end

subscriber = PG.connect url,
  handler: MyHandler.new,
  publication_name: "my_publication",
  slot_name: "my_replication_slot"

The default is to do nothing when receiving a WALMessage.

Most of the work here happens in XLogData events, which contain the WALMessages that people will actually care about. Everything else seems to be protocol meta. The objects passed to the handler's received methods are the XLogData's message value and are decoded based on message formats specified in the Postgres docs.

There are still some things I need and/or want to do here:

  • Implement message types
    • Begin
    • Message
    • Commit
    • Origin
    • Relation
    • Type
    • Insert
    • Update
    • Delete
    • Truncate
  • Allow decoding TupleData as other types (rather than raw Bytes)
  • Allow starting the WAL from somewhere other than the beginning
  • Implement keepalive responses
  • Since logical replication requires a change to postgresql.conf, the specs need to be opt-in or enabled only when logical replication is enabled

The initial spec does successfully invoke the handler's methods for the CREATE TABLE, INSERT, and UPDATE commands.

Copy link
Contributor Author

@jgaskins jgaskins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some notes about the current state of things. Some of them are subject to change.

@@ -0,0 +1,353 @@
module PG::Replication
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't tell yet if this should be LogicalReplication or if physical replication uses the same concepts.

when 'n'
nil
when 'u'
UnchangedTOASTValue.new
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tested this yet, but the docs say that an unchanged TOASTed value is not sent, so I'm assuming it's just a marker to indicate that the previous value is unchanged to avoid sending huge strings over the wire unnecessarily.

end

def start_replication_frame_loop(publication_name : String, slot_name : String, &block : PG::Replication::Frame ->)
command = "START_REPLICATION SLOT #{slot_name} LOGICAL 0/0 (proto_version '1', binary 'true', publication_names '#{publication_name}')"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few notes on this line:

  • There are currently 4 proto_version values, but anything other than 1 requires Postgres 14+ and changes how some WALMessages are decoded. Supporting these dynamically would be more complicated, so I'm just supporting the baseline out of the gate.
  • This is currently hardcoded to start at the beginning of the WAL (0/0). The PG.listen_replication method should almost certainly be updated to allow passing in other LSNs to start from a different point in the WAL history.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW postgres 13 is EOL in 3 months I think. Still starting support with only 1 is probably fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. Main addition of in v2 is transaction ids on various actions, which could be useful for some that might want this functionality. I wouldn't say it's necessary since the events are linear (all CRUD actions map to the transaction id from the most recent Begin WAL message), but it might still be useful.

jgaskins added 3 commits July 18, 2025 17:28
This way, if the replication slot wasn't created, or if another stuck
around from a previous run, we don't run into misleading errors.
@will
Copy link
Owner

will commented Jul 20, 2025

I've not done much with logical replication myself, so I cant comment on the details of the protocol here, but it all seems super cool!

@jgaskins
Copy link
Contributor Author

I've had to add Debezium to our stack to pull some data into our data warehouse. All the trouble I've had getting it working got me wondering how easy or difficult it would be to just consume the WAL ourselves and emit a similar output structure. Completely nerd-sniped.

I cant comment on the details of the protocol here

I'm hoping nobody will need to be all that concerned with it. 😄 It's not super complicated, but I want to come up with a good set of tests that illustrate that replication is working/broken by just performing actions and asserting on things appearing in the stream of logical replication events.

jgaskins added 15 commits July 20, 2025 18:33
This commit also comments out some logical WAL messages that aren't
supported with proto_version 1.
Block-less `Log.error(exception : Exception)` was only introduced in
recent versions of Crystal. This shard supports versions earlier than
that, so we need to use the more compatible overload of the method.
This was leftover from an experiment I thought I'd removed
This is now broken down into multiple specs that test individual parts
of this functionality.
I don't know how to test Message and Origin yet, but they need to be
there to at least consume the bytes off the wire.
The method yields, so the abstract method needs to have the `&` there.
This was misnamed before. An XLogData *has* a message, but it is not
itself a message.
@jgaskins jgaskins force-pushed the add-logical-replication branch from 2f584ad to 39a8ac8 Compare July 25, 2025 04:41
@jgaskins jgaskins marked this pull request as ready for review July 25, 2025 05:23
@jgaskins
Copy link
Contributor Author

Alrighty, I think this is at least ready for initial review. Feel free to let me know if anything is unclear.

@will
Copy link
Owner

will commented Jul 29, 2025

Awesome, thanks! I think this is all good to go. Could you add a changelog entry too please first?

@jgaskins
Copy link
Contributor Author

jgaskins commented Aug 6, 2025

Sweet, will do. I did find a bug the other night, though, that I still need to address. Specifically: error frames aren't being decoded on this connection. While looking into that, I realized the error frame is this error, which you've already got here. Same message structure, which makes sense.

Then I wondered, since that's the same as ErrorResponse, if the CopyBoth message is documented on that page, too. And it is. That's when I realized I could've reused the same connection structure as LISTEN like I originally intended.

I think, for the moment, I'll just reimplement the error frame here (the pattern I used in implementing these messages turned out pretty decent) and refactor toward reuse later.

The method signatures were updated, but the comments still referenced
the previous method signatures.
Had the loop condition inverted by mistake.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants