import pyspark
spark = pyspark.sql.SparkSession.builder.appName("test").config(
'spark.sql.parquet.binaryAsString', 'true').getOrCreate()
# 2.77GB
df1 = spark.read.format("jdbc").options(
user="dsv2", password="djiijx.lTpeA", driver='com.mysql.cj.jdbc.Driver',
url=f"jdbc:mysql://192.168.4.254:3306/data_service_v2", query="SELECT * FROM companyxxx").load()
# df1.collect()
# java.sql.SQLException: GC overhead limit exceeded
df2 = spark.createDataFrame([(1,), (2,), (3,), (4,), (5,)], ["id"])
df2.createOrReplaceTempView("df2")
df2.show()
df3 = df1.join(df2, "id", how="inner")
# df3.show()
# java.sql.SQLException: GC overhead limit exceeded
df4 = df1.filter("id in (select id from df2)")
df4.show()
# java.sql.SQLException: GC overhead limit exceeded
# df1.filter("id in (select id from df2)") 不会优化
root 用户生成ssh的key就是用于root用户的登陆
所以要注意用户
可能某一个集群之间使用hadoop可以通信,但是root是不允许的
列类型转换
from pyspark.sql.types import DecimalType, StringType
output_df = ip_df \
.withColumn("col_value", ip_df["col_value"].cast(DecimalType())) \
.withColumn("id", ip_df["id"].cast(StringType()))
es+spark 读取es中的自定义格式日期时报异常
问题描述:spark读取指定索引/类型的数据,其中有自定义格式的日期数据,读取该日期时报异常,日期定义格式:"start_time" : {
"type" : "date",
"format" : "yyyy-MM-dd HH:mm:ss"
},
异常信息:
16/06/12 15:54:17 INFO DAGScheduler: Job 0 failed: saveAsTextFile at BrowersDataJob.java:84, took 6.235458 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, 172.16.7.82): org.elasticsearch.hadoop.rest.EsHadoopParsingException: Cannot parse value [2016-05-12 05:49:07] for field [start_time]
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:701)
at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:794)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:692)