SparkCore-RDD编程

SparkCore-RDD编程操作

0. 大纲

  • Spark程序的执行过程
  • RDD的操作
    • RDD的转换操作
    • 共享变量
  • 高级排序

1. Spark程序执行过程

1.1. WordCount案例程序的执行过程

图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ar7k9Q5x-1587102271050)(assets/01Spark作业运行原理剖析.png)]

1.2. Spark程序执行流程

在这里插入图片描述

2. RDD的操作

​ At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.

​ A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.

2.1. RDD的初始化

​ RDD的初始化,原生api提供的2中创建方式,一种就是读取文件textFile,还有一种就是加载一个scala集合parallelize。当然,也可以通过transformation算子来创建的RDD。

2.2. RDD的操作

​ 需要知道RDD操作算子的分类,基本上分为两类:transformation和action,当然更加细致的分,可以分为输入算子,转换算子,缓存算子,行动算子。

在这里插入图片描述

输入:在Spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入Scala集合或数据)输入Spark,数据进入Spark运行时数据空间,转化为Spark中的数据块,通过BlockManager进行管理。
运行:在Spark数据输入形成RDD后便可以通过变换算子,如filter等,对数据进行操作并将RDD转化为新的RDD,通过Action算子,触发Spark提交作业。 如果数据需要复用,可以通过Cache算子,将数据缓存到内存。
输出:程序运行结束数据会输出Spark运行时空间,存储到分布式存储中(如saveAsTextFile输出到HDFS),或Scala数据或集合中(collect输出到Scala集合,count返回Scala int型数据)。

2.2.1. transformation转换算子

  • map

    1. 说明

      rdd.map(func):RDD,对rdd集合中的每一个元素,都作用一次该func函数,之后返回值为生成元素构成的一个新的RDD。

    2. 编码

      对rdd中的每一个元素×7

      object _01RDDOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(s"${_01RDDOps.getClass.getSimpleName}").setMaster("local[*]")val sc = new SparkContext(conf)//map 原集合*7val list = 1 to 7//构建一个rddval listRDD:RDD[Int] = sc.parallelize(list)//        listRDD.map((num:Int) => num * 7)
      //        listRDD.map(num => num * 7)val ret = listRDD.map(_ * 7)ret.foreach(println)sc.stop()}
      }
      
  • flatMap

    1. 说明

      rdd.flatMap(func):RDD ==>rdd集合中的每一个元素,都要作用func函数,返回0到多个新的元素,这些新的元素共同构成一个新的RDD。所以和上述map算子进行总结:

      map操作是一个one-2-one的操作

      flatMap操作是一个one-2-many的操作

    2. 编码

      案例:将每行字符串,拆分成一个个的单词

      def  flatMapOps(sc:SparkContext): Unit = {val list = List("jia jing kan kan kan","gao di di  di di","zhan yuan qi qi")val listRDD = sc.parallelize(list)listRDD.flatMap(line => line.split("\\s+")).foreach(println)
      }
      
  • filter

    1. 说明

      rdd.filter(func):RDD ==> 对rdd中的每一个元素操作func函数,该函数的返回值为Boolean类型,保留返回值为true的元素,共同构成一个新的RDD,过滤掉哪些返回值为false的元素。

    2. 编程

      案例:保留集合中的偶数

      def filterMapOps(sc:SparkContext): Unit = {val list = 1 to 10sc.parallelize(list).filter(_ % 2 == 0).foreach(println)
      }
      
  • sample

    1. 说明

      rdd.sample(withReplacement:Boolean, fraction:Double [, seed:Long]):RDD ===> 抽样,需要注意的是spark的sample抽样不是一个精确的抽样。一个非常重要的作用,就是来看rdd中数据的分布情况,根据数据分布的情况,进行各种调优与优化。—>数据倾斜。

      首先得要知道这三个参数是啥意思

      withReplacement:抽样的方式,true有放回抽样, false为无返回抽样

      fraction: 抽样比例,取值范围就是0~1

      seed: 抽样的随机数种子,有默认值,通常也不需要传值

    2. 编程

      案例:从10w个数中抽取千分之一进行样本评估

      def sampleMapOps(sc:SparkContext): Unit = {val listRDD = sc.parallelize(1 to 100000)var sampledRDD = listRDD.sample(true, 0.001)println("样本空间的元素个数:" + sampledRDD.count())sampledRDD = listRDD.sample(false, 0.001)println("样本空间的元素个数:" + sampledRDD.count())
      }
      
  • union

    1. 说明

      rdd1.union(rdd2),联合rdd1和rdd2中的数据,形成一个新的rdd,其作用相当于sql中的union all。

    2. 编程

      案例:合并两个rdd中的数据,生成一个新的rdd,做后续统一的处理

      def unionMapOps(sc:SparkContext): Unit = {val listRDD1 = sc.parallelize(List(1, 3, 5, 7, 9))val listRDD2 = sc.parallelize(List(2, 4, 5, 8, 10))val unionedRDD:RDD[Int] = listRDD1.union(listRDD2)unionedRDD.foreach(println)
      }
      
  • join

    1. 说明

      join就是sql中的inner join,join的效果工作7种。

    在这里插入图片描述

      从具体的写法上面有如下几种- 交叉连接A a accross join B b;这种操作方式会产生笛卡尔积,在工作中一定要避免。- 内连接A a [inner] join B b [where|on a.id = b.id]; 有时候也写成:A a, B b(自连接) 是一种等值连接。所谓等值连接,就是获取A和B的交集。- 外连接- 左外连接以左表为主体,查找右表中能够关联上的数据,如果管理不上,显示null。A a left outer join B b on a.id = b.id。- 右外连接是以右表为主体,查找左表中能够关联上的数据,如果关联不上,显示null。A a right outer join B b on a.id = b.id。- 全连接就是左外连接+右外连接A a full outer join B b on a.id = b.id。- 半连接 一般在工作很少用sparkcore中支持的连接有:笛卡尔积、内连接join,外连接(左、右、全)- spark连接要想两个RDD进行连接,那么这两个rdd的数据格式,必须是k-v键值对的,其中的k就是关联的条件,也就是sql中的on连接条件。假设,RDD1的类型[K, V], RDD2的类型[K, W]- 内连接val joinedRDD:RDD[(K, (V, W))] = rdd1.join(rdd2)- 左外连接val leftJoinedRDD:RDD[(K, (V, Option[W]))] = rdd1.leftOuterJoin(rdd2)- 右外连接val rightJoinedRDD:RDD[(K, (Option[V], W))] = rdd1.rightOuterJoin(rdd2)- 全连接val fullJoinedRDD:RDD[(K, (Option[V], Option[W]))] = rdd1.fullOuterJoin(rdd2)
    
    1. 编程

      案例:关联学生信息

      def joinedMapOps(sc:SparkContext): Unit = {//stu表:id,name,gender,age//score表:stuid,course, scoreval stuList = List("1 严文青 女 18","2 王大伟 男 55","3 贾静凯 男 33","4 old李 ladyBoy 31")val scoreList = List("1 语文 59","3 数学 0","2 英语 60","5 体育 99")val stuListRDD = sc.parallelize(stuList)val sid2StuInfoRDD:RDD[(Int, String)] = stuListRDD.map(line => {val sid = line.substring(0, line.indexOf(" ")).toIntval info = line.substring(line.indexOf(" ") + 1)(sid, info)})val scoreListRDD = sc.parallelize(scoreList)val sid2ScoreInfoRDD:RDD[(Int, String)] = scoreListRDD.map(line => {val sid = line.substring(0, line.indexOf(" ")).toIntval scoreInfo = line.substring(line.indexOf(" ") + 1)(sid, scoreInfo)})//查询有成绩的学生信息 --> join, k-vprintln("---------------------joined----------------------------")val stuScoreInfoRDD:RDD[(Int, (String, String))] = sid2StuInfoRDD.join(sid2ScoreInfoRDD)//        stuScoreInfoRDD.foreach(kv => {//            println(s"sid:${kv._1}, stu's info: ${kv._2._1}, stu's score: ${kv._2._2}")//        })stuScoreInfoRDD.foreach{case (sid, (stuInfo, scoreInfo)) => {println(s"sid:${sid}, stu's info: ${stuInfo}, stu's score: ${scoreInfo}")}}println("---------------------left----------------------------")//查询所有学生的信息val stuInfo:RDD[(Int, (String, Option[String]))] = sid2StuInfoRDD.leftOuterJoin(sid2ScoreInfoRDD)stuInfo.foreach{case (sid, (stuInfo, scoreOption)) => {println(s"sid:${sid}, stu's info: ${stuInfo}, stu's score: ${scoreOption.getOrElse(null)}")}}println("---------------------full----------------------------")//查询学生,及其有成绩的学生信息val stuScoreInfo:RDD[(Int, (Option[String], Option[String]))] = sid2StuInfoRDD.fullOuterJoin(sid2ScoreInfoRDD)stuScoreInfo.foreach{case (sid, (stuOption, scoreOption)) => {println(s"sid:${sid}, stu's info: ${stuOption.getOrElse(null)}, stu's score: ${scoreOption.getOrElse(null)}")}}
      }
      
  • groupByKey

    1. 说明

      原始rdd的类型时[(K, V)]

      rdd.groupByKey(),按照key进行分组,那必然其结果就肯定[(K, Iterable[V])],是一个shuffle dependency宽依赖shuffle操作,但是这个groupByKey不建议在工作过程中使用,除非非要用,因为groupByKey没有本地预聚合,性能较差,一般我们能用下面的reduceByKey或者combineByKey或者aggregateByKey代替就尽量代替。

    2. 编程

      案例:对不同班级的学生进行分组。

      def groupByKeyOps(sc:SparkContext): Unit ={//stu表:id, name, gender, age, classval stuList = List("1,白普州,1,22,1904-bd-bj","2,伍齐城,1,19,1904-bd-wh","3,曹佳,0,27,1904-bd-sz","4,姚远,1,27,1904-bd-bj","5,匿名大哥,2,17,1904-bd-wh","6,欧阳龙生,0,28,1904-bd-sz")val stuRDD = sc.parallelize(stuList)val class2InfoRDD:RDD[(String, String)] = stuRDD.map(line => {val dotIndex = line.lastIndexOf(",")val className = line.substring(dotIndex + 1)val info = line.substring(0, dotIndex)(className, info)})val gbkRDD:RDD[(String, Iterable[String])] = class2InfoRDD.groupByKey()gbkRDD.foreach(println)
      }
      
  • reduceByKey

    rdd的类型为[(K, V)]

    1. 说明

      rdd.reduceByKey(func:(V, V) => V):RDD[(K, V)] ====>在scala集合中学习过一个reduce(func:(W, W) => W)操作,是一个聚合操作,这里的reduceByKey按照就理解为在groupByKey(按照key进行分组[(K, Iterable[V])])的基础上,对每一个key对应的Iterable[V]执行reduce操作。

      同时reduceByKey操作会有一个本地预聚合的操作,所以是一个shuffle dependency宽依赖shuffle操作。

    2. 编程

      经典案例:wordcount

  • sortByKey

    按照key进行排序

  • combineByKey

    ​ 通过查看reduceByKey和groupByKey的实现,发现其二者底层都是基于一个combineByKeyWithClassTag的底层算子来实现的,包括下面的aggregateByKey也是使用该算子实现。该算子又和combineByKey有啥关系呢?

      Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the* existing partitioner/parallelism level. This method is here for backward compatibility. It* does not provide combiner classtag information to the shuffle.
    

    通过api学习,我们了解到combineByKey是combineByKeyWithClassTag的简写的版本。

    1. 说明

      这是spark最底层的聚合算子之一,按照key进行各种各样的聚合操作,spark提供的很多高阶算子,都是基于该算子实现的。

      def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] =  {。。。。。。
      }
      

      上述源码便是combineByKey的定义,是将一个类型为[(K, V)]的RDD聚合转化为[(K, C)]的类型,也就是按照K来进行聚合。这里的V是聚合前的类型,C聚合之后的类型。

      如何理解聚合函数?切入点就是如何理解分布式计算?总—>分—>总

      createCombiner: V => C, 相同的Key在分区中会调用一次该函数,用于创建聚合之后的类型,为了和后续Key相同的数据进行聚合。使用分区中的一条记录进行初始化。
      mergeValue: (C, V) => C, 在相同分区中基于上述createCombiner基础之上的局部聚合
      mergeCombiners: (C, C) => C) 将每个分区中相同key聚合的结果在分区间进行全局聚合

在这里插入图片描述

    所以combineByKey就是分布式计算。2. 编程- 模拟groupByKey```scaladef cbk2GbkOps(sc:SparkContext): Unit ={val stuList = List("白普州,1904-bd-bj","伍齐城,1904-bd-bj","曹佳,1904-bd-sz","刘文浪,1904-bd-wh","姚远,1904-bd-bj","匿名大哥,1904-bd-sz","欧阳龙生,1904-bd-sz")val stuRDD = sc.parallelize(stuList)val class2InfoRDD:RDD[(String, String)] = stuRDD.map(line => {val dotIndex = line.lastIndexOf(",")val className = line.substring(dotIndex + 1)(className, line)})println("=================groupBykey=================")val gbkRDD:RDD[(String, Iterable[String])] = class2InfoRDD.groupByKey()gbkRDD.foreach(println)println("=================cbk2Gbk================")val cbk2gbkRDD:RDD[(String, ArrayBuffer[String])] = class2InfoRDD.combineByKey(createCombiner, mergeValue, mergeCombiners)cbk2gbkRDD.foreach(println)}/*初始化操作相同key在同一个分区中只会调用一次该函数,用于初始化,并且将第一个元素用于初始化*/def createCombiner(stu:String):ArrayBuffer[String] = {println("============createCombiner<" + stu + ">====================>>>>")val ab = ArrayBuffer[String]()ab.append(stu)ab}/*** 分区内的局部聚合*/def mergeValue(ab:ArrayBuffer[String], stu:String):ArrayBuffer[String] = {println("》》》>>========mergeValue:局部聚合结果<" + ab + ">,被聚合的值:" + stu + "===========>>>>")ab.append(stu)ab}/*全局聚合,各个分区内聚合的结果进行二次全局聚合第一个参数ab就是全局聚合的临时结果第二个参数ab2就是某一个分区聚合的结果*/def mergeCombiners(ab:ArrayBuffer[String], ab2:ArrayBuffer[String]):ArrayBuffer[String] = {println("|-|-|<>|-|>>========mergeCombiners:全局聚合临时结果<" + ab + ">,局部聚合的值:" + ab2 + "===========>>>>")ab.++:(ab2)}```- 模拟reduceByKey```scaladef cbk2rbkOps(sc:SparkContext): Unit = {val lines = sc.textFile("file:/E:/data/hello.txt")val pairs = lines.flatMap(_.split("\\s+")).map((_, 1))println("==========reduceByKey============")//reduceByKeypairs.reduceByKey(_+_).foreach(println)println("==========combineByKey============")val cbk2rbk = pairs.combineByKey[Int]((num:Int) => num, (sum:Int, num:Int) => sum + num, (sum1:Int, sum2:Int) => sum1 + sum2)cbk2rbk.foreach(println)}```
  • aggregateByKey

    1. 说明

      aggregateByKey和combineByKey都是一个相对底层的聚合算子,可以完成系统没有提供的其它操作,相当于自定义算子。

      aggregateByKey底层还是使用combineByKeyWithClassTag来实现,所以本质上二者没啥区别,区别就在于使用时的选择而已。

    2. 编程

      def abk2rbkOps(sc:SparkContext): Unit = {val lines = sc.textFile("file:/E:/data/hello.txt")val pairs = lines.flatMap(_.split("\\s+")).map((_, 1))println("==========reduceByKey============")//reduceByKeypairs.reduceByKey(_+_).foreach(println)println("==========combineByKey============")val abk:RDD[(String, Int)] = pairs.aggregateByKey(0)(_+_, _+_)abk.foreach(println)
      }def abk2GbkOps(sc:SparkContext): Unit ={val stuList = List("白普州,bj","伍齐城,bj","曹佳,sz","刘文浪,wh","姚远,bj","匿名大哥,sz","欧阳龙生,sz")val stuRDD = sc.parallelize(stuList)val class2InfoRDD:RDD[(String, String)] = stuRDD.map(line => {val dotIndex = line.lastIndexOf(",")val className = line.substring(dotIndex + 1)(className, line)})//        class2InfoRDD.saveAsTextFile("file:/E:/data/out/heihei")println("=================groupBykey=================")val gbkRDD:RDD[(String, Iterable[String])] = class2InfoRDD.groupByKey()gbkRDD.foreach(println)println("=================abk2Gbk================")val abkRDD:RDD[(String, ArrayBuffer[String])] = class2InfoRDD.aggregateByKey(ArrayBuffer[String]())(seqOp,combOp)abkRDD.foreach(println)}def seqOp(ab:ArrayBuffer[String], info:String):ArrayBuffer[String] = {ab.append(info)ab
      }def combOp(ab:ArrayBuffer[String], ab2:ArrayBuffer[String]):ArrayBuffer[String] = {ab.++:(ab2)
      }
      
    3. 总结:

      通过上例我们可以看出,如果是对相同类型的数据进行聚合统计,倾向于使用aggregateByKey更为简单,但是如果聚合前后数据类型不一致,建议使用combineByKey;同时如果初始化操作较为复杂,也建议使用combineByKey。

2.2.2. action行动算子

​ 所有的这些算子都是在rdd,rdd上的分区partition上面执行的,不是在driver本地执行。

  • foreach

  • count

    统计该rdd中元素的个数

    val count = abk.count()
    println("abk rdd的count个数为:" + count)
    

    返回值为Long类型

  • take(n)

    返回该rdd中的前N个元素

    val arr:Array[(String, Int)] = abk.take(2)
    arr.foreach(println)
    

    如果该rdd的数据是有序的,那么take(n)就是TopN。

  • first

    take(n)中比较特殊的一个take(1)(0)

    val ret:(String, Int) = abk.first()
    println(ret)
    
  • collect

    字面意思就是收集,拉取的意思,该算子的含义就是将分布在集群中的各个partition中的数据拉回到driver中,进行统一的处理;但是这个算子有很大的风险存在,第一,driver内存压力很大,第二数据在网络中大规模的传输,效率很低;所以一般不建议使用,如果非要用,请先执行filter。

    val arr = abk.filter(_._2 > 2).collect()
    arr.foreach(println)
    
  • reduce

    一定记清楚,reduce是一个action操作,reduceByKey是一个transformation。reduce对一个rdd执行聚合操作,并返回结果,结果是一个值。

    //统计有多少个单词
    val newRet = abk.reduce{case ((k1, v1), (k2, v2)) => (k1 + "_" + k2, v1 + v2)}
    println(newRet)
    

    需要注意一点的是,聚合前后的数据类型保持一致。

  • countByKey

    统计key出现的次数

    val countByKey = pairs.countByKey()
    for ((k, v) <- countByKey) {println(k + "-->" + v)
    }
    //使用groupByKey求解wordcount
    val rr = pairs.groupByKey().map{case (key, iters) => (key, iters.size)}
    rr.foreach(println)
    
  • saveAsTextFile

    rr.saveAsTextFile("file:/E:/data/out/1904-bd/out1")
    

    本质上是saveAsHadoopFile[TextOutputFormat[NullWritable, Text]]

  • saveAsObjectFile和saveAsSequenceFile

    saveAsObjectFile本质上是saveAsSequenceFile

    def saveAsObjectFile(path: String): Unit = withScope {this.mapPartitions(iter => iter.grouped(10).map(_.toArray)).map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))).saveAsSequenceFile(path)
    }
    
  • saveAsHadoopFile和saveAsNewAPIHadoopFile

    这二者的主要区别就是OutputFormat的区别,

    接口org.apache.hadoop.mapred.OutputFormat

    抽象类org.apache.hadoop.mapreduce.OutputFormat

    所以saveAshadoopFile使用的是接口OutputFormat,saveAsNewAPIHadoopFile使用的抽象类OutputFormat,建议大家使用后者。

    val path = "file:/E:/data/out/1904-bd/out3"
    rr.saveAsNewAPIHadoopFile(path,classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text, IntWritable]])
    

2.2.3. 持久化操作

2.2.3.1. 什么是持久化,为什么要持久化

  One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

2.2.3.2. 如何进行持久化

	You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

​ 持久化的方法就是rdd.persist()或者rdd.cache()

2.2.3.3. 持久化策略

In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes. These levels are set by passing a StorageLevel object (Scala, Java, Python) to persist(). The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory).

​ 可以通过persist(StoreageLevle的对象)来指定持久化策略,eg:StorageLevel.MEMORY_ONLY。

持久化策略含义
MEMORY_ONLY(默认)rdd中的数据,以未经序列化的java对象格式,存储在内存中。如果内存不足,剩余的部分不持久化,使用的时候,没有持久化的那一部分数据重新加载。这种效率是最高,但是是对内存要求最高的。
MEMORY_ONLY_SER就比MEMORY_ONLY多了一个SER序列化,保存在内存中的数据是经过序列化之后的字节数组,同时每一个partition此时就是一个比较大的字节数组。
MEMORY_AND_DISK和MEMORY_ONLY相比就多了一个,内存存不下的数据存储在磁盘中。
MEMEORY_AND_DISK_SER比MEMORY_AND_DISK多了个序列化。
DISK_ONLY就是MEMORY_ONLY对应,都保存在磁盘,效率太差,一般不用。
xxx_2就是上述多个策略后面加了一个_2,比如MEMORY_ONLY_2,MEMORY_AND_DISK_SER_2等等,就多了一个replicate而已,备份,所以性能会下降,但是容错或者高可用加强了。所以需要在二者直接做权衡。如果说要求数据具备高可用,同时容错的时间花费比从新计算花费时间少,此时便可以使用,否则一般不用。
HEAP_OFF(experimental)使用非Spark的内存,也即堆外内存,比如Tachyon,HBase、Redis等等内存来补充spark数据的缓存。

2.2.3.4. 如何选择一款合适的持久化策略

​ 第一就选择默认MEMORY_ONLY,因为性能最高嘛,但是对空间要求最高;如果空间满足不了,退而求其次,选择MEMORY_ONLY_SER,此时性能还是蛮高的,相比较于MEMORY_ONLY的主要性能开销就是序列化和反序列化;如果内存满足不了,直接跨越MEMORY_AND_DISK,选择MEMEORY_AND_DISK_SER,因为到这一步,说明数据蛮大的,要想提高性能,关键就是基于内存的计算,所以应该尽可能的在内存中存储对象;DISK_ONLY不用,xx_2的使用如果说要求数据具备高可用,同时容错的时间花费比从新计算花费时间少,此时便可以使用,否则一般不用。

2.2.3.5. 持久化和非持久化性能比较

object _05SparkPersistOps {def main(args: Array[String]): Unit = {val conf = new SparkConf()    .setAppName(s"${_05SparkPersistOps.getClass.getSimpleName}").setMaster("local[*]")val sc = new SparkContext(conf)//读取外部数据var start = System.currentTimeMillis()val lines = sc.textFile("file:///E:/data/spark/core/sequences.txt")var count = lines.count()println("没有持久化:#######lines' count: " + count + ", cost time: " + (System.currentTimeMillis() - start) + "ms")lines.persist(StorageLevel.MEMORY_AND_DISK) //lines.cache()start = System.currentTimeMillis()count = lines.count()println("持久化之后:#######lines' count: " + count + ", cost time: " + (System.currentTimeMillis() - start) + "ms")lines.unpersist()//卸载持久化数据sc.stop()}
}

没有持久化:#######lines’ count: 1000000, cost time: 5257ms
持久化之后:#######lines’ count: 1000000, cost time: 1408ms

2.3. 共享变量

2.3.0. 概述

Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient(低效). However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.

2.3.1. broadcast广播变量

  1. 说明

在这里插入图片描述

如何使用广播变量呢?对普通遍历进行包装即可,val num:Any = xxxval numBC:Broadcast[Any] = sc.broadcast(num)调用val n = numBC.value需要注意一点的是,显然该num需要进行序列化。
  1. 编程

    object _06BroadcastOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(s"${_06BroadcastOps.getClass.getSimpleName}").setMaster("local[*]")val sc = new SparkContext(conf)val genderMap = Map("0" -> "妹砸儿","1" -> "大兄弟")val stuRDD = sc.parallelize(List(Student("01", "宋敏健", "0", 18),Student("02", "严文青", "1", 19),Student("03", "王大伟", "1", 18),Student("04", "闫来宾", "1", 22)))stuRDD.map(stu => {val gender = stu.genderStudent(stu.id, stu.name, genderMap.getOrElse(gender, "ladyBoy"), stu.age)}).foreach(println)println("=============使用广播变量的做法==============")val genderBC:Broadcast[Map[String, String]] = sc.broadcast(genderMap)stuRDD.map(stu => {val gender = genderBC.value.getOrElse(stu.gender, "ladyBody")Student(stu.id, stu.name, gender, stu.age)}).foreach(println)sc.stop()}
    }case class Student(id:String, name:String, gender:String, age:Int)
    

2.3.2. accumulator累加器

  1. 说明

    accumulator累加器的概念和mr中出现的counter计数器的概念有异曲同工之妙,对某些具备某些特征的数据进行累加。累加器的一个好处是,不需要修改程序的业务逻辑来完成数据累加,同时也不需要额外的触发一个action job来完成累加,反之必须要添加新的业务逻辑,必须要触发一个新的action job来完成,显然这个accumulator的操作性能更佳!

    累加的使用:

    构建一个累加器

    val accu = sc.longAccumuator()

    累加的操作

    accu.add(参数)

    获取累加器的结果,累加器的获取,必须需要action的触发

    val ret = accu.value

  2. 编程

    • 使用非累加器完成某些特征数据的累加求解

      val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/work/1904-bd/workspace/spark-parent-1904/data/accu.txt")
      val words = lines.flatMap(_.split("\\s+"))//统计每个单词出现的次数
      val rbk = words.map((_, 1)).reduceByKey(_+_)
      rbk.foreach(println)
      println("=============额外的统计=================")
      //统计其中的is出现的次数
      rbk.filter{case (word, count) => word == "is"}.foreach(println)Thread.sleep(10000000)
      sc.stop()
      

      在这里插入图片描述

    • 使用累加器完成上述案例

      val conf = new SparkConf()
      .setAppName(s"${_07AccumulatorOps.getClass.getSimpleName}")
      .setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/work/1904-bd/workspace/spark-parent-1904/data/accu.txt")
      val words = lines.flatMap(_.split("\\s+"))//统计每个单词出现的次数
      val accumulator = sc.longAccumulatorval rbk = words.map(word => {if(word == "is")accumulator.add(1)(word, 1)
      }).reduceByKey(_+_)
      rbk.foreach(println)
      println("================使用累加器===================")
      println("is: " + accumulator.value)Thread.sleep(10000000)
      sc.stop()
      

在这里插入图片描述
- 总结

    使用累加器也能够完成上述的操作,而且只使用了一个action操作。3. 注意- 累加器的调用,也就是accumulator.value必须要在action之后被调用,也就是说累加器必须在action触发之后。- 多次使用同一个累加器,应该尽量做到用完即重置accumulator.reset- 尽量给累加器指定name,方便我们在web-ui上面进行查看

在这里插入图片描述
4. 自定义累加器

    在上述3的案例基础之上,还用统计of,甚至统计a这些额外的单词,怎么做?此时就应该使用自定义累加器。MyAccumulator```scala/*自定义累加器IN 指的是accmulator.add(sth.)中sth的数据类型OUT 指的是accmulator.value返回值的数据类型*/class MyAccumulator extends AccumulatorV2[String, Map[String, Long]] {private var map = mutable.Map[String, Long]()/*** 当前累加器是否有初始化值* 如果为一个long的值,0就是初始化值,如果为list,Nil就是初始化值,是map,Map()就是初始化值*/override def isZero: Boolean = trueoverride def copy(): AccumulatorV2[String, Map[String, Long]] = {val accu = new MyAccumulatoraccu.map = this.mapaccu}override def reset(): Unit = map.clear()//分区内的数据累加 is: 5, of:4override def add(word: String): Unit = {if(map.contains(word)) {val newCount = map(word) + 1map.put(word, newCount)} else {map.put(word, 1)}//        map.put(word, map.getOrElse(word, 0) + 1)}//多个分区间的数据累加override def merge(other: AccumulatorV2[String, Map[String, Long]]): Unit = {other.value.foreach{case (word, count) => {if(map.contains(word)) {val newCount = map(word) + countmap.put(word, newCount)} else {map.put(word, count)}//            map.put(word, map.getOrElse(word, 0) + count)}}}override def value: Map[String, Long] = map.toMap}```注册使用```scalaobject _08AccumulatorOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(s"${_08AccumulatorOps.getClass.getSimpleName}").setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/work/1904-bd/workspace/spark-parent-1904/data/accu.txt")val words = lines.flatMap(_.split("\\s+"))//注册val myAccu = new MyAccumulator()sc.register(myAccu, "myAccu")//统计每个单词出现的次数val pairs = words.map(word => {if(word == "is" || word == "of" || word == "a")myAccu.add(word)(word, 1)})val rbk = pairs.reduceByKey(_+_)rbk.foreach(println)println("=============累加器==========")myAccu.value.foreach(println)Thread.sleep(10000000)sc.stop()}}```

3. 高级排序

3.1. 普通的排序

3.1.1. sortByKey

object _01SortByKeyOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(s"${_01SortByKeyOps.getClass.getSimpleName}").setMaster("local[2]")val sc = new SparkContext(conf)//sortByKey 数据类型为k-v,且是按照key进行排序val stuRDD:RDD[Student] = sc.parallelize(List(Student(1, "吴轩宇", 19, 168),Student(2, "彭国宏", 18, 175),Student(3, "随国强", 18, 176),Student(4, "闫  磊", 20, 180),Student(5, "王静轶", 18, 168.5)))//按照学生身高进行降序排序val height2Stu = stuRDD.map(stu => (stu.height, stu))//注意:sortByKey是局部排序,不是全局排序,如果要进行全局排序,// 必须将所有的数据都拉取到一台机器上面才可以val sorted = height2Stu.sortByKey(ascending = false, numPartitions = 1)sorted.foreach{case (height, stu) => println(stu)}sc.stop()}
}case class Student(id:Int, name:String, age:Int, height:Double)

3.1.2. sortBy

  1. 说明

    这个sortByKey其实使用sortByKey来实现,但是比sortByKey更加灵活,因为sortByKey只能应用在k-v数据格式上,而这个sortByKey可以应在非k-v键值对的数据格式上面。

  2. 编程

    val sortedBy = stuRDD.sortBy(stu => stu.height,ascending = true,numPartitions = 1)(new Ordering[Double](){override def compare(x: Double, y: Double) = y.compareTo(x)},ClassTag.Double.asInstanceOf[ClassTag[Double]])
    sortedBy.foreach(println)
    
  3. 总结

    sortedBy的操作,除了正常的升序,分区个数以外,还需需要传递一个将原始数据类型,提取其中用于排序的字段;并且提供用于比较的方式,以及在运行时的数据类型ClassTag标记型trait。

3.1.3. takeOrdered

  1. 说明

    takeOrdered也是对rdd进行排序,但是和上述的sortByKey和sortBy相比较,takeOrdered是一个action操作,返回值为一个集合,而前两者为transformation,返回值为rdd。如果我们想在driver中获取排序之后的结果,那么建议使用takeOrdered,因为该操作边排序边返回。

    其实是take和sortBy的一个结合体。

    takeOrdered(n),获取排序之后的n条记录

  2. 编程

    //先按照身高降序排序,身高相对按照年龄升序排 ---> 二次排序
    stuRDD.takeOrdered(3)(new Ordering[Student](){override def compare(x: Student, y: Student) = {var ret = y.height.compareTo(x.height)if(ret == 0) {ret = x.age.compareTo(y.age)}ret}
    }).foreach(println)
    

3.2. TopN

​ 到这里,topN就是3.1之后执行action操作take(N),或者3.2直接takeOrderd(N),建议使用后者,效率高于前者。

3.3. 二次排序

​ 所谓二次排序,指的是排序字段不唯一,有多个,共同排序,仍然使用上面的数据,对学生的身高和年龄一次排序。

object _02SecondSortOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(s"${_02SecondSortOps.getClass.getSimpleName}").setMaster("local[2]")val sc = new SparkContext(conf)//sortByKey 数据类型为k-v,且是按照key进行排序val personRDD:RDD[Person] = sc.parallelize(List(Person(1, "吴轩宇", 19, 168),Person(2, "彭国宏", 18, 175),Person(3, "随国强", 18, 176),Person(4, "闫  磊", 20, 180),Person(5, "王静轶", 18, 168)))personRDD.map(stu => (stu, null)).sortByKey(true, 1).foreach(p => println(p._1))sc.stop()}
}case class Person(id:Int, name:String, age:Int, height:Double) extends Ordered[Person] {//对学生的身高和年龄依次排序override def compare(that: Person) = {var ret = this.height.compareTo(that.height)if(ret == 0) {ret = this.age.compareTo(that.age)}ret}
}

3.4. 分组TopN

​ 在分组的情况之下,获取每个组内的TopN数据。

  1. 需求

    基础数据:

    chinese ls 91
    english ww 56
    chinese zs 90
    chinese zl 76
    english zq 88

    字段分别为科目,姓名,成绩。要求:求出每个科目成绩排名前3的学生信息。

  2. 编码

    object _03GroupSortTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(s"${_03GroupSortTopN.getClass.getSimpleName}").setMaster("local[2]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/spark/topn.txt")//按照科目进行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目内排序,不是科目间的排序,所以需要把每个科目的信息汇总val course2Infos:RDD[(String, Iterable[String])] = course2Info.groupByKey()//按照key进行分组//分组内的排序val sorted:RDD[(String, mutable.TreeSet[String])] = course2Infos.map{case (course, infos) => {val topN = mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})for(info <- infos) {topN.add(info)}(course, topN.take(3))}}sorted.foreach(println)sc.stop()}
    }
    

3.6. 优化分组TopN

  1. 说明

    上述在编码过程当中使用groupByKey,我们说着这个算子的性能很差,因为没有本地预聚合,所以应该在开发过程当中尽量避免使用,能用其它代替就代替。

  2. 编码

    • 使用combineByKey模拟

      object _04GroupSortByCombineByKeyTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(s"${_04GroupSortByCombineByKeyTopN.getClass.getSimpleName}").setMaster("local[2]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/spark/topn.txt")//按照科目进行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目内排序,不是科目间的排序,所以需要把每个科目的信息汇总val course2Infos= course2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)//分组内的排序val sorted:RDD[(String, mutable.TreeSet[String])] = course2Infos.map{case (course, infos) => {val topN = mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})for(info <- infos) {topN.add(info)}(course, topN.take(3))}}sorted.foreach(println)sc.stop()}def createCombiner(info:String): ArrayBuffer[String] = {val ab = new ArrayBuffer[String]()ab.append(info)ab}def mergeValue(ab:ArrayBuffer[String], info:String): ArrayBuffer[String] = {ab.append(info)ab}def mergeCombiners(ab:ArrayBuffer[String], ab1: ArrayBuffer[String]): ArrayBuffer[String] = {ab.++:(ab1)}
      }
      

      此时这种写法和上面的groupByKey性能一模一样,没有任何的优化。

    • 使用combineByKey的优化

      object _05GroupSortByCombineByKeyTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(s"${_05GroupSortByCombineByKeyTopN.getClass.getSimpleName}").setMaster("local[2]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/spark/topn.txt")//按照科目进行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目内排序,不是科目间的排序,所以需要把每个科目的信息汇总val sorted= course2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)sorted.foreach(println)sc.stop()}def createCombiner(info:String): mutable.TreeSet[String] = {val ts = new mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})ts.add(info)ts}def mergeValue(ab:mutable.TreeSet[String], info:String): mutable.TreeSet[String] = {ab.add(info)if(ab.size > 3) {ab.take(3)} else {ab}}def mergeCombiners(ab:mutable.TreeSet[String], ab1: mutable.TreeSet[String]): mutable.TreeSet[String] = {for (info <- ab1) {ab.add(info)}if(ab.size > 3) {ab.take(3)} else {ab}}
      }
      
            sorted.foreach(println)sc.stop()}def createCombiner(info:String): mutable.TreeSet[String] = {val ts = new mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})ts.add(info)ts}def mergeValue(ab:mutable.TreeSet[String], info:String): mutable.TreeSet[String] = {ab.add(info)if(ab.size > 3) {ab.take(3)} else {ab}}def mergeCombiners(ab:mutable.TreeSet[String], ab1: mutable.TreeSet[String]): mutable.TreeSet[String] = {for (info <- ab1) {ab.add(info)}if(ab.size > 3) {ab.take(3)} else {ab}}
      

      }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/17922.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

有了这个Python库,再也不愁给孩子起名字了

后台回复1024&#xff0c;解锁无限快乐&#xff01; 你是不是曾经为了给孩子起名字而左思右想&#xff0c;抓耳挠腮&#xff1f; 今天&#xff0c;我为你带来一款永久且快速的解决方案&#xff0c;让你再也不会为给孩子起名而苦恼了&#xff0c;无论生多少个&#xff0c;起名都…

深挖GPT-4背后的技术方向,华人开发者成绩斐然

源&#xff5c;机器之心 始于谷歌&#xff0c;发迹于 OpenAI&#xff0c;这是很多 GPT-4 贡献者的职业轨迹。 这个星期&#xff0c;OpenAI 大模型 GPT-4 的发布让全球科技圈的技术竞争进入了白热化。几天之内&#xff0c;ChatGPT、必应搜索和 Microsoft 365 相继接入 GPT-4&…

火狐十大必用插件

十个个必装的火狐插件 也许这十个中其中某个或者几个并适合中国人用&#xff0c;当我至少有3个在用&#xff01; 上次我们为必装的火狐扩展进行排行 已经是两年前的事了。而现在的新 排行做出了一些必要的更新&#xff0c;和上次一样以即点即用的形式为大家展现这次的大师级最…

火狐的一些实用插件

Firefox Firefox是一一个出自Mozilla组织的流行的Web浏览器。Firefox的流行并不仅仅是因为它是一-个好的浏览器&#xff0c;而且它能够支持插件进而加强它自身的功能。 Mozilla 有一个插件站点&#xff0c;在那里面有成千上万非常有用的插件。一些插件对于渗透测试人员和安全…

推荐几个火狐浏览器插件

目前&#xff0c;谷歌浏览器市场占有率最高&#xff0c;谷歌公司号称不作恶&#xff0c;实际上也作了一些恶&#xff0c;只不过底线比百度高一点。因为各类屏蔽广告的插件有损谷歌自己投放的广告的收益&#xff0c;因此谷歌对广告屏蔽插件下手了&#xff0c;削弱插件的屏蔽效果…

Firefox火狐浏览器插件大全

Firefox火狐浏览器 软件版本&#xff1a;51.0.1 简体中文版软件大小&#xff1a;666KB软件授权&#xff1a;免费适用平台&#xff1a; WinXP Win2003 Vista Win8 Win7 Win10下载地址&#xff1a;http://dl.pconline.com.cn/download/52175.html 立即下载 一、浏览器功能扩展组…

fireFox post请求插件,火狐浏览器插件

在开发过程中&#xff0c;为了测试数据&#xff0c;提交表单等。经常会用到post请求&#xff1b;在这里向大家介绍一款比较好用的浏览器插件&#xff0c;它可以很好的模拟post&#xff0c;get&#xff0c;put等常用的请求。大大便利了我们的开发。它就是fire fox中的RESTClient…

用nodejs搭建代理服务器

代理服务器的原理 案例 安装 express、http-proxy-middlewareapp.js 文件 node app.jsvar express require(express); var app express(); app.use(express.static(./public)); app.listen(3000);在 public 文件夹下建立 a.html<!DOCTYPE html> <html lang"en…

VSCode连GitHub的代理服务器配置和获取历史版本命令

1. 在VSCode中配置代理&#xff1a; 在设置中查找“Proxy”直接编辑配置文件。或者在如下菜单中点击打开配置文件 在配置文件中添加如下两条。注意http和https的代理都要配置上 "http.proxy": "http://192.168.8.*:8080", "https.proxy": &quo…

巴比特 | 元宇宙每日必读:围剿ChatGPT?继意大利之后,法国、西班牙也对ChatGPT展开调查,欧盟监管机构已成立调查组...

摘要&#xff1a;据财联社报道&#xff0c;自去年末以来&#xff0c;ChatGPT人工智能机器人在全球引发高度关注。在人们为该AI系统表现出的强大能力所惊叹的同时&#xff0c;其数据准确性及隐私相关问题也引发人们的担忧。在意大利最早对ChatGPT采取了监管举措后&#xff0c;当…

AIGC用于智能写作的技术综述-达观数据

导语 图1. ChatGPT生成的关于智能写作的介绍 智能写作指使用自然语言处理技术来自动生成文本内容。这种技术通过分析给定语料库&#xff0c;学习文本的结构和语法&#xff0c;然后利用这些信息来生成新的文本。智能写作可以用来快速生成高质量的文本内容&#xff0c;并且可…

谷歌Bard是拿ChatGPT数据训练的?BERT一作抗议无果,跳槽OpenAI...

点击下方卡片&#xff0c;关注“CVer”公众号 AI/CV重磅干货&#xff0c;第一时间送达 点击进入—>【多模态学习】微信技术交流群 转载自&#xff1a;机器之心 谷歌顶尖科学家向皮查伊、Jeff Dean 请愿未果&#xff0c;最后只好跳槽 OpenAI。 打不过就加入&#xff0c;打不过…

Bard是拿ChatGPT数据训练的?谷歌顶级科学家抗议无果,出走OpenAI

源 | 机器之心 谷歌顶尖科学家向皮查伊、Jeff Dean 请愿未果&#xff0c;最后只好跳槽 OpenAI。 打不过就加入&#xff0c;打不过就直接用&#xff1f; 相比成为科技领域里程碑的 ChatGPT&#xff0c;谷歌推出的竞品 Bard 亮相似乎并不令人印象深刻&#xff0c;对于这家科技巨头…

借助 ChatGPT 编写的 libbpf eBPF 工具开发实践教程: 通过例子学习 eBPF

这是一个基于 CO-RE&#xff08;一次编译&#xff0c;到处运行&#xff09;的 libbpf 的 eBPF 的开发教程&#xff0c;提供了从入门到进阶的 eBPF 开发实践指南&#xff0c;包括基本概念、代码实例、实际应用等内容。我们主要提供了一些 eBPF 工具的案例&#xff0c;帮助开发者…

城市路边停车收费系统/停车收费管理系统

摘 要 近年来&#xff0c;随着社会的进步和发展&#xff0c;车辆也在迅速增加&#xff0c;城市交通的瓶颈不仅体现在道路交通的拥挤上&#xff0c;也体现在传统停车场管理效率和安全性大大滞后于社会的需要&#xff0c;给人们的生活带来了极大的不便。尤其&#xff0c;随着汽车…

字节辟谣被裁员工与 HR 互殴;苹果头显多个新功能曝光;谷歌希望 RISC-V 成为 T1 级 Android 架构|极客头条...

「极客头条」—— 技术人员的新闻圈&#xff01; CSDN 的读者朋友们早上好哇&#xff0c;「极客头条」来啦&#xff0c;快来看今天都有哪些值得我们技术人关注的重要新闻吧。 整理 | 梦依丹 出品 | CSDN&#xff08;ID&#xff1a;CSDNnews&#xff09; 一分钟速览新闻点&#…

《花雕学AI》26:多维度了解ChatGPT思维链提示的原理、方法、使用和发展的22个问题

早上5点起床&#xff0c;没有去打羽毛球&#xff0c;打开电脑&#xff0c;漫无边际的浏览&#xff0c;偶然发现了一个提法&#xff1a;ChatGPT思维链提示。于是&#xff0c;我使用与ChatGPT同源技术的新Bing引擎&#xff08;GPT-4&#xff09;&#xff0c;来进行搜索与了解相关…

《花雕学AI》人类推理能力对AI来说是什么?用ChatGPT来检验一下

”这里有一本书、九个鸡蛋、一台笔记本电脑、一个瓶子和一个钉子&#xff0c;请告诉我如何把它们稳定地堆叠在一起&#xff1f;“ 这是去年提出的一道测试推理能力的题目&#xff0c;当微软的计算机科学家开始试验一种新的AI系统时&#xff0c;他们要求AI解决这个难题&#xf…

哈尔滨联通java_哈尔滨联通套餐资费-哈尔滨联通套餐资费介绍2021【哈尔滨集号吧】...

哈尔滨联通4G全国套餐资费 4G基本套餐56 月租56元&#xff0c;送来显&#xff0c;含国内语音可视拨打100分钟&#xff0c;国内流量500M&#xff0c;超出后国内主叫0.15元/分钟&#xff0c;国内流量 0.30元/M&#xff0c;国内短信彩信0.10元/条&#xff0c;全国接听免费&#xf…

NETCTOSS02_资费管理模块

NETCTOSS02_资费管理模块 结果展示: 用到的技术: 1.搭建struts2项目 2.访问数据库,查询结果显示在jsp中 3.分页技术 源代码及具体细节如下: Cost.java 实体类 package com.qxl.netctoss.entity;import java.sql.Timestamp;public class Cost {private int costId; //…