婷婷综合国产,91蜜桃婷婷狠狠久久综合9色 ,九九九九九精品,国产综合av

主頁(yè) > 知識(shí)庫(kù) > Spark SQL數(shù)據(jù)加載和保存實(shí)例講解

Spark SQL數(shù)據(jù)加載和保存實(shí)例講解

熱門(mén)標(biāo)簽:互聯(lián)網(wǎng)電話外呼系統(tǒng) 電話機(jī)器人怎么代理商 400電話辦理泰安 家庭農(nóng)場(chǎng)地圖標(biāo)注名稱怎樣起名 安卡拉地圖標(biāo)注app 我要地圖標(biāo)注數(shù)量有限制嗎 千呼電話機(jī)器人可以試用嗎 電銷(xiāo)需要外呼系統(tǒng)嗎 零成本地圖標(biāo)注賺錢(qián)

一、前置知識(shí)詳解
Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作,
Load:可以創(chuàng)建DataFrame,
Save:把DataFrame中的數(shù)據(jù)保存到文件或者說(shuō)與具體的格式來(lái)指明我們要讀取的文件的類(lèi)型以及與具體的格式來(lái)指出我們要輸出的文件是什么類(lèi)型。

二、Spark SQL讀寫(xiě)數(shù)據(jù)代碼實(shí)戰(zhàn)

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;

public class SparkSQLLoadSaveOps {
 public static void main(String[] args) {
  SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLLoadSaveOps");
  JavaSparkContext sc = new JavaSparkContext(conf);
  SQLContext = new SQLContext(sc);
  /**
   * read()是DataFrameReader類(lèi)型,load可以將數(shù)據(jù)讀取出來(lái)
   */
  DataFrame peopleDF = sqlContext.read().format("json").load("E:\\Spark\\Sparkinstanll_package\\Big_Data_Software\\spark-1.6.0-bin-hadoop2.6\\examples\\src\\main\\resources\\people.json");

  /**
   * 直接對(duì)DataFrame進(jìn)行操作
   * Json: 是一種自解釋的格式,讀取Json的時(shí)候怎么判斷其是什么格式?
   * 通過(guò)掃描整個(gè)Json。掃描之后才會(huì)知道元數(shù)據(jù)
   */
  //通過(guò)mode來(lái)指定輸出文件的是append。創(chuàng)建新文件來(lái)追加文件
 peopleDF.select("name").write().mode(SaveMode.Append).save("E:\\personNames");
 }
}

讀取過(guò)程源碼分析如下:
1. read方法返回DataFrameReader,用于讀取數(shù)據(jù)。

/**
 * :: Experimental ::
 * Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]].
 * {{{
 *  sqlContext.read.parquet("/path/to/file.parquet")
 *  sqlContext.read.schema(schema).json("/path/to/file.json")
 * }}}
 *
 * @group genericdata
 * @since 1.4.0
 */
@Experimental
//創(chuàng)建DataFrameReader實(shí)例,獲得了DataFrameReader引用
def read: DataFrameReader = new DataFrameReader(this)

2.  然后再調(diào)用DataFrameReader類(lèi)中的format,指出讀取文件的格式。

/**
 * Specifies the input data source format.
 *
 * @since 1.4.0
 */
def format(source: String): DataFrameReader = {
 this.source = source
 this
}

3.  通過(guò)DtaFrameReader中l(wèi)oad方法通過(guò)路徑把傳入過(guò)來(lái)的輸入變成DataFrame。

/**
 * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
 * a local or distributed file system).
 *
 * @since 1.4.0
 */
// TODO: Remove this one in Spark 2.0.
def load(path: String): DataFrame = {
 option("path", path).load()
}

至此,數(shù)據(jù)的讀取工作就完成了,下面就對(duì)DataFrame進(jìn)行操作。
下面就是寫(xiě)操作!!!

1. 調(diào)用DataFrame中select函數(shù)進(jìn)行對(duì)列篩選

/**
 * Selects a set of columns. This is a variant of `select` that can only select
 * existing columns using column names (i.e. cannot construct expressions).
 *
 * {{{
 *  // The following two are equivalent:
 *  df.select("colA", "colB")
 *  df.select($"colA", $"colB")
 * }}}
 * @group dfops
 * @since 1.3.0
 */
@scala.annotation.varargs
def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)

2.  然后通過(guò)write將結(jié)果寫(xiě)入到外部存儲(chǔ)系統(tǒng)中。

/**
 * :: Experimental ::
 * Interface for saving the content of the [[DataFrame]] out into external storage.
 *
 * @group output
 * @since 1.4.0
 */
@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)

3.   在保持文件的時(shí)候mode指定追加文件的方式

/**
 * Specifies the behavior when data or table already exists. Options include:
// Overwrite是覆蓋
 *  - `SaveMode.Overwrite`: overwrite the existing data.
//創(chuàng)建新的文件,然后追加
 *  - `SaveMode.Append`: append the data.
 *  - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
 *  - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
 *
 * @since 1.4.0
 */
def mode(saveMode: SaveMode): DataFrameWriter = {
 this.mode = saveMode
 this
}

4.   最后,save()方法觸發(fā)action,將文件輸出到指定文件中。

/**
 * Saves the content of the [[DataFrame]] at the specified path.
 *
 * @since 1.4.0
 */
def save(path: String): Unit = {
 this.extraOptions += ("path" -> path)
 save()
}

三、Spark SQL讀寫(xiě)整個(gè)流程圖如下

四、對(duì)于流程中部分函數(shù)源碼詳解

DataFrameReader.Load()

1. Load()返回DataFrame類(lèi)型的數(shù)據(jù)集合,使用的數(shù)據(jù)是從默認(rèn)的路徑讀取。

/**
 * Returns the dataset stored at path as a DataFrame,
 * using the default data source configured by spark.sql.sources.default.
 *
 * @group genericdata
 * @deprecated As of 1.4.0, replaced by `read().load(path)`. This will be removed in Spark 2.0.
 */
@deprecated("Use read.load(path). This will be removed in Spark 2.0.", "1.4.0")
def load(path: String): DataFrame = {
//此時(shí)的read就是DataFrameReader
 read.load(path)
}

2.  追蹤load源碼進(jìn)去,源碼如下:
在DataFrameReader中的方法。Load()通過(guò)路徑把輸入傳進(jìn)來(lái)變成一個(gè)DataFrame。

/** 
 * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
 * a local or distributed file system).
 *
 * @since 1.4.0
 */
// TODO: Remove this one in Spark 2.0.
def load(path: String): DataFrame = {
 option("path", path).load()
}

3.  追蹤load源碼如下:

/**
 * Loads input in as a [[DataFrame]], for data sources that don't require a path (e.g. external
 * key-value stores).
 *
 * @since 1.4.0
 */
def load(): DataFrame = {
//對(duì)傳入的Source進(jìn)行解析
 val resolved = ResolvedDataSource(
  sqlContext,
  userSpecifiedSchema = userSpecifiedSchema,
  partitionColumns = Array.empty[String],
  provider = source,
  options = extraOptions.toMap)
 DataFrame(sqlContext, LogicalRelation(resolved.relation))
}

DataFrameReader.format()

1. Format:具體指定文件格式,這就獲得一個(gè)巨大的啟示是:如果是Json文件格式可以保持為Parquet等此類(lèi)操作。
Spark SQL在讀取文件的時(shí)候可以指定讀取文件的類(lèi)型。例如,Json,Parquet.

/**
 * Specifies the input data source format.Built-in options include “parquet”,”json”,etc.
 *
 * @since 1.4.0
 */
def format(source: String): DataFrameReader = {
 this.source = source //FileType
 this
}

DataFrame.write()

1. 創(chuàng)建DataFrameWriter實(shí)例

/**
 * :: Experimental ::
 * Interface for saving the content of the [[DataFrame]] out into external storage.
 *
 * @group output
 * @since 1.4.0
 */
@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)
1

2.  追蹤DataFrameWriter源碼如下:
以DataFrame的方式向外部存儲(chǔ)系統(tǒng)中寫(xiě)入數(shù)據(jù)。

/**
 * :: Experimental ::
 * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems,
 * key-value stores, etc). Use [[DataFrame.write]] to access this.
 *
 * @since 1.4.0
 */
@Experimental
final class DataFrameWriter private[sql](df: DataFrame) {

DataFrameWriter.mode()

1. Overwrite是覆蓋,之前寫(xiě)的數(shù)據(jù)全都被覆蓋了。
Append:是追加,對(duì)于普通文件是在一個(gè)文件中進(jìn)行追加,但是對(duì)于parquet格式的文件則創(chuàng)建新的文件進(jìn)行追加。

/**
 * Specifies the behavior when data or table already exists. Options include:
 *  - `SaveMode.Overwrite`: overwrite the existing data.
 *  - `SaveMode.Append`: append the data.
 *  - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
//默認(rèn)操作
 *  - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
 *
 * @since 1.4.0
 */
def mode(saveMode: SaveMode): DataFrameWriter = {
 this.mode = saveMode
 this
}

2.  通過(guò)模式匹配接收外部參數(shù)

/**
 * Specifies the behavior when data or table already exists. Options include:
 *  - `overwrite`: overwrite the existing data.
 *  - `append`: append the data.
 *  - `ignore`: ignore the operation (i.e. no-op).
 *  - `error`: default option, throw an exception at runtime.
 *
 * @since 1.4.0
 */
def mode(saveMode: String): DataFrameWriter = {
 this.mode = saveMode.toLowerCase match {
  case "overwrite" => SaveMode.Overwrite
  case "append" => SaveMode.Append
  case "ignore" => SaveMode.Ignore
  case "error" | "default" => SaveMode.ErrorIfExists
  case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
   "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.")
 }
 this
}

DataFrameWriter.save()

1. save將結(jié)果保存?zhèn)魅氲穆窂健?/p>

/**
 * Saves the content of the [[DataFrame]] at the specified path.
 *
 * @since 1.4.0
 */
def save(path: String): Unit = {
 this.extraOptions += ("path" -> path)
 save()
}

2.  追蹤save方法。

/**
 * Saves the content of the [[DataFrame]] as the specified table.
 *
 * @since 1.4.0
 */
def save(): Unit = {
 ResolvedDataSource(
  df.sqlContext,
  source,
  partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
  mode,
  extraOptions.toMap,
  df)
}

3.  其中source是SQLConf的defaultDataSourceName
private var source: String = df.sqlContext.conf.defaultDataSourceName
其中DEFAULT_DATA_SOURCE_NAME默認(rèn)參數(shù)是parquet。

// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
 defaultValue = Some("org.apache.spark.sql.parquet"),
 doc = "The default data source to use in input/output.")

DataFrame.scala中部分函數(shù)詳解:

1. toDF函數(shù)是將RDD轉(zhuǎn)換成DataFrame

/**
 * Returns the object itself.
 * @group basic
 * @since 1.3.0
 */
// This is declared with parentheses to prevent the Scala compiler from treating
// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
def toDF(): DataFrame = this

2.  show()方法:將結(jié)果顯示出來(lái)

/**
 * Displays the [[DataFrame]] in a tabular form. For example:
 * {{{
 *  year month AVG('Adj Close) MAX('Adj Close)
 *  1980 12  0.503218    0.595103
 *  1981 01  0.523289    0.570307
 *  1982 02  0.436504    0.475256
 *  1983 03  0.410516    0.442194
 *  1984 04  0.450090    0.483521
 * }}}
 * @param numRows Number of rows to show
 * @param truncate Whether truncate long strings. If true, strings more than 20 characters will
 *       be truncated and all cells will be aligned right
 *
 * @group action
 * @since 1.5.0
 */
// scalastyle:off println
def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))
// scalastyle:on println

追蹤showString源碼如下:showString中觸發(fā)action收集數(shù)據(jù)。

/**
 * Compose the string representing rows for output
 * @param _numRows Number of rows to show
 * @param truncate Whether truncate long strings and align cells right
 */
private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = {
 val numRows = _numRows.max(0)
 val sb = new StringBuilder
 val takeResult = take(numRows + 1)
 val hasMoreData = takeResult.length > numRows
 val data = takeResult.take(numRows)
 val numCols = schema.fieldNames.length

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

您可能感興趣的文章:
  • Spark SQL常見(jiàn)4種數(shù)據(jù)源詳解
  • Spark學(xué)習(xí)筆記之Spark SQL的具體使用
  • pyspark.sql.DataFrame與pandas.DataFrame之間的相互轉(zhuǎn)換實(shí)例
  • 淺談DataFrame和SparkSql取值誤區(qū)
  • Spark SQL操作JSON字段的小技巧
  • Spark SQL的整體實(shí)現(xiàn)邏輯解析

標(biāo)簽:池州 黃山 來(lái)賓 文山 大同 東營(yíng) 新鄉(xiāng) 濱州

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《Spark SQL數(shù)據(jù)加載和保存實(shí)例講解》,本文關(guān)鍵詞  Spark,SQL,數(shù)據(jù),加載,和,保存,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問(wèn)題,煩請(qǐng)?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無(wú)關(guān)。
  • 相關(guān)文章
  • 下面列出與本文章《Spark SQL數(shù)據(jù)加載和保存實(shí)例講解》相關(guān)的同類(lèi)信息!
  • 本頁(yè)收集關(guān)于Spark SQL數(shù)據(jù)加載和保存實(shí)例講解的相關(guān)信息資訊供網(wǎng)民參考!
  • 推薦文章
    婷婷综合国产,91蜜桃婷婷狠狠久久综合9色 ,九九九九九精品,国产综合av
    中文字幕制服丝袜成人av | 亚洲图片你懂的| 一区二区欧美国产| 丁香六月久久综合狠狠色| 在线不卡的av| 亚洲一区二区三区美女| 精品视频1区2区| 视频一区二区三区入口| kk眼镜猥琐国模调教系列一区二区| 51精品视频一区二区三区| 久久成人免费电影| 国产精品国产自产拍高清av| 精品国产一区二区三区av性色| 亚洲成人免费av| 欧美三级中文字| 欧美日韩一区二区在线视频| 色88888久久久久久影院野外| 一区二区三区电影在线播| 国产欧美一区二区三区沐欲| 波多野结衣中文字幕一区 | 麻豆久久久久久久| |精品福利一区二区三区| 91精品欧美久久久久久动漫| 欧美日韩国产乱码电影| 国产91露脸合集magnet| 麻豆视频观看网址久久| 亚洲激情图片qvod| 国产在线播放一区三区四| 国产精品免费视频一区| 日本乱码高清不卡字幕| www久久久久| 欧美日韩视频在线观看一区二区三区| 91久久国产综合久久| 欧美一区二区在线免费观看| 精品视频一区三区九区| 日韩精品专区在线影院重磅| 欧美日韩成人综合天天影院 | 大美女一区二区三区| 欧美日韩免费视频| 国产精品私房写真福利视频| 日韩国产成人精品| 亚洲国产精品天堂| 一区二区三区四区av| 久久99热国产| 51精品国自产在线| 亚洲一级二级三级在线免费观看| 国产精品自拍av| 国产不卡一区视频| 精品欧美一区二区久久| 欧美一区三区四区| 性感美女极品91精品| 在线一区二区观看| 亚洲国产欧美在线| 欧美日韩亚洲另类| 亚洲成人免费视| 一本大道久久a久久综合| 欧美日韩亚洲国产综合| 日本不卡一二三| 97久久超碰国产精品| 欧美一个色资源| 伊人色综合久久天天人手人婷| 色哟哟国产精品免费观看| 一区二区激情视频| 欧美在线观看禁18| 中文字幕va一区二区三区| 亚洲三级理论片| 日韩视频国产视频| 日韩午夜三级在线| 婷婷激情综合网| 国产一区二区免费看| 国产精品久久久久久亚洲毛片| 99re6这里只有精品视频在线观看 99re8在线精品视频免费播放 | 婷婷六月综合网| 久久免费偷拍视频| 亚洲1区2区3区4区| 久久先锋资源网| 欧美色图免费看| 国产91高潮流白浆在线麻豆| 亚洲欧美视频一区| 在线一区二区三区做爰视频网站| 青青国产91久久久久久| 国产精品九色蝌蚪自拍| 日韩三级精品电影久久久| 91影视在线播放| 亚洲精品一线二线三线| 色欧美日韩亚洲| 不卡在线观看av| 国产精品综合久久| 激情欧美一区二区| 国产欧美精品一区| 久久综合久久综合九色| 欧美一区二区三区四区久久| 欧美午夜精品久久久| 久久黄色级2电影| 奇米影视在线99精品| 午夜电影一区二区三区| 午夜国产精品影院在线观看| 亚洲第四色夜色| 人人精品人人爱| 狠狠色丁香婷婷综合久久片| 麻豆精品在线观看| 国产精品性做久久久久久| 国产大陆精品国产| 日产国产高清一区二区三区| 调教+趴+乳夹+国产+精品| 蜜桃精品视频在线观看| 国产成人一区二区精品非洲| 国产成人8x视频一区二区 | 国产一区二区三区免费在线观看| 亚洲二区在线视频| 久久精品国产久精国产| 成人aa视频在线观看| 7777精品伊人久久久大香线蕉的 | 精品99999| 亚洲成年人网站在线观看| 国产在线精品一区二区不卡了| 成人夜色视频网站在线观看| 色婷婷久久一区二区三区麻豆| 成人网男人的天堂| 欧美日韩一区成人| 久久综合五月天婷婷伊人| 国产精品色婷婷久久58| 美女视频黄a大片欧美| 亚洲欧美日韩中文字幕一区二区三区| 国产精品福利av| 首页国产欧美久久| 国产精品一区专区| 欧美色网站导航| 亚洲色图一区二区三区| 日韩av在线播放中文字幕| 国产成人精品免费网站| 欧美一区二区三区在线观看视频| 色哟哟国产精品免费观看| 久久久久久久久伊人| 婷婷中文字幕综合| 91婷婷韩国欧美一区二区| 久久久久亚洲蜜桃| 久久久www成人免费无遮挡大片| 国产精品久久久久久一区二区三区| 另类小说一区二区三区| 在线不卡a资源高清| 精品国产乱子伦一区| 午夜电影久久久| 777精品伊人久久久久大香线蕉| 亚洲午夜电影网| 欧美色精品在线视频| 亚洲男人天堂av网| 色呦呦网站一区| 亚洲成人精品一区二区| 欧美日韩mp4| 免费看日韩a级影片| 久久综合给合久久狠狠狠97色69| 日本欧洲一区二区| 久久嫩草精品久久久精品| 在线一区二区视频| 亚洲日韩欧美一区二区在线| 成人久久久精品乱码一区二区三区 | 欧美亚洲一区三区| 日本中文字幕一区二区有限公司| 91精品在线观看入口| 国产精品88888| 亚洲国产成人tv| 久久综合色天天久久综合图片| 久久精品国产色蜜蜜麻豆| 亚洲欧美自拍偷拍色图| 欧美精品一区二区三区很污很色的| 91久久精品一区二区三区| 国产精品一区免费在线观看| 日韩精品欧美精品| 久久久久9999亚洲精品| 精品视频在线免费看| 波多野洁衣一区| 捆绑紧缚一区二区三区视频| 中文字幕中文字幕一区二区| 日韩欧美一级二级| 亚洲国产岛国毛片在线| 欧美日韩高清影院| 日本久久电影网| 日产国产高清一区二区三区 | 丝袜美腿亚洲综合| 一级女性全黄久久生活片免费| 亚洲国产成人午夜在线一区 | 久久激情综合网| 一个色妞综合视频在线观看| 久久久99精品久久| 精品乱码亚洲一区二区不卡| 日韩女同互慰一区二区| 日韩欧美国产麻豆| 精品播放一区二区| 国产欧美日韩亚州综合| 国产亚洲自拍一区| 国产亚洲成年网址在线观看| 国产欧美一区二区精品秋霞影院| 国产日韩欧美激情| 欧美sm极限捆绑bd| 国产亚洲成av人在线观看导航| 欧美色视频一区| 日韩一区二区视频| 综合久久一区二区三区| 偷偷要91色婷婷|