At Allegro we use many open-source tools that support our work. Sometimes we are not able to find what we want and this is a perfect moment to fill the gap and to share with the community. We are proud to announce Camus Compressor — a tool that merges files created by Camus on HDFS and saves them in a compressed format.

Background and motivation

Camus is massively used at Allegro for dumping more than 200 Kafka topics onto HDFS. The tool runs every 15 minutes and creates one file per Kafka partition which results in about 76800 small files per day. Most of the files do not exceed Hadoop block size. This is a clear Hadoop antipattern which leads to performance issues, for example extensive number of mappers in SQL queries executions.

Camus Compressor solves this issue by merging files within Hive partition and compressing them. It does not change Camus directories structure and supports well daily and hourly partitioning. The tool runs in YARN as well as in the local mode and is build on Spark.

You can find tools that perform a similar processing, for example Hadoop filecrusher and Camus sweeper. Unfortunately none of them do meet our criteria. Filecrusher is a tool that compresses only one directory as a single MapReduce job which does not fit a scenario with a bunch of directories to process which Camus Compressor supports. Camus sweeper is almost ideal but changes the partitioning schema (for examples it compress hourly paritioned files to daily file) and mixes data locations which leads to problems with reading data by users’ tools. Both tools cannot replace input directories with compressed files which causes data schema (i.e. Hive Metastore) to be modified. Camus Compressor supports Camus directory structure, compresses many directories in a single job and does not change data location.

We tested two compression formats in our environment: LZO and Snappy. At first we compressed the data using LZO which is a splittable format (files bigger that HDFS block size can be read in parallel) and serves nice decompression speed. Unfortunately, we found that big data analysis tools do not support LZO out of the box (it is shipped under GPL license) and we didn’t want to force users to change their scripts. Snappy compression is well supported in plain HDFS commands, Hive and Spark and this is the format of our choice. It is not splittable, but we repartition data to sets of files with size about 2 * [HDFS bock size] and compress every set into one .snappy file. According to out measurements output files in most cases do not exceed block size.

Usage

Camus Compressor is written in Spark (requires version 1.2.0 or newer). We provide a script that automates parameter passing to spark-submit and application: src/main/resources/compressor.sh.

Assuming that Your Camus is configured to store data in /data/camus and partition them daily you can:

  • Compress one day of topic my_topic by executing:

      compressor.sh -m unit -p /data/camus/my_topic/daily/2015/06/15
    
  • Compress whole topic my_topic (all days) by:

      compressor.sh -m topic -p /data/camus/my_topic
    
  • Compress all topics created by Camus:

      compressor.sh -m all -p /data/camus
    
  • Compress all topics with concurrency (number of executors) increased to 30:

      compressor.sh -m all -p /data/camus -e 30
    
  • Compress all topics on YARN queue myqueue:

      compressor.sh -m all -p /data/camus -q myqueue
    
  • Compress all topics with LZO:

      compressor.sh -m all -p /data/camus -c lzo
    

Current status, plans and source code

In the future, we plan to add support for creating Hive partitions for tables that provide access to compressed data. The source code is available via GitHub repository. We have attached build instructions and usage samples.

Feel free to use this library and especially to participate.