materialize icon indicating copy to clipboard operation
materialize copied to clipboard

[Epic] Postgres source could materialize replicated tables directly

Open frankmcsherry opened this issue 3 years ago • 9 comments

Feature request

At the moment, a postgres source gets created with a homogenous collection of records containing table identifiers and packed up row payloads. The CREATE VIEWS command can create views that unpack them, but they are then inefficient to read (re-reading, filtering, and decoding the data for each use). It would seem that common users would then materialize these views back to persist, where it also seems the postgres source could have written them in the first place.

I propose that a "postgres source" (idk if is still "a" source) should performed the equivalent of today's CREATE SOURCE, CREATE VIEWS, and the multiple CREATE MATERIALIZED VIEW commands. This is very likely what the user wants and expects, but also removes various footguns (not doing the above) and bits of magic (CREATE VIEWS).

cc: @petrosagg, @benesch

frankmcsherry avatar Aug 02 '22 16:08 frankmcsherry

The more I test with larger postgres instances, the more this feels like a hard requirement. For example, tpch scale 10 is 10GB but the regions table only has 5 rows. Having to process the full dataset just to work with that table is incredibly inefficient, and in practice requires users to materialize all tables into persist before using them. Ideally, this should happen directly from storaged.

I think the previous model made sense in the binary, due to limitations requiring the postgres source be arranged in memory for any real use case, but not in platform.

The big question is, what are the catalog objects. My proposal is that we still create a source catalog object, representing the logical replication stream, but which is always empty or non-selectable. We now additionally create MATERIALIZED VIEW catalog objects for each table, indicating they are stored in persist. The source object still exists for administrative tasks such as rescaling and dropping, but the materialized views are used for efficient querying.

Syntax wise, the create source statement would need to be extended to support the create views options, such as only materializing certain tables or renaming tables. https://materialize.com/docs/unstable/sql/create-views/#creating-views-for-specific-tables

sjwiesman avatar Aug 03 '22 13:08 sjwiesman

Also linking this issue @frankmcsherry indicated may be related. https://github.com/MaterializeInc/materialize/issues/6591

sjwiesman avatar Aug 03 '22 13:08 sjwiesman

One sorta-resolution of that issue could be that in the dataflow implementation of postgres sources, it can hotwire in a bespoke use of timely's partition operator. That might be easier than solving the general problem of #6591.

frankmcsherry avatar Aug 03 '22 13:08 frankmcsherry

It would be amazing to demux the source during ingestion!

This issue has been around since the original implementation of the postgres source (in form of the issue @sjwiesman linked). Apart from figuring out how to render the dataflow, which I think is much easier today that we don't have the concept of source instances, there is also an SQL interface error.

Back then we were considering a oneshot CREATE SOURCES (notice the plural) command that would create multiple SQL objects in one go but in the end we didn't go for it. Do we have more clarity now on what the SQL interface would look like for these kinds of multiplexed sources? Probably a question for @benesch

petrosagg avatar Aug 08 '22 12:08 petrosagg

Do we have more clarity now on what the SQL interface would look like for these kinds of multiplexed sources?

CREATE SOURCES is probably the wrong direction. There should be a single source object in the catalog for ddl operations such as rescaling the source and dropping. I believe the new MATERIALIZED VIEW catalog object makes everything simpler. MATERIALIZED VIEWs contains the actual data users select, indicating the data is constantly changing, is accessible from any cluster, and has the performance characteristics of reading from persist.

The DDL should look something like (open to bikeshedding)

CREATE SOURCE mz_source
FROM POSTGRES
  CONNECTION pg_connection
  PUBLICATION 'mz_source'
  FOR ALL VIEWS;

CREATE SOURCE mz_source
FROM POSTGRES
  CONNECTION pg_connection
  PUBLICATION 'mz_source'
  FOR VIEWS (“table1”);

CREATE SOURCE mz_source
FROM POSTGRES
  CONNECTION pg_connection
  PUBLICATION 'mz_source'
  FOR VIEWS (“table1” AS “foo”, “table2” AS “otherschema”.”foo”);

sjwiesman avatar Aug 08 '22 13:08 sjwiesman

Resurfacing the original discussion around CREATE SOURCES, for context: https://github.com/MaterializeInc/materialize/issues/5370#issuecomment-773569202. I'm +1 on sticking with CREATE SOURCE and creating multiple materialized views for the specified tables. The only thing I'd suggest on top of @sjwiesman's proposal is to use TABLES instead of VIEWS:

CREATE SOURCE mz_source
FROM POSTGRES
  CONNECTION pg_connection
  PUBLICATION 'mz_source'
  FOR ALL TABLES;
  
...

Something that isn't clear to me if we go down this path is whether it'd still be possible to have some degree of control over type re-casting; thinking of cases where the upstream table contains unsupported types (most commonly, enum), and users might need to manually create the materialized view as a workaround.

morsapaes avatar Aug 08 '22 15:08 morsapaes

@morsapaes that's a great point. What about something like:

CREATE SOURCE mz_source
FROM POSTGRES
  CONNECTION pg_connection
  PUBLICATION 'mz_source'
  FOR ALL TABLES
  CAST foo.my_enum AS string;

sjwiesman avatar Aug 08 '22 15:08 sjwiesman

FYI @ggnall as our surfaces PM

nmeagan11 avatar Aug 08 '22 19:08 nmeagan11

CREATE SOURCE mz_source
FROM POSTGRES
 CONNECTION pg_connection
 PUBLICATION 'mz_source'
 FOR ALL TABLES;

This roughly LGTM. There are some hairy questions about the lifetime of the auto-created materialized views. The questions are similar to the questions about if CREATE SINK learned to auto-create materialized views materialized views, which I touched on here.

Note also that there is a large limitation here, which is that you won't be able to add or remove tables after creating the source. At least to start. Is that okay as an MVP? We could one day rig up ALTER SOURCE to do that, but that's a big work item.

benesch avatar Aug 10 '22 07:08 benesch

Note also that there is a large limitation here, which is that you won't be able to add or remove tables after creating the source. At least to start. Is that okay as an MVP?

@benesch I think this is fine. I can capture the work for the limitation in a separate issue, so long as everyone agrees.

nmeagan11 avatar Aug 10 '22 15:08 nmeagan11

It occurred to me that load generator sources have the same issue: https://materialize.com/docs/unstable/sql/create-views/#load-generators.

It'd be great if we could apply whatever magic we come up with for PostgreSQL sources to load generator sources too. @petrosagg do you think that will fall out naturally of the implementation?

benesch avatar Sep 19 '22 13:09 benesch

Status update

Since there haven't been concrete PRs on this epic recently I wanted to write a status update of what's happening and where the implementation is at. This epic required substantial design work in order to fit the concept of a source that outputs multiple streams onto a system that only has the concept of a flat list of sources with no interdependencies.

After exploring a few options, the design that seems to be the most natural (i.e has the least edge cases) is the following:

Users will type one the following SQL statements (syntax up for debate):

CREATE SOURCE "pg_source" FROM POSTGRES ....;
-- or specify a list of aliases
CREATE SOURCE "pg_source" FROM POSTGRES .... ALIAS ("public"."foo" AS "foo", "otherschema"."foo" AS "foo2");

And it will be interpreted as if the user typed multiple DDL statement in this order:

CREATE SOURCE "foo" (foo_col1 int);
CREATE SOURCE "foo2" (foo_col2 int);
CREATE SOURCE "pg_source" FROM POSTGRES .... SINK ("public"."foo" INTO "foo", "otherschema"."foo" INTO "foo2");

In the above statements the first two define a "subsource". A subsource is a source that has a schema but no ingestion associated with it. It is pretty much a table with the only difference being that instead of INSERT statements it gets its data from some other source that does have an ingestion and depends on it.

For the example above the end state of the catalog would be this:

      "pg_source"
       /       \
depends_on   depends_on
     /           \
    v             v
 "foo"            "foo2"

Notice that the dependency arrows are the opposite of what you'd expect! Today with the CREATE VIEWS the subviews depend on the parent source. But here the main source depends on the subsources. This is because this encodes a "writes to" dependency as opposed to a "reads from" dependency and is the first kind of such dependency that we have in materialize.

Reaching the above conclusion and implementing it in the adapter was a very delicate balancing act and arguably the hardest part of this epic. The good news is that as of today this part is implemented in this PR https://github.com/MaterializeInc/materialize/pull/15021

Next steps

With the adapter groundwork out of the way there are two more chunks of work that need to be done.

First, I need to change the SQL layer to actually create these subsources and implement the new syntax. This should be straightforward and will bring these new types of objects all the way to storage controller. This piece should be straightfoward and what I'll be working on while the adapter PR is being reviewed.

The last piece would be to change storaged to understand multi-output dataflows and render them on timely properly. I foresee some complexity here but nothing like the work of fitting the concept in ADAPTER.

petrosagg avatar Sep 27 '22 14:09 petrosagg

Users will type one the following SQL statements (syntax up for debate)

I like the following:

CREATE SOURCE pg_source FROM POSTGRES ... FOR ALL TABLES;

CREATE SOURCE pg_source FROM POSTGRES ... FOR TABLES ("table1" AS "foo1", "table2" AS "otherschema"."foo2");

I thought about removing FOR ALL TABLES and making that the default behavior if the user does not specify a list, but now I think it makes more sense to be explicit about it.

nmeagan11 avatar Sep 27 '22 14:09 nmeagan11

@nmeagan11 I like it! I'll go with that syntax for now. It's easy to change syntax last minute

petrosagg avatar Sep 27 '22 15:09 petrosagg

I like it too.

benesch avatar Sep 28 '22 01:09 benesch

I'm on board!

ggnall avatar Sep 28 '22 12:09 ggnall

If we are automatically replicating tables, users are likely to miss any columns with unsupported types. What would happen in these cases?

If we can't provide immediate and actionable feedback that x,y,z columns need to be cast we might want to automatically handle this for all unsupported types?

CAST foo.my_enum AS string;

ggnall avatar Sep 29 '22 14:09 ggnall

We can provide feedback that table x contains unsupported columns and the users could exclude that table from the source. having some more fine grained or allowing to select the type of a specific column is possible but requires a bit more work so we definitely need to consider that after EA

petrosagg avatar Sep 29 '22 14:09 petrosagg

CREATE SOURCE mz_source
FROM POSTGRES
  CONNECTION pg_connection
  PUBLICATION 'mz_source'
  FOR ALL TABLES
  CAST foo.my_enum AS string;

An alternate proposal for the problem of unsupported upstream types: cast all of them to strings without the user asking us to, but send a NOTICE message that we are doing so, to avoid the user assumption that we correctly support the types.

madelynnblue avatar Sep 29 '22 15:09 madelynnblue

Huge +1, @mjibson.

morsapaes avatar Sep 29 '22 15:09 morsapaes

Sanity check, but doing the implicit cast would mean adding support for a new type would be breaking, and hard to fix if it didn't support a cast from text?

frankmcsherry avatar Sep 29 '22 15:09 frankmcsherry

We'd have to ensure that if we add support for a new type with an existing postgres source, we continue to use the string cast until the user destroys and recreates the pg source. Everything supports being casted to text in postgres. If we are receiving the datums from postgres over the binary encoding (which I believe is what tokio-postgres does), then we'd have to have postgres do the text casting before sending over to us, which might make this feature fairly hard or impossible. The very hard version would be: we know how to decode the pgwire binary protocol into some rust data structure and then cast that to text, which is part of the work of just supporting the type itself, so sort of silly to do because that's the work we'd be trying to avoid.

madelynnblue avatar Sep 29 '22 15:09 madelynnblue

The replication protocol sends everything over in its text encoded version so keeping them as text is very easy. The tricky bit is the thing Frank mentioned where we must remember which types we have selected to keep as strings and which ones to cast so that future type support doesn't result in breakage.

In practice this would amount to inventing some syntax to record the exceptions, like the one proposed by Seth, and have purification look at the types of the relation and automatically insert CAST foo.my_enum AS string; for the currently unsupported ones. This will allow environmentd to restart and re-plan the statements correctly, even if we add a new type in the future. Dropping the source and re-creating with a new environmentd would get a new round of purification that would passthrough the newly supported types.

This is a fine solution and I think we should do it but is no easier than supporting the CAST foo.my_enum AS string; in the first place.

petrosagg avatar Sep 29 '22 17:09 petrosagg

Sanity check, but doing the implicit cast would mean adding support for a new type would be breaking, and hard to fix if it didn't support a cast from text?

Yes.

In practice this would amount to inventing some syntax to record the exceptions, like the one proposed by Seth, and have purification look at the types of the relation and automatically insert CAST foo.my_enum AS string; for the currently unsupported ones. This will allow environmentd to restart and re-plan the statements correctly, even if we add a new type in the future. Dropping the source and re-creating with a new environmentd would get a new round of purification that would passthrough the newly supported types.

This would still result in an an unacceptably breaking change, IMO! Users might be running CREATE SOURCE in CI or automated tests. That shouldn't create a source with a text column one day and a source with rich_new_type the next day.


So, strong preference from me to require users to explicitly spell out the columns that need casts. I don't think it will be particularly onerous, and we can give great error messages that tell users in the hint field exactly how to fix their CREATE SOURCE statement.

This is in line with a bit of product insight from @brennanvincent from a while back: users very rarely complain about verbosity in CREATE SOURCE because they type it so infrequently. (And we fixed the most glaring verbosity issue with CREATE CONNECTION.)

benesch avatar Sep 29 '22 18:09 benesch

I just realized another reason to make CAST explicit. When we add other direct db sources, such as MySQL, there may be types without 1-1 mappings or types we never support. Users will need the flexibility to specify the mz type and it’ll be great if the syntax for various db sources is consistent

sjwiesman avatar Oct 01 '22 15:10 sjwiesman

Contingency plan

There's been some talk about adding a contingency plan in case this feature doesn't land in time. That doesn't look very likely to happen, but figured we should come up with a plan in any case.

@benesch suggests we use adapter::session::vars::SessionVars; with this, we can set all users' allowed PG sources to 0 and attempting to create a new PG source of any type will simply fail.

Pros:

  • This is very few LOC to implement; can be done in an afternoon.
  • This doesn't check resource limits on boot, so any users' existing PG sources will continue to work uninterrupted, and will simply stop people from creating new ones.
  • If we want to change the value specifically for the current users, we could manually configure that value to permit them to create new sources.

Cons:

  • We're essentially shipping without Postgres sources, even if users are OK with the current behavior.

Alternatives:

  • Required WITH option, e.g. WITH (LEGACY = true), which could let users create new sources with the current behavior, though that may be exactly the reason we don't want to pursue this option. Once we no longer want to require or support this behavior, we can disable sources from getting created with the option.

sploiselle avatar Oct 05 '22 17:10 sploiselle

@sploiselle Thank you for the write-up! The SessionVars proposal sounds great to me. I like that we'd get fine-grained control and that we wouldn't disrupt existing users twice (vs. the alternative where we disrupt once to add legacy = true and another time to remove it).

uce avatar Oct 05 '22 17:10 uce

@uce we can make that work. During EA, I will control onboarding and can ensure we don’t bring any new pg source users onboard until this issue is resolved

sjwiesman avatar Oct 06 '22 13:10 sjwiesman

Status update

Things are moving along swiftly, the final PR is almost ready to get merged. https://github.com/MaterializeInc/materialize/pull/15190

The new approach introduces a new minor limitation that didn't exist before and that I leave to fix in a follow up work item. Previously users were allowed to create a postgres source that targeted a publication that included tables with unsupported column types. Users were only getting an error during the CREATE VIEWS statement at which point they could exclude the table and move on.

This is no longer the case with the new implementation (again, not for important reasons, just something fix later). This means that the only way to workaround a table with unsupported column types is to not include it in the publication at all.

petrosagg avatar Oct 06 '22 17:10 petrosagg

Awesome @petrosagg. What's the ux if they try to ingest an unsupported and uncasted-to-supported type? Do we error on CREATE SOURCE or at all?

ggnall avatar Oct 07 '22 16:10 ggnall