spark IDEA代码初步

2022年6月19日21:36:41 发表评论 3,126 views

IDEA通过DS+Row查询数据

package day0106

import java.util.Properties

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

/**
 * 数据写入MySQL
 * */
object Demo3 extends App {
  //获取spark环境
  val spark = SparkSession.builder().appName("DF").master("local").getOrCreate()



  //读取数据
  var stuRDD = spark.sparkContext.textFile("D:\\testdata\\student.txt").map(_.split("\t"))


  //通过Struct Type创建Schema
  val schema = StructType(
    List(
      StructField("id",IntegerType),
      StructField("name",StringType),
      StructField("age",IntegerType)
    )
  )

  //将数据映射到Row,创建DF
  val RowRDD = stuRDD.map(s => Row(s(0).toInt,s(1),s(2).toInt))
  val stuDF = spark.createDataFrame(RowRDD,schema)

  //注册表
  stuDF.createOrReplaceTempView("student")

  //sql操作
  val result = spark.sql("select * from student order by age desc")

  //写入MySQL中
  val myPro = new Properties()
  myPro.setProperty("user","root")
  myPro.setProperty("password","000000")
  myPro.setProperty("driver","com.mysql.jdbc.Driver")

  result.write.mode("overwrite").jdbc("jdbc:mysql://bigdata166:3306/company?serverTimezone=UTC&characterEncoding=utf-8","student",myPro)

  spark.stop()

}

IDEA通过case class 创建DF


IDEA数据写入mysql

package day0106

import java.util.Properties

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

/**
 * 数据写入MySQL
 * */
object Demo3 extends App {
  //获取spark环境
  val spark = SparkSession.builder().appName("DF").master("local").getOrCreate()



  //读取数据
  var stuRDD = spark.sparkContext.textFile("D:\\testdata\\student.txt").map(_.split("\t"))


  //通过Struct Type创建Schema
  val schema = StructType(
    List(
      StructField("id",IntegerType),
      StructField("name",StringType),
      StructField("age",IntegerType)
    )
  )

  //将数据映射到Row,创建DF
  val RowRDD = stuRDD.map(s => Row(s(0).toInt,s(1),s(2).toInt))
  val stuDF = spark.createDataFrame(RowRDD,schema)

  //注册表
  stuDF.createOrReplaceTempView("student")

  //sql操作
  val result = spark.sql("select * from student order by age desc")

  //写入MySQL中
  val myPro = new Properties()
  myPro.setProperty("user","root")
  myPro.setProperty("password","000000")
  myPro.setProperty("driver","com.mysql.jdbc.Driver")

  result.write.mode("overwrite").jdbc("jdbc:mysql://bigdata166:3306/company?serverTimezone=UTC&characterEncoding=utf-8","student",myPro)

  spark.stop()

}

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: