Does Spark Maintain Partition From Hive When Reading

Incoming data is unremarkably in a format unlike than we would similar for long-term storage. The offset step that nosotros usually do is transform the data into a format such as Parquet that can easily exist queried past Hive/Impala.

The apply case we imagined is when we are ingesting data in Avro format. The users want piece of cake access to the information with Hive or Spark. To take performant queries we demand the historical information to be in Parquet format. We don't want to accept 2 different tables: one for the historical data in Parquet format and one for the incoming information in Avro format. Our preference goes out to having 1 table which can handle all information, no matter the format.
This fashion we tin can run our conversion process (from Avro to Parquet) let'southward say every night, just the users would still become access to all information all the fourth dimension.

In Hive yous can achieve this with a partitioned tabular array, where you can set the format of each partition. Spark unfortunately doesn't implement this. Since our users also use Spark, this was something we had to fix. This was also a nice challenge for a couple of GoDataDriven Fri's where we could then acquire more well-nigh the internals of Apache Spark.

Acquire Spark or Python in only one twenty-four hour period

Develop Your Data Scientific discipline Capabilities. **Online**, teacher-led on 23 or 26 March 2020, 09:00 - 17:00 CET.

1-24-hour interval live grooming courses

Setting upwardly a exam surround

Offset we had to identify what we need to be able to reproduce the problem. We needed the following components:

  • Hive with persistent Hive metastore
  • Hadoop to be able to store and access the files
  • Spark

We're using MacBook Pro's and we had to practice the post-obit steps:

Install Hadoop, Hive, Spark and create a local HDFS directory

$ brew install hadoop $ brew install hive $ brew install apache-spark $ mkdir            ${            HOME            }/localhdfs          

Run the Hive Metastore in Docker

We want the Hive Metastore to use PostgreSQL to be able to admission it from Hive and Spark simultaneously. We found a docker image, just this wasn't the latest version, and so we forked information technology and upgraded information technology to the latest version. You can find this docker image on GitHub (source code is at link).
To run this image, use (annotation that we exposed port 5432 so nosotros tin use this for Hive):

$ docker pull krisgeus/docker-hive-metastore-postgresql:upgrade-2.three.0 $ docker run -p 5432:5432 krisgeus/docker-hive-metastore-postgresql:upgrade-2.3.0          

Configuring Hive to utilize the Hive Metastore

  • Download postgresql-42.two.4.jar from this link
  • Add this jar to Hive lib directory (in our case the Hive version was 2.3.1)
$ cp postgresql-42.2.4.jar /usr/local/Cellar/hive//libexec/lib.                      
  • Create a working directory
$ mkdir            ${            Dwelling            }/spark-hive-schema $            cd            ${            HOME            }/spark-hive-schema          
  • Create a configuration directory and re-create hadoop and hive base configurations
$ mkdir hadoop_conf $ cp -R /usr/local/Cellar/hadoop/3.0.0/libexec/etc/hadoop/*            ${            HOME            }/spark-hive-schema/hadoop_conf $ cp -R /usr/local/Cellar/hive/2.3.1/libexec/conf/*            ${            Dwelling            }/spark-hive-schema/hadoop_conf $ cp conf/hive-default.xml.template            ${            HOME            }/spark-hive-schema/hadoop_conf/hive-site.xml          
  • Change configurations in hive-site.xml so we actually utilise the Hive Metastore nosotros just started
                                                                            system:java.io.tmpdir                                      /tmp/hive/coffee                                                                                        organization:user.proper noun                                      ${user.proper name}                                                                                        hive.metastore.warehouse.dir                                      ${user.home}/localhdfs/user/hive/warehouse                                      location of default database for the warehouse                                                                                        javax.jdo.choice.ConnectionUserName                                      hive                                      Username to use against metastore database                                                                                        javax.jdo.option.ConnectionURL                                      jdbc:postgresql://localhost:5432/metastore                                                  JDBC connect cord for a JDBC metastore.         To use SSL to encrypt/authenticate the connection, provide database-specific SSL flag in the connection URL.         For case, jdbc:postgresql://myhost/db?ssl=true for postgres database.                                                                                                    datanucleus.connectionPoolingType                                      NONE                                                  Expects one of [bonecp, dbcp, hikaricp, none].         Specify connection puddle library for datanucleus                                                                                                    javax.jdo.option.ConnectionDriverName                                      org.postgresql.Driver                                      Driver class name for a JDBC metastore                                                                                        javax.jdo.option.ConnectionUserName                                      hive                                      Username to use against metastore database                      
  • Make /tmp/hive writable:
  • In a terminal ready paths so nosotros tin commencement HiveServer2, where hadoop_version=iii.0.0, hive_version=2.3.1
$            export            HADOOP_HOME            =/usr/local/Cellar/hadoop//libexec $              consign              HIVE_HOME              =/usr/local/Cellar/hive//libexec $                export                HADOOP_CONF_DIR                =                ${                Dwelling house                }/spark-hive-schema/hadoop_conf $                export                HIVE_CONF_DIR                =                ${                HOME                }/spark-hive-schema/hadoop_conf $ hiveserver2                                    
  • In another terminal fix the same paths and commencement beeline, where hadoop_version=3.0.0, hive_version=2.3.1
$            export            HADOOP_HOME            =/usr/local/Cellar/hadoop//libexec $              export              HIVE_HOME              =/usr/local/Cellar/hive//libexec $                export                HADOOP_CONF_DIR                =                ${                Abode                }/spark-hive-schema/hadoop_conf $                consign                HIVE_CONF_DIR                =                ${                Domicile                }/spark-hive-schema/hadoop_conf $ beeline -u jdbc:hive2://localhost:10000/default -n hive -p hive                                    

We're all fix upwards...nosotros can now create a table.

Creating a working example in Hive

  • In beeline create a database and a tabular array
            CREATE            DATABASE            exam            ;            Employ            exam            ;            CREATE            EXTERNAL            Tabular array            IF            NOT            EXISTS            events            (            eventType            String            ,            metropolis            STRING            )            PARTITIONED            By            (            dt            STRING            )            STORED            Every bit            PARQUET            ;          
  • Add two parquet partitions
            ALTER            Table            events            ADD            Partition            (            dt            =            '2018-01-25'            )            Partition            (            dt            =            '2018-01-26'            );            ALTER            TABLE            events            Sectionalization            (            dt            =            '2018-01-25'            )            SET            FILEFORMAT            PARQUET            ;            ALTER            Table            events            Sectionalisation            (            dt            =            '2018-01-26'            )            Set            FILEFORMAT            PARQUET            ;          
  • Add a partition where we'll add together Avro data
            ALTER            Table            events            ADD            PARTITION            (            dt            =            '2018-01-27'            );            ALTER            Tabular array            events            PARTITION            (            dt            =            '2018-01-27'            )            SET            FILEFORMAT            AVRO            ;          
  • Cheque the table
events.eventtype events.metropolis events.dt
            Depict            FORMATTED            events            PARTITION            (            dt            =            "2018-01-26"            );          
Column proper noun Data type
# Storage Information
SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
            Draw            FORMATTED            events            PARTITION            (            dt            =            "2018-01-27"            );          
Column proper noun Data type
# Storage Data
SerDe Library: org.apache.hadoop.hive.serde2.avro.AvroSerDe
InputFormat: org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat

Create some test data

  • Create test data directory
mkdir            ${            HOME            }/spark-hive-schema/testdata          
  • Generate Avro data and add to table
$ cat            ${            Domicile            }/spark-hive-schema/testdata/data.json            {            "eventtype":            "avro",            "metropolis":            "Breukelen"            }            {            "eventtype":            "avro",            "metropolis":            "Wilnis"            }            {            "eventtype":            "avro",            "city":            "Abcoude"            }            {            "eventtype":            "avro",            "city":            "Vinkeveen"            }            $ cat            ${            Dwelling house            }/spark-hive-schema/testdata/data.avsc            {            "blazon"            :            "record",            "name"            :            "events",            "namespace"            :            "com.godatadriven.events",            "fields"            :            [            {            "name"            :            "eventtype",            "type"            :            "cord"            },            {            "name"            :            "urban center",            "blazon"            :            "string"            }]            $ brew install avro-tools $            cd            ${            HOME            }/spark-hive-schema/testdata $ avro-tools fromjson --schema-file data.avsc data.json >            \            ${            HOME            }/localhdfs/user/hive/warehouse/exam.db/events/dt\=2018-01-27/information.avro          
  • Generate parquet data and add together to tabular array
$            cd            ${            Abode            }/spark-hive-schema/testdata $ spark-beat out > import org.apache.spark.sql.functions.lit > spark.read.json(            "data.json"            ).select(lit(            "parquet"            ).allonym(            "eventtype"            ),     col(            "metropolis"            )).write.parquet(            "data.pq"            )            > :quit $ cp ./information.pq/part*.parquet            ${            HOME            }/localhdfs/user/hive/warehouse/test.db/events/dt\=2018-01-26/          
  • Insert data into terminal existing division using beeline
            INSERT            INTO            Tabular array            events            PARTITION            (            dt            =            "2018-01-25"            )            SELECT            'overwrite'            ,            'Amsterdam'            ;          
  • Check that nosotros take information in beeline
events.eventtype events.city events.dt
overwrite Amsterdam 2018-01-25
parquet Breukelen 2018-01-26
parquet Wilnis 2018-01-26
parquet Abcoude 2018-01-26
parquet Vinkeveen 2018-01-26
avro Breukelen 2018-01-27
avro Wilnis 2018-01-27
avro Abcoude 2018-01-27
avro Vinkeveen 2018-01-27

Yuhee we run across all the information!!!

  • Double check that the formats are correct
$ tree            ${            Domicile            }/localhdfs/user/hive/warehouse/test.db/events .../localhdfs/user/hive/warehouse/test.db/events ├──            dt            =2018-01-25 │   └── 000000_0 ├──            dt            =2018-01-26 │   └── part-00000-1846ef38-ec33-47ae-aa80-3f72ddb50c7d-c000.snappy.parquet └──            dt            =2018-01-27     └── data.avro          

Creating a failing test in Spark

Connect to spark and make certain we admission the Hive Metastore we set upward:

$            export            HADOOP_HOME            =/usr/local/Cellar/hadoop//libexec $              export              HIVE_HOME              =/usr/local/Cellar/hive//libexec $                consign                HADOOP_CONF_DIR                =                ${                Domicile                }/spark-hive-schema/hadoop_conf $                export                HIVE_CONF_DIR                =                ${                Dwelling house                }/spark-hive-schema/hadoop_conf  $ spark-shell --commuter-class-path /usr/local/Cellar/hive/two.iii.1/libexec/lib/postgresql-42.2.4.jar                \                --jars /usr/local/Cellar/hive/2.three.1/libexec/lib/postgresql-42.2.iv.jar                \                --conf spark.executor.extraClassPath=/usr/local/Cellar/hive/2.3.i/libexec/lib/postgresql-42.two.4.jar                \                --conf spark.hadoop.javax.jdo.choice.ConnectionURL=jdbc:postgresql://localhost:5432/metastore                \                --conf spark.hadoop.javax.jdo.pick.ConnectionUserName=hive                \                --conf spark.hadoop.javax.jdo.choice.ConnectionPassword=hive                \                --conf spark.hadoop.javax.jdo.option.ConnectionDriverName=org.postgresql.Driver                \                --conf spark.hadoop.hive.metastore.schema.verification=                true                \                --conf spark.hadoop.hive.metastore.schema.verification.tape.version=                truthful                \                --conf spark.sql.hive.metastore.version=2.one.0                \                --conf spark.sql.hive.metastore.jars=maven > spark.sql(                "select * from test.events"                ).show()                ... java.lang.RuntimeException:     file:...localhdfs/user/hive/warehouse/test.db/events/dt=2018-01-27/data.avro     is not a Parquet file. expected magic number at tail                [lxxx, 65, 82, 49]                simply found                [-126, 61, 76, 121]                ...                                    

So information technology doesn't work. Let's meet if we can bank check out the Apache Spark lawmaking base of operations and create a declining unit of measurement test.

Beginning we forked the Apache Spark projection and checked it out and made sure we have sbt installed. We also figured out how to run a given unit of measurement test.

$ git clone https://github.com/krisgeus/spark.git $            cd            spark $ ./build/sbt            "hive/testOnly *HiveSQLViewSuite"          

Writing a test

Outset we need to create a table and change the format of a given partition.
The final test can be found at: MultiFormatTableSuite.scala
We're implemented the following steps:

  • create a table with partitions
  • create a tabular array based on Avro data which is really located at a partition of the previously created table. Insert some data in this table.
  • create a table based on Parquet information which is really located at another partition of the previously created table. Insert some information in this table.
  • endeavor to read the data from the original table with partitions

Let's try to run the test:

$ ./build/sbt            "hive/testOnly *MultiFormatTableSuite            ...            - create hive table with multi format partitions *** FAILED *** (4 seconds, 265 milliseconds)            [info]   org.apache.spark.sql.goad.parser.ParseException:                        Operation non allowed: Modify TABLE Gear up FILEFORMAT(line 2, pos 0)            [info]                        [info] == SQL ==            [info]                        [info] ALTER Tabular array ext_multiformat_partition_table            [info] ^^^            [info] PARTITION (dt='2018-01-26') SET FILEFORMAT PARQUET            ...          

Then Spark doesn't support changing the file format of a partition. Before we chance into fixing this problem, let's sympathise how execution plans work in Spark.

Understanding execution plans

The all-time explanation that we plant was on the Databricks site, the commodity about Deep Dive into Spark SQL's Goad Optimizer
Here is an extract in instance y'all don't desire to read the whole article:

At the core of Spark SQL is the Catalyst optimizer, which leverages avant-garde programming language features (e.g. Scala'due south pattern matching and quasiquotes) in a novel manner to build an extensible query optimizer.

We use Catalyst'southward full general tree transformation framework in four phases, as shown below: (one) analyzing a logical plan to resolve references, (ii) logical plan optimization, (iii) concrete planning, and (4) lawmaking generation to compile parts of the query to Java bytecode. In the physical planning stage, Catalyst may generate multiple plans and compare them based on cost. All other phases are purely rule-based. Each phase uses different types of tree nodes; Catalyst includes libraries of nodes for expressions, data types, and logical and physical operators.

Spark palns

Spark SQL begins with a relation to be computed, either from an abstruse syntax tree (AST) returned by a SQL parser, or from a DataFrame object synthetic using the API.
Spark SQL uses Goad rules and a Catalog object that tracks the tables in all data sources to resolve these attributes. It starts by building an "unresolved logical program" tree with unbound attributes and data types, then applies rules that do the post-obit:

Looking up relations past name from the catalog.
Mapping named attributes, such as col, to the input provided given operator's children.
Determining which attributes refer to the aforementioned value to requite them a unique ID (which subsequently allows optimization of expressions such every bit col = col).
Propagating and coercing types through expressions: for case, nosotros cannot know the return type of 1 + col until we have resolved col and peradventure casted its subexpressions to a compatible types.

The logical optimization stage applies standard rule-based optimizations to the logical plan.
These include constant folding, predicate pushdown, project pruning, null propagation, Boolean expression simplification, and other rules.

In the physical planning phase, Spark SQL takes a logical plan and generates i or more than physical plans, using concrete operators that lucifer the Spark execution engine. It then selects a plan using a toll model. At the moment, cost-based optimization is only used to select bring together algorithms: for relations that are known to be small, Spark SQL uses a broadcast join, using a peer-to-peer broadcast facility available in Spark. The framework supports broader utilize of cost-based optimization, however, as costs tin exist estimated recursively for a whole tree using a rule. We thus intend to implement richer cost-based optimization in the future.
The physical planner also performs rule-based physical optimizations, such equally pipelining projections or filters into i Spark map functioning. In add-on, information technology tin can push operations from the logical programme into data sources that support predicate or project pushdown. We will draw the API for these data sources in a after section.

The final stage of query optimization involves generating Java bytecode to run on each machine.

Support setting the format for a partition in a Hive table with Spark

First we had to notice that Spark uses ANTLR to generate its SQL parser. ANTLR Another Tool for Linguistic communication Recognition tin can generate a grammar that tin exist built and walked.
The grammer for Spark is specified in SqlBase.g4

And so we demand to support FILEFORMAT in case a partition is ready, thus we had to add the post-obit line to SqlBase.g4.

| ALTER TABLE tableIdentifier (partitionSpec)?   SET FILEFORMAT fileFormat          

This will not only add support for setting the fileformat of a partition but likewise on a table itself. We don't demand this for our current case, merely might come in handy another time.

The AstBuilder in Spark SQL, processes the ANTLR ParseTree to obtain a Logical Programme. Since nosotros're working with Spark SQL, we had to alter SparkSqlParser which creates a SparkSqlAstBuilder which extends AstBuilder.
In the SparkSqlAstBuilder we had to create a new function to be able to interpret the grammer and add the requested step to the logical plan.

            /**                          * Create an [[AlterTableFormatPropertiesCommand]] command.                          *                          * For case:                          * {{{                          *   ALTER Table table [Division spec] Ready FILEFORMAT format;                          * }}}                          */            override            def            visitSetTableFormat            (            ctx            :            SetTableFormatContext            )            :            LogicalPlan            =            withOrigin            (            ctx            )            {            val            format            =            (            ctx            .            fileFormat            )            friction match            {            // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format            case            (            c            :            TableFileFormatContext            )            =>            visitTableFileFormat            (            c            )            // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO            case            (            c            :            GenericFileFormatContext            )            =>            visitGenericFileFormat            (            c            )            example            _            =>            throw            new            ParseException            (            "Expected STORED AS "            ,            ctx            )            }            AlterTableFormatCommand            (            visitTableIdentifier            (            ctx            .            tableIdentifier            ),            format            ,            // TODO a segmentation spec is allowed to have optional values. This is currently violated.            Choice            (            ctx            .            partitionSpec            ).            map            (            visitNonOptionalPartitionSpec            ))            }          

But we're still not done, because we also demand a definition for the new commands. These definitions are specified in ddl.scala
and the definitions are based on the ones described in the Apache Hive Linguistic communication transmission.

So what should this command do? Well it should brand certain that the serde properties are gear up properly on the partition level.

            /**                          * A command that sets the format of a table/view/partitioning .                          *                          * The syntax of this control is:                          * {{{                          *   Change Tabular array table [Segmentation spec] SET FILEFORMAT format;                          * }}}                          */            instance            class            AlterTableFormatCommand            (            tableName            :            TableIdentifier            ,            format            :            CatalogStorageFormat            ,            partSpec            :            Option            [            TablePartitionSpec            ])            extends            RunnableCommand            {            override            def            run            (            sparkSession            :            SparkSession            )            :            Seq            [            Row            ]            =            {            val            catalog            =            sparkSession            .            sessionState            .            itemize            val            table            =            catalog            .            getTableMetadata            (            tableName            )            DDLUtils            .            verifyAlterTableType            (            itemize            ,            table            ,            isView            =            false            )            // For datasource tables, disallow setting serde or specifying partition            if            (            partSpec            .            isDefined            &&            DDLUtils            .            isDatasourceTable            (            table            ))            {            throw            new            AnalysisException            (            "Operation not allowed: Alter Tabular array Fix FILEFORMAT "            +            "for a specific partition is not supported "            +            "for tables created with the datasource API"            )            }            if            (            partSpec            .            isEmpty            )            {            val            newTable            =            table            .            withNewStorage            (            serde            =            format            .            serde            .            orElse            (            table            .            storage            .            serde            ),            inputFormat            =            format            .            inputFormat            .            orElse            (            tabular array            .            storage            .            inputFormat            ),            outputFormat            =            format            .            outputFormat            .            orElse            (            table            .            storage            .            outputFormat            ),            backdrop            =            table            .            storage            .            properties            ++            format            .            properties            )            itemize            .            alterTable            (            newTable            )            }            else            {            val            spec            =            partSpec            .            become            val            role            =            catalog            .            getPartition            (            table            .            identifier            ,            spec            )            val            newPart            =            office            .            re-create            (            storage            =            part            .            storage            .            copy            (            serde            =            format            .            serde            .            orElse            (            part            .            storage            .            serde            ),            inputFormat            =            format            .            inputFormat            .            orElse            (            table            .            storage            .            inputFormat            ),            outputFormat            =            format            .            outputFormat            .            orElse            (            table            .            storage            .            outputFormat            ),            properties            =            part            .            storage            .            properties            ++            format            .            properties            ))            catalog            .            alterPartitions            (            table            .            identifier            ,            Seq            (            newPart            ))            }            Seq            .            empty            [            Row            ]            }            }          

Now nosotros have a unit of measurement test which succeeds in which we can gear up the file format for a segmentation.

Surprise: execution plan differences...

We were playing around and we accidentally changed the format of the partitioned table to Avro, and so we had an Avro tabular array with a Parquet sectionalisation in it...and Information technology WORKED!! Nosotros could read all the data...but look, what?!!?
So Avro table with Parquet sectionalisation works, merely Parquet table with Avro partition doesn't?

What's the difference? Allow'south run into the execution plans:

  • execution program for the Parquet table with Avro partitions
            [            {            "class"            :            "org.apache.spark.sql.execution.ProjectExec"            ,            "num-children"            :            1            ,            "projectList"            :            [            [            {            "class"            :            "org.apache.spark.sql.catalyst.expressions.AttributeReference"            ,            ...            "qualifier"            :            "ext_parquet_partition_table"            }            ],            [            {            "class"            :            "org.apache.spark.sql.catalyst.expressions.AttributeReference"            ,            ...            "qualifier"            :            "ext_parquet_partition_table"            }            ]            ],            "child"            :            0            },            {            "course"            :            "org.apache.spark.sql.execution.FileSourceScanExec"            ,            "num-children"            :            0            ,            "relation"            :            zero            ,            "output"            :            [            [            {            "form"            :            "org.apache.spark.sql.goad.expressions.AttributeReference"            ,            ...            }            ],            [            {            "course"            :            "org.apache.spark.sql.goad.expressions.AttributeReference"            ,            ...            }            ],            [            {            "class"            :            "org.apache.spark.sql.catalyst.expressions.AttributeReference"            ,            ...            }            ]            ],            "requiredSchema"            :            {            "type"            :            "struct"            ,            "fields"            :            [            {            "name"            :            "central"            ,            "blazon"            :            "integer"            ,            "nullable"            :            true            ,            "metadata"            :            {            }            },            {            "name"            :            "value"            ,            "type"            :            "cord"            ,            "nullable"            :            true            ,            "metadata"            :            {            }            }            ]            },            "partitionFilters"            :            [            ],            "dataFilters"            :            [            ],            "tableIdentifier"            :            {            "product-form"            :            "org.apache.spark.sql.catalyst.TableIdentifier"            ,            "tabular array"            :            "ext_parquet_partition_table"            ,            "database"            :            "default"            }            }            ]          
  • execution plan for the Avro table with Parquet partitions
            [            {            "class"            :            "org.apache.spark.sql.hive.execution.HiveTableScanExec"            ,            "num-children"            :            0            ,            "requestedAttributes"            :            [            [            {            "class"            :            "org.apache.spark.sql.goad.expressions.AttributeReference"            ,            ...            "qualifier"            :            "ext_avro_partition_table"            }            ],            [            {            "class"            :            "org.apache.spark.sql.goad.expressions.AttributeReference"            ,            ...            "qualifier"            :            "ext_avro_partition_table"            }            ]            ],            "relation"            :            [            {            "course"            :            "org.apache.spark.sql.catalyst.catalog.HiveTableRelation"            ,            ...            "tableMeta"            :            {            "product-form"            :            "org.apache.spark.sql.goad.itemize.CatalogTable"            ,            "identifier"            :            {            "product-class"            :            "org.apache.spark.sql.goad.TableIdentifier"            ,            "table"            :            "ext_avro_partition_table"            ,            "database"            :            "default"            },            "tableType"            :            {            "product-class"            :            "org.apache.spark.sql.catalyst.catalog.CatalogTableType"            ,            "name"            :            "EXTERNAL"            },            "storage"            :            {            "product-class"            :            "org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat"            ,            "locationUri"            :            cipher            ,            "inputFormat"            :            "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"            ,            "outputFormat"            :            "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"            ,            "serde"            :            "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"            ,            "compressed"            :            fake            ,            "properties"            :            null            },            "schema"            :            {            "blazon"            :            "struct"            ,            "fields"            :            [            {            "name"            :            "key"            ,            "type"            :            "integer"            ,            "nullable"            :            truthful            ,            "metadata"            :            {            }            },            {            "name"            :            "value"            ,            "type"            :            "string"            ,            "nullable"            :            true            ,            "metadata"            :            {            }            },            {            "proper name"            :            "dt"            ,            "type"            :            "string"            ,            "nullable"            :            true            ,            "metadata"            :            {            }            }            ]            },            "provider"            :            "hive"            ,            "partitionColumnNames"            :            "[dt]"            ,            "owner"            :            "hole-and-corner"            ,            "createTime"            :            1532699365000            ,            "lastAccessTime"            :            0            ,            "createVersion"            :            "2.4.0-SNAPSHOT"            ,            "properties"            :            aught            ,            "stats"            :            zero            ,            "unsupportedFeatures"            :            [            ],            "tracksPartitionsInCatalog"            :            truthful            ,            "schemaPreservesCase"            :            true            ,            "ignoredProperties"            :            null            ,            "hasMultiFormatPartitions"            :            false            },            "dataCols"            :            [            [            {            "class"            :            "org.apache.spark.sql.catalyst.expressions.AttributeReference"            ,            "num-children"            :            0            ,            "name"            :            "key"            ,            "dataType"            :            "integer"            ,            "nullable"            :            true            ,            "metadata"            :            {            },            "exprId"            :            {            "product-grade"            :            "org.apache.spark.sql.catalyst.expressions.ExprId"            ,            "id"            :            25            ,            "jvmId"            :            "5988f5b1-0966-49ca-a6de-2485d5582464"            }            }            ],            [            {            "class"            :            "org.apache.spark.sql.catalyst.expressions.AttributeReference"            ,            "num-children"            :            0            ,            "name"            :            "value"            ,            "dataType"            :            "string"            ,            "nullable"            :            true            ,            "metadata"            :            {            },            "exprId"            :            {            "production-class"            :            "org.apache.spark.sql.catalyst.expressions.ExprId"            ,            "id"            :            26            ,            "jvmId"            :            "5988f5b1-0966-49ca-a6de-2485d5582464"            }            }            ]            ],            "partitionCols"            :            [            [            {            "grade"            :            "org.apache.spark.sql.catalyst.expressions.AttributeReference"            ,            "num-children"            :            0            ,            "name"            :            "dt"            ,            "dataType"            :            "string"            ,            "nullable"            :            true            ,            "metadata"            :            {            },            "exprId"            :            {            "product-class"            :            "org.apache.spark.sql.catalyst.expressions.ExprId"            ,            "id"            :            27            ,            "jvmId"            :            "5988f5b1-0966-49ca-a6de-2485d5582464"            }            }            ]            ]            }            ],            "partitionPruningPred"            :            [            ],            "sparkSession"            :            null            }            ]          

And so how could we make the parquet table not take the FileSourceScanExec route, merely the HiveTableScanExec route? And thus make the Parquet execution plan similar to the Avro execution plan?

Finding the magic setting

We went digging in the lawmaking over again and nosotros discovered the following method in HiveStrategies.scala

            /**                          * Relation conversion from metastore relations to information source relations for better operation                          *                          * - When writing to non-partitioned Hive-serde Parquet/Orc tables                          * - When scanning Hive-serde Parquet/ORC tables                          *                          * This rule must be run before all other DDL post-hoc resolution rules, i.e.                          *              PreprocessTableCreation,              PreprocessTableInsertion,              DataSourceAnalysis              and              HiveAnalysis.                          */            case            grade            RelationConversions            (            conf            :            SQLConf            ,            sessionCatalog            :            HiveSessionCatalog            )            extends            Rule            [            LogicalPlan            ]            {            private            def            isConvertible            (            relation            :            HiveTableRelation            )            :            Boolean            =            {            val            serde            =            relation            .            tableMeta            .            storage            .            serde            .            getOrElse            (            ""            ).            toLowerCase            (            Locale            .            ROOT            )            serde            .            contains            (            "parquet"            )            &&            conf            .            getConf            (            HiveUtils            .            CONVERT_METASTORE_PARQUET            )            ||            serde            .            contains            (            "orc"            )            &&            conf            .            getConf            (            HiveUtils            .            CONVERT_METASTORE_ORC            )            }            ...          

Looking at this lawmaking nosotros decided to set HiveUtils.CONVERT_METASTORE_PARQUET.key to false, pregnant that we won't optimize to data source relations in case nosotros contradistinct the partition file format.

Nosotros simulated this by adding the following line to our unit test:

withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.fundamental ->            "false"            )          

With this setting, the test passed. We decided to implement an actress bank check to avoid optimising the execution when a partition has a different file format than the principal table.

Implement multi format partitions support without disabling optimizations manually

We decided to add a property, hasMultiFormatPartitions to the CatalogTable which reflects if we have a tabular array with multiple dissimilar formats in it'due south partitions. This had to be done in HiveClientImpl.scala

The following line did the fox:

            hasMultiFormatPartitions            =            shim            .            getAllPartitions            (            customer            ,            h            ).            map            (            _            .            getInputFormatClass            ).            distinct            .            size            >            ane          

Of course we too had to add this to the catalog's interface.scala
and and so we could apply this in HiveStrategies.scala to change the previously mentioned method:

            /**                          * Relation conversion from metastore relations to information source relations for amend performance                          *                          * - When writing to not-partitioned Hive-serde Parquet/Orc tables                          * - When scanning Hive-serde Parquet/ORC tables                          *                          * This rule must be run earlier all other DDL post-hoc resolution rules, i.e.                          *              PreprocessTableCreation,              PreprocessTableInsertion,              DataSourceAnalysis              and              HiveAnalysis.                          */            case            course            RelationConversions            (            conf            :            SQLConf            ,            sessionCatalog            :            HiveSessionCatalog            )            extends            Rule            [            LogicalPlan            ]            {            private            def            isConvertible            (            relation            :            HiveTableRelation            )            :            Boolean            =            {            val            serde            =            relation            .            tableMeta            .            storage            .            serde            .            getOrElse            (            ""            ).            toLowerCase            (            Locale            .            ROOT            )            val            hasMultiFormatPartitions            =            relation            .            tableMeta            .            hasMultiFormatPartitions            serde            .            contains            (            "parquet"            )            &&            conf            .            getConf            (            HiveUtils            .            CONVERT_METASTORE_PARQUET            )            &&            (!            hasMultiFormatPartitions            )            ||            serde            .            contains            (            "orc"            )            &&            conf            .            getConf            (            HiveUtils            .            CONVERT_METASTORE_ORC            )            }            ...          

With these changes our tests too succeeded.

All this work has been provided back to the community in this Apache Spark pull request.
Based on the concluding comments on our pull request it doesn't look very promising that this will be merged. Notwithstanding we learned a lot about Apache Spark and information technology's internals.

Farther heighten your Apache Spark noesis!

We offering an in-depth Data Science with Spark form that will brand data science at scale a slice of cake for any data scientist, engineer, or analyst!

arndtessurn.blogspot.com

Source: https://godatadriven.com/blog/working-with-multiple-partition-formats-within-a-hive-table-with-spark/

0 Response to "Does Spark Maintain Partition From Hive When Reading"

Post a Comment

Iklan Atas Artikel

Iklan Tengah Artikel 1

Iklan Tengah Artikel 2

Iklan Bawah Artikel