sc.textFile()读取GBK编码文件乱码原因分析
SPARK 常用的textFile方法默认是写死了读UTF-8格式的文件,其他编码格式文件会显示乱码
查看textFile方法的实现
1
2
3
4
5
6
7def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}读文件的时候用到了hadoopFile方法,读取的文件时调用TextInputformat类来解析文本文件,输出K V键值对。继续查看TextInputformat方法读取文件的实现,其中读记录生成K V键值对的方法的实现如下:
1
2
3
4
5
6
7
8
9
10public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(genericSplit.toString());
String delimiter = job.get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
}
return new LineRecordReader(job, (FileSplit)genericSplit, recordDelimiterBytes);
}虽然代码中有
delimiter.getBytes(Charsets.UTF_8)
,但并不是最后输出的关键,这里是指定分隔符用UTF-8解码,并设置分隔符。继续往下看,最后发现LineRecordReader–>readLine–>Text,返回指定的字符集UTF-8。1
2
3
4
5
6
7
8
9
10
11public class Text extends BinaryComparable implements WritableComparable<BinaryComparable> {
private static final ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new ThreadLocal<CharsetEncoder>() {
protected CharsetEncoder initialValue() {
return Charset.forName("UTF-8").newEncoder().onMalformedInput(CodingErrorAction.REPORT).onUnmappableCharacter(CodingErrorAction.REPORT);
}
};
private static final ThreadLocal<CharsetDecoder> DECODER_FACTORY = new ThreadLocal<CharsetDecoder>() {
protected CharsetDecoder initialValue() {
return Charset.forName("UTF-8").newDecoder().onMalformedInput(CodingErrorAction.REPORT).onUnmappableCharacter(CodingErrorAction.REPORT);
}
};所以hadoopFile返回的HadoopRDD,其中输入TextInputformat和输出valueClass均为Text,返回的是字符集UTF-8,而我们输入的是GBK编码,用UTF-8解码就会造成乱码。所以HadoopRDD中的value也是乱码的。
如果我们想要读取GBK文件避免乱码,可以通过对HadoopRDD进行操作:
对HadoopRDD中的value按照GBK的方式读取变成字符串,运行之后能够正常显示:
1
HadoopRDD.map(pair => new String(pair._2.getBytes,"GBK")
说明:关于String类:
1
2
3
4
5
6String.getBytes(Charset charset)。
返回值:byte[] 返回的是一个 字节数组。
方法的作用:将String以指定的编码格式(既参数charset)进行解码,然后以字节数组的形式存储这些解码后的字节。
String(byte[] bytes,Charset charset)
返回值:String 返回的是一串字符串。
方法的作用:将字节数组bytes以charset的编码格式进行解码。案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62package com.ruozedata.spark.job
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.{SparkConf, SparkContext}
import java.io.{File, FileOutputStream, OutputStreamWriter}
object jobApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getCanonicalName)
val sc = new SparkContext(sparkConf)
val GBKPATH = "data/GBKtext.txt"
val UTF8PATH = "data/UTF8text.txt"
val GBKFile = new File(GBKPATH)
val UTF8File = new File(UTF8PATH)
UTF8File
if(GBKFile.exists()){
GBKFile.delete()
}
if(UTF8File.exists()){
GBKFile.delete()
}
val GBKwriter = new OutputStreamWriter(new FileOutputStream(GBKFile), "GBK")
GBKwriter.write("这是第一行GBK数据\n")
GBKwriter.write("这是第二行GBK数据\n")
GBKwriter.write("这是第三行GBK数据\n")
GBKwriter.flush()
GBKwriter.close()
val UTF8writer = new OutputStreamWriter(new FileOutputStream(UTF8File), "UTF-8")
UTF8writer.write("这是第一行UTF8数据\n")
UTF8writer.write("这是第二行UTF8数据\n")
UTF8writer.write("这是第三行UTF8数据\n")
UTF8writer.flush()
UTF8writer.close()
//val UTF8rdd = sc.textFile(UTF8PATH).collect().foreach(println)
/**textFile读UTF-8文件,显示如下:
*这是第一行GBK数据
*这是第二行GBK数据
*这是第三行GBK数据
*/
//val GBKrdd1 = sc.textFile(GBKPATH).collect().foreach(println)
/**textFile读GBK文件,显示如下:
*���ǵ�һ��GBK����
*���ǵڶ���GBK����
*���ǵ�����GBK����
*/
//textFile实现方法:
//hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
// minPartitions).map(pair => pair._2.toString).setName(path)
val GBKrdd2 = sc.hadoopFile(GBKPATH, classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
.map(pair => new String(pair._2.getBytes,"GBK")).collect.foreach(println)
sc.stop()
}
}
贴一个更复杂的方法,自定义InputFormat类,在读取输入时候将封装的字节流从GBK编码转化为UTF-8编码,可以参考: