区块链技术博客
www.b2bchain.cn

如何在Apache Flink中加入两个流? – java程序员分享

本文介绍了如何在Apache Flink中加入两个流? – java程序员分享,有助于帮助完成毕业设计以及求职,是一篇很好的资料。

对技术面试,学习经验等有一些体会,在此分享。

我开始使用flink并查看one of the official tutorials。

据我了解,此练习的目标是将两个流加入time属性。

任务:

该练习的结果是Tuple2记录的数据流,每个不同的rideId一个。您应该忽略
END个活动,并且只能在每次骑行的START时加入该活动
其相应的票价数据。

生成的流应打印为标准输出。

问题:EnrichmentFunction如何又可以将两个流加入。它怎么知道参加哪个游乐项目的公平?我希望它可以缓冲多个博览会/竞赛,直到传入的博览会/竞赛有一个匹配的伙伴。

以我的理解,它只是保存了它看到的所有游乐设施/展览,并将其与下一个最佳游乐设施/展览会结合在一起。为什么这是正确的联接?

提供的解决方案:

/*  * Copyright 2017 data Artisans GmbH  *  * Licensed under the Apache License, Version 2.0 (the "License");  * you may not use this file except in compliance with the License.  * You may obtain a copy of the License at  *  *  http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and  * limitations under the License.  */  package com.dataartisans.flinktraining.solutions.datastream_java.state;  import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiFare; import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide; import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiFareSource; import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource; import com.dataartisans.flinktraining.exercises.datastream_java.utils.ExerciseBase; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; import org.apache.flink.util.Collector;  /**  * Java reference implementation for the "Stateful Enrichment" exercise of the Flink training  * (http://training.data-artisans.com).  *  * The goal for this exercise is to enrich TaxiRides with fare information.  *  * Parameters:  * -rides path-to-input-file  * -fares path-to-input-file  *  */ public class RidesAndFaresSolution extends ExerciseBase {     public static void main(String[] args) throws Exception {          ParameterTool params = ParameterTool.fromArgs(args);         final String ridesFile = params.get("rides", pathToRideData);         final String faresFile = params.get("fares", pathToFareData);          final int delay = 60;                   // at most 60 seconds of delay         final int servingSpeedFactor = 1800;    // 30 minutes worth of events are served every second          // set up streaming execution environment         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);         env.setParallelism(ExerciseBase.parallelism);          DataStream<TaxiRide> rides = env                 .addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))                 .filter((TaxiRide ride) -> ride.isStart)                 .keyBy("rideId");          DataStream<TaxiFare> fares = env                 .addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))                 .keyBy("rideId");          DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides                 .connect(fares)                 .flatMap(new EnrichmentFunction());          printOrTest(enrichedRides);          env.execute("Join Rides with Fares (java RichCoFlatMap)");     }      public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {         // keyed, managed state         private ValueState<TaxiRide> rideState;         private ValueState<TaxiFare> fareState;          @Override         public void open(Configuration config) {             rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));             fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));         }          @Override         public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {             TaxiFare fare = fareState.value();             if (fare != null) {                 fareState.clear();                 out.collect(new Tuple2(ride, fare));             } else {                 rideState.update(ride);             }         }          @Override         public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {             TaxiRide ride = rideState.value();             if (ride != null) {                 rideState.clear();                 out.collect(new Tuple2(ride, fare));             } else {                 fareState.update(fare);             }         }     } } 

java大神给出的解决方案

在此特定的training exercise on stateful enrichment上下文中,每个rideId值都有三个事件-TaxiRide开始事件,TaxiRide结束事件和TaxiFare。本练习的目的是将每个TaxiRide启动事件与具有相同rideId的一个TaxiFare事件相关联,或者换句话说,在rideId上加入乘车流和票价流,同时知道每个将只有一个。

此练习演示了键控状态在Flink中的工作方式。键控状态实际上是分片键值存储。当我们有一个ValueState项(例如ValueState<TaxiRide> rideState)时,Flink将在其状态后端为键的每个不同值(rideId)存储一个单独的记录。

每次调用flatMap1flatMap2时,都会在上下文中隐式地存在一个键(一个rideId),当我们调用rideState.update(ride)rideState.value()时,我们不是在访问单个变量,而是设置并获取一个键值存储在键值存储中,使用rideId作为键。

在本练习中,两个流都由rideId键控,因此每个不同的rideState可能都有一个fareState元素和一个rideId元素。因此,提供的解决方案是缓冲许多行程和票价,但每个rideId仅缓冲一次(考虑到此数据集中的行程和票价完美配对,就足够了)。

因此,您问:

EnrichmentFunction如何又可以将两个流加入。它如何知道哪种票价与哪种游乐设施并驾齐驱?

答案是

它加入具有相同rideId的票价。

您所询问的这个特定练习显示了如何实现简单的扩充连接,以了解键控状态和连接流的思想。但是使用Flink当然可以进行更复杂的连接。请参阅joins using the DataStream API,joins with Flink's Table API和joins with Flink SQL上的文档。

有哪些替代继承的方法? java大神给出的解决方案 有效的Java:偏重于继承而不是继承。 (这实际上也来自“四人帮”)。他提出的理由是,如果扩展类未明确设计为继承,则继承会引起很多不正常的副作用。例如,对super.someMethod()的任何调用都可以引导您通过未知代码的意外路径。取而代之的是,持有对本来应该扩展的类的引用,然后委托给它。这是与Eric…

我从服务器收到此消息,我不明白T和Z的含义,2012-08-24T09:59:59Z将此字符串转换为Date对象的正确SimpleDateFormat模式是什么? java大神给出的解决方案 这是ISO 8601标准。您可以使用SimpleDateFormat simpleFormat = new SimpleDateFormat("yyyy-MM…

我正在使用Retrofit来获取JSON答复。这是我实施的一部分("/api/report/list") Observable<Bills> listBill(@Query("employee_id") String employeeID); 而条例草案类是-public static class…

private static Coordinate[] getCircleCoordintaes() { Coordinate coordinates[] = {new Coordinate(0, 0)}; return coordinates; } 以上程序工作正常。在上面的程序中,返回的坐标数组首先初始化了数组使用这条线Coordinate coordi…

我需要使用Java Swing的搜索框,如果单击任何建议,当输入字母时它将显示来自数据库的建议,它将执行一些操作。如果有可能在Java swing中,请提供源代码提前致谢 java大神给出的解决方案 您可以使用DefaultComboBoxModel,输出将是这样。Try this在此代码中,您将找到countries数组,因此您需要从数据库中获取此数组。

赞(0) 打赏
部分文章转自网络,侵权联系删除b2bchain区块链学习技术社区 » 如何在Apache Flink中加入两个流? – java程序员分享
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

b2b链

联系我们联系我们