Flink Kafka连接器报错解决

Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='kafka'
	at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:587)
	at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:561)
	at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:180)
	... 32 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

在单机跑通集群环境下失败,去https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/connectors/table/kafka/ 下载依赖,放到服务器lib目录即可。

验证是否添加成功,将连接器填错查看报错是否有Kafka连接器。

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafkaa' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
jdbc
kafka
print
upsert-kafka
	at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:399)
	at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:583)
	... 33 more

Flink standalone方式搭建及问题

通过Docker一键搭建,以下是模板。

version: '3.1'
services:
  jobmanager:
    image: flink:latest
    ports:
      - "8081:8081"
    command: jobmanager
    volumes:
      - ./jobs:/opt/flink/jobs:rw
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
    deploy:
      resources:
         limits:
            cpus: "2.00"
            memory: 4G

  taskmanager:
    image: flink:latest
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 2
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
    deploy:
      resources:
         limits:
            cpus: "2.00"
            memory: 4G

遇到的坑1、class传参问题

【错误】./bin/flink run ./jobs/flinkpgsql-1.0-SNAPSHOT-jar-with-dependencies.jar -c cn.sdust.jtyhzl.packingcnt.ParkingCnt

【正确】./bin/flink run -c cn.sdust.jtyhzl.packingcnt.ParkingCnt ./jobs/flinkpgsql-1.0-SNAPSHOT-jar-with-dependencies.jar

【结论】 flink run 传参只能放脚本地址前面。