婷婷综合国产,91蜜桃婷婷狠狠久久综合9色 ,九九九九九精品,国产综合av

主頁 > 知識庫 > Spark SQL數據加載和保存實例講解

Spark SQL數據加載和保存實例講解

熱門標簽:互聯網電話外呼系統 電話機器人怎么代理商 400電話辦理泰安 家庭農場地圖標注名稱怎樣起名 安卡拉地圖標注app 我要地圖標注數量有限制嗎 千呼電話機器人可以試用嗎 電銷需要外呼系統嗎 零成本地圖標注賺錢

一、前置知識詳解
Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作,
Load:可以創建DataFrame,
Save:把DataFrame中的數據保存到文件或者說與具體的格式來指明我們要讀取的文件的類型以及與具體的格式來指出我們要輸出的文件是什么類型。

二、Spark SQL讀寫數據代碼實戰

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;

public class SparkSQLLoadSaveOps {
 public static void main(String[] args) {
  SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLLoadSaveOps");
  JavaSparkContext sc = new JavaSparkContext(conf);
  SQLContext = new SQLContext(sc);
  /**
   * read()是DataFrameReader類型,load可以將數據讀取出來
   */
  DataFrame peopleDF = sqlContext.read().format("json").load("E:\\Spark\\Sparkinstanll_package\\Big_Data_Software\\spark-1.6.0-bin-hadoop2.6\\examples\\src\\main\\resources\\people.json");

  /**
   * 直接對DataFrame進行操作
   * Json: 是一種自解釋的格式,讀取Json的時候怎么判斷其是什么格式?
   * 通過掃描整個Json。掃描之后才會知道元數據
   */
  //通過mode來指定輸出文件的是append。創建新文件來追加文件
 peopleDF.select("name").write().mode(SaveMode.Append).save("E:\\personNames");
 }
}

讀取過程源碼分析如下:
1. read方法返回DataFrameReader,用于讀取數據。

/**
 * :: Experimental ::
 * Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]].
 * {{{
 *  sqlContext.read.parquet("/path/to/file.parquet")
 *  sqlContext.read.schema(schema).json("/path/to/file.json")
 * }}}
 *
 * @group genericdata
 * @since 1.4.0
 */
@Experimental
//創建DataFrameReader實例,獲得了DataFrameReader引用
def read: DataFrameReader = new DataFrameReader(this)

2.  然后再調用DataFrameReader類中的format,指出讀取文件的格式。

/**
 * Specifies the input data source format.
 *
 * @since 1.4.0
 */
def format(source: String): DataFrameReader = {
 this.source = source
 this
}

3.  通過DtaFrameReader中load方法通過路徑把傳入過來的輸入變成DataFrame。

/**
 * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
 * a local or distributed file system).
 *
 * @since 1.4.0
 */
// TODO: Remove this one in Spark 2.0.
def load(path: String): DataFrame = {
 option("path", path).load()
}

至此,數據的讀取工作就完成了,下面就對DataFrame進行操作。
下面就是寫操作!!!

1. 調用DataFrame中select函數進行對列篩選

/**
 * Selects a set of columns. This is a variant of `select` that can only select
 * existing columns using column names (i.e. cannot construct expressions).
 *
 * {{{
 *  // The following two are equivalent:
 *  df.select("colA", "colB")
 *  df.select($"colA", $"colB")
 * }}}
 * @group dfops
 * @since 1.3.0
 */
@scala.annotation.varargs
def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)

2.  然后通過write將結果寫入到外部存儲系統中。

/**
 * :: Experimental ::
 * Interface for saving the content of the [[DataFrame]] out into external storage.
 *
 * @group output
 * @since 1.4.0
 */
@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)

3.   在保持文件的時候mode指定追加文件的方式

/**
 * Specifies the behavior when data or table already exists. Options include:
// Overwrite是覆蓋
 *  - `SaveMode.Overwrite`: overwrite the existing data.
//創建新的文件,然后追加
 *  - `SaveMode.Append`: append the data.
 *  - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
 *  - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
 *
 * @since 1.4.0
 */
def mode(saveMode: SaveMode): DataFrameWriter = {
 this.mode = saveMode
 this
}

4.   最后,save()方法觸發action,將文件輸出到指定文件中。

/**
 * Saves the content of the [[DataFrame]] at the specified path.
 *
 * @since 1.4.0
 */
def save(path: String): Unit = {
 this.extraOptions += ("path" -> path)
 save()
}

三、Spark SQL讀寫整個流程圖如下

四、對于流程中部分函數源碼詳解

DataFrameReader.Load()

1. Load()返回DataFrame類型的數據集合,使用的數據是從默認的路徑讀取。

/**
 * Returns the dataset stored at path as a DataFrame,
 * using the default data source configured by spark.sql.sources.default.
 *
 * @group genericdata
 * @deprecated As of 1.4.0, replaced by `read().load(path)`. This will be removed in Spark 2.0.
 */
@deprecated("Use read.load(path). This will be removed in Spark 2.0.", "1.4.0")
def load(path: String): DataFrame = {
//此時的read就是DataFrameReader
 read.load(path)
}

2.  追蹤load源碼進去,源碼如下:
在DataFrameReader中的方法。Load()通過路徑把輸入傳進來變成一個DataFrame。

/** 
 * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
 * a local or distributed file system).
 *
 * @since 1.4.0
 */
// TODO: Remove this one in Spark 2.0.
def load(path: String): DataFrame = {
 option("path", path).load()
}

3.  追蹤load源碼如下:

/**
 * Loads input in as a [[DataFrame]], for data sources that don't require a path (e.g. external
 * key-value stores).
 *
 * @since 1.4.0
 */
def load(): DataFrame = {
//對傳入的Source進行解析
 val resolved = ResolvedDataSource(
  sqlContext,
  userSpecifiedSchema = userSpecifiedSchema,
  partitionColumns = Array.empty[String],
  provider = source,
  options = extraOptions.toMap)
 DataFrame(sqlContext, LogicalRelation(resolved.relation))
}

DataFrameReader.format()

1. Format:具體指定文件格式,這就獲得一個巨大的啟示是:如果是Json文件格式可以保持為Parquet等此類操作。
Spark SQL在讀取文件的時候可以指定讀取文件的類型。例如,Json,Parquet.

/**
 * Specifies the input data source format.Built-in options include “parquet”,”json”,etc.
 *
 * @since 1.4.0
 */
def format(source: String): DataFrameReader = {
 this.source = source //FileType
 this
}

DataFrame.write()

1. 創建DataFrameWriter實例

/**
 * :: Experimental ::
 * Interface for saving the content of the [[DataFrame]] out into external storage.
 *
 * @group output
 * @since 1.4.0
 */
@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)
1

2.  追蹤DataFrameWriter源碼如下:
以DataFrame的方式向外部存儲系統中寫入數據。

/**
 * :: Experimental ::
 * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems,
 * key-value stores, etc). Use [[DataFrame.write]] to access this.
 *
 * @since 1.4.0
 */
@Experimental
final class DataFrameWriter private[sql](df: DataFrame) {

DataFrameWriter.mode()

1. Overwrite是覆蓋,之前寫的數據全都被覆蓋了。
Append:是追加,對于普通文件是在一個文件中進行追加,但是對于parquet格式的文件則創建新的文件進行追加。

/**
 * Specifies the behavior when data or table already exists. Options include:
 *  - `SaveMode.Overwrite`: overwrite the existing data.
 *  - `SaveMode.Append`: append the data.
 *  - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
//默認操作
 *  - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
 *
 * @since 1.4.0
 */
def mode(saveMode: SaveMode): DataFrameWriter = {
 this.mode = saveMode
 this
}

2.  通過模式匹配接收外部參數

/**
 * Specifies the behavior when data or table already exists. Options include:
 *  - `overwrite`: overwrite the existing data.
 *  - `append`: append the data.
 *  - `ignore`: ignore the operation (i.e. no-op).
 *  - `error`: default option, throw an exception at runtime.
 *
 * @since 1.4.0
 */
def mode(saveMode: String): DataFrameWriter = {
 this.mode = saveMode.toLowerCase match {
  case "overwrite" => SaveMode.Overwrite
  case "append" => SaveMode.Append
  case "ignore" => SaveMode.Ignore
  case "error" | "default" => SaveMode.ErrorIfExists
  case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
   "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.")
 }
 this
}

DataFrameWriter.save()

1. save將結果保存傳入的路徑。

/**
 * Saves the content of the [[DataFrame]] at the specified path.
 *
 * @since 1.4.0
 */
def save(path: String): Unit = {
 this.extraOptions += ("path" -> path)
 save()
}

2.  追蹤save方法。

/**
 * Saves the content of the [[DataFrame]] as the specified table.
 *
 * @since 1.4.0
 */
def save(): Unit = {
 ResolvedDataSource(
  df.sqlContext,
  source,
  partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
  mode,
  extraOptions.toMap,
  df)
}

3.  其中source是SQLConf的defaultDataSourceName
private var source: String = df.sqlContext.conf.defaultDataSourceName
其中DEFAULT_DATA_SOURCE_NAME默認參數是parquet。

// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
 defaultValue = Some("org.apache.spark.sql.parquet"),
 doc = "The default data source to use in input/output.")

DataFrame.scala中部分函數詳解:

1. toDF函數是將RDD轉換成DataFrame

/**
 * Returns the object itself.
 * @group basic
 * @since 1.3.0
 */
// This is declared with parentheses to prevent the Scala compiler from treating
// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
def toDF(): DataFrame = this

2.  show()方法:將結果顯示出來

/**
 * Displays the [[DataFrame]] in a tabular form. For example:
 * {{{
 *  year month AVG('Adj Close) MAX('Adj Close)
 *  1980 12  0.503218    0.595103
 *  1981 01  0.523289    0.570307
 *  1982 02  0.436504    0.475256
 *  1983 03  0.410516    0.442194
 *  1984 04  0.450090    0.483521
 * }}}
 * @param numRows Number of rows to show
 * @param truncate Whether truncate long strings. If true, strings more than 20 characters will
 *       be truncated and all cells will be aligned right
 *
 * @group action
 * @since 1.5.0
 */
// scalastyle:off println
def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))
// scalastyle:on println

追蹤showString源碼如下:showString中觸發action收集數據。

/**
 * Compose the string representing rows for output
 * @param _numRows Number of rows to show
 * @param truncate Whether truncate long strings and align cells right
 */
private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = {
 val numRows = _numRows.max(0)
 val sb = new StringBuilder
 val takeResult = take(numRows + 1)
 val hasMoreData = takeResult.length > numRows
 val data = takeResult.take(numRows)
 val numCols = schema.fieldNames.length

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

您可能感興趣的文章:
  • Spark SQL常見4種數據源詳解
  • Spark學習筆記之Spark SQL的具體使用
  • pyspark.sql.DataFrame與pandas.DataFrame之間的相互轉換實例
  • 淺談DataFrame和SparkSql取值誤區
  • Spark SQL操作JSON字段的小技巧
  • Spark SQL的整體實現邏輯解析

標簽:池州 黃山 來賓 文山 大同 東營 新鄉 濱州

巨人網絡通訊聲明:本文標題《Spark SQL數據加載和保存實例講解》,本文關鍵詞  Spark,SQL,數據,加載,和,保存,;如發現本文內容存在版權問題,煩請提供相關信息告之我們,我們將及時溝通與處理。本站內容系統采集于網絡,涉及言論、版權與本站無關。
  • 相關文章
  • 下面列出與本文章《Spark SQL數據加載和保存實例講解》相關的同類信息!
  • 本頁收集關于Spark SQL數據加載和保存實例講解的相關信息資訊供網民參考!
  • 推薦文章
    婷婷综合国产,91蜜桃婷婷狠狠久久综合9色 ,九九九九九精品,国产综合av
    亚洲国产精品av| 一本久久a久久免费精品不卡| 自拍偷拍国产亚洲| 欧美激情综合五月色丁香小说| 精品伦理精品一区| 欧美大片国产精品| 精品人在线二区三区| 日韩色在线观看| 国产亚洲一二三区| 综合色天天鬼久久鬼色| 亚洲久草在线视频| 日日夜夜免费精品| 另类调教123区| 精品一区二区三区免费观看| 国产一区不卡视频| 91在线免费播放| 欧美日韩一二区| 欧美一级免费大片| 中文字幕精品一区| 一区二区三区日本| 欧美aaa在线| 国产91精品一区二区麻豆亚洲| 成人aaaa免费全部观看| 日本精品一区二区三区高清 | 久久香蕉国产线看观看99| 久久久亚洲午夜电影| 国产精品欧美一区二区三区| 亚洲精品一二三| 午夜视频在线观看一区二区三区| 韩国中文字幕2020精品| 97久久超碰国产精品| 538prom精品视频线放| 国产日产精品一区| 亚洲一区二区综合| 国产一区二区三区免费观看 | 中文字幕亚洲区| 日韩电影免费一区| 99久久99久久精品免费观看 | 国产精品久久久久久久久果冻传媒 | 在线成人免费视频| 国产色产综合产在线视频| 一区二区在线观看不卡| 久久国产视频网| 欧美三级视频在线| 国产欧美久久久精品影院| 天天做天天摸天天爽国产一区| 国产一区二区在线观看免费| 欧美在线观看视频在线| 国产欧美日韩精品在线| 日韩不卡一二三区| 色欧美日韩亚洲| 国产精品伦理在线| 国产在线精品一区二区不卡了| 日本福利一区二区| 国产精品三级av| 国产专区欧美精品| 日韩视频国产视频| 日韩一区精品视频| 欧美日韩一区二区三区不卡| 国产欧美久久久精品影院| 久久国产生活片100| 欧美三区在线观看| 亚洲在线视频免费观看| 99视频一区二区三区| 国产精品久久久久aaaa樱花| 国产成人精品1024| www激情久久| 国产一区二区成人久久免费影院| 日韩一区二区三区视频在线| 亚洲成av人综合在线观看| 欧美性大战久久| 亚洲成在线观看| 欧美日韩专区在线| 视频精品一区二区| 欧美一区二区美女| 久久99精品久久久久久动态图| 欧美狂野另类xxxxoooo| 午夜电影一区二区三区| 欧美性受极品xxxx喷水| 亚洲综合久久久久| 欧美日韩国产一二三| 亚洲国产精品久久一线不卡| 欧美性感一类影片在线播放| 亚洲一二三区在线观看| 51久久夜色精品国产麻豆| 蜜桃传媒麻豆第一区在线观看| 日韩午夜精品视频| 国产精品一卡二| 中文字幕在线不卡一区| 91成人在线精品| 日产国产高清一区二区三区| 日韩一区二区三区av| 国产精品一二一区| 亚洲三级免费电影| 欧美美女视频在线观看| 国产精品小仙女| 久久精品视频一区二区三区| 床上的激情91.| 亚洲国产精品久久一线不卡| 日韩欧美一二区| 成人黄色片在线观看| 一区二区三区在线视频免费观看 | 亚洲一区二区三区免费视频| 欧美疯狂做受xxxx富婆| 国产美女在线观看一区| 亚洲图片激情小说| 在线播放日韩导航| 不卡一卡二卡三乱码免费网站| 亚洲国产cao| 中文字幕第一区二区| 欧美日韩www| 福利91精品一区二区三区| 亚洲国产中文字幕在线视频综合| 久久午夜色播影院免费高清| 色播五月激情综合网| 国产一区二区三区四区五区入口 | 一区二区免费看| 久久婷婷国产综合国色天香| 在线看国产一区二区| 国产成人综合网| 久久不见久久见中文字幕免费| 亚洲精品精品亚洲| 国产欧美精品一区| 欧美一区二区三区小说| 91久久精品一区二区| 丁香婷婷综合激情五月色| 日日摸夜夜添夜夜添精品视频 | 一区二区三区蜜桃网| 国产亚洲欧美在线| 欧美电影精品一区二区 | 人人精品人人爱| 亚洲激情图片qvod| 1000精品久久久久久久久| 久久久国际精品| 日韩欧美视频一区| 91精品国产综合久久婷婷香蕉| 在线免费不卡视频| 91久久精品一区二区三区| 成人国产电影网| 国产成人鲁色资源国产91色综 | 91成人免费电影| 成人av影视在线观看| 国产麻豆成人精品| 国产成人综合自拍| 丁香婷婷综合五月| 粉嫩av一区二区三区在线播放| 久久国产麻豆精品| 久久黄色级2电影| 激情综合五月婷婷| 国产米奇在线777精品观看| 精品一区二区免费视频| 毛片基地黄久久久久久天堂| 日本色综合中文字幕| 日本欧美肥老太交大片| 免费在线看一区| 久久er99热精品一区二区| 狠狠色丁香久久婷婷综合丁香| 韩国视频一区二区| 国产精品 日产精品 欧美精品| 国产精品18久久久久| 国产mv日韩mv欧美| 97se亚洲国产综合自在线| 91亚洲永久精品| 精品视频免费看| 精品电影一区二区| 中文字幕在线观看一区二区| 亚洲色图欧美偷拍| 午夜激情综合网| 国产一区二区三区四区五区美女| 国产91在线看| 在线视频欧美精品| 欧美v日韩v国产v| 中文字幕欧美区| 亚洲成av人片一区二区| 国内精品伊人久久久久av一坑| 国产麻豆成人传媒免费观看| bt欧美亚洲午夜电影天堂| 欧美性欧美巨大黑白大战| 精品国产一区二区三区不卡| 国产精品色婷婷| 午夜视频一区在线观看| 国产高清精品网站| 色噜噜久久综合| 亚洲精品一线二线三线无人区| 国产精品婷婷午夜在线观看| 丝袜亚洲另类欧美综合| 国产在线一区观看| 欧美视频在线播放| 中文字幕乱码亚洲精品一区| 亚洲香肠在线观看| 高清不卡在线观看| 欧美一区二区三区电影| 中文字幕日本不卡| 紧缚奴在线一区二区三区| 91成人在线精品| 亚洲国产精品精华液ab| 五月婷婷另类国产| 色菇凉天天综合网| 国产女主播一区| 黄页视频在线91|