Skip to content

PARQUET-3459: Per column compression#3526

Open
mengna-lin wants to merge 10 commits into
apache:masterfrom
mengna-lin:per_column_compression
Open

PARQUET-3459: Per column compression#3526
mengna-lin wants to merge 10 commits into
apache:masterfrom
mengna-lin:per_column_compression

Conversation

@mengna-lin

Copy link
Copy Markdown

Rationale for this change

The Parquet format supports per-column compression at the spec level, but parquet-java has always forced a single codec across all columns —
this PR exposes that existing capability.

What changes are included in this PR?

  • CompressionCodecFactory — new default getCompressor(codec, level) interface method
  • CodecFactory — level-aware compressor instantiation and validation (ZSTD: 1–22, GZIP: 0–9 or -1, BROTLI: 0–11)
  • ParquetProperties — new builder methods withCompressionCodec(col, codec) / withCompressionLevel(col, level)
  • ColumnChunkPageWriteStore — resolves compressor per column at write time, falls back to job-level default
  • ParquetWriter — new builder methods withCompressionCodec(col, codec) / withCompressionLevel(col, level)
  • ParquetOutputFormat — wires per-column codec/level from Hadoop Configuration into ParquetProperties

Are these changes tested?

Yes.
Unit tests cover ParquetProperties getters/copy behavior (TestParquetProperties),
CodecFactory level-aware caching and invalid level rejection (TestDirectCodecFactory),
and ColumnChunkPageWriteStore codec resolution and invalid level rejection (TestColumnChunkPageWriteStore).
Integration tests cover end-to-end data round-trips and footer metadata verification
through both the ParquetWriter builder API and ParquetOutputFormat (TestParquetWriter).
Also test with spark job

/**
 * Local Spark job that writes a Parquet file with per-column compression:
 *   col_1 (string) -> ZSTD level 9
 *   col_2 (int)    -> UNCOMPRESSED
 *   col_3 (double) -> SNAPPY (default)
 *
 * Build:
 *   cd test-spark-job && mvn package -DskipTests
 *
 * Run:
 *   spark-submit \
 *     --master local[2] \
 *     target/parquet-per-column-compression-test-1.0-SNAPSHOT-job.jar
 *
 * Inspect output with parquet-cli:
 *   hadoop jar ../parquet-cli/target/parquet-cli-1.18.0-SNAPSHOT-runtime.jar \
 *     org.apache.parquet.cli.Main meta \
 *     /tmp/per_column_compression_test/part-*.parquet
 */
public class PerColumnCompressionJob {

  private static final String OUTPUT_PATH = "/tmp/per_column_compression_test";

  public static void main(String[] args) {
    SparkSession spark = SparkSession.builder()
        .master("local[2]")
        .appName("PerColumnCompressionTest")
        // Default codec for all columns
        .config("spark.hadoop.parquet.compression", "SNAPPY")
        // col_1: ZSTD at level 9
        .config("spark.hadoop.parquet.compression#col_1", "ZSTD")
        .config("spark.hadoop.parquet.compression.level#col_1", "9")
        // col_2: no compression
        .config("spark.hadoop.parquet.compression#col_2", "UNCOMPRESSED")
        .getOrCreate();

    spark.sparkContext().setLogLevel("WARN");

    StructType schema = DataTypes.createStructType(new StructField[]{
        DataTypes.createStructField("col_1", DataTypes.StringType, false),
        DataTypes.createStructField("col_2", DataTypes.IntegerType, false),
        DataTypes.createStructField("col_3", DataTypes.DoubleType, false),
    });

    List<Row> rows = Arrays.asList(
        RowFactory.create("alice",   1, 1.1),
        RowFactory.create("bob",     2, 2.2),
        RowFactory.create("charlie", 3, 3.3),
        RowFactory.create("dave",    4, 4.4),
        RowFactory.create("eve",     5, 5.5)
    );

    Dataset<Row> df = spark.createDataFrame(rows, schema);
    System.out.println("\n=== Input DataFrame ===");
    df.printSchema();
    df.show();

    df.coalesce(1).write().mode("overwrite").parquet(OUTPUT_PATH);
    System.out.println("Wrote Parquet to: " + OUTPUT_PATH);

    System.out.println("\n=== Read Back ===");
    spark.read().parquet(OUTPUT_PATH).show();

    System.out.println("\nDone. Inspect compression with:");
    System.out.println("  hadoop jar ../parquet-cli/target/parquet-cli-1.18.0-SNAPSHOT-runtime.jar \\");
    System.out.println("    org.apache.parquet.cli.Main meta " + OUTPUT_PATH + "/part-*.parquet");

    spark.stop();
  }
}

Result

mengnalin@Mengnas-MacBook-Pro apache-parquet-java-mengna % hadoop jar parquet-cli/target/parquet-cli-1.18.0-SNAPSHOT-runtime.jar meta /tmp/per_column_compression_test/part-*.parquet

File path:  /tmp/per_column_compression_test/part-00000-017e9683-14be-4539-85ce-92f3904a744d-c000.snappy.parquet
Created by: *
Properties:
                   org.apache.spark.version: 4.1.1
  org.apache.spark.sql.parquet.row.metadata: {"type":"struct","fields":[{"name":"col_1","type":"string","nullable":false,"metadata":{}},{"name":"col_2","type":"integer","nullable":false,"metadata":{}},{"name":"col_3","type":"double","nullable":false,"metadata":{}}]}
Schema:
message spark_schema {
  required binary col_1 (STRING);
  required int32 col_2;
  required double col_3;
}


Row group 0:  count: 5  35.20 B records  start: 4  total(compressed): 176 B total(uncompressed):171 B 
--------------------------------------------------------------------------------
       type      encodings count     avg size   nulls   min / max
col_1  BINARY    Z   _     5         15.40 B    0       "alice" / "eve"
col_2  INT32     _   _     5         8.60 B     0       "1" / "5"
col_3  DOUBLE    S   _     5         11.20 B    0       "1.1" / "5.5"

Are there any user-facing changes?

Two new APIs, fully backwards compatible:
- ParquetWriter.Builder.withCompressionCodec(col, codec)
- ParquetWriter.Builder.withCompressionLevel(col, level)
(Also accessible at the lower level via ParquetProperties.Builder.)

@wgtmac

wgtmac commented Apr 23, 2026

Copy link
Copy Markdown
Member

Does this duplicate #3396?

@github-actions

Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has had no activity for at least 2 months. If you are still working on this change or plan to move it forward, please leave a comment or push a new commit so we know to keep it open. Otherwise, this PR will be closed automatically in about one month. Thank you for your contribution to Apache Parquet!

@github-actions github-actions Bot added the stale label Jun 23, 2026
@mengna-lin mengna-lin force-pushed the per_column_compression branch from 4f31b0f to fa11bde Compare July 1, 2026 18:31
Thread a per-column compressor-provider function through the writer stack, replace the duplicate store constructors with a validating builder, and fix the copy constructor to preserve previously-dropped fields.
@mengna-lin mengna-lin force-pushed the per_column_compression branch from fa11bde to 5587e33 Compare July 1, 2026 18:35
@mengna-lin

Copy link
Copy Markdown
Author

Does this duplicate #3396?

Hi @wgtmac , thanks for your reviews on this feature — I really appreciate the guidance.
I understand the preference for #3396 since it was opened first. I just wanted to gently note it's been inactive ~2 months with several review comments still open. In the meantime, I've done my best to incorporate all of that feedback into this PR.
For context, this is currently a blocker for downstream work in Apache Iceberg apache/iceberg#16094.
Whenever you have a chance, I'd be grateful if you could take a review pass. Thank you!

@github-actions github-actions Bot removed the stale label Jul 2, 2026
@wgtmac

wgtmac commented Jul 2, 2026

Copy link
Copy Markdown
Member

@mengna-lin Thank you for the patience! I think now it makes sense to proceed with your PR. Let me take a look then.

}

@Override
public BytesCompressor getCompressor(CompressionCodecName codecName, int level) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This path bypasses DirectCodecFactory#createCompressor(...). With a direct codec factory, per-column levels for ZSTD/SNAPPY can fall back to the heap/Hadoop compressor path; please add a direct-factory override or test.

(reviewed by Codex)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.slf4j.Logger;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These imports are out of Spotless import order; the same pattern appears in a few added test imports too, so spotless:check will likely fail.

(reviewed by Codex)

…rt order

Route the level-aware getCompressor(codec, level) path through an overridable createCompressorAtLevel so DirectCodecFactory returns its direct SNAPPY/ZSTD compressors (with the level honored for ZSTD) instead of falling back to the heap/Hadoop path. Add a direct-factory test for this. Also fix Spotless import ordering flagged in review.
@mengna-lin

Copy link
Copy Markdown
Author

@wgtmac I addressed those comments. Please take another look when you get a chance. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants