Spark Python操作命令三

发布时间:2020-07-05 22:38:10 作者:zjy1002261870
来源:网络 阅读:408

12 数据格式

  1. [[u'3', u'5'], [u'4', u'6'], [u'4', u'5'], [u'4', u'2']] 拆分或截取的原始数据, 可以通过 map 中的 x[0], x[1] 来获取对应列的数据
      可以通过 map 来转换为key-value 数据格式 例如: df3 = df2.map(lambda x: (x[0], x[1]))

  2. key-value 数据格式
      [(u'3', u'5'), (u'4', u'6'), (u'4', u'5'), (u'4', u'2')] 中每一个() 表示一组数据, 第一个表示key 第二个表示value
    3)PipelinedRDD 类型表示 key-value形式数据
    13 RDD类型转换
    userRdd = sc.textFile("D:\data\people.json")
    userRdd = userRdd.map(lambda x: x.split(" "))

    userRows = userRdd.map(lambda p:
                            Row(
                                userName = p[0],
                                userAge = int(p[1]),
                                userAdd = p[2],
                                userSalary = int(p[3])
                                )
                            )
    print(userRows.take(4))

    结果: [Row(userAdd='shanghai', userAge=20, userName='zhangsan', userSalary=13), Row(userAdd='beijin', userAge=30, userName='lisi', userSalary=15)]

    2) 创建 DataFrame
      userDF = sqlContext.createDataFrame(userRows)

    1. 通过sql 语句查询字段
      from pyspark.conf import SparkConf
      from pyspark.sql.session import SparkSession
      from pyspark.sql.types import Row

if name == 'main':
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

sc = spark.sparkContext

rd = sc.textFile("D:\data\people.txt")
rd2 = rd.map(lambda x:x.split(","))
people = rd2.map(lambda p: Row(name=p[0], age=int(p[1])))

peopleDF = spark.createDataFrame(people)
peopleDF.createOrReplaceTempView("people")
teenagers = spark.sql("SELECT name,age FROM people where name='Andy'")
teenagers.show(5)

print(teenagers.rdd.collect())

teenNames = teenagers.rdd.map(lambda p: 100 + p.age).collect()

for name in teenNames:
    print(name)

15 dateFrame,sql,json使用详细示例
#

Licensed to the Apache Software Foundation (ASF) under one or more

contributor license agreements. See the NOTICE file distributed with

this work for additional information regarding copyright ownership.

The ASF licenses this file to You under the Apache License, Version 2.0

(the "License"); you may not use this file except in compliance with

the License. You may obtain a copy of the License at

#

http://www.apache.org/licenses/LICENSE-2.0

#

Unless required by applicable law or agreed to in writing, software

distributed under the License is distributed on an "AS IS" BASIS,

WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

See the License for the specific language governing permissions and

limitations under the License.

#

"""
A simple example demonstrating basic Spark SQL features.
Run with:
./bin/spark-submit examples/src/main/python/sql/basic.py
"""
from future import print_function

$example on:init_session$

from pyspark.sql import SparkSession

$example off:init_session$

$example on:schema_inferring$

from pyspark.sql import Row

$example off:schema_inferring$

$example on:programmatic_schema$

Import data types

from pyspark.sql.types import *

$example off:programmatic_schema$

def basic_df_example(spark):

$example on:create_df$

# spark is an existing SparkSession
df = spark.read.json("/data/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+
# $example off:create_df$

# $example on:untyped_ops$
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# | Justin|       20|
# +-------+---------+

# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# |  19|    1|
# |null|    1|
# |  30|    1|
# +----+-----+
# $example off:untyped_ops$

# $example on:run_sql$
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+
# $example off:run_sql$

# $example on:global_temp_view$
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+
# $example off:global_temp_view$

def schema_inference_example(spark):

$example on:schema_inferring$

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)
# Name: Justin
# $example off:schema_inferring$

def programmatic_schema_example(spark):

$example on:programmatic_schema$

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

results.show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+
# $example off:programmatic_schema$

if name == "main":

$example on:init_session$

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
# $example off:init_session$

basic_df_example(spark)
# schema_inference_example(spark)
# programmatic_schema_example(spark)

spark.stop()
推荐阅读:
  1. 三、spark--spark调度原理分析
  2. Spark 系列(一)—— Spark 简介

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

spark rdd

上一篇:水晶报表工具栏按钮代码

下一篇:MYSLQ 增量恢复学习及实践

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》