Skip to content

[Bug]: SqlTransformSchemaTransformProvider.java does not work #34613

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
2 of 17 tasks
liferoad opened this issue Apr 11, 2025 · 5 comments
Open
2 of 17 tasks

[Bug]: SqlTransformSchemaTransformProvider.java does not work #34613

liferoad opened this issue Apr 11, 2025 · 5 comments

Comments

@liferoad
Copy link
Contributor

What happened?

Code:

# https://beam.apache.org/documentation/sdks/python-custom-multi-language-pipelines-guide/#using-beam-native-java-schematransforms
from apache_beam.transforms.external_transform_provider import ExternalTransformProvider
from apache_beam.transforms.external import BeamJarExpansionService

identifier ='schematransform:org.apache.beam:sql_transform:v1'
expansion_service = "sdks:java:extensions:sql:expansion-service:shadowJar"
provider = ExternalTransformProvider(BeamJarExpansionService(expansion_service))

Error:

ERROR:root:Encountered an error while discovering expansion service at 'https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-extensions-sql-expansion-service/2.64.0/beam-sdks-java-extensions-sql-expansion-service-2.64.0.jar':
Failed to decode schema due to an issue with Field proto:

name: "dialect"
type {
  nullable: true
  logical_type {
    urn: "beam:logical_type:javasdk_enum:v1"
    payload: "\202SNAPPY\000\000\000\000\001\000\000\000\001\000\000\001\220\322\003\360a\254\355\000\005sr\0008org.apache.beam.sdk.schemas.logicaltypes.EnumerationType0Q\nk\326\330\360j\002\000\002L\000\nenumValuest\000JLorg[/](http://localhost:8888/)ap\001T\000/\001T\210[/vendor/guava/v32_1_2_jre/com/googl](http://localhost:8888/vendor/guava/v32_1_2_jre/com/googl)\005\013Tmon[/collect/BiMap](http://localhost:8888/collect/BiMap);L\000\006v\rV\010\020Lj\001><util[/List](http://localhost:8888/List);xpsr\000L>\277\000\tk\030.guava.\035k\020.com.\tk\001\013\014mon.\rk\020.Hash\005o\000\000\r\001`\003\000\000xpw\004\000\000\000\002t\000\007zetasqlsr\000\021\001\202h.lang.Integer\022\342\240\244\367\201\2078\002\000\001I\000\005\005\253\014xr\000\020\031(8Number\206\254\225\035\013\224\340\213\002\001Y\001e`t\000\007calcitesq\000~\000\007\000\000\000\001xsr\000\023\005:\001\344\024.Array\001\351 x\201\322\035\231\307a\235\003\001d\024\004sizex\001D\004\002w\005\241(q\000~\000\006q\000~\000\nx"
    representation {
      atomic_type: INT32
    }
    argument_type {
      map_type {
        key_type {
          atomic_type: STRING
        }
        value_type {
          atomic_type: INT32
        }
      }
    }
    argument {
      map_value {
        entries {
          key {
            atomic_value {
              string: "zetasql"
            }
          }
          value {
            atomic_value {
              int32: 0
            }
          }
        }
        entries {
          key {
            atomic_value {
              string: "calcite"
            }
          }
          value {
            atomic_value {
              int32: 1
            }
          }
        }
      }
    }
  }
}
id: 2
encoding_position: 2
Traceback (most recent call last):
  File "[/Users/xqhu/PlayGround/venv-beam-2.60.0-3.11/lib/python3.11/site-packages/apache_beam/typehints/schemas.py", line 553](http://localhost:8888/lab/tree/venv-beam-2.60.0-3.11/lib/python3.11/site-packages/apache_beam/typehints/schemas.py#line=552), in named_tuple_from_schema
    field_py_type = self.typing_from_runner_api(field.type)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "[/Users/xqhu/PlayGround/venv-beam-2.60.0-3.11/lib/python3.11/site-packages/apache_beam/typehints/schemas.py", line 475](http://localhost:8888/lab/tree/venv-beam-2.60.0-3.11/lib/python3.11/site-packages/apache_beam/typehints/schemas.py#line=474), in typing_from_runner_api
    base = self.typing_from_runner_api(base_type)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "[/Users/xqhu/PlayGround/venv-beam-2.60.0-3.11/lib/python3.11/site-packages/apache_beam/typehints/schemas.py", line 538](http://localhost:8888/lab/tree/venv-beam-2.60.0-3.11/lib/python3.11/site-packages/apache_beam/typehints/schemas.py#line=537), in typing_from_runner_api
    return LogicalType.from_runner_api(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "[/Users/xqhu/PlayGround/venv-beam-2.60.0-3.11/lib/python3.11/site-packages/apache_beam/typehints/schemas.py", line 786](http://localhost:8888/lab/tree/venv-beam-2.60.0-3.11/lib/python3.11/site-packages/apache_beam/typehints/schemas.py#line=785), in from_runner_api
    raise ValueError(
ValueError: No logical type registered for URN 'beam:logical_type:javasdk_enum:v1'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "[/Users/xqhu/PlayGround/venv-beam-2.60.0-3.11/lib/python3.11/site-packages/apache_beam/transforms/external_transform_provider.py", line 236](http://localhost:8888/lab/tree/venv-beam-2.60.0-3.11/lib/python3.11/site-packages/apache_beam/transforms/external_transform_provider.py#line=235), in _create_wrappers
    schematransform_configs = SchemaAwareExternalTransform.discover(service)
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "[/Users/xqhu/PlayGround/venv-beam-2.60.0-3.11/lib/python3.11/site-packages/apache_beam/transforms/external.py", line 442](http://localhost:8888/lab/tree/venv-beam-2.60.0-3.11/lib/python3.11/site-packages/apache_beam/transforms/external.py#line=441), in discover
    return list(cls.discover_iter(expansion_service, ignore_errors))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "[/Users/xqhu/PlayGround/venv-beam-2.60.0-3.11/lib/python3.11/site-packages/apache_beam/transforms/external.py", line 453](http://localhost:8888/lab/tree/venv-beam-2.60.0-3.11/lib/python3.11/site-packages/apache_beam/transforms/external.py#line=452), in discover_iter
    schema = named_tuple_from_schema(proto_config.config_schema)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "[/Users/xqhu/PlayGround/venv-beam-2.60.0-3.11/lib/python3.11/site-packages/apache_beam/typehints/schemas.py", line 596](http://localhost:8888/lab/tree/venv-beam-2.60.0-3.11/lib/python3.11/site-packages/apache_beam/typehints/schemas.py#line=595), in named_tuple_from_schema
    schema_registry=schema_registry).named_tuple_from_schema(schema)
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "[/Users/xqhu/PlayGround/venv-beam-2.60.0-3.11/lib/python3.11/site-packages/apache_beam/typehints/schemas.py", line 557](http://localhost:8888/lab/tree/venv-beam-2.60.0-3.11/lib/python3.11/site-packages/apache_beam/typehints/schemas.py#line=556), in named_tuple_from_schema
    raise ValueError(
ValueError: Failed to decode schema due to an issue with Field proto:

name: "dialect"
type {
  nullable: true
  logical_type {
    urn: "beam:logical_type:javasdk_enum:v1"
    payload: "\202SNAPPY\000\000\000\000\001\000\000\000\001\000\000\001\220\322\003\360a\254\355\000\005sr\0008org.apache.beam.sdk.schemas.logicaltypes.EnumerationType0Q\nk\326\330\360j\002\000\002L\000\nenumValuest\000JLorg[/ap](http://localhost:8888/ap)\001T\000[/](http://localhost:8888/)\001T\210[/vendor/guava/v32_1_2_jre/com/googl](http://localhost:8888/vendor/guava/v32_1_2_jre/com/googl)\005\013Tmon[/collect/BiMap](http://localhost:8888/collect/BiMap);L\000\006v\rV\010\020Lj\001><util[/List](http://localhost:8888/List);xpsr\000L>\277\000\tk\030.guava.\035k\020.com.\tk\001\013\014mon.\rk\020.Hash\005o\000\000\r\001`\003\000\000xpw\004\000\000\000\002t\000\007zetasqlsr\000\021\001\202h.lang.Integer\022\342\240\244\367\201\2078\002\000\001I\000\005\005\253\014xr\000\020\031(8Number\206\254\225\035\013\224\340\213\002\001Y\001e`t\000\007calcitesq\000~\000\007\000\000\000\001xsr\000\023\005:\001\344\024.Array\001\351 x\201\322\035\231\307a\235\003\001d\024\004sizex\001D\004\002w\005\241(q\000~\000\006q\000~\000\nx"
    representation {
      atomic_type: INT32
    }
    argument_type {
      map_type {
        key_type {
          atomic_type: STRING
        }
        value_type {
          atomic_type: INT32
        }
      }
    }
    argument {
      map_value {
        entries {
          key {
            atomic_value {
              string: "zetasql"
            }
          }
          value {
            atomic_value {
              int32: 0
            }
          }
        }
        entries {
          key {
            atomic_value {
              string: "calcite"
            }
          }
          value {
            atomic_value {
              int32: 1
            }
          }
        }
      }
    }
  }
}
id: 2
encoding_position: 2

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@liferoad
Copy link
Contributor Author

cc @ahmedabu98 @Abacn

@ahmedabu98
Copy link
Contributor

Root error is ValueError: No logical type registered for URN 'beam:logical_type:javasdk_enum:v1'

this is a logical type URN (constructed here) and points to the EnumerationType that is used as a configuration field in SqlTransformSchemaTransformProvider:

The SchemaTransform's config uses field types that do not have a Python equivalent. Specifically, they are:

  • "dialect" using EnumerationType
  • "tableproviders" using EnumerationType
  • "parameters" using OneOfType

I don't see SqlTransformSchemaTransformProvider being used anywhere currently, so I think a reasonable approach would be to re-evaluate the configuration schema and change any problematic fields to be simple primitive types that Python can recognize.

@Abacn
Copy link
Contributor

Abacn commented Apr 11, 2025

Thanks @ahmedabu98. Since it's not used, let me just remove this enum parameter, and always returns calcite, as part of #34423

Update: there is another enum type parameter tableproviders still preventing it working

@talatuyarer
Copy link
Contributor

Tableprovider is important I believe when we use create Kafka Table in SQL it uses TableProvider in Java. I am not sure about python side.

@ahmedabu98
Copy link
Contributor

It'd be good to look over the SchemaTransform implementation in general. When I skimmed thru it I saw that some config fields weren't actually being used anywhere (my guess is future work was planned but never got implemented)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants