diff --git a/common/src/main/java/tech/ides/constants/ScriptConstants.scala b/common/src/main/java/tech/ides/constants/ScriptConstants.scala index 6e7e5df..c55f715 100644 --- a/common/src/main/java/tech/ides/constants/ScriptConstants.scala +++ b/common/src/main/java/tech/ides/constants/ScriptConstants.scala @@ -12,6 +12,8 @@ object ScriptConstants { val IMPL_CLASS = "implClass" + val FILE_FORMAT = "fileFormat" + val PARTITION_BY_COL = "partitionByCol" def PATH_SEPARATOR = File.pathSeparator diff --git a/engine/pom.xml b/engine/pom.xml index bd751a6..5046be8 100644 --- a/engine/pom.xml +++ b/engine/pom.xml @@ -77,6 +77,11 @@ ds-spark-excel-${spark.big.version}_${scala.binary.version} ${project.version} + + tech.ides + hive-exec-${spark.big.version}_${scala.binary.version} + ${project.version} + diff --git a/engine/src/main/java/tech/ides/datasource/impl/HiveDataSource.scala b/engine/src/main/java/tech/ides/datasource/impl/HiveDataSource.scala index bd52415..4736c6d 100644 --- a/engine/src/main/java/tech/ides/datasource/impl/HiveDataSource.scala +++ b/engine/src/main/java/tech/ides/datasource/impl/HiveDataSource.scala @@ -1,9 +1,15 @@ package tech.ides.datasource.impl +import org.apache.hadoop.hive.custom.inputformat.MultiLineCSVInputFormat +import org.apache.hadoop.hive.custom.serde.OpenCSVSerde +import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, Row} import tech.ides.datasource._ import tech.ides.datasource.DataSource.Method.{SINK, SOURCE} -import tech.ides.constants.ScriptConstants.{IMPL_CLASS, PARTITION_BY_COL} +import tech.ides.constants.ScriptConstants.{FILE_FORMAT, IMPL_CLASS, PARTITION_BY_COL} +import tech.ides.exception.IdesException +import tech.sqlclub.common.utils.StringEscapeUtil /** * Hive 数据源 @@ -15,6 +21,7 @@ import tech.ides.constants.ScriptConstants.{IMPL_CLASS, PARTITION_BY_COL} sinceVersion = "1.0.0" ) class HiveDataSource extends DataReader with DataWriter{ + override def load(reader: DataFrameReader, config: DataSourceConfig): DataFrame = { val format = config.config.getOrElse(IMPL_CLASS, fullFormat) reader.options(config.config).format(format).table(config.path) @@ -22,15 +29,50 @@ class HiveDataSource extends DataReader with DataWriter{ override def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Unit = { - val fileFormat = config.config.getOrElse("fileFormat", "parquet") + val fileFormat = config.config.getOrElse(FILE_FORMAT, "parquet") - var options = config.config - "fileFormat" - IMPL_CLASS + var options = config.config - FILE_FORMAT - IMPL_CLASS // 只支持'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' 'avro' 6种类型 // 参考:https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html - options += ("fileFormat" -> ( if ("csv" == fileFormat) "textfile" else fileFormat )) + options += (FILE_FORMAT -> ( if ("csv" == fileFormat) "textfile" else fileFormat )) // csv转成textfile格式 + + val containsDelimiters = HiveOptions.delimiterOptions.keys.exists(options.contains) + if (containsDelimiters && "textfile" != options(FILE_FORMAT) ) { + throw new IdesException(s"""These options:[${HiveOptions.delimiterOptions.keys.mkString(", ")}] can only be used with "textfile/csv" fileFormat. They define how to read delimited files into rows.""") + } + + if (containsDelimiters) { +// fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim + HiveOptions.delimiterOptions.keys.foreach { + // 将delimiter options都进行反向转义 + k => + options.get(k).foreach(v => options = options.updated(k, StringEscapeUtil.unescapeJava(v))) + } + } + + if ( "csv" == fileFormat && options.getOrElse("multiline", "false").toBoolean ) { + // 先判断所有字段是否是String类型 + val table = config.df.get + val notStringCols = table.schema.filterNot(_.dataType == StringType) + if (notStringCols.nonEmpty) throw new IdesException("All fields must be String type when stored as csv type!") + // 解决csv中存在换行问题 + options = options.updated("serde", classOf[OpenCSVSerde].getName) // 使用自定义serde + // 使用自定义FileInputFormat + options = options.updated("inputFormat", classOf[MultiLineCSVInputFormat].getName) + options = options.updated("outputFormat", classOf[HiveIgnoreKeyTextOutputFormat[_,_]].getName) + + options = options - FILE_FORMAT // 不能同时指定 fileFormat 和 inputFormat/outputFormat + } - //todo 这里有点问题 \t 变成了\\t + val containsSerdeOptions = HiveOptions.openCSVSerdeOptions.exists(options.contains) + if ( containsSerdeOptions ) { + // 将serde options 进行转义 + HiveOptions.openCSVSerdeOptions.foreach { + k => + options.get(k).foreach(v => options = options.updated(k, StringEscapeUtil.unescapeJava(v))) + } + } // 如果需要分区 进行partitionBy options.get(PARTITION_BY_COL).map(partitionColumn => partitionColumn.split(",").filterNot(_.isEmpty)) @@ -44,3 +86,21 @@ class HiveDataSource extends DataReader with DataWriter{ override def shortFormat: String = fullFormat } + +object HiveOptions { + + val delimiterOptions = Map( + "fieldDelim" -> "field.delim", + "escapeDelim" -> "escape.delim", + // The following typo is inherited from Hive... + "collectionDelim" -> "colelction.delim", + "mapkeyDelim" -> "mapkey.delim", + "lineDelim" -> "line.delim") + + val openCSVSerdeOptions = Set( + OpenCSVSerde.SEPARATORCHAR, // separatorChar + OpenCSVSerde.QUOTECHAR, // quoteChar + OpenCSVSerde.ESCAPECHAR // escapeChar + ) + +} \ No newline at end of file diff --git a/external/hive-exec/pom.xml b/external/hive-exec/pom.xml new file mode 100644 index 0000000..0f56a3a --- /dev/null +++ b/external/hive-exec/pom.xml @@ -0,0 +1,23 @@ + + + + DataLinked + tech.ides + 1.0.0 + ../../pom.xml + + 4.0.0 + + hive-exec-${spark.big.version}_${scala.binary.version} + + + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + provided + + + \ No newline at end of file diff --git a/external/hive-exec/src/main/java/org/apache/hadoop/hive/custom/inputformat/MultiLineCSVInputFormat.java b/external/hive-exec/src/main/java/org/apache/hadoop/hive/custom/inputformat/MultiLineCSVInputFormat.java new file mode 100644 index 0000000..b2ca36b --- /dev/null +++ b/external/hive-exec/src/main/java/org/apache/hadoop/hive/custom/inputformat/MultiLineCSVInputFormat.java @@ -0,0 +1,62 @@ +package org.apache.hadoop.hive.custom.inputformat; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.custom.serde.OpenCSVSerde; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.SplittableCompressionCodec; +import org.apache.hadoop.mapred.*; +import tech.sqlclub.common.utils.StringEscapeUtil; + +import java.io.IOException; +import java.util.Locale; + +/** + * 处理csv字段带有换行符的InputFormat + * + * An {@link InputFormat} for plain text csv files. Files are broken into lines. + * Either linefeed or carriage-return are used to signal end of line. Keys are + * the position in the file, and values are the line of text.. + * + * @see org.apache.hadoop.mapred.TextInputFormat + * Created by songgr on 2020/12/01. + */ + +@InterfaceAudience.Public +@InterfaceStability.Stable +public class MultiLineCSVInputFormat extends FileInputFormat + implements JobConfigurable { + + private CompressionCodecFactory compressionCodecs = null; + + @Override + public void configure(JobConf conf) { + compressionCodecs = new CompressionCodecFactory(conf); + } + + @Override + protected boolean isSplitable(FileSystem fs, Path file) { + final CompressionCodec codec = compressionCodecs.getCodec(file); + if (null == codec) { + return true; + } + return codec instanceof SplittableCompressionCodec; + } + + @Override + public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { + reporter.setStatus(split.toString()); + + String separatorChar = StringEscapeUtil.unescapeJava(job.get(OpenCSVSerde.SEPARATORCHAR.toLowerCase(Locale.ROOT))); + String quoteChar = StringEscapeUtil.unescapeJava(job.get(OpenCSVSerde.QUOTECHAR.toLowerCase(Locale.ROOT))); + String escapeChar = StringEscapeUtil.unescapeJava(job.get(OpenCSVSerde.ESCAPECHAR.toLowerCase(Locale.ROOT))); + + // 核心是RecordReader 切分每一行 + return new MultiLineCSVRecordReader(job, (FileSplit) split, separatorChar, quoteChar, escapeChar); + } +} diff --git a/external/hive-exec/src/main/java/org/apache/hadoop/hive/custom/serde/OpenCSVSerde.java b/external/hive-exec/src/main/java/org/apache/hadoop/hive/custom/serde/OpenCSVSerde.java new file mode 100644 index 0000000..4a320e4 --- /dev/null +++ b/external/hive-exec/src/main/java/org/apache/hadoop/hive/custom/serde/OpenCSVSerde.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.custom.serde; + +import au.com.bytecode.opencsv.CSVReader; +import au.com.bytecode.opencsv.CSVWriter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeSpec; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.*; + +/** + * OpenCSVSerde use opencsv to deserialize CSV format. + * Users can specify custom separator, quote or escape characters. And the default separator(\), + * quote("), and escape characters(\) are the same as the opencsv library. + * + * @see org.apache.hadoop.hive.serde2.OpenCSVSerde + */ +@SerDeSpec(schemaProps = { + serdeConstants.LIST_COLUMNS, + OpenCSVSerde.SEPARATORCHAR, OpenCSVSerde.QUOTECHAR, OpenCSVSerde.ESCAPECHAR}) +public final class OpenCSVSerde extends AbstractSerDe { + + public static final Logger LOG = LoggerFactory.getLogger(OpenCSVSerde.class.getName()); + private ObjectInspector inspector; + private String[] outputFields; + private int numCols; + private List row; + + private char separatorChar; + private char quoteChar; + private char escapeChar; + /** + * 字段分割符 + */ + public static final String SEPARATORCHAR = "separatorChar"; + /** + * 字符引用符,字段值使用该字符包裹 + */ + public static final String QUOTECHAR = "quoteChar"; + /** + * 字段逃逸字符,使用该字符对字段值进行转义 + */ + public static final String ESCAPECHAR = "escapeChar"; + + public static final char DEFAULT_SEPARATOR = ','; + public static final char DEFAULT_QUOTE_CHARACTER = '"'; + public static final char DEFAULT_ESCAPE_CHARACTER = '\\'; + + @Override + public void initialize(final Configuration conf, final Properties tbl) throws SerDeException { + + final List columnNames = Arrays.asList(tbl.getProperty(serdeConstants.LIST_COLUMNS) + .split(",")); + + numCols = columnNames.size(); + + final List columnOIs = new ArrayList(numCols); + + for (int i = 0; i < numCols; i++) { + columnOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); + } + + inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs); + outputFields = new String[numCols]; + row = new ArrayList(numCols); + + for (int i = 0; i < numCols; i++) { + row.add(null); + } + + // tbl都转化成了小写 + separatorChar = getProperty(tbl, SEPARATORCHAR.toLowerCase(Locale.ROOT), DEFAULT_SEPARATOR); + quoteChar = getProperty(tbl, QUOTECHAR.toLowerCase(Locale.ROOT), DEFAULT_QUOTE_CHARACTER); + escapeChar = getProperty(tbl, ESCAPECHAR.toLowerCase(Locale.ROOT), DEFAULT_ESCAPE_CHARACTER); + } + + private char getProperty(final Properties tbl, final String property, final char def) { + final String val = tbl.getProperty(property); + + if (val != null) { + return val.charAt(0); + } + + return def; + } + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + final StructObjectInspector outputRowOI = (StructObjectInspector) objInspector; + final List outputFieldRefs = outputRowOI.getAllStructFieldRefs(); + + if (outputFieldRefs.size() != numCols) { + throw new SerDeException("Cannot serialize the object because there are " + + outputFieldRefs.size() + " fields but the table has " + numCols + " columns."); + } + + // Get all data out. + for (int c = 0; c < numCols; c++) { + final Object field = outputRowOI.getStructFieldData(obj, outputFieldRefs.get(c)); + final ObjectInspector fieldOI = outputFieldRefs.get(c).getFieldObjectInspector(); + + // The data must be of type String + final StringObjectInspector fieldStringOI = (StringObjectInspector) fieldOI; + + // Convert the field to Java class String, because objects of String type + // can be stored in String, Text, or some other classes. + outputFields[c] = fieldStringOI.getPrimitiveJavaObject(field); + } + + final StringWriter writer = new StringWriter(); + final CSVWriter csv = newWriter(writer, separatorChar, quoteChar, escapeChar); + + try { + csv.writeNext(outputFields); + csv.close(); + + return new Text(writer.toString()); + } catch (final IOException ioe) { + throw new SerDeException(ioe); + } + } + + @Override + public Object deserialize(final Writable blob) throws SerDeException { + Text rowText = (Text) blob; + + CSVReader csv = null; + try { + csv = newReader(new CharArrayReader(rowText.toString().toCharArray()), separatorChar, + quoteChar, escapeChar); + final String[] read = csv.readNext(); + + for (int i = 0; i < numCols; i++) { + if (read != null && i < read.length) { + row.set(i, read[i]); + } else { + row.set(i, null); + } + } + + return row; + } catch (final Exception e) { + throw new SerDeException(e); + } finally { + if (csv != null) { + try { + csv.close(); + } catch (final Exception e) { + LOG.error("fail to close csv writer ", e); + } + } + } + } + + private CSVReader newReader(final Reader reader, char separator, char quote, char escape) { + // CSVReader will throw an exception if any of separator, quote, or escape is the same, but + // the CSV format specifies that the escape character and quote char are the same... very weird + if (CSVWriter.DEFAULT_ESCAPE_CHARACTER == escape) { + return new CSVReader(reader, separator, quote); + } else { + return new CSVReader(reader, separator, quote, escape); + } + } + + private CSVWriter newWriter(final Writer writer, char separator, char quote, char escape) { + if (CSVWriter.DEFAULT_ESCAPE_CHARACTER == escape) { + return new CSVWriter(writer, separator, quote, ""); + } else { + return new CSVWriter(writer, separator, quote, escape, ""); + } + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return inspector; + } + + @Override + public Class getSerializedClass() { + return Text.class; + } + + @Override + public SerDeStats getSerDeStats() { + return null; + } +} diff --git a/external/hive-exec/src/main/java/org/apache/hadoop/mapred/MultiLineCSVRecordReader.java b/external/hive-exec/src/main/java/org/apache/hadoop/mapred/MultiLineCSVRecordReader.java new file mode 100644 index 0000000..26114d2 --- /dev/null +++ b/external/hive-exec/src/main/java/org/apache/hadoop/mapred/MultiLineCSVRecordReader.java @@ -0,0 +1,321 @@ +package org.apache.hadoop.mapred; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.custom.serde.OpenCSVSerde; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import java.io.IOException; +import java.io.InputStream; + +/** + * 处理csv字段带有换行符的RecordReader + * + * Treats keys as offset in file and value as line. + * Can correctly handle the value which contains one of CR, LF, or CRLF. + * + * @see org.apache.hadoop.mapred.LineRecordReader + * Created by songgr on 2020/12/10. + */ +public class MultiLineCSVRecordReader implements RecordReader { + private static final Log LOG = LogFactory.getLog(MultiLineCSVRecordReader.class.getName()); + + private CompressionCodecFactory compressionCodecs; + private CompressionCodec codec; + private long start; + private long pos; + private long end; + private NewLineReader in; + private FSDataInputStream fileIn; + /** + * 单行最大字节数 + * 默认为 Integer.MAX_VALUE + */ + private int maxLineLength; + + private char separatorChar; + private char quoteChar; + private char escapeChar; + /** + * 默认行分割符为'\n' + */ + private final char lineSep = '\n'; + private final int lineSepLen = 1; + + public MultiLineCSVRecordReader(Configuration job, FileSplit split, + String separatorChar, + String quoteChar, + String escapeChar) throws IOException { + + this.separatorChar = separatorChar != null && separatorChar.length() >0 ? + separatorChar.charAt(0) : OpenCSVSerde.DEFAULT_SEPARATOR; + this.quoteChar = quoteChar != null && quoteChar.length() >0 ? + quoteChar.charAt(0) : OpenCSVSerde.DEFAULT_QUOTE_CHARACTER; + this.escapeChar = escapeChar != null && escapeChar.length() >0 ? + escapeChar.charAt(0) : OpenCSVSerde.DEFAULT_ESCAPE_CHARACTER; + + this.maxLineLength = job.getInt("mapreduce.input.linerecordreader.line.maxlength", + Integer.MAX_VALUE); + start = split.getStart(); + end = start + split.getLength(); + final Path file = split.getPath(); + compressionCodecs = new CompressionCodecFactory(job); + codec = compressionCodecs.getCodec(file); + + // open the file and seek to the start of the split + final FileSystem fs = file.getFileSystem(job); + fileIn = fs.open(file); + + boolean skipFirstLine = false; + if ( isCompressedInput() ) { + in = new NewLineReader(codec.createInputStream(fileIn), job); + end = Long.MAX_VALUE; + } else { + if (start != 0) { + skipFirstLine = true; + // - lineSep + this.start -= lineSepLen; + fileIn.seek(start); + } + in = new NewLineReader(fileIn, job); + } + if (skipFirstLine) { + // skip first line and re-establish "start". + start += in.readLine(new Text(), 0, + (int) Math.min((long) Integer.MAX_VALUE, end - start)); + } + this.pos = start; + } + + @Override + public boolean next(LongWritable key, Text value) throws IOException { + key.set(pos); + int newSize = 0; + while (pos < end) { + newSize = in.readLine(value, maxLineLength, + Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), + maxLineLength)); + if (newSize == 0) { + break; + } + pos += newSize; + if (newSize < maxLineLength) { + break; + } + LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); + } + if (newSize == 0) { + key = null; + value = null; + return false; + } else { + return true; + } + } + + @Override + public LongWritable createKey() { + return new LongWritable(); + } + + @Override + public Text createValue() { + return new Text(); + } + + @Override + public long getPos() throws IOException { + return pos; + } + + @Override + public void close() throws IOException { + if (in != null) { + in.close(); + in = null; + } + if (fileIn != null) { + fileIn.close(); + fileIn = null; + } + compressionCodecs = null; + codec = null; + LOG.info("MultiLineCSVRecordReader close all resources successfully."); + } + + @Override + public float getProgress() throws IOException { + if (start == end) { + return 0.0f; + } else { + return Math.min(1.0f, (pos - start) / (float) (end - start)); + } + } + + private boolean isCompressedInput() { + return (codec != null); + } + + public class NewLineReader { + // 默认buffer大小 64Kb + private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; + private int bufferSize; + private InputStream in; + private byte[] buffer; + // 当前buffer长度 + private int bufferLength = 0; + // buffer 位置 + private int bufferPosn = 0; + + public NewLineReader(InputStream in) { + this(in, DEFAULT_BUFFER_SIZE); + } + + public NewLineReader(InputStream in, int bufferSize) { + this.in = in; + this.bufferSize = bufferSize; + this.buffer = new byte[this.bufferSize]; + } + + public NewLineReader(InputStream in, Configuration conf) + throws IOException { + this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE)); + } + + public void close() throws IOException { + in.close(); + } + + public int readLine(Text str, int maxLineLength, int maxBytesToConsume) + throws IOException { + str.clear(); + Text record = new Text(); + int txtLength = 0; + // 消耗的字节数 + long bytesConsumed = 0L; + // 是否是新的一行 + boolean newline = false; + // 行分割符 位置 + int sepPosn = 0; + boolean wholeQuote = true; // 是否引用完整 + do { + // 已经读到buffer的末尾了,读下一个buffer + if (this.bufferPosn >= this.bufferLength) { + bufferPosn = 0; + bufferLength = in.read(buffer); + // 读到文件末尾了,则跳出,进行下一个文件的读取 + if (bufferLength <= 0) { + break; + } + } + int startPosn = this.bufferPosn; + boolean forceSplit = false; + for (; bufferPosn < bufferLength && !forceSplit ; bufferPosn++) { + // 如果是字段分割符直接跳过 + if (buffer[bufferPosn] == separatorChar) { + continue; + } + // 遇到引用符的第一个字符 或者 上一次引号由于buffer满了被切分了 + if (buffer[bufferPosn] == quoteChar || !wholeQuote) { + wholeQuote = false; + // 判断接下来的字符是否也是引号中的字符 + for (int i = 1; !wholeQuote; i++) { + // buffer满了,且引号符被不幸地切成了两半 + if (bufferPosn + i >= bufferLength) { + bufferPosn += i - 1; // 回到上一步 + break; + } + // 遇到转义字符 注意⚠️:需要先判断转义,不然如果转义和引号符相同的时候就会有问题! + if ( buffer[bufferPosn+i] == escapeChar) { + // 判断buffer是否满 注意这边直接放了两个字符 + if ( bufferPosn + i + 1 >= bufferLength) { + forceSplit = true; + bufferPosn += i-1; // 回到上一步 + } + i ++ ; // 指针需要往后移一位 + continue; // 注意⚠️:️ 只要往后一步 continue 直接到了转义字符的下一位 + } + + // 遇上引号符 && 下一个字符是字段分割符 => 引号已经完整 + if (buffer[bufferPosn+i] == quoteChar && buffer[bufferPosn+i+1] == separatorChar) { + wholeQuote = true; + // sepPosn重置 避免由于上次由于引号却分导致sepPosn不一致 + sepPosn = 0; + bufferPosn = bufferPosn + i; + break; + } + + // 遇上引号符 && 下一个字符是换行符 => 引号完整/直接是下一行 + // 注意⚠️: 这里要多判断一位是不是引号符,不然字段里出现引号符加换行符就会有问题! + if (buffer[bufferPosn+i] == quoteChar && buffer[bufferPosn+i+1] == lineSep && buffer[bufferPosn+i+2] == quoteChar) { + wholeQuote = true; + newline = true; + // sepPosn重置 避免由于上次由于引号却分导致sepPosn不一致 + sepPosn = 0; + bufferPosn = bufferPosn + i + 1; // 移到换行符上 + break; + } + + // 如果遇到最后一个换行符 直接结束,并且设置wholeQuote=true + if (buffer[bufferPosn+i] == lineSep && (bufferPosn+i == bufferLength-1)) { + wholeQuote = true; + sepPosn = 0; + bufferPosn = bufferPosn + i; + break; + } + } + } + + // 遇到行分隔符的第一个字符 + if (buffer[bufferPosn] == lineSep || newline ) { + bufferPosn++; + sepPosn++; + // 的确遇到了行分隔符 ?? + if (sepPosn == lineSepLen && wholeQuote) { + newline = true; + sepPosn = 0; + break; + } + } + } + int readLength = this.bufferPosn - startPosn; // 总共读到的长度 + bytesConsumed += readLength; // 加上之前消耗的字节数 + // 控制 maxLineLength 范围 + if (readLength > maxLineLength - txtLength) { + readLength = maxLineLength - txtLength; + } + if (readLength > 0) { + // 从buffer的startPosn位置开始取出真实需要读取的长度 加入到record + record.append(this.buffer, startPosn, readLength); + txtLength += readLength; + // 去掉记录的分隔符 + if (newline) { + // - lineSep + str.set(record.getBytes(), 0, record.getLength() + - lineSepLen); + } + } + } while (!newline && (bytesConsumed < maxBytesToConsume)); + if (bytesConsumed > (long) Integer.MAX_VALUE) { + throw new IOException("Too many bytes before newline: " + + bytesConsumed); + } + + return (int) bytesConsumed; + } + + public int readLine(Text str, int maxLineLength) throws IOException { + return readLine(str, maxLineLength, Integer.MAX_VALUE); + } + + public int readLine(Text str) throws IOException { + return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); + } + } +} diff --git a/pom.xml b/pom.xml index e6804f7..eaa1c3e 100644 --- a/pom.xml +++ b/pom.xml @@ -36,6 +36,7 @@ engine repl external/ds-spark-excel + external/hive-exec