|
| 1 | +# |
| 2 | +# Licensed to the Apache Software Foundation (ASF) under one or more |
| 3 | +# contributor license agreements. See the NOTICE file distributed with |
| 4 | +# this work for additional information regarding copyright ownership. |
| 5 | +# The ASF licenses this file to You under the Apache License, Version 2.0 |
| 6 | +# (the "License"); you may not use this file except in compliance with |
| 7 | +# the License. You may obtain a copy of the License at |
| 8 | +# |
| 9 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +# |
| 11 | +# Unless required by applicable law or agreed to in writing, software |
| 12 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | +# See the License for the specific language governing permissions and |
| 15 | +# limitations under the License. |
| 16 | +# |
| 17 | + |
| 18 | +import os |
| 19 | + |
| 20 | +from pyspark import SparkContext |
| 21 | +from pyspark.sql import SQLContext |
| 22 | +from pyspark.sql import Row, StructField, StructType, StringType, IntegerType |
| 23 | + |
| 24 | + |
| 25 | +if __name__ == "__main__": |
| 26 | + sc = SparkContext(appName="PythonSQL") |
| 27 | + sqlContext = SQLContext(sc) |
| 28 | + |
| 29 | + # RDD is created from a list of rows |
| 30 | + some_rdd = sc.parallelize([Row(name="John", age=19), |
| 31 | + Row(name="Smith", age=23), |
| 32 | + Row(name="Sarah", age=18)]) |
| 33 | + # Infer schema from the first row, create a SchemaRDD and print the schema |
| 34 | + some_schemardd = sqlContext.inferSchema(some_rdd) |
| 35 | + some_schemardd.printSchema() |
| 36 | + |
| 37 | + # Another RDD is created from a list of tuples |
| 38 | + another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)]) |
| 39 | + # Schema with two fields - person_name and person_age |
| 40 | + schema = StructType([StructField("person_name", StringType(), False), |
| 41 | + StructField("person_age", IntegerType(), False)]) |
| 42 | + # Create a SchemaRDD by applying the schema to the RDD and print the schema |
| 43 | + another_schemardd = sqlContext.applySchema(another_rdd, schema) |
| 44 | + another_schemardd.printSchema() |
| 45 | + # root |
| 46 | + # |-- age: integer (nullable = true) |
| 47 | + # |-- name: string (nullable = true) |
| 48 | + |
| 49 | + # A JSON dataset is pointed to by path. |
| 50 | + # The path can be either a single text file or a directory storing text files. |
| 51 | + path = os.environ['SPARK_HOME'] + "examples/src/main/resources/people.json" |
| 52 | + # Create a SchemaRDD from the file(s) pointed to by path |
| 53 | + people = sqlContext.jsonFile(path) |
| 54 | + # root |
| 55 | + # |-- person_name: string (nullable = false) |
| 56 | + # |-- person_age: integer (nullable = false) |
| 57 | + |
| 58 | + # The inferred schema can be visualized using the printSchema() method. |
| 59 | + people.printSchema() |
| 60 | + # root |
| 61 | + # |-- age: IntegerType |
| 62 | + # |-- name: StringType |
| 63 | + |
| 64 | + # Register this SchemaRDD as a table. |
| 65 | + people.registerAsTable("people") |
| 66 | + |
| 67 | + # SQL statements can be run by using the sql methods provided by sqlContext |
| 68 | + teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") |
| 69 | + |
| 70 | + for each in teenagers.collect(): |
| 71 | + print each[0] |
| 72 | + |
| 73 | + sc.stop() |
0 commit comments