Does Spark Maintain Partition From Hive When Reading
Incoming data is unremarkably in a format unlike than we would similar for long-term storage. The offset step that nosotros usually do is transform the data into a format such as Parquet that can easily exist queried past Hive/Impala.
The apply case we imagined is when we are ingesting data in Avro format. The users want piece of cake access to the information with Hive or Spark. To take performant queries we demand the historical information to be in Parquet format. We don't want to accept 2 different tables: one for the historical data in Parquet format and one for the incoming information in Avro format. Our preference goes out to having 1 table which can handle all information, no matter the format.
This fashion we tin can run our conversion process (from Avro to Parquet) let'southward say every night, just the users would still become access to all information all the fourth dimension.
In Hive yous can achieve this with a partitioned tabular array, where you can set the format of each partition. Spark unfortunately doesn't implement this. Since our users also use Spark, this was something we had to fix. This was also a nice challenge for a couple of GoDataDriven Fri's where we could then acquire more well-nigh the internals of Apache Spark.
Acquire Spark or Python in only one twenty-four hour period
Develop Your Data Scientific discipline Capabilities. **Online**, teacher-led on 23 or 26 March 2020, 09:00 - 17:00 CET.
1-24-hour interval live grooming courses
Setting upwardly a exam surround
Offset we had to identify what we need to be able to reproduce the problem. We needed the following components:
- Hive with persistent Hive metastore
- Hadoop to be able to store and access the files
- Spark
We're using MacBook Pro's and we had to practice the post-obit steps:
Install Hadoop, Hive, Spark and create a local HDFS directory
$ brew install hadoop $ brew install hive $ brew install apache-spark $ mkdir ${ HOME }/localhdfs
Run the Hive Metastore in Docker
We want the Hive Metastore to use PostgreSQL to be able to admission it from Hive and Spark simultaneously. We found a docker image, just this wasn't the latest version, and so we forked information technology and upgraded information technology to the latest version. You can find this docker image on GitHub (source code is at link).
To run this image, use (annotation that we exposed port 5432 so nosotros tin use this for Hive):
$ docker pull krisgeus/docker-hive-metastore-postgresql:upgrade-2.three.0 $ docker run -p 5432:5432 krisgeus/docker-hive-metastore-postgresql:upgrade-2.3.0
Configuring Hive to utilize the Hive Metastore
- Download postgresql-42.two.4.jar from this link
- Add this jar to Hive lib directory (in our case the Hive version was 2.3.1)
$ cp postgresql-42.2.4.jar /usr/local/Cellar/hive//libexec/lib.
- Create a working directory
$ mkdir ${ Dwelling }/spark-hive-schema $ cd ${ HOME }/spark-hive-schema
- Create a configuration directory and re-create hadoop and hive base configurations
$ mkdir hadoop_conf $ cp -R /usr/local/Cellar/hadoop/3.0.0/libexec/etc/hadoop/* ${ HOME }/spark-hive-schema/hadoop_conf $ cp -R /usr/local/Cellar/hive/2.3.1/libexec/conf/* ${ Dwelling }/spark-hive-schema/hadoop_conf $ cp conf/hive-default.xml.template ${ HOME }/spark-hive-schema/hadoop_conf/hive-site.xml
- Change configurations in hive-site.xml so we actually utilise the Hive Metastore nosotros just started
system:java.io.tmpdir /tmp/hive/coffee organization:user.proper noun ${user.proper name} hive.metastore.warehouse.dir ${user.home}/localhdfs/user/hive/warehouse location of default database for the warehouse javax.jdo.choice.ConnectionUserName hive Username to use against metastore database javax.jdo.option.ConnectionURL jdbc:postgresql://localhost:5432/metastore JDBC connect cord for a JDBC metastore. To use SSL to encrypt/authenticate the connection, provide database-specific SSL flag in the connection URL. For case, jdbc:postgresql://myhost/db?ssl=true for postgres database. datanucleus.connectionPoolingType NONE Expects one of [bonecp, dbcp, hikaricp, none]. Specify connection puddle library for datanucleus javax.jdo.option.ConnectionDriverName org.postgresql.Driver Driver class name for a JDBC metastore javax.jdo.option.ConnectionUserName hive Username to use against metastore database
- Make /tmp/hive writable:
- In a terminal ready paths so nosotros tin commencement HiveServer2, where hadoop_version=iii.0.0, hive_version=2.3.1
$ export HADOOP_HOME =/usr/local/Cellar/hadoop//libexec $ consign HIVE_HOME =/usr/local/Cellar/hive/ /libexec $ export HADOOP_CONF_DIR = ${ Dwelling house }/spark-hive-schema/hadoop_conf $ export HIVE_CONF_DIR = ${ HOME }/spark-hive-schema/hadoop_conf $ hiveserver2
- In another terminal fix the same paths and commencement beeline, where hadoop_version=3.0.0, hive_version=2.3.1
$ export HADOOP_HOME =/usr/local/Cellar/hadoop//libexec $ export HIVE_HOME =/usr/local/Cellar/hive/ /libexec $ export HADOOP_CONF_DIR = ${ Abode }/spark-hive-schema/hadoop_conf $ consign HIVE_CONF_DIR = ${ Domicile }/spark-hive-schema/hadoop_conf $ beeline -u jdbc:hive2://localhost:10000/default -n hive -p hive
We're all fix upwards...nosotros can now create a table.
Creating a working example in Hive
- In beeline create a database and a tabular array
CREATE DATABASE exam ; Employ exam ; CREATE EXTERNAL Tabular array IF NOT EXISTS events ( eventType String , metropolis STRING ) PARTITIONED By ( dt STRING ) STORED Every bit PARQUET ;
- Add two parquet partitions
ALTER Table events ADD Partition ( dt = '2018-01-25' ) Partition ( dt = '2018-01-26' ); ALTER TABLE events Sectionalization ( dt = '2018-01-25' ) SET FILEFORMAT PARQUET ; ALTER Table events Sectionalisation ( dt = '2018-01-26' ) Set FILEFORMAT PARQUET ;
- Add a partition where we'll add together Avro data
ALTER Table events ADD PARTITION ( dt = '2018-01-27' ); ALTER Tabular array events PARTITION ( dt = '2018-01-27' ) SET FILEFORMAT AVRO ;
- Cheque the table
events.eventtype | events.metropolis | events.dt |
---|---|---|
Depict FORMATTED events PARTITION ( dt = "2018-01-26" );
Column proper noun | Data type |
---|---|
# Storage Information | |
SerDe Library: | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |
InputFormat: | org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat |
OutputFormat: | org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat |
Draw FORMATTED events PARTITION ( dt = "2018-01-27" );
Column proper noun | Data type |
---|---|
# Storage Data | |
SerDe Library: | org.apache.hadoop.hive.serde2.avro.AvroSerDe |
InputFormat: | org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat |
OutputFormat: | org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat |
Create some test data
- Create test data directory
mkdir ${ HOME }/spark-hive-schema/testdata
- Generate Avro data and add to table
$ cat ${ Domicile }/spark-hive-schema/testdata/data.json { "eventtype": "avro", "metropolis": "Breukelen" } { "eventtype": "avro", "metropolis": "Wilnis" } { "eventtype": "avro", "city": "Abcoude" } { "eventtype": "avro", "city": "Vinkeveen" } $ cat ${ Dwelling house }/spark-hive-schema/testdata/data.avsc { "blazon" : "record", "name" : "events", "namespace" : "com.godatadriven.events", "fields" : [ { "name" : "eventtype", "type" : "cord" }, { "name" : "urban center", "blazon" : "string" }] $ brew install avro-tools $ cd ${ HOME }/spark-hive-schema/testdata $ avro-tools fromjson --schema-file data.avsc data.json > \ ${ HOME }/localhdfs/user/hive/warehouse/exam.db/events/dt\=2018-01-27/information.avro
- Generate parquet data and add together to tabular array
$ cd ${ Abode }/spark-hive-schema/testdata $ spark-beat out > import org.apache.spark.sql.functions.lit > spark.read.json( "data.json" ).select(lit( "parquet" ).allonym( "eventtype" ), col( "metropolis" )).write.parquet( "data.pq" ) > :quit $ cp ./information.pq/part*.parquet ${ HOME }/localhdfs/user/hive/warehouse/test.db/events/dt\=2018-01-26/
- Insert data into terminal existing division using beeline
INSERT INTO Tabular array events PARTITION ( dt = "2018-01-25" ) SELECT 'overwrite' , 'Amsterdam' ;
- Check that nosotros take information in beeline
events.eventtype | events.city | events.dt |
---|---|---|
overwrite | Amsterdam | 2018-01-25 |
parquet | Breukelen | 2018-01-26 |
parquet | Wilnis | 2018-01-26 |
parquet | Abcoude | 2018-01-26 |
parquet | Vinkeveen | 2018-01-26 |
avro | Breukelen | 2018-01-27 |
avro | Wilnis | 2018-01-27 |
avro | Abcoude | 2018-01-27 |
avro | Vinkeveen | 2018-01-27 |
Yuhee we run across all the information!!!
- Double check that the formats are correct
$ tree ${ Domicile }/localhdfs/user/hive/warehouse/test.db/events .../localhdfs/user/hive/warehouse/test.db/events ├── dt =2018-01-25 │ └── 000000_0 ├── dt =2018-01-26 │ └── part-00000-1846ef38-ec33-47ae-aa80-3f72ddb50c7d-c000.snappy.parquet └── dt =2018-01-27 └── data.avro
Creating a failing test in Spark
Connect to spark and make certain we admission the Hive Metastore we set upward:
$ export HADOOP_HOME =/usr/local/Cellar/hadoop//libexec $ export HIVE_HOME =/usr/local/Cellar/hive/ /libexec $ consign HADOOP_CONF_DIR = ${ Domicile }/spark-hive-schema/hadoop_conf $ export HIVE_CONF_DIR = ${ Dwelling house }/spark-hive-schema/hadoop_conf $ spark-shell --commuter-class-path /usr/local/Cellar/hive/two.iii.1/libexec/lib/postgresql-42.2.4.jar \ --jars /usr/local/Cellar/hive/2.three.1/libexec/lib/postgresql-42.2.iv.jar \ --conf spark.executor.extraClassPath=/usr/local/Cellar/hive/2.3.i/libexec/lib/postgresql-42.two.4.jar \ --conf spark.hadoop.javax.jdo.choice.ConnectionURL=jdbc:postgresql://localhost:5432/metastore \ --conf spark.hadoop.javax.jdo.pick.ConnectionUserName=hive \ --conf spark.hadoop.javax.jdo.choice.ConnectionPassword=hive \ --conf spark.hadoop.javax.jdo.option.ConnectionDriverName=org.postgresql.Driver \ --conf spark.hadoop.hive.metastore.schema.verification= true \ --conf spark.hadoop.hive.metastore.schema.verification.tape.version= truthful \ --conf spark.sql.hive.metastore.version=2.one.0 \ --conf spark.sql.hive.metastore.jars=maven > spark.sql( "select * from test.events" ).show() ... java.lang.RuntimeException: file:...localhdfs/user/hive/warehouse/test.db/events/dt=2018-01-27/data.avro is not a Parquet file. expected magic number at tail [lxxx, 65, 82, 49] simply found [-126, 61, 76, 121] ...
So information technology doesn't work. Let's meet if we can bank check out the Apache Spark lawmaking base of operations and create a declining unit of measurement test.
Beginning we forked the Apache Spark projection and checked it out and made sure we have sbt installed. We also figured out how to run a given unit of measurement test.
$ git clone https://github.com/krisgeus/spark.git $ cd spark $ ./build/sbt "hive/testOnly *HiveSQLViewSuite"
Writing a test
Outset we need to create a table and change the format of a given partition.
The final test can be found at: MultiFormatTableSuite.scala
We're implemented the following steps:
- create a table with partitions
- create a tabular array based on Avro data which is really located at a partition of the previously created table. Insert some data in this table.
- create a table based on Parquet information which is really located at another partition of the previously created table. Insert some information in this table.
- endeavor to read the data from the original table with partitions
Let's try to run the test:
$ ./build/sbt "hive/testOnly *MultiFormatTableSuite ... - create hive table with multi format partitions *** FAILED *** (4 seconds, 265 milliseconds) [info] org.apache.spark.sql.goad.parser.ParseException: Operation non allowed: Modify TABLE Gear up FILEFORMAT(line 2, pos 0) [info] [info] == SQL == [info] [info] ALTER Tabular array ext_multiformat_partition_table [info] ^^^ [info] PARTITION (dt='2018-01-26') SET FILEFORMAT PARQUET ...
Then Spark doesn't support changing the file format of a partition. Before we chance into fixing this problem, let's sympathise how execution plans work in Spark.
Understanding execution plans
The all-time explanation that we plant was on the Databricks site, the commodity about Deep Dive into Spark SQL's Goad Optimizer
Here is an extract in instance y'all don't desire to read the whole article:
At the core of Spark SQL is the Catalyst optimizer, which leverages avant-garde programming language features (e.g. Scala'due south pattern matching and quasiquotes) in a novel manner to build an extensible query optimizer.
We use Catalyst'southward full general tree transformation framework in four phases, as shown below: (one) analyzing a logical plan to resolve references, (ii) logical plan optimization, (iii) concrete planning, and (4) lawmaking generation to compile parts of the query to Java bytecode. In the physical planning stage, Catalyst may generate multiple plans and compare them based on cost. All other phases are purely rule-based. Each phase uses different types of tree nodes; Catalyst includes libraries of nodes for expressions, data types, and logical and physical operators.
Spark SQL begins with a relation to be computed, either from an abstruse syntax tree (AST) returned by a SQL parser, or from a DataFrame object synthetic using the API.
Spark SQL uses Goad rules and a Catalog object that tracks the tables in all data sources to resolve these attributes. It starts by building an "unresolved logical program" tree with unbound attributes and data types, then applies rules that do the post-obit:Looking up relations past name from the catalog.
Mapping named attributes, such as col, to the input provided given operator's children.
Determining which attributes refer to the aforementioned value to requite them a unique ID (which subsequently allows optimization of expressions such every bit col = col).
Propagating and coercing types through expressions: for case, nosotros cannot know the return type of 1 + col until we have resolved col and peradventure casted its subexpressions to a compatible types.The logical optimization stage applies standard rule-based optimizations to the logical plan.
These include constant folding, predicate pushdown, project pruning, null propagation, Boolean expression simplification, and other rules.In the physical planning phase, Spark SQL takes a logical plan and generates i or more than physical plans, using concrete operators that lucifer the Spark execution engine. It then selects a plan using a toll model. At the moment, cost-based optimization is only used to select bring together algorithms: for relations that are known to be small, Spark SQL uses a broadcast join, using a peer-to-peer broadcast facility available in Spark. The framework supports broader utilize of cost-based optimization, however, as costs tin exist estimated recursively for a whole tree using a rule. We thus intend to implement richer cost-based optimization in the future.
The physical planner also performs rule-based physical optimizations, such equally pipelining projections or filters into i Spark map functioning. In add-on, information technology tin can push operations from the logical programme into data sources that support predicate or project pushdown. We will draw the API for these data sources in a after section.The final stage of query optimization involves generating Java bytecode to run on each machine.
Support setting the format for a partition in a Hive table with Spark
First we had to notice that Spark uses ANTLR to generate its SQL parser. ANTLR Another Tool for Linguistic communication Recognition tin can generate a grammar that tin exist built and walked.
The grammer for Spark is specified in SqlBase.g4
And so we demand to support FILEFORMAT in case a partition is ready, thus we had to add the post-obit line to SqlBase.g4.
| ALTER TABLE tableIdentifier (partitionSpec)? SET FILEFORMAT fileFormat
This will not only add support for setting the fileformat of a partition but likewise on a table itself. We don't demand this for our current case, merely might come in handy another time.
The AstBuilder in Spark SQL, processes the ANTLR ParseTree to obtain a Logical Programme. Since nosotros're working with Spark SQL, we had to alter SparkSqlParser which creates a SparkSqlAstBuilder which extends AstBuilder.
In the SparkSqlAstBuilder we had to create a new function to be able to interpret the grammer and add the requested step to the logical plan.
/** * Create an [[AlterTableFormatPropertiesCommand]] command. * * For case: * {{{ * ALTER Table table [Division spec] Ready FILEFORMAT format; * }}} */ override def visitSetTableFormat ( ctx : SetTableFormatContext ) : LogicalPlan = withOrigin ( ctx ) { val format = ( ctx . fileFormat ) friction match { // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format case ( c : TableFileFormatContext ) => visitTableFileFormat ( c ) // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO case ( c : GenericFileFormatContext ) => visitGenericFileFormat ( c ) example _ => throw new ParseException ( "Expected STORED AS " , ctx ) } AlterTableFormatCommand ( visitTableIdentifier ( ctx . tableIdentifier ), format , // TODO a segmentation spec is allowed to have optional values. This is currently violated. Choice ( ctx . partitionSpec ). map ( visitNonOptionalPartitionSpec )) }
But we're still not done, because we also demand a definition for the new commands. These definitions are specified in ddl.scala
and the definitions are based on the ones described in the Apache Hive Linguistic communication transmission.
So what should this command do? Well it should brand certain that the serde properties are gear up properly on the partition level.
/** * A command that sets the format of a table/view/partitioning . * * The syntax of this control is: * {{{ * Change Tabular array table [Segmentation spec] SET FILEFORMAT format; * }}} */ instance class AlterTableFormatCommand ( tableName : TableIdentifier , format : CatalogStorageFormat , partSpec : Option [ TablePartitionSpec ]) extends RunnableCommand { override def run ( sparkSession : SparkSession ) : Seq [ Row ] = { val catalog = sparkSession . sessionState . itemize val table = catalog . getTableMetadata ( tableName ) DDLUtils . verifyAlterTableType ( itemize , table , isView = false ) // For datasource tables, disallow setting serde or specifying partition if ( partSpec . isDefined && DDLUtils . isDatasourceTable ( table )) { throw new AnalysisException ( "Operation not allowed: Alter Tabular array Fix FILEFORMAT " + "for a specific partition is not supported " + "for tables created with the datasource API" ) } if ( partSpec . isEmpty ) { val newTable = table . withNewStorage ( serde = format . serde . orElse ( table . storage . serde ), inputFormat = format . inputFormat . orElse ( tabular array . storage . inputFormat ), outputFormat = format . outputFormat . orElse ( table . storage . outputFormat ), backdrop = table . storage . properties ++ format . properties ) itemize . alterTable ( newTable ) } else { val spec = partSpec . become val role = catalog . getPartition ( table . identifier , spec ) val newPart = office . re-create ( storage = part . storage . copy ( serde = format . serde . orElse ( part . storage . serde ), inputFormat = format . inputFormat . orElse ( table . storage . inputFormat ), outputFormat = format . outputFormat . orElse ( table . storage . outputFormat ), properties = part . storage . properties ++ format . properties )) catalog . alterPartitions ( table . identifier , Seq ( newPart )) } Seq . empty [ Row ] } }
Now nosotros have a unit of measurement test which succeeds in which we can gear up the file format for a segmentation.
Surprise: execution plan differences...
We were playing around and we accidentally changed the format of the partitioned table to Avro, and so we had an Avro tabular array with a Parquet sectionalisation in it...and Information technology WORKED!! Nosotros could read all the data...but look, what?!!?
So Avro table with Parquet sectionalisation works, merely Parquet table with Avro partition doesn't?
What's the difference? Allow'south run into the execution plans:
- execution program for the Parquet table with Avro partitions
[ { "class" : "org.apache.spark.sql.execution.ProjectExec" , "num-children" : 1 , "projectList" : [ [ { "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference" , ... "qualifier" : "ext_parquet_partition_table" } ], [ { "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference" , ... "qualifier" : "ext_parquet_partition_table" } ] ], "child" : 0 }, { "course" : "org.apache.spark.sql.execution.FileSourceScanExec" , "num-children" : 0 , "relation" : zero , "output" : [ [ { "form" : "org.apache.spark.sql.goad.expressions.AttributeReference" , ... } ], [ { "course" : "org.apache.spark.sql.goad.expressions.AttributeReference" , ... } ], [ { "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference" , ... } ] ], "requiredSchema" : { "type" : "struct" , "fields" : [ { "name" : "central" , "blazon" : "integer" , "nullable" : true , "metadata" : { } }, { "name" : "value" , "type" : "cord" , "nullable" : true , "metadata" : { } } ] }, "partitionFilters" : [ ], "dataFilters" : [ ], "tableIdentifier" : { "product-form" : "org.apache.spark.sql.catalyst.TableIdentifier" , "tabular array" : "ext_parquet_partition_table" , "database" : "default" } } ]
- execution plan for the Avro table with Parquet partitions
[ { "class" : "org.apache.spark.sql.hive.execution.HiveTableScanExec" , "num-children" : 0 , "requestedAttributes" : [ [ { "class" : "org.apache.spark.sql.goad.expressions.AttributeReference" , ... "qualifier" : "ext_avro_partition_table" } ], [ { "class" : "org.apache.spark.sql.goad.expressions.AttributeReference" , ... "qualifier" : "ext_avro_partition_table" } ] ], "relation" : [ { "course" : "org.apache.spark.sql.catalyst.catalog.HiveTableRelation" , ... "tableMeta" : { "product-form" : "org.apache.spark.sql.goad.itemize.CatalogTable" , "identifier" : { "product-class" : "org.apache.spark.sql.goad.TableIdentifier" , "table" : "ext_avro_partition_table" , "database" : "default" }, "tableType" : { "product-class" : "org.apache.spark.sql.catalyst.catalog.CatalogTableType" , "name" : "EXTERNAL" }, "storage" : { "product-class" : "org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat" , "locationUri" : cipher , "inputFormat" : "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat" , "outputFormat" : "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat" , "serde" : "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe" , "compressed" : fake , "properties" : null }, "schema" : { "blazon" : "struct" , "fields" : [ { "name" : "key" , "type" : "integer" , "nullable" : truthful , "metadata" : { } }, { "name" : "value" , "type" : "string" , "nullable" : true , "metadata" : { } }, { "proper name" : "dt" , "type" : "string" , "nullable" : true , "metadata" : { } } ] }, "provider" : "hive" , "partitionColumnNames" : "[dt]" , "owner" : "hole-and-corner" , "createTime" : 1532699365000 , "lastAccessTime" : 0 , "createVersion" : "2.4.0-SNAPSHOT" , "properties" : aught , "stats" : zero , "unsupportedFeatures" : [ ], "tracksPartitionsInCatalog" : truthful , "schemaPreservesCase" : true , "ignoredProperties" : null , "hasMultiFormatPartitions" : false }, "dataCols" : [ [ { "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference" , "num-children" : 0 , "name" : "key" , "dataType" : "integer" , "nullable" : true , "metadata" : { }, "exprId" : { "product-grade" : "org.apache.spark.sql.catalyst.expressions.ExprId" , "id" : 25 , "jvmId" : "5988f5b1-0966-49ca-a6de-2485d5582464" } } ], [ { "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference" , "num-children" : 0 , "name" : "value" , "dataType" : "string" , "nullable" : true , "metadata" : { }, "exprId" : { "production-class" : "org.apache.spark.sql.catalyst.expressions.ExprId" , "id" : 26 , "jvmId" : "5988f5b1-0966-49ca-a6de-2485d5582464" } } ] ], "partitionCols" : [ [ { "grade" : "org.apache.spark.sql.catalyst.expressions.AttributeReference" , "num-children" : 0 , "name" : "dt" , "dataType" : "string" , "nullable" : true , "metadata" : { }, "exprId" : { "product-class" : "org.apache.spark.sql.catalyst.expressions.ExprId" , "id" : 27 , "jvmId" : "5988f5b1-0966-49ca-a6de-2485d5582464" } } ] ] } ], "partitionPruningPred" : [ ], "sparkSession" : null } ]
And so how could we make the parquet table not take the FileSourceScanExec route, merely the HiveTableScanExec route? And thus make the Parquet execution plan similar to the Avro execution plan?
Finding the magic setting
We went digging in the lawmaking over again and nosotros discovered the following method in HiveStrategies.scala
/** * Relation conversion from metastore relations to information source relations for better operation * * - When writing to non-partitioned Hive-serde Parquet/Orc tables * - When scanning Hive-serde Parquet/ORC tables * * This rule must be run before all other DDL post-hoc resolution rules, i.e. *PreprocessTableCreation
,PreprocessTableInsertion
,DataSourceAnalysis
andHiveAnalysis
. */ case grade RelationConversions ( conf : SQLConf , sessionCatalog : HiveSessionCatalog ) extends Rule [ LogicalPlan ] { private def isConvertible ( relation : HiveTableRelation ) : Boolean = { val serde = relation . tableMeta . storage . serde . getOrElse ( "" ). toLowerCase ( Locale . ROOT ) serde . contains ( "parquet" ) && conf . getConf ( HiveUtils . CONVERT_METASTORE_PARQUET ) || serde . contains ( "orc" ) && conf . getConf ( HiveUtils . CONVERT_METASTORE_ORC ) } ...
Looking at this lawmaking nosotros decided to set HiveUtils.CONVERT_METASTORE_PARQUET.key to false
, pregnant that we won't optimize to data source relations in case nosotros contradistinct the partition file format.
Nosotros simulated this by adding the following line to our unit test:
withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.fundamental -> "false" )
With this setting, the test passed. We decided to implement an actress bank check to avoid optimising the execution when a partition has a different file format than the principal table.
Implement multi format partitions support without disabling optimizations manually
We decided to add a property, hasMultiFormatPartitions
to the CatalogTable which reflects if we have a tabular array with multiple dissimilar formats in it'due south partitions. This had to be done in HiveClientImpl.scala
The following line did the fox:
hasMultiFormatPartitions = shim . getAllPartitions ( customer , h ). map ( _ . getInputFormatClass ). distinct . size > ane
Of course we too had to add this to the catalog's interface.scala
and and so we could apply this in HiveStrategies.scala to change the previously mentioned method:
/** * Relation conversion from metastore relations to information source relations for amend performance * * - When writing to not-partitioned Hive-serde Parquet/Orc tables * - When scanning Hive-serde Parquet/ORC tables * * This rule must be run earlier all other DDL post-hoc resolution rules, i.e. *PreprocessTableCreation
,PreprocessTableInsertion
,DataSourceAnalysis
andHiveAnalysis
. */ case course RelationConversions ( conf : SQLConf , sessionCatalog : HiveSessionCatalog ) extends Rule [ LogicalPlan ] { private def isConvertible ( relation : HiveTableRelation ) : Boolean = { val serde = relation . tableMeta . storage . serde . getOrElse ( "" ). toLowerCase ( Locale . ROOT ) val hasMultiFormatPartitions = relation . tableMeta . hasMultiFormatPartitions serde . contains ( "parquet" ) && conf . getConf ( HiveUtils . CONVERT_METASTORE_PARQUET ) && (! hasMultiFormatPartitions ) || serde . contains ( "orc" ) && conf . getConf ( HiveUtils . CONVERT_METASTORE_ORC ) } ...
With these changes our tests too succeeded.
All this work has been provided back to the community in this Apache Spark pull request.
Based on the concluding comments on our pull request it doesn't look very promising that this will be merged. Notwithstanding we learned a lot about Apache Spark and information technology's internals.
Farther heighten your Apache Spark noesis!
We offering an in-depth Data Science with Spark form that will brand data science at scale a slice of cake for any data scientist, engineer, or analyst!
Source: https://godatadriven.com/blog/working-with-multiple-partition-formats-within-a-hive-table-with-spark/
0 Response to "Does Spark Maintain Partition From Hive When Reading"
Post a Comment