Spark3.x-实战之mysql方式实现kafka精准一次性消费

简介

      使用mysql实现精准一次性消费的前提是所处理的数据在一个JVM上面执行的数据,所以在spark数据处理完以后要把它们收集到Driver端,才能够实现事务的操作

实现

创建一张保存偏移量的表

DROP TABLE IF EXISTS `offset`;
CREATE TABLE `offset`  (
  `group_id` varchar(200) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `topic` varchar(200) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `partition_id` int(11) NOT NULL,
  `topic_offset` bigint(20) NULL DEFAULT NULL,
  PRIMARY KEY (`group_id`, `topic`, `partition_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

自己的业务表 

DROP TABLE IF EXISTS `aear_sumscore`;
CREATE TABLE `aear_sumscore`  (
  `aear_name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `sum_score` decimal(16, 2) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

引入pom.xml依赖

 
        
            org.scalikejdbc
            scalikejdbc_2.12
            3.4.0
        
        
        
            org.scalikejdbc
            scalikejdbc-config_2.12
            3.4.0
        
        
            mysql
            mysql-connector-java
            5.1.47
        

配置文件 application.conf

db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://master/school?characterEncoding=utf-8&useSSL=false"
db.default.user="root"
db.default.password="root"

工具类

MySQLUtil

object MySQLUtil {
  def main(args: Array[String]): Unit = {
    val list: List[JSONObject] = queryList("select * from tb_stu")
    println(list)
  }

  def queryList(sql: String): List[JSONObject] = {
    Class.forName("com.mysql.jdbc.Driver")
    val resultList: ListBuffer[JSONObject] = new ListBuffer[JSONObject]()
    val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/school?characterEncoding=utf-8&useSSL=false",
    "root",
    "root"
    )
    val stat: Statement = conn.createStatement
    println(sql)
    val rs: ResultSet = stat.executeQuery(sql)
    val md: ResultSetMetaData = rs.getMetaData
    while (rs.next) {
      val rowData = new JSONObject();
      for (i <- 1 to md.getColumnCount) {
        rowData.put(md.getColumnName(i), rs.getObject(i))
      }
      resultList += rowData
    }
    stat.close()
    conn.close()
    resultList.toList
  }
}

OffsetManagerM(mysql得到偏移量的工具类)

object OffsetManagerM {
  def getOffset(topic: String, consumerGroupId: String): mutable.Map[TopicPartition, Long] = {
    val sql=" select group_id,topic,topic_offset,partition_id from offset" +
    " where topic='"+topic+"' and group_id='"+consumerGroupId+"'"
    val jsonObjList: List[JSONObject] = MySQLUtil.queryList(sql)

    val res: mutable.Map[TopicPartition, Long] = mutable.Map()
    jsonObjList.map {
      jsonObj =>{
        val topicPartition: TopicPartition = new TopicPartition(topic,
        jsonObj.getIntValue("partition_id"))
        val offset: Long = jsonObj.getLongValue("topic_offset")
        res.put(topicPartition,offset)
      }
    }
    res
  }
}

MyKafkaUtilDwdTbStu(得到kafkaDStream)

object MyKafkaUtilDwdTbStu {
  //读取配置文件
  val properties: Properties = MyPropertiesUtil.load("config.properties")

  //sparkstreaming消费kafka的kafka参数
  private val kafkaParams = collection.immutable.Map[String, Object](
    //kafka的服务节点
    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> properties.getProperty("kafka.broker.list"),
    //kafka的key序列化解码器
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    //kafka的value序列化解码器
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    //消费者组id
    ConsumerConfig.GROUP_ID_CONFIG -> "MyKafkaUtilDwdTbStu",
    //起始消费的位置
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
    //是否自动提交
    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
  )

  /**
   * 根据提供的主题消费
   *
   * @param topic            指定消费的主题
   * @param streamingContext SparkStreamingContext
   * @return
   */
  def getInputDStreamByDefault(topic: String, streamingContext: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {

    val topics = Array(topic)
    KafkaUtils.createDirectStream[String, String](
      streamingContext,
      //PreferBrokers表示Executor和kafka的Broker在一个节点的时候使用
      //PreferConsistent 尽量均衡的把分区放到Executor上面执行 (常用)
      //PreferFixed 指定分区由哪个主机去消费
      PreferConsistent,
      //Subscribe 根据主题进行消费,Assign,指定主题分区进行消费
      Subscribe[String, String](topics, kafkaParams)
    )
  }

  /**
   *
   * @param topic 指定消费的主题
   * @param offsetRange Map[TopicPartition, Long] 指定主题对应分区的偏移量进行消费
   * @param streamingContext SparkStreamingContext
   * @return
   */
  def getInputDStreamByMapTopicPartition(topic: String, offsetRange: mutable.Map[TopicPartition, Long], streamingContext: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
    val topics = Array(topic)
    KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams, offsetRange)
    )
  }
}

Ads(mysql实现精准一次性消费的代码)

object Ads {
  def main(args: Array[String]): Unit = {
    // 加载流
    val sparkConf: SparkConf = new
        SparkConf().setMaster("local[4]").setAppName("TrademarkStatApp")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    val groupId = "ads_tb_stuwide"
    val topic = "dws_tb_stuwide";
    //从 Mysql 中读取偏移量
    val offsetMapForKafka: mutable.Map[TopicPartition, Long] =
      OffsetManagerM.getOffset(topic, groupId)


    //把偏移量传递给 kafka ,加载数据流
    var recordInputDstream: InputDStream[ConsumerRecord[String, String]] = null
    if (offsetMapForKafka != null && offsetMapForKafka.size > 0) {
      recordInputDstream = MyKafkaUtilDwdTbStu.getInputDStreamByMapTopicPartition(topic, offsetMapForKafka, ssc)
    } else {
      recordInputDstream = MyKafkaUtilDwdTbStu.getInputDStreamByDefault(topic, ssc)
    }

    //下面是自己的处理逻辑
    //从流中获得本批次的 偏移量结束点
    var offsetRanges: Array[OffsetRange] = null
    val inputGetOffsetDstream: DStream[ConsumerRecord[String, String]] =
      recordInputDstream.transform {
        rdd =>{
          offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          rdd
        }
      }
    //提取数据
    val stuWideDstream: DStream[StuWide] = inputGetOffsetDstream.map {
      record =>{
        val jsonString: String = record.value()
        //得到最后的数据
        val stuWide: StuWide = JSON.parseObject(jsonString,
          classOf[StuWide])
        stuWide
      }
    }
    // 聚合
    val res: DStream[(String, Int)] = stuWideDstream.map(item => {
      (item.aearName, item.score.toInt)
    }).reduceByKey(_ + _)



    //存储数据以及偏移量到 MySQL 中,为了保证精准消费 我们将使用事务对存储数据和修改偏移量
    res.foreachRDD {
      rdd =>{
        //自己处理得到的数据
        val aearSum: Array[(String, Int)] = rdd.collect()
        if (aearSum !=null && aearSum.size > 0) {
          DBs.setup()
          DB.localTx {
            implicit session =>{
              // 写入计算结果数据
              val batchParamsList: ListBuffer[Seq[Any]] = ListBuffer[Seq[Any]]()

              for ((aearName, sumScore) <- aearSum) {
                //这里的数据和下面的SQL("insert into aear_sumscore(aear_name,sum_score) values(?,?)")有位置
                //映射关系aearName就是第一个?的数据sumScore就是第二个?好的数据
                batchParamsList.append(Seq(aearName, sumScore))
              }

              SQL("insert into aear_sumscore(aear_name,sum_score) values(?,?)")
                .batch(batchParamsList.toSeq:_*).apply()

              //throw new RuntimeException("测试异常")
              // 写入偏移量
              for (offsetRange <- offsetRanges) {
                val partitionId: Int = offsetRange.partition
                val untilOffset: Long = offsetRange.untilOffset
                SQL("replace into offset values(?,?,?,?)").bind(groupId,
                  topic, partitionId, untilOffset).update().apply()
              }
            }
          }
        }
      }
    }
    ssc.start()
    ssc.awaitTermination()
  }
}

你可能感兴趣的