以下是RNG S8 8强赛失败后,官微发表道歉微博下一级评论 1.1、在kafka中创建 1.4.1、查询出
我又又带来一堆Spark题了,这次是SparkStreaming的!!!
废话不多说,上题!!!
题目如下👇
数据说明:
rng_comment.txt
文件中的数据
字段
含义
index
数据id
child_comment
回复数量
comment_time
评论时间
content
评论内容
da_v
微博个人认证
like_status
赞
pic
图片评论url
user_id
微博用户id
user_name
微博用户名
vip_rank
微博会员等级
stamp
时间戳
rng_comment
主题,设置2
个分区2
个副本
1.2、数据预处理,把空行
过滤掉
1.3、请把给出的文件写入到kafka
中,根据数据id
进行分区,id为奇数
的发送到一个分区中,偶数
的发送到另一个分区
1.4、使用Spark Streaming对接kafka之后进行计算
rng_comment
rng_comment
创建vip_rank
表,字段为数据的所有字段
rng_comment
创建like_status
表,字段为数据的所有字段
rng_comment
创建count_conmment
表,字段为 时间
、条数
微博会员等级为5
的用户,并把这些数据写入到mysql数据库中的vip_rank
表中
1.4.2、查询出评论赞的个数
在10个
以上的数据,并写入到mysql数据库中的like_status
表中
1.4.3、分别计算出2018/10/20
,2018/10/21
,2018/10/22
,2018/10/23
这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment
表中数据如下👇
数据过大,所以放到百度云上,失效请私信博主! 链接: https://pan.baidu.com/s/1jMsJbN9RLh5ItXFDXdwVjw 提取码: 1234
题目如下👇
数据预处理,取出空行
object HomeWork20200414_1_Pretreatment { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HomeWork20200414_1_Pretreatment") val sc = new SparkContext(sparkConf) sc.setLogLevel("WARN") val fileRDD: RDD[String] = sc.textFile("input20200414/rng_comment.txt") fileRDD.filter{x => var datas = x.split("t");datas.length == 11}.coalesce(1).saveAsTextFile("output20200414") sc.stop() } }
读取数据写入Kafka
public class HomeWork20200414_2_Producer { public static void main(String[] args) throws IOException { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.100.111:9092,192.168.100.112:9092,192.168.100.113:9092"); props.put("acks", "-1"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); File inputFile = new File("output20200414/part-00000"); BufferedReader bufferedReader = new BufferedReader(new FileReader(inputFile)); String line = null; int partition = 0; while ((line = bufferedReader.readLine()) != null) { try { if(Integer.parseInt(line.split("t")[0]) % 2== 0){ partition = 0; }else{ partition = 1; } }catch (NumberFormatException e){ continue; } producer.send(new ProducerRecord<String, String>("rng_comment",partition,String.valueOf(partition),line)); } bufferedReader.close(); producer.close(); } }
SparkStreaming消费Kafka数据写入MySql
(写入数据到MySQL我用的是RDD,也可以转换成DataFream,用SparkSQL写入MySQL,我后续把这个补上!)object HomeWork20200414_3_SparkStreaming { val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://localhost:3306/rng_comment" val username = "root" val password = "root" def main(args: Array[String]): Unit = { /** * 1.5.1、查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中 * 1.5.2、查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中 * 1.5.3、分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中 */ val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HomeWork20200414_3_SparkStreaming") val ssc = new StreamingContext(sparkConf, Seconds(3)) ssc.sparkContext.setLogLevel("WARN") // 3.设置Kafka参数 val kafkaParams: Map[String, Object] = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.100.111:9092,192.168.100.112:9092,192.168.100.113:9092", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.GROUP_ID_CONFIG -> "SparkKafka77777", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest", //false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean) ) // 4.设置Topic var topics = Array("rng_comment") val recordDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, //位置策略,源码强烈推荐使用该策略,会让Spark的Executor和Kafka的Broker均匀对应 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)) //消费策略,源码强烈推荐使用该策略 val resultDStream: DStream[Array[String]] = recordDStream.map(_.value()).map(_.split("t")).cache() // 1.查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中 resultDStream.filter(_ (9) == "5").foreachRDD { rdd: RDD[Array[String]] => { rdd.foreachPartition { iter: Iterator[Array[String]] => { Class.forName(driver) val connection: Connection = DriverManager.getConnection(url, username, password) var sql = "insert into vip_rank values (?,?,?,?,?,?,?,?,?,?,?)" iter.foreach { line: Array[String] => { val statement: PreparedStatement = connection.prepareStatement(sql) statement.setInt(1, line(0).toInt); statement.setInt(2, line(1).toInt); statement.setString(3, line(2)); statement.setString(4, line(3)); statement.setString(5, line(4)); statement.setString(6, line(5)); statement.setString(7, line(6)); statement.setString(8, line(7)); statement.setString(9, line(8)); statement.setInt(10, line(9).toInt); statement.setString(11, line(10)); statement.executeUpdate() statement.close() } } connection.close() } } } } // 2.查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中 resultDStream.filter(_ (5).toInt > 10).foreachRDD { rdd: RDD[Array[String]] => { rdd.foreachPartition { iter: Iterator[Array[String]] => { Class.forName(driver) val connection: Connection = DriverManager.getConnection(url, username, password) var sql = "insert into like_status values (?,?,?,?,?,?,?,?,?,?,?)" iter.foreach { line: Array[String] => { val statement: PreparedStatement = connection.prepareStatement(sql) statement.setInt(1, line(0).toInt); statement.setInt(2, line(1).toInt); statement.setString(3, line(2)); statement.setString(4, line(3)); statement.setString(5, line(4)); statement.setString(6, line(5)); statement.setString(7, line(6)); statement.setString(8, line(7)); statement.setString(9, line(8)); statement.setInt(10, line(9).toInt); statement.setString(11, line(10)); statement.executeUpdate() statement.close() } } connection.close() } } } } val dateFormat1 = new SimpleDateFormat("yyyy/MM/dd HH:mm") val dateFormat2 = new SimpleDateFormat("yyyy/MM/dd") // 3.分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中 val value: DStream[Array[String]] = resultDStream.filter { date:Array[String] => { val str: String = dateFormat2.format(dateFormat1.parse(date(2))) if ("2018/10/20".equals(str) || "2018/10/21".equals(str) || "2018/10/22".equals(str) || "2018/10/23".equals(str)) { true } else { false } } } value.foreachRDD { rdd: RDD[Array[String]] => { rdd.groupBy(x => dateFormat2.format(dateFormat1.parse(x(2)))).map(x => x._1 -> x._2.size).foreachPartition { iter: Iterator[(String, Int)] => { Class.forName(driver) val connection: Connection = DriverManager.getConnection(url, username, password) var sql = "insert into count_conmment values (?,?)" iter.foreach { line: (String, Int) => { val statement: PreparedStatement = connection.prepareStatement(sql) statement.setString(1, line._1); statement.setInt(2, line._2.toInt); statement.executeUpdate() statement.close() } } connection.close() } } } } ssc.start() ssc.awaitTermination() ssc.stop } }
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算