婷婷综合国产,91蜜桃婷婷狠狠久久综合9色 ,九九九九九精品,国产综合av

主頁 > 知識庫 > windowns使用PySpark環境配置和基本操作

windowns使用PySpark環境配置和基本操作

熱門標簽:公司電話機器人 唐山智能外呼系統一般多少錢 海南400電話如何申請 廣告地圖標注app 白銀外呼系統 騰訊外呼線路 陜西金融外呼系統 哈爾濱ai外呼系統定制 激戰2地圖標注

下載依賴

首先需要下載hadoop和spark,解壓,然后設置環境變量。
hadoop清華源下載
spark清華源下載

HADOOP_HOME => /path/hadoop
SPARK_HOME => /path/spark

安裝pyspark。

pip install pyspark

基本使用

可以在shell終端,輸入pyspark,有如下回顯:

輸入以下指令進行測試,并創建SparkContext,SparkContext是任何spark功能的入口點。

>>> from pyspark import SparkContext
>>> sc = SparkContext("local", "First App")

如果以上不會報錯,恭喜可以開始使用pyspark編寫代碼了。
不過,我這里使用IDE來編寫代碼,首先我們先在終端執行以下代碼關閉SparkContext。

>>> sc.stop()

下面使用pycharm編寫代碼,如果修改了環境變量需要先重啟pycharm。
在pycharm運行如下程序,程序會起本地模式的spark計算引擎,通過spark統計abc.txt文件中a和b出現行的數量,文件路徑需要自己指定。

from pyspark import SparkContext

sc = SparkContext("local", "First App")
logFile = "abc.txt"
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Line with a:%i,line with b:%i" % (numAs, numBs))

運行結果如下:

20/03/11 16:15:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/03/11 16:15:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Line with a:3,line with b:1

這里說一下,同樣的工作使用python可以做,spark也可以做,使用spark主要是為了高效的進行分布式計算。
戳pyspark教程
戳spark教程

RDD

RDD代表Resilient Distributed Dataset,它們是在多個節點上運行和操作以在集群上進行并行處理的元素,RDD是spark計算的操作對象。
一般,我們先使用數據創建RDD,然后對RDD進行操作。
對RDD操作有兩種方法:
Transformation(轉換) - 這些操作應用于RDD以創建新的RDD。例如filter,groupBy和map。
Action(操作) - 這些是應用于RDD的操作,它指示Spark執行計算并將結果發送回驅動程序,例如count,collect等。

創建RDD

parallelize是從列表創建RDD,先看一個例子:

from pyspark import SparkContext


sc = SparkContext("local", "count app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"
     ])
print(words)

結果中我們得到一個對象,就是我們列表數據的RDD對象,spark之后可以對他進行操作。

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

Count

count方法返回RDD中的元素個數。

from pyspark import SparkContext


sc = SparkContext("local", "count app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"
     ])
print(words)

counts = words.count()
print("Number of elements in RDD -> %i" % counts)

返回結果:

Number of elements in RDD -> 8

Collect

collect返回RDD中的所有元素。

from pyspark import SparkContext


sc = SparkContext("local", "collect app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"
     ])
coll = words.collect()
print("Elements in RDD -> %s" % coll)

返回結果:

Elements in RDD -> ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

foreach

每個元素會使用foreach內的函數進行處理,但是不會返回任何對象。
下面的程序中,我們定義的一個累加器accumulator,用于儲存在foreach執行過程中的值。

from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")

accum = sc.accumulator(0)
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)


def increment_counter(x):
    print(x)
    accum.add(x)
 return 0

s = rdd.foreach(increment_counter)
print(s)  # None
print("Counter value: ", accum)

返回結果:

None
Counter value:  15

filter

返回一個包含元素的新RDD,滿足過濾器的條件。

from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print("Fitered RDD -> %s" % (filtered))

 

Fitered RDD -> ['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

也可以改寫成這樣:

from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"]
)


def g(x):
    for i in x:
        if "spark" in x:
            return i

words_filter = words.filter(g)
filtered = words_filter.collect()
print("Fitered RDD -> %s" % (filtered))

map

將函數應用于RDD中的每個元素并返回新的RDD。

from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1, "_{}".format(x)))
mapping = words_map.collect()
print("Key value pair -> %s" % (mapping))

返回結果:

Key value pair -> [('scala', 1, '_scala'), ('java', 1, '_java'), ('hadoop', 1, '_hadoop'), ('spark', 1, '_spark'), ('akka', 1, '_akka'), ('spark vs hadoop', 1, '_spark vs hadoop'), ('pyspark', 1, '_pyspark'), ('pyspark and spark', 1, '_pyspark and spark')]

Reduce

執行指定的可交換和關聯二元操作后,然后返回RDD中的元素。

from pyspark import SparkContext
from operator import add


sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print("Adding all the elements -> %i" % (adding))

 這里的add是python內置的函數,可以使用ide查看:

def add(a, b):
    "Same as a + b."
    return a + b

reduce會依次對元素相加,相加后的結果加上其他元素,最后返回結果(RDD中的元素)。

Adding all the elements -> 15

Join

返回RDD,包含兩者同時匹配的鍵,鍵包含對應的所有元素。

from pyspark import SparkContext


sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4), ("python", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
print("x =>", x.collect())
print("y =>", y.collect())
joined = x.join(y)
final = joined.collect()
print( "Join RDD -> %s" % (final))

返回結果:

x => [('spark', 1), ('hadoop', 4), ('python', 4)]
y => [('spark', 2), ('hadoop', 5)]
Join RDD -> [('hadoop', (4, 5)), ('spark', (1, 2))]

到此這篇關于windowns使用PySpark環境配置和基本操作的文章就介紹到這了,更多相關PySpark環境配置 內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持腳本之家!

您可能感興趣的文章:
  • PyCharm搭建Spark開發環境實現第一個pyspark程序
  • PyCharm+PySpark遠程調試的環境配置的方法

標簽:惠州 鷹潭 上海 黔西 常德 四川 黑龍江 益陽

巨人網絡通訊聲明:本文標題《windowns使用PySpark環境配置和基本操作》,本文關鍵詞  windowns,使用,PySpark,環境,;如發現本文內容存在版權問題,煩請提供相關信息告之我們,我們將及時溝通與處理。本站內容系統采集于網絡,涉及言論、版權與本站無關。
  • 相關文章
  • 下面列出與本文章《windowns使用PySpark環境配置和基本操作》相關的同類信息!
  • 本頁收集關于windowns使用PySpark環境配置和基本操作的相關信息資訊供網民參考!
  • 推薦文章
    婷婷综合国产,91蜜桃婷婷狠狠久久综合9色 ,九九九九九精品,国产综合av
    高清久久久久久| 激情成人午夜视频| 亚洲一区二区三区三| 久久国产成人午夜av影院| 日本道精品一区二区三区| 久久精品人人做人人综合| 蜜臀久久99精品久久久画质超高清| 成年人网站91| 亚洲视频在线观看三级| 99久久精品国产一区| 国产精品国产三级国产aⅴ无密码| 国产成人啪免费观看软件| 久久亚洲精华国产精华液 | 天堂久久一区二区三区| 欧美色区777第一页| 天天综合天天综合色| 欧美一区二区在线观看| 蜜芽一区二区三区| 久久综合色天天久久综合图片| 国产一区二区免费在线| 中文欧美字幕免费| 91免费看视频| 日本欧美韩国一区三区| 2020国产精品自拍| 不卡av免费在线观看| 伊人婷婷欧美激情| 久久久久久影视| 成人av资源下载| 亚洲尤物视频在线| 日韩女优av电影在线观看| 国产精品中文有码| 亚洲精品菠萝久久久久久久| 6080日韩午夜伦伦午夜伦| 久久99精品国产| 国产精品区一区二区三区| 欧美日韩一区二区三区免费看| 久久精品国产第一区二区三区| 久久久久久久久久久久久久久99 | 欧洲av在线精品| 精品一区二区三区免费视频| 国产精品免费视频一区| 欧美一区二区三区四区久久| 成人午夜电影网站| 青青草视频一区| 一区二区视频免费在线观看| 久久综合久久鬼色中文字| 在线观看www91| 国产成人综合自拍| 强制捆绑调教一区二区| 亚洲一区在线观看免费 | 中文字幕一区在线观看视频| 日韩视频在线一区二区| 在线中文字幕一区| 成人免费视频一区| 蜜桃av噜噜一区| 亚洲电影在线免费观看| 亚洲人123区| 亚洲欧美在线视频| 欧美激情综合在线| 久久九九久精品国产免费直播| 9191精品国产综合久久久久久| 91麻豆免费在线观看| 99久久免费视频.com| 国产成人自拍高清视频在线免费播放| 日韩在线卡一卡二| 日韩电影一二三区| 毛片不卡一区二区| 精品一区二区免费看| 日韩一区精品视频| 另类综合日韩欧美亚洲| 免费不卡在线观看| 开心九九激情九九欧美日韩精美视频电影| 亚洲电影一区二区三区| 午夜精品国产更新| 午夜精品爽啪视频| 天堂va蜜桃一区二区三区| 亚洲综合激情小说| 一区二区三区在线观看欧美| 1000精品久久久久久久久| 精品一区二区三区久久| 国产精品一区二区久激情瑜伽 | 亚洲精品亚洲人成人网 | 亚洲123区在线观看| 国产蜜臀av在线一区二区三区| 国产 欧美在线| 93久久精品日日躁夜夜躁欧美| 久久成人免费日本黄色| 国产精品99久| 国产乱色国产精品免费视频| a在线欧美一区| 成人晚上爱看视频| 欧美亚洲动漫精品| 中国色在线观看另类| 亚洲欧洲精品一区二区精品久久久 | 在线视频一区二区三区| 色狠狠一区二区三区香蕉| 成人激情黄色小说| 色婷婷精品大视频在线蜜桃视频| 成人激情动漫在线观看| 卡一卡二国产精品 | 日韩精品电影在线观看| 亚洲精品精品亚洲| 亚洲精品伦理在线| 亚洲成人福利片| 中文字幕乱码日本亚洲一区二区| 欧美日韩视频在线第一区| 欧美揉bbbbb揉bbbbb| 久久久久久久久久久99999| 2020国产精品| 香蕉av福利精品导航| 日本中文字幕一区二区有限公司| 国产精品白丝jk白祙喷水网站| 国产精品中文字幕一区二区三区| 在线一区二区三区做爰视频网站| 欧美在线观看你懂的| 欧美韩国日本一区| 亚洲欧洲精品一区二区三区| 久久国产福利国产秒拍| 成人性生交大片免费看中文网站| 中文字幕日本不卡| 亚洲人成网站影音先锋播放| 国产精品每日更新在线播放网址| 亚洲日本在线天堂| 国产亚洲一区二区三区四区| 国产女主播视频一区二区| 亚洲成人1区2区| 国产乱人伦偷精品视频不卡| 88在线观看91蜜桃国自产| 日韩欧美久久一区| 亚洲成人先锋电影| 国产91精品久久久久久久网曝门| 欧美一区二区三区精品| 欧美经典一区二区三区| 久久99久国产精品黄毛片色诱| 国产成人精品www牛牛影视| 欧美精品少妇一区二区三区| 国产性天天综合网| 国产精品一区二区你懂的| 欧美日韩日日摸| 偷窥少妇高潮呻吟av久久免费| 美女在线观看视频一区二区| 欧美一三区三区四区免费在线看| 中文子幕无线码一区tr| 国产一区二区久久| 91麻豆精品国产91久久久使用方法 | 国产成人激情av| 国产精品一区二区男女羞羞无遮挡| 91麻豆精品国产| 精品国产乱码久久久久久久久| 秋霞av亚洲一区二区三| 成人久久18免费网站麻豆| 中文字幕二三区不卡| 亚洲五码中文字幕| 欧美三级中文字| 久久先锋影音av鲁色资源网| 欧美三级在线视频| 这里只有精品视频在线观看| 亚洲成在线观看| 国产精品一区二区在线播放 | 亚洲一区在线电影| 欧美美女视频在线观看| 自拍偷拍国产精品| 欧美性色黄大片手机版| 亚洲视频一区二区在线观看| 欧日韩精品视频| 亚洲蜜桃精久久久久久久| 欧美日韩久久一区二区| 亚洲自拍与偷拍| 日韩欧美在线网站| 美女在线观看视频一区二区| 国产精品网曝门| 国产精品亚洲人在线观看| 国产日韩欧美综合一区| 国产精品一区二区黑丝| 中文字幕国产一区二区| 国产成人一区在线| 国产精品久久看| 欧美高清激情brazzers| 视频一区在线视频| 中文幕一区二区三区久久蜜桃| 成人av免费观看| 日韩av在线免费观看不卡| 欧美日韩国产综合视频在线观看| 国内精品写真在线观看| 国产亚洲综合av| 欧美一级日韩免费不卡| 激情欧美一区二区| 亚洲高清不卡在线| 精品日韩欧美在线| 欧美日韩一区不卡| 国产精品丝袜黑色高跟| 欧美精品久久天天躁| 成人国产精品视频| 亚洲最大成人综合| 中文字幕一区不卡| 欧美人狂配大交3d怪物一区| 成人午夜av影视| 亚洲一区自拍偷拍| 亚洲日本一区二区三区| 4438成人网|