spark读取GBK编码文件乱码问题

sc.textFile()读取GBK编码文件乱码原因分析

  1. SPARK 常用的textFile方法默认是写死了读UTF-8格式的文件,其他编码格式文件会显示乱码

  2. 查看textFile方法的实现

    1
    2
    3
    4
    5
    6
    7
    def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
    }

    读文件的时候用到了hadoopFile方法,读取的文件时调用TextInputformat类来解析文本文件,输出K V键值对。继续查看TextInputformat方法读取文件的实现,其中读记录生成K V键值对的方法的实现如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException {
    reporter.setStatus(genericSplit.toString());
    String delimiter = job.get("textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter) {
    recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    }

    return new LineRecordReader(job, (FileSplit)genericSplit, recordDelimiterBytes);
    }

    虽然代码中有delimiter.getBytes(Charsets.UTF_8),但并不是最后输出的关键,这里是指定分隔符用UTF-8解码,并设置分隔符。继续往下看,最后发现LineRecordReader–>readLine–>Text,返回指定的字符集UTF-8。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public class Text extends BinaryComparable implements WritableComparable<BinaryComparable> {
    private static final ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new ThreadLocal<CharsetEncoder>() {
    protected CharsetEncoder initialValue() {
    return Charset.forName("UTF-8").newEncoder().onMalformedInput(CodingErrorAction.REPORT).onUnmappableCharacter(CodingErrorAction.REPORT);
    }
    };
    private static final ThreadLocal<CharsetDecoder> DECODER_FACTORY = new ThreadLocal<CharsetDecoder>() {
    protected CharsetDecoder initialValue() {
    return Charset.forName("UTF-8").newDecoder().onMalformedInput(CodingErrorAction.REPORT).onUnmappableCharacter(CodingErrorAction.REPORT);
    }
    };

    所以hadoopFile返回的HadoopRDD,其中输入TextInputformat和输出valueClass均为Text,返回的是字符集UTF-8,而我们输入的是GBK编码,用UTF-8解码就会造成乱码。所以HadoopRDD中的value也是乱码的。

  3. 如果我们想要读取GBK文件避免乱码,可以通过对HadoopRDD进行操作:

    对HadoopRDD中的value按照GBK的方式读取变成字符串,运行之后能够正常显示:

    1
    HadoopRDD.map(pair => new String(pair._2.getBytes,"GBK")

    说明:关于String类:

    1
    2
    3
    4
    5
    6
    String.getBytes(Charset charset)。
    返回值:byte[] 返回的是一个 字节数组。
    方法的作用:将String以指定的编码格式(既参数charset)进行解码,然后以字节数组的形式存储这些解码后的字节。
    String(byte[] bytes,Charset charset)
    返回值:String 返回的是一串字符串。
    方法的作用:将字节数组bytes以charset的编码格式进行解码。
  4. 案例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    package com.ruozedata.spark.job

    import org.apache.hadoop.io.{LongWritable, Text}
    import org.apache.hadoop.mapred.TextInputFormat
    import org.apache.spark.{SparkConf, SparkContext}
    import java.io.{File, FileOutputStream, OutputStreamWriter}

    object jobApp {
    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getCanonicalName)
    val sc = new SparkContext(sparkConf)

    val GBKPATH = "data/GBKtext.txt"
    val UTF8PATH = "data/UTF8text.txt"
    val GBKFile = new File(GBKPATH)
    val UTF8File = new File(UTF8PATH)
    UTF8File
    if(GBKFile.exists()){
    GBKFile.delete()
    }
    if(UTF8File.exists()){
    GBKFile.delete()
    }


    val GBKwriter = new OutputStreamWriter(new FileOutputStream(GBKFile), "GBK")
    GBKwriter.write("这是第一行GBK数据\n")
    GBKwriter.write("这是第二行GBK数据\n")
    GBKwriter.write("这是第三行GBK数据\n")
    GBKwriter.flush()
    GBKwriter.close()

    val UTF8writer = new OutputStreamWriter(new FileOutputStream(UTF8File), "UTF-8")
    UTF8writer.write("这是第一行UTF8数据\n")
    UTF8writer.write("这是第二行UTF8数据\n")
    UTF8writer.write("这是第三行UTF8数据\n")
    UTF8writer.flush()
    UTF8writer.close()

    //val UTF8rdd = sc.textFile(UTF8PATH).collect().foreach(println)
    /**textFile读UTF-8文件,显示如下:
    *这是第一行GBK数据
    *这是第二行GBK数据
    *这是第三行GBK数据
    */
    //val GBKrdd1 = sc.textFile(GBKPATH).collect().foreach(println)
    /**textFile读GBK文件,显示如下:
    *���ǵ�һ��GBK����
    *���ǵڶ���GBK����
    *���ǵ�����GBK����
    */
    //textFile实现方法:
    //hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    // minPartitions).map(pair => pair._2.toString).setName(path)

    val GBKrdd2 = sc.hadoopFile(GBKPATH, classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
    .map(pair => new String(pair._2.getBytes,"GBK")).collect.foreach(println)

    sc.stop()
    }

    }

贴一个更复杂的方法,自定义InputFormat类,在读取输入时候将封装的字节流从GBK编码转化为UTF-8编码,可以参考:

https://www.wangt.cc/2019/11/feature%EF%BC%9Aspark%E6%94%AF%E6%8C%81gbk%E6%96%87%E4%BB%B6%E8%AF%BB%E5%8F%96%E5%8A%9F%E8%83%BD/