在Flink CDC中,将数据从Flink 1.17写入Doris,需要遵循以下步骤:
创新互联公司公司2013年成立,先为巴林左旗等服务建站,巴林左旗等地企业,进行企业商务咨询服务。为巴林左旗企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。
1、添加依赖
在项目的pom.xml文件中添加Flink CDC和Doris的依赖:
org.apache.flink flinkconnectordoris_2.11 1.13.2 org.apache.flink flinkconnectormysqlcdc 2.1.0
2、创建Flink CDC Source
创建一个Flink CDC Source,用于从MySQL数据库中读取数据变更事件:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema; public class FlinkCDCSourceExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SourceFunctionsourceFunction = MySqlSource. builder() .hostname("localhost") .port(3306) .databaseList("mydb") // 监听的数据库名 .tableList("mydb.mytable") // 监听的表名 .username("root") .password("password") .deserializer(new StringDebeziumDeserializationSchema()) // 反序列化方式 .build(); env.addSource(sourceFunction).print(); env.execute("Flink CDC Example"); } }
3、创建Doris Sink
创建一个Doris Sink,用于将数据写入Doris数据库:
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.doris.DorisSink; import org.apache.flink.streaming.connectors.doris.DorisStreamLoadOptions; import org.apache.flink.types.Row; public class DorisSinkExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 假设从Flink CDC Source获取的数据流为dataStream DataStreamdataStream = ...; DorisSink
dorisSink = DorisSink.builder() .setDorisTable("mydb.mytable") // Doris表名 .setUsername("root") .setPassword("password") .setFenodes("localhost:8030") // Doris FE节点地址 .setLoadProps(DorisStreamLoadOptions.DEFAULT_LOAD_PROPS) // 加载属性 .build(); dataStream.addSink(dorisSink); env.execute("Doris Sink Example"); } }
4、整合Flink CDC Source和Doris Sink
将Flink CDC Source和Doris Sink整合到一起,实现从MySQL数据库到Doris数据库的数据同步:
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.doris.DorisSink; import org.apache.flink.streaming.connectors.doris.DorisStreamLoadOptions; import org.apache.flink.types.Row; import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; public class FlinkCDCToDorisExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SourceFunctionsourceFunction = MySqlSource. builder() .hostname("localhost") .port(3306) .databaseList("mydb") // 监听的数据库名 .tableList("mydb.mytable") // 监听的表名 .username("root") .password("password") .deserializer(new StringDebeziumDeserializationSchema()) // 反序列化方式 .build(); DataStream dataStream = env.addSource(sourceFunction); // 将数据流转换为Row类型,以便写入Doris DataStream rowDataStream = dataStream.map(json > { JsonObject jsonObject = new JsonParser().parse(json).getAsJsonObject(); String before = jsonObject.get("before").getAsString(); String after = jsonObject.get("after").getAsString(); return Row.of(before, after); }).returns(new RowTypeInfo(Types.STRING, Types.STRING)); DorisSink
dorisSink = DorisSink.builder() .setDorisTable("mydb.mytable") // Doris表名 .setUsername("root") .setPassword("password") .setFenodes("localhost:8030") // Doris FE节点地址 .setLoadProps(DorisStreamLoadOptions.DEFAULT_LOAD_PROPS) // 加载属性 .build(); rowDataStream.addSink(dorisSink); env.execute("Flink CDC to Doris Example"); } }
这样,就完成了使用Flink CDC将数据从MySQL数据库同步到Doris数据库的过程。
本文题目:FlinkCDC里flink1.17写doris的代码怎么做?
转载来源:http://www.shufengxianlan.com/qtweb/news41/50441.html
网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联