Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ object ScriptConstants {

val IMPL_CLASS = "implClass"

val FILE_FORMAT = "fileFormat"

val PARTITION_BY_COL = "partitionByCol"

def PATH_SEPARATOR = File.pathSeparator
Expand Down
5 changes: 5 additions & 0 deletions engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@
<artifactId>ds-spark-excel-${spark.big.version}_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>tech.ides</groupId>
<artifactId>hive-exec-${spark.big.version}_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package tech.ides.datasource.impl

import org.apache.hadoop.hive.custom.inputformat.MultiLineCSVInputFormat
import org.apache.hadoop.hive.custom.serde.OpenCSVSerde
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, Row}
import tech.ides.datasource._
import tech.ides.datasource.DataSource.Method.{SINK, SOURCE}
import tech.ides.constants.ScriptConstants.{IMPL_CLASS, PARTITION_BY_COL}
import tech.ides.constants.ScriptConstants.{FILE_FORMAT, IMPL_CLASS, PARTITION_BY_COL}
import tech.ides.exception.IdesException
import tech.sqlclub.common.utils.StringEscapeUtil

/**
* Hive 数据源
Expand All @@ -15,22 +21,58 @@ import tech.ides.constants.ScriptConstants.{IMPL_CLASS, PARTITION_BY_COL}
sinceVersion = "1.0.0"
)
class HiveDataSource extends DataReader with DataWriter{

override def load(reader: DataFrameReader, config: DataSourceConfig): DataFrame = {
val format = config.config.getOrElse(IMPL_CLASS, fullFormat)
reader.options(config.config).format(format).table(config.path)
}

override def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Unit = {

val fileFormat = config.config.getOrElse("fileFormat", "parquet")
val fileFormat = config.config.getOrElse(FILE_FORMAT, "parquet")

var options = config.config - "fileFormat" - IMPL_CLASS
var options = config.config - FILE_FORMAT - IMPL_CLASS

// 只支持'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' 'avro' 6种类型
// 参考:https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html
options += ("fileFormat" -> ( if ("csv" == fileFormat) "textfile" else fileFormat ))
options += (FILE_FORMAT -> ( if ("csv" == fileFormat) "textfile" else fileFormat )) // csv转成textfile格式

val containsDelimiters = HiveOptions.delimiterOptions.keys.exists(options.contains)
if (containsDelimiters && "textfile" != options(FILE_FORMAT) ) {
throw new IdesException(s"""These options:[${HiveOptions.delimiterOptions.keys.mkString(", ")}] can only be used with "textfile/csv" fileFormat. They define how to read delimited files into rows.""")
}

if (containsDelimiters) {
// fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim
HiveOptions.delimiterOptions.keys.foreach {
// 将delimiter options都进行反向转义
k =>
options.get(k).foreach(v => options = options.updated(k, StringEscapeUtil.unescapeJava(v)))
}
}

if ( "csv" == fileFormat && options.getOrElse("multiline", "false").toBoolean ) {
// 先判断所有字段是否是String类型
val table = config.df.get
val notStringCols = table.schema.filterNot(_.dataType == StringType)
if (notStringCols.nonEmpty) throw new IdesException("All fields must be String type when stored as csv type!")
// 解决csv中存在换行问题
options = options.updated("serde", classOf[OpenCSVSerde].getName) // 使用自定义serde
// 使用自定义FileInputFormat
options = options.updated("inputFormat", classOf[MultiLineCSVInputFormat].getName)
options = options.updated("outputFormat", classOf[HiveIgnoreKeyTextOutputFormat[_,_]].getName)

options = options - FILE_FORMAT // 不能同时指定 fileFormat 和 inputFormat/outputFormat
}

//todo 这里有点问题 \t 变成了\\t
val containsSerdeOptions = HiveOptions.openCSVSerdeOptions.exists(options.contains)
if ( containsSerdeOptions ) {
// 将serde options 进行转义
HiveOptions.openCSVSerdeOptions.foreach {
k =>
options.get(k).foreach(v => options = options.updated(k, StringEscapeUtil.unescapeJava(v)))
}
}

// 如果需要分区 进行partitionBy
options.get(PARTITION_BY_COL).map(partitionColumn => partitionColumn.split(",").filterNot(_.isEmpty))
Expand All @@ -44,3 +86,21 @@ class HiveDataSource extends DataReader with DataWriter{

override def shortFormat: String = fullFormat
}

object HiveOptions {

val delimiterOptions = Map(
"fieldDelim" -> "field.delim",
"escapeDelim" -> "escape.delim",
// The following typo is inherited from Hive...
"collectionDelim" -> "colelction.delim",
"mapkeyDelim" -> "mapkey.delim",
"lineDelim" -> "line.delim")

val openCSVSerdeOptions = Set(
OpenCSVSerde.SEPARATORCHAR, // separatorChar
OpenCSVSerde.QUOTECHAR, // quoteChar
OpenCSVSerde.ESCAPECHAR // escapeChar
)

}
23 changes: 23 additions & 0 deletions external/hive-exec/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>DataLinked</artifactId>
<groupId>tech.ides</groupId>
<version>1.0.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>hive-exec-${spark.big.version}_${scala.binary.version}</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.apache.hadoop.hive.custom.inputformat;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.custom.serde.OpenCSVSerde;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapred.*;
import tech.sqlclub.common.utils.StringEscapeUtil;

import java.io.IOException;
import java.util.Locale;

/**
* 处理csv字段带有换行符的InputFormat
*
* An {@link InputFormat} for plain text csv files. Files are broken into lines.
* Either linefeed or carriage-return are used to signal end of line. Keys are
* the position in the file, and values are the line of text..
*
* @see org.apache.hadoop.mapred.TextInputFormat
* Created by songgr on 2020/12/01.
*/

@InterfaceAudience.Public
@InterfaceStability.Stable
public class MultiLineCSVInputFormat extends FileInputFormat<LongWritable, Text>
implements JobConfigurable {

private CompressionCodecFactory compressionCodecs = null;

@Override
public void configure(JobConf conf) {
compressionCodecs = new CompressionCodecFactory(conf);
}

@Override
protected boolean isSplitable(FileSystem fs, Path file) {
final CompressionCodec codec = compressionCodecs.getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}

@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(split.toString());

String separatorChar = StringEscapeUtil.unescapeJava(job.get(OpenCSVSerde.SEPARATORCHAR.toLowerCase(Locale.ROOT)));
String quoteChar = StringEscapeUtil.unescapeJava(job.get(OpenCSVSerde.QUOTECHAR.toLowerCase(Locale.ROOT)));
String escapeChar = StringEscapeUtil.unescapeJava(job.get(OpenCSVSerde.ESCAPECHAR.toLowerCase(Locale.ROOT)));

// 核心是RecordReader 切分每一行
return new MultiLineCSVRecordReader(job, (FileSplit) split, separatorChar, quoteChar, escapeChar);
}
}
Loading