diff --git a/common/src/main/java/tech/ides/constants/ScriptConstants.scala b/common/src/main/java/tech/ides/constants/ScriptConstants.scala
index 6e7e5df..c55f715 100644
--- a/common/src/main/java/tech/ides/constants/ScriptConstants.scala
+++ b/common/src/main/java/tech/ides/constants/ScriptConstants.scala
@@ -12,6 +12,8 @@ object ScriptConstants {
val IMPL_CLASS = "implClass"
+ val FILE_FORMAT = "fileFormat"
+
val PARTITION_BY_COL = "partitionByCol"
def PATH_SEPARATOR = File.pathSeparator
diff --git a/engine/pom.xml b/engine/pom.xml
index bd751a6..5046be8 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -77,6 +77,11 @@
ds-spark-excel-${spark.big.version}_${scala.binary.version}
${project.version}
+
+ tech.ides
+ hive-exec-${spark.big.version}_${scala.binary.version}
+ ${project.version}
+
diff --git a/engine/src/main/java/tech/ides/datasource/impl/HiveDataSource.scala b/engine/src/main/java/tech/ides/datasource/impl/HiveDataSource.scala
index bd52415..4736c6d 100644
--- a/engine/src/main/java/tech/ides/datasource/impl/HiveDataSource.scala
+++ b/engine/src/main/java/tech/ides/datasource/impl/HiveDataSource.scala
@@ -1,9 +1,15 @@
package tech.ides.datasource.impl
+import org.apache.hadoop.hive.custom.inputformat.MultiLineCSVInputFormat
+import org.apache.hadoop.hive.custom.serde.OpenCSVSerde
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, Row}
import tech.ides.datasource._
import tech.ides.datasource.DataSource.Method.{SINK, SOURCE}
-import tech.ides.constants.ScriptConstants.{IMPL_CLASS, PARTITION_BY_COL}
+import tech.ides.constants.ScriptConstants.{FILE_FORMAT, IMPL_CLASS, PARTITION_BY_COL}
+import tech.ides.exception.IdesException
+import tech.sqlclub.common.utils.StringEscapeUtil
/**
* Hive 数据源
@@ -15,6 +21,7 @@ import tech.ides.constants.ScriptConstants.{IMPL_CLASS, PARTITION_BY_COL}
sinceVersion = "1.0.0"
)
class HiveDataSource extends DataReader with DataWriter{
+
override def load(reader: DataFrameReader, config: DataSourceConfig): DataFrame = {
val format = config.config.getOrElse(IMPL_CLASS, fullFormat)
reader.options(config.config).format(format).table(config.path)
@@ -22,15 +29,50 @@ class HiveDataSource extends DataReader with DataWriter{
override def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Unit = {
- val fileFormat = config.config.getOrElse("fileFormat", "parquet")
+ val fileFormat = config.config.getOrElse(FILE_FORMAT, "parquet")
- var options = config.config - "fileFormat" - IMPL_CLASS
+ var options = config.config - FILE_FORMAT - IMPL_CLASS
// 只支持'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' 'avro' 6种类型
// 参考:https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html
- options += ("fileFormat" -> ( if ("csv" == fileFormat) "textfile" else fileFormat ))
+ options += (FILE_FORMAT -> ( if ("csv" == fileFormat) "textfile" else fileFormat )) // csv转成textfile格式
+
+ val containsDelimiters = HiveOptions.delimiterOptions.keys.exists(options.contains)
+ if (containsDelimiters && "textfile" != options(FILE_FORMAT) ) {
+ throw new IdesException(s"""These options:[${HiveOptions.delimiterOptions.keys.mkString(", ")}] can only be used with "textfile/csv" fileFormat. They define how to read delimited files into rows.""")
+ }
+
+ if (containsDelimiters) {
+// fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim
+ HiveOptions.delimiterOptions.keys.foreach {
+ // 将delimiter options都进行反向转义
+ k =>
+ options.get(k).foreach(v => options = options.updated(k, StringEscapeUtil.unescapeJava(v)))
+ }
+ }
+
+ if ( "csv" == fileFormat && options.getOrElse("multiline", "false").toBoolean ) {
+ // 先判断所有字段是否是String类型
+ val table = config.df.get
+ val notStringCols = table.schema.filterNot(_.dataType == StringType)
+ if (notStringCols.nonEmpty) throw new IdesException("All fields must be String type when stored as csv type!")
+ // 解决csv中存在换行问题
+ options = options.updated("serde", classOf[OpenCSVSerde].getName) // 使用自定义serde
+ // 使用自定义FileInputFormat
+ options = options.updated("inputFormat", classOf[MultiLineCSVInputFormat].getName)
+ options = options.updated("outputFormat", classOf[HiveIgnoreKeyTextOutputFormat[_,_]].getName)
+
+ options = options - FILE_FORMAT // 不能同时指定 fileFormat 和 inputFormat/outputFormat
+ }
- //todo 这里有点问题 \t 变成了\\t
+ val containsSerdeOptions = HiveOptions.openCSVSerdeOptions.exists(options.contains)
+ if ( containsSerdeOptions ) {
+ // 将serde options 进行转义
+ HiveOptions.openCSVSerdeOptions.foreach {
+ k =>
+ options.get(k).foreach(v => options = options.updated(k, StringEscapeUtil.unescapeJava(v)))
+ }
+ }
// 如果需要分区 进行partitionBy
options.get(PARTITION_BY_COL).map(partitionColumn => partitionColumn.split(",").filterNot(_.isEmpty))
@@ -44,3 +86,21 @@ class HiveDataSource extends DataReader with DataWriter{
override def shortFormat: String = fullFormat
}
+
+object HiveOptions {
+
+ val delimiterOptions = Map(
+ "fieldDelim" -> "field.delim",
+ "escapeDelim" -> "escape.delim",
+ // The following typo is inherited from Hive...
+ "collectionDelim" -> "colelction.delim",
+ "mapkeyDelim" -> "mapkey.delim",
+ "lineDelim" -> "line.delim")
+
+ val openCSVSerdeOptions = Set(
+ OpenCSVSerde.SEPARATORCHAR, // separatorChar
+ OpenCSVSerde.QUOTECHAR, // quoteChar
+ OpenCSVSerde.ESCAPECHAR // escapeChar
+ )
+
+}
\ No newline at end of file
diff --git a/external/hive-exec/pom.xml b/external/hive-exec/pom.xml
new file mode 100644
index 0000000..0f56a3a
--- /dev/null
+++ b/external/hive-exec/pom.xml
@@ -0,0 +1,23 @@
+
+
+
+ DataLinked
+ tech.ides
+ 1.0.0
+ ../../pom.xml
+
+ 4.0.0
+
+ hive-exec-${spark.big.version}_${scala.binary.version}
+
+
+
+ org.apache.spark
+ spark-hive_${scala.binary.version}
+ ${spark.version}
+ provided
+
+
+
\ No newline at end of file
diff --git a/external/hive-exec/src/main/java/org/apache/hadoop/hive/custom/inputformat/MultiLineCSVInputFormat.java b/external/hive-exec/src/main/java/org/apache/hadoop/hive/custom/inputformat/MultiLineCSVInputFormat.java
new file mode 100644
index 0000000..b2ca36b
--- /dev/null
+++ b/external/hive-exec/src/main/java/org/apache/hadoop/hive/custom/inputformat/MultiLineCSVInputFormat.java
@@ -0,0 +1,62 @@
+package org.apache.hadoop.hive.custom.inputformat;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.custom.serde.OpenCSVSerde;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
+import org.apache.hadoop.mapred.*;
+import tech.sqlclub.common.utils.StringEscapeUtil;
+
+import java.io.IOException;
+import java.util.Locale;
+
+/**
+ * 处理csv字段带有换行符的InputFormat
+ *
+ * An {@link InputFormat} for plain text csv files. Files are broken into lines.
+ * Either linefeed or carriage-return are used to signal end of line. Keys are
+ * the position in the file, and values are the line of text..
+ *
+ * @see org.apache.hadoop.mapred.TextInputFormat
+ * Created by songgr on 2020/12/01.
+ */
+
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class MultiLineCSVInputFormat extends FileInputFormat
+ implements JobConfigurable {
+
+ private CompressionCodecFactory compressionCodecs = null;
+
+ @Override
+ public void configure(JobConf conf) {
+ compressionCodecs = new CompressionCodecFactory(conf);
+ }
+
+ @Override
+ protected boolean isSplitable(FileSystem fs, Path file) {
+ final CompressionCodec codec = compressionCodecs.getCodec(file);
+ if (null == codec) {
+ return true;
+ }
+ return codec instanceof SplittableCompressionCodec;
+ }
+
+ @Override
+ public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ reporter.setStatus(split.toString());
+
+ String separatorChar = StringEscapeUtil.unescapeJava(job.get(OpenCSVSerde.SEPARATORCHAR.toLowerCase(Locale.ROOT)));
+ String quoteChar = StringEscapeUtil.unescapeJava(job.get(OpenCSVSerde.QUOTECHAR.toLowerCase(Locale.ROOT)));
+ String escapeChar = StringEscapeUtil.unescapeJava(job.get(OpenCSVSerde.ESCAPECHAR.toLowerCase(Locale.ROOT)));
+
+ // 核心是RecordReader 切分每一行
+ return new MultiLineCSVRecordReader(job, (FileSplit) split, separatorChar, quoteChar, escapeChar);
+ }
+}
diff --git a/external/hive-exec/src/main/java/org/apache/hadoop/hive/custom/serde/OpenCSVSerde.java b/external/hive-exec/src/main/java/org/apache/hadoop/hive/custom/serde/OpenCSVSerde.java
new file mode 100644
index 0000000..4a320e4
--- /dev/null
+++ b/external/hive-exec/src/main/java/org/apache/hadoop/hive/custom/serde/OpenCSVSerde.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.custom.serde;
+
+import au.com.bytecode.opencsv.CSVReader;
+import au.com.bytecode.opencsv.CSVWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * OpenCSVSerde use opencsv to deserialize CSV format.
+ * Users can specify custom separator, quote or escape characters. And the default separator(\),
+ * quote("), and escape characters(\) are the same as the opencsv library.
+ *
+ * @see org.apache.hadoop.hive.serde2.OpenCSVSerde
+ */
+@SerDeSpec(schemaProps = {
+ serdeConstants.LIST_COLUMNS,
+ OpenCSVSerde.SEPARATORCHAR, OpenCSVSerde.QUOTECHAR, OpenCSVSerde.ESCAPECHAR})
+public final class OpenCSVSerde extends AbstractSerDe {
+
+ public static final Logger LOG = LoggerFactory.getLogger(OpenCSVSerde.class.getName());
+ private ObjectInspector inspector;
+ private String[] outputFields;
+ private int numCols;
+ private List row;
+
+ private char separatorChar;
+ private char quoteChar;
+ private char escapeChar;
+ /**
+ * 字段分割符
+ */
+ public static final String SEPARATORCHAR = "separatorChar";
+ /**
+ * 字符引用符,字段值使用该字符包裹
+ */
+ public static final String QUOTECHAR = "quoteChar";
+ /**
+ * 字段逃逸字符,使用该字符对字段值进行转义
+ */
+ public static final String ESCAPECHAR = "escapeChar";
+
+ public static final char DEFAULT_SEPARATOR = ',';
+ public static final char DEFAULT_QUOTE_CHARACTER = '"';
+ public static final char DEFAULT_ESCAPE_CHARACTER = '\\';
+
+ @Override
+ public void initialize(final Configuration conf, final Properties tbl) throws SerDeException {
+
+ final List columnNames = Arrays.asList(tbl.getProperty(serdeConstants.LIST_COLUMNS)
+ .split(","));
+
+ numCols = columnNames.size();
+
+ final List columnOIs = new ArrayList(numCols);
+
+ for (int i = 0; i < numCols; i++) {
+ columnOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+ }
+
+ inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs);
+ outputFields = new String[numCols];
+ row = new ArrayList(numCols);
+
+ for (int i = 0; i < numCols; i++) {
+ row.add(null);
+ }
+
+ // tbl都转化成了小写
+ separatorChar = getProperty(tbl, SEPARATORCHAR.toLowerCase(Locale.ROOT), DEFAULT_SEPARATOR);
+ quoteChar = getProperty(tbl, QUOTECHAR.toLowerCase(Locale.ROOT), DEFAULT_QUOTE_CHARACTER);
+ escapeChar = getProperty(tbl, ESCAPECHAR.toLowerCase(Locale.ROOT), DEFAULT_ESCAPE_CHARACTER);
+ }
+
+ private char getProperty(final Properties tbl, final String property, final char def) {
+ final String val = tbl.getProperty(property);
+
+ if (val != null) {
+ return val.charAt(0);
+ }
+
+ return def;
+ }
+
+ @Override
+ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
+ final StructObjectInspector outputRowOI = (StructObjectInspector) objInspector;
+ final List extends StructField> outputFieldRefs = outputRowOI.getAllStructFieldRefs();
+
+ if (outputFieldRefs.size() != numCols) {
+ throw new SerDeException("Cannot serialize the object because there are "
+ + outputFieldRefs.size() + " fields but the table has " + numCols + " columns.");
+ }
+
+ // Get all data out.
+ for (int c = 0; c < numCols; c++) {
+ final Object field = outputRowOI.getStructFieldData(obj, outputFieldRefs.get(c));
+ final ObjectInspector fieldOI = outputFieldRefs.get(c).getFieldObjectInspector();
+
+ // The data must be of type String
+ final StringObjectInspector fieldStringOI = (StringObjectInspector) fieldOI;
+
+ // Convert the field to Java class String, because objects of String type
+ // can be stored in String, Text, or some other classes.
+ outputFields[c] = fieldStringOI.getPrimitiveJavaObject(field);
+ }
+
+ final StringWriter writer = new StringWriter();
+ final CSVWriter csv = newWriter(writer, separatorChar, quoteChar, escapeChar);
+
+ try {
+ csv.writeNext(outputFields);
+ csv.close();
+
+ return new Text(writer.toString());
+ } catch (final IOException ioe) {
+ throw new SerDeException(ioe);
+ }
+ }
+
+ @Override
+ public Object deserialize(final Writable blob) throws SerDeException {
+ Text rowText = (Text) blob;
+
+ CSVReader csv = null;
+ try {
+ csv = newReader(new CharArrayReader(rowText.toString().toCharArray()), separatorChar,
+ quoteChar, escapeChar);
+ final String[] read = csv.readNext();
+
+ for (int i = 0; i < numCols; i++) {
+ if (read != null && i < read.length) {
+ row.set(i, read[i]);
+ } else {
+ row.set(i, null);
+ }
+ }
+
+ return row;
+ } catch (final Exception e) {
+ throw new SerDeException(e);
+ } finally {
+ if (csv != null) {
+ try {
+ csv.close();
+ } catch (final Exception e) {
+ LOG.error("fail to close csv writer ", e);
+ }
+ }
+ }
+ }
+
+ private CSVReader newReader(final Reader reader, char separator, char quote, char escape) {
+ // CSVReader will throw an exception if any of separator, quote, or escape is the same, but
+ // the CSV format specifies that the escape character and quote char are the same... very weird
+ if (CSVWriter.DEFAULT_ESCAPE_CHARACTER == escape) {
+ return new CSVReader(reader, separator, quote);
+ } else {
+ return new CSVReader(reader, separator, quote, escape);
+ }
+ }
+
+ private CSVWriter newWriter(final Writer writer, char separator, char quote, char escape) {
+ if (CSVWriter.DEFAULT_ESCAPE_CHARACTER == escape) {
+ return new CSVWriter(writer, separator, quote, "");
+ } else {
+ return new CSVWriter(writer, separator, quote, escape, "");
+ }
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() throws SerDeException {
+ return inspector;
+ }
+
+ @Override
+ public Class extends Writable> getSerializedClass() {
+ return Text.class;
+ }
+
+ @Override
+ public SerDeStats getSerDeStats() {
+ return null;
+ }
+}
diff --git a/external/hive-exec/src/main/java/org/apache/hadoop/mapred/MultiLineCSVRecordReader.java b/external/hive-exec/src/main/java/org/apache/hadoop/mapred/MultiLineCSVRecordReader.java
new file mode 100644
index 0000000..26114d2
--- /dev/null
+++ b/external/hive-exec/src/main/java/org/apache/hadoop/mapred/MultiLineCSVRecordReader.java
@@ -0,0 +1,321 @@
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.custom.serde.OpenCSVSerde;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * 处理csv字段带有换行符的RecordReader
+ *
+ * Treats keys as offset in file and value as line.
+ * Can correctly handle the value which contains one of CR, LF, or CRLF.
+ *
+ * @see org.apache.hadoop.mapred.LineRecordReader
+ * Created by songgr on 2020/12/10.
+ */
+public class MultiLineCSVRecordReader implements RecordReader {
+ private static final Log LOG = LogFactory.getLog(MultiLineCSVRecordReader.class.getName());
+
+ private CompressionCodecFactory compressionCodecs;
+ private CompressionCodec codec;
+ private long start;
+ private long pos;
+ private long end;
+ private NewLineReader in;
+ private FSDataInputStream fileIn;
+ /**
+ * 单行最大字节数
+ * 默认为 Integer.MAX_VALUE
+ */
+ private int maxLineLength;
+
+ private char separatorChar;
+ private char quoteChar;
+ private char escapeChar;
+ /**
+ * 默认行分割符为'\n'
+ */
+ private final char lineSep = '\n';
+ private final int lineSepLen = 1;
+
+ public MultiLineCSVRecordReader(Configuration job, FileSplit split,
+ String separatorChar,
+ String quoteChar,
+ String escapeChar) throws IOException {
+
+ this.separatorChar = separatorChar != null && separatorChar.length() >0 ?
+ separatorChar.charAt(0) : OpenCSVSerde.DEFAULT_SEPARATOR;
+ this.quoteChar = quoteChar != null && quoteChar.length() >0 ?
+ quoteChar.charAt(0) : OpenCSVSerde.DEFAULT_QUOTE_CHARACTER;
+ this.escapeChar = escapeChar != null && escapeChar.length() >0 ?
+ escapeChar.charAt(0) : OpenCSVSerde.DEFAULT_ESCAPE_CHARACTER;
+
+ this.maxLineLength = job.getInt("mapreduce.input.linerecordreader.line.maxlength",
+ Integer.MAX_VALUE);
+ start = split.getStart();
+ end = start + split.getLength();
+ final Path file = split.getPath();
+ compressionCodecs = new CompressionCodecFactory(job);
+ codec = compressionCodecs.getCodec(file);
+
+ // open the file and seek to the start of the split
+ final FileSystem fs = file.getFileSystem(job);
+ fileIn = fs.open(file);
+
+ boolean skipFirstLine = false;
+ if ( isCompressedInput() ) {
+ in = new NewLineReader(codec.createInputStream(fileIn), job);
+ end = Long.MAX_VALUE;
+ } else {
+ if (start != 0) {
+ skipFirstLine = true;
+ // - lineSep
+ this.start -= lineSepLen;
+ fileIn.seek(start);
+ }
+ in = new NewLineReader(fileIn, job);
+ }
+ if (skipFirstLine) {
+ // skip first line and re-establish "start".
+ start += in.readLine(new Text(), 0,
+ (int) Math.min((long) Integer.MAX_VALUE, end - start));
+ }
+ this.pos = start;
+ }
+
+ @Override
+ public boolean next(LongWritable key, Text value) throws IOException {
+ key.set(pos);
+ int newSize = 0;
+ while (pos < end) {
+ newSize = in.readLine(value, maxLineLength,
+ Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
+ maxLineLength));
+ if (newSize == 0) {
+ break;
+ }
+ pos += newSize;
+ if (newSize < maxLineLength) {
+ break;
+ }
+ LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
+ }
+ if (newSize == 0) {
+ key = null;
+ value = null;
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ @Override
+ public LongWritable createKey() {
+ return new LongWritable();
+ }
+
+ @Override
+ public Text createValue() {
+ return new Text();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return pos;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ in.close();
+ in = null;
+ }
+ if (fileIn != null) {
+ fileIn.close();
+ fileIn = null;
+ }
+ compressionCodecs = null;
+ codec = null;
+ LOG.info("MultiLineCSVRecordReader close all resources successfully.");
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ if (start == end) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (pos - start) / (float) (end - start));
+ }
+ }
+
+ private boolean isCompressedInput() {
+ return (codec != null);
+ }
+
+ public class NewLineReader {
+ // 默认buffer大小 64Kb
+ private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ private int bufferSize;
+ private InputStream in;
+ private byte[] buffer;
+ // 当前buffer长度
+ private int bufferLength = 0;
+ // buffer 位置
+ private int bufferPosn = 0;
+
+ public NewLineReader(InputStream in) {
+ this(in, DEFAULT_BUFFER_SIZE);
+ }
+
+ public NewLineReader(InputStream in, int bufferSize) {
+ this.in = in;
+ this.bufferSize = bufferSize;
+ this.buffer = new byte[this.bufferSize];
+ }
+
+ public NewLineReader(InputStream in, Configuration conf)
+ throws IOException {
+ this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
+ }
+
+ public void close() throws IOException {
+ in.close();
+ }
+
+ public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
+ throws IOException {
+ str.clear();
+ Text record = new Text();
+ int txtLength = 0;
+ // 消耗的字节数
+ long bytesConsumed = 0L;
+ // 是否是新的一行
+ boolean newline = false;
+ // 行分割符 位置
+ int sepPosn = 0;
+ boolean wholeQuote = true; // 是否引用完整
+ do {
+ // 已经读到buffer的末尾了,读下一个buffer
+ if (this.bufferPosn >= this.bufferLength) {
+ bufferPosn = 0;
+ bufferLength = in.read(buffer);
+ // 读到文件末尾了,则跳出,进行下一个文件的读取
+ if (bufferLength <= 0) {
+ break;
+ }
+ }
+ int startPosn = this.bufferPosn;
+ boolean forceSplit = false;
+ for (; bufferPosn < bufferLength && !forceSplit ; bufferPosn++) {
+ // 如果是字段分割符直接跳过
+ if (buffer[bufferPosn] == separatorChar) {
+ continue;
+ }
+ // 遇到引用符的第一个字符 或者 上一次引号由于buffer满了被切分了
+ if (buffer[bufferPosn] == quoteChar || !wholeQuote) {
+ wholeQuote = false;
+ // 判断接下来的字符是否也是引号中的字符
+ for (int i = 1; !wholeQuote; i++) {
+ // buffer满了,且引号符被不幸地切成了两半
+ if (bufferPosn + i >= bufferLength) {
+ bufferPosn += i - 1; // 回到上一步
+ break;
+ }
+ // 遇到转义字符 注意⚠️:需要先判断转义,不然如果转义和引号符相同的时候就会有问题!
+ if ( buffer[bufferPosn+i] == escapeChar) {
+ // 判断buffer是否满 注意这边直接放了两个字符
+ if ( bufferPosn + i + 1 >= bufferLength) {
+ forceSplit = true;
+ bufferPosn += i-1; // 回到上一步
+ }
+ i ++ ; // 指针需要往后移一位
+ continue; // 注意⚠️:️ 只要往后一步 continue 直接到了转义字符的下一位
+ }
+
+ // 遇上引号符 && 下一个字符是字段分割符 => 引号已经完整
+ if (buffer[bufferPosn+i] == quoteChar && buffer[bufferPosn+i+1] == separatorChar) {
+ wholeQuote = true;
+ // sepPosn重置 避免由于上次由于引号却分导致sepPosn不一致
+ sepPosn = 0;
+ bufferPosn = bufferPosn + i;
+ break;
+ }
+
+ // 遇上引号符 && 下一个字符是换行符 => 引号完整/直接是下一行
+ // 注意⚠️: 这里要多判断一位是不是引号符,不然字段里出现引号符加换行符就会有问题!
+ if (buffer[bufferPosn+i] == quoteChar && buffer[bufferPosn+i+1] == lineSep && buffer[bufferPosn+i+2] == quoteChar) {
+ wholeQuote = true;
+ newline = true;
+ // sepPosn重置 避免由于上次由于引号却分导致sepPosn不一致
+ sepPosn = 0;
+ bufferPosn = bufferPosn + i + 1; // 移到换行符上
+ break;
+ }
+
+ // 如果遇到最后一个换行符 直接结束,并且设置wholeQuote=true
+ if (buffer[bufferPosn+i] == lineSep && (bufferPosn+i == bufferLength-1)) {
+ wholeQuote = true;
+ sepPosn = 0;
+ bufferPosn = bufferPosn + i;
+ break;
+ }
+ }
+ }
+
+ // 遇到行分隔符的第一个字符
+ if (buffer[bufferPosn] == lineSep || newline ) {
+ bufferPosn++;
+ sepPosn++;
+ // 的确遇到了行分隔符 ??
+ if (sepPosn == lineSepLen && wholeQuote) {
+ newline = true;
+ sepPosn = 0;
+ break;
+ }
+ }
+ }
+ int readLength = this.bufferPosn - startPosn; // 总共读到的长度
+ bytesConsumed += readLength; // 加上之前消耗的字节数
+ // 控制 maxLineLength 范围
+ if (readLength > maxLineLength - txtLength) {
+ readLength = maxLineLength - txtLength;
+ }
+ if (readLength > 0) {
+ // 从buffer的startPosn位置开始取出真实需要读取的长度 加入到record
+ record.append(this.buffer, startPosn, readLength);
+ txtLength += readLength;
+ // 去掉记录的分隔符
+ if (newline) {
+ // - lineSep
+ str.set(record.getBytes(), 0, record.getLength()
+ - lineSepLen);
+ }
+ }
+ } while (!newline && (bytesConsumed < maxBytesToConsume));
+ if (bytesConsumed > (long) Integer.MAX_VALUE) {
+ throw new IOException("Too many bytes before newline: "
+ + bytesConsumed);
+ }
+
+ return (int) bytesConsumed;
+ }
+
+ public int readLine(Text str, int maxLineLength) throws IOException {
+ return readLine(str, maxLineLength, Integer.MAX_VALUE);
+ }
+
+ public int readLine(Text str) throws IOException {
+ return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index e6804f7..eaa1c3e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,6 +36,7 @@
engine
repl
external/ds-spark-excel
+ external/hive-exec