Adding Data-At-Rest compression to EC pools

Erasure Coding(EC) pool introduction to Ceph brings storage space saving at the expense of additional CPU utilization. Compression is pretty similar feature from this point of view. And these features can be easily coupled together to gain even more space saving transparently to users. To do that compression support implementation can be done at EC pool layer.
The Compression modules should be pluggable similar to Erasure Coding ones. This allows different compression methods usage as well as hardware optimization utilization. Ceph administrators should be able to enable/disable compression on per-pool basis using regular CLI means.

Igor Fedotov (Mirantis)
Alyona Kiseleva (Mirantis)

Interested Parties
If you are interested in contributing to this blueprint, or want to be a "speaker" during the Summit session, list your name here.
Name (Affiliation)
Name (Affiliation)

Current Status
There is no Data-At-Rest compression support at Ceph for now.

Detailed Description
Currently EC pools handle data append and random read requests only. Such requests can be issued by both Ceph cache-tier or Ceph users directly. At the same time EC pool implementation prohibits random write requests that is positive from compression support perspective. These requests brings tons of complexities for compression introduction.
Thus the suggested approach is to perform appended data block compression at EC pool level before applying erasure coding. Obviously decompression to take place after erasure decoding.
Schematically this looks like:

<data block to append> -> [Compressor] -> <Compressed data block> -> [Erasure Coder] -> <Erasure Coded shards> -> [File Stores]
<read data block> <- [Decompressor] <- <Compressed data block> <- [Erasure Decoder] <- <Erasure Coded shards> <- [File Stores]

Please note that "data block to append" offset and size tend to be aligned with stripe width ( default 4K for EC pool). The exception is the last block that can have arbitrary size. It's padded by the EC pool though.
At the same time "read data block" offset and size can be absolutely random.
The feature of any compression algorithm is that it operates with data blocks rather than specific bytes. I.e. one needs to decompress some block completely to access specific bytes stored within it.
Evidently this causes some overhead when data-to-read isn't aligned with originally compressed blocks. There is probably no way to eliminate this issue completely - the only complete solution is to prohibit client fom reading data in unaligned manner. But that's probably inappropriate. We can however try to reduce such overhead by limiting the size of data blocks processed by the compressor. Thus we should split/assemble incoming data into some blocks of limited size. The most straigtforward solution is to use stripe width for that. But this may degrade compression ratio for some cases. At the same time it's inappropriate to have some unprocessed data between two append requests to assemble larger block as it considerably complicates the handling and brings consistency/reliability issues. Thus the suggestion is to use data block provided by the user (it's stripe aligned) and split it into smaller chunks. The chunk size can be multiple of stripe width. Thus original chunk offsets and sizes continue to be stripe-aligned but chunk sizes aren't fixed. Then each chunk to be compressed and erasure coded independently.

Another thing to mention is the need to map <offset, length> pair from read request to the corresponding chunks offsets <start_chunk_original_offset, start_chunk_compressed_offset, ending_chunk_compressed_offset> in the compressed data set. Compressed offsets are required to perform data retrieval (EC shards) from File Store entities.
In fact one needs some MapFn(offset, next_chunk_flag) -> <chunk_offset,chunk_coffset> function, where 'offset' is original data offset, next_chunk_flag - specifies if corresponding or next chunk information to be returned, 'chunk_offset' - original data offset the corresponding or next chunk contains, 'chunk_coffset' - chunk offset in the compressed data set. Mentioned <offset, length> pair can be easily converted to <start_chunk_original_offset, start_chunk_compressed_offset, ending_chunk_compressed_offset> tuple using the following calls: start_chunk_original_offset, start_chunk_compressed_offset = MapFn(offset, false); ending_chunk_compressed_offset = MapFn(offset+length, true).chunk_coffset.
When mapped one needs to retrive chunks in <starting_chunk_compressed_offset, last_chunk_compressed_offset> range from File Stores, decompress them and extract desired data subset that is < offset - starting_chunk_original_offset, length>.
To provide desired mapping we can use a subset of <key,value> pairs ( name that Compression Info ) from object xattrs and update them on each append. Read request handling uses this Compression Information to perform the mapping.
Compression Info can have following structure:

Multiple <chunk_key,chunk_value> records. One per chunk.
chunk_key=CompressionInfo prefix + Original Chunk Offset
chunk_value=Compression Method indicator + Compressed Chunk Offset
Single <key, value> record ( let's name it Master Record ) to track total original/compressed data sizes and some other compression information if any ( e.g. preferred compression method or disable-compression indication flag ).
key = CompressionInfo prefix
value = OriginalDataSize_current CompressedDataSize_current + flags

Here is CompressionInfo sample: there was single 1Mb append followed by additional 128K + 1K ones, chunk size = 256K ( probably too much for the real life ), each chunk from the first append operations was compressed to 1K, others were not uncompressible, CompressionInfo prefix="@CI"

@CI_0 = zlib / 0                       //original length = 256K
@CI_262144 = zlib / 1024               //original length = 256K
@CI_524288 = zlib / 2048               //original length = 256K
@CI_786432 = zlib / 3072               //original length = 256K
@CI_1048576 = none / 4096              //original length = 128K
@CI_1179648 = none / 135168            //original length = 1K, compressed offset = 4K + 128K
@CI = 1180672 / 136192                 // =1179648 + 1024 / = 135168 + 1024

MapFn results will be:
MapFn(0, false) = <0, 0>
MapFn(0, true) = <262144, 1024>

MapFn(1, false) = <0, 0>
MapFn(4096, true) = <262144, 1024>

MapFn(262144, false) = <262144, 1024>
MapFn(262144, true) = <524288, 2048>

MapFn(262145, false) = <262144, 1024>
MapFn(262145, true) = <524288, 2048>

MapFn(1179649, false) = <1179648, 135168>
MapFn(1179649, true) = <1180672, 136192>

Additionally we might need to introduce another helper function that uses CompressionInfo to return chunk sizes ( both original and compressed ones):
ChunkSizesFn( offset ) -> <original chunk size, compressed chunk size> 

where 'offset' specifies original offset of data that desired chunk keeps, "original chunk size" - amount of uncompressed(original) data that chunk contains, "compressed chunk size" - amount of compressed data that chunk contains.
For the sample above ChunkOffsetFn() results are:
ChunkSizesFn(0) = <262144, 1024>
ChunkSizesFn(1) = <262144, 1024>

ChunkSizesFn(262144) = <262144, 1024>
ChunkSizesFn(262145) = <262144, 1024>

ChunkSizesFn(1179647) = <131072, 131072>
ChunkSizesFn(1179649) = <1024, 1024>

Schematically append request handling can be done as follows ( everything to be performed at EC pool layer):

1) Append operation handler at EC pool receives <object_id, data offset, data length, data octets> tuple that specifies destination object and appended data block.
2) Append handler retrieves object xattrs related to compression info ( actually Master Record is enough ). Retrieved xattrs are temporary preserved to be able to rollback if needed. This is absolutely similar to the Hash Info handling performed by the EC pool now.
3) Handler virtually splits incoming data block into chunks and compresses each chunk independently.
4) Each chunk is padded to stripe width. If there is no space saving for a specific chunk - it's marked as uncompressed.
5) Each chunk is passed to Erasure Coding function that produces several shards.
6) For each chunk :
a) handler adds a new CompressionInfo chunk record to xattrs (to some internal in-memory container not the actual storage at this point). Key value to contain chunk data offset, value - applied compression method ( including uncompressed option) and offset in compressed data set ( CompressedDataSize_current value )
b) handler updates <OriginalDataSize_current, CompressedDataSize_current> values from the Master Record: OriginalDataSize_current += chunk data length; CompressedDataSize_current += compressed chunk data size; This takes place in memory at this point - actual storage update happens along with shards saving.
7) All shards along with appended/updated xattrs are saved to respective file stores. This is existing shards/hash info saving stuff. The only thing we need to append is additional xattrs provision.

Read request handling can be performed as follows:
1) Read operation handler receives <object_id, data offset, data length> tuple that specifies source object and data block to read.
2) Handler retrieves object xattrs related to compression info. Master record and a subset of chunk records ( covering [data offset, data offset + data length] range ) should be preseved for subsequent use.
3) Handler obtains offsets in compressed data set for desired chunks:
start_chunk_compressed_offset = MapFn(data offset, false).chunk_coffset
ending_chunk_compressed_offset = MapFn(data offset+data length, true).chunk_coffset;
4) Handler retrieves specific data subset pinted out by <start_chunk_compressed_offset, ending_chunk_compressed_offset> from file stores and decodes it by Erasure Decode. This step is already present in EC pool, we just provide calculated data range instead of user-provided one.
5) Handler iterates over retrieved chunks data ( compressed_data below) and decompress each chunk independently. Chunk bounds can be calculated using ChunkSizesFn:

   uncompressed_data = {} 
   decompress_pos = 0
   starting_chunk_original_offset = chunk_offs = MapFn(data offset, false).chunk_offset 
   end_offs = MapFn(data offset + data length, true).chunk_offset 
   while( chunk_offs < end_offs ) {
     <chunk_size, chunk_csize > = ChunkSizesFn( chunk_offs );
     uncompressed_data += doDecompress( compressed_data, decompress_pos, chunk_csize)
     decompress_pos += chunk_csize
     chunk_offs += chunk_size

7) Handler extracts desired data subset from the collected uncompressed data block:
data_to_return = uncompressed_data.substr( data offset - starting_chunk_original_offset, length)
8) Handler returns data to the client

Please note that provided MapFn & ChunkSizesFn usage is primarily for descriptive purposes. In fact all the required information can be retrieved by single consolidated function call that performs a couple of lookups and partially iterates over the map only once.

Surely the suggested approach has limited applicability - compression takes place for Erasure Coding pools only. Replicated pool users don't benefit from that. But it looks like the suggested approach is the only one that is reasonably complex from implementation point of view. Any other approach would need specific means for random write access handling that requires much more redesigning and development efforts. At the same time compression and Erasure coding seems to be easily and naturally mergeable.

Work items
Add compression support to EC pool
Refactor existing message compression infrastructure to support pluggable compression modules.

Coding tasks
Task 1
Task 2
Task 3

Build / release tasks
Task 1
Task 2
Task 3

Documentation tasks
Task 1
Task 2
Task 3

Deprecation tasks
Task 1
Task 2
Task 3