Untitled 2

予早 2026-04-30 23:53:27
Categories: Tags:
 import pyspark
 
 spark = pyspark.sql.SparkSession.builder.appName("test").config(
     'spark.sql.parquet.binaryAsString', 'true').getOrCreate()
 
 # 2.77GB
 df1 = spark.read.format("jdbc").options(
     user="dsv2", password="djiijx.lTpeA", driver='com.mysql.cj.jdbc.Driver',
     url=f"jdbc:mysql://192.168.4.254:3306/data_service_v2", query="SELECT * FROM companyxxx").load()
 
 # df1.collect()
 # java.sql.SQLException: GC overhead limit exceeded
 
 df2 = spark.createDataFrame([(1,), (2,), (3,), (4,), (5,)], ["id"])
 df2.createOrReplaceTempView("df2")
 
 df2.show()
 
 df3 = df1.join(df2, "id", how="inner")
 
 # df3.show()
 # java.sql.SQLException: GC overhead limit exceeded
 
 df4 = df1.filter("id in (select id from df2)")
 
 df4.show()
 # java.sql.SQLException: GC overhead limit exceeded
 
 # df1.filter("id in (select id from df2)") 不会优化
 

root 用户生成ssh的key就是用于root用户的登陆

所以要注意用户

可能某一个集群之间使用hadoop可以通信,但是root是不允许的

列类型转换

 from pyspark.sql.types import DecimalType, StringType
 
 output_df = ip_df \
   .withColumn("col_value", ip_df["col_value"].cast(DecimalType())) \
   .withColumn("id", ip_df["id"].cast(StringType()))

es+spark 读取es中的自定义格式日期时报异常

 问题描述:spark读取指定索引/类型的数据,其中有自定义格式的日期数据,读取该日期时报异常,日期定义格式:"start_time" : {
 "type" : "date",
 "format" : "yyyy-MM-dd HH:mm:ss"
 },
 异常信息:
 16/06/12 15:54:17 INFO DAGScheduler: Job 0 failed: saveAsTextFile at BrowersDataJob.java:84, took 6.235458 s
 Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, 172.16.7.82): org.elasticsearch.hadoop.rest.EsHadoopParsingException: Cannot parse value [2016-05-12 05:49:07] for field [start_time]
 at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:701)
 at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:794)
 at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:692)