Kafka handling large files

Narayana Basetty
2 min readDec 13, 2020

I describe here to handle a large file which pass through kafka software and to avoid data loss from producer to server more than 1 MB and split large file into chunks and aggregate at consumer and finally process it.

Kafka Handling large messages : This is a challenging problem statement.

Kafka how it works?

KAFKA IS A LOG APPEND STRUCTURE FILE SYSTEM, EACH Data APPENDS TO TAIL OF THE FILE AND THEN BUILDS A SEGMENT in a specified file system path.

http://kafka.apache.org/

Note:
Please google out for more details about kafka software

Kafka High level Architecture

Kafka can be tuned to handle large messages, so that kafka producers, brokers and consumers handled efficiently (A fine grained approach)

This can be done by configuring producer, broker, and consumer properties relating to maximum message and file sizes.

1. You try and reduce the size of messages first by applying compression formats.
2. Say if size of the reduced message still > 1MB then we need to build an algorithm to solve the problem.
3. Split your file into 1MB small chunks (1000000 bytes) [ex: 1000 x 1000]
Consider I have a json/xml file which is size : 1435KB
I can split this into 2 chunks[chunk 1, chunk 2]

Producer sends a chunk to the kafka broker on to the same partition(define a partition which I wish) and all these chunks which will have a message uuid, chunk type, chunk order , chunk size and aggregate at consumer side to form the original file data.
For example, if the original message is a text-based format (such as XML/JSON), in most cases the compressed message will be sufficiently small.
4. If producer published more than 1MB data then data loss occurs from kafka broker, can’t retrieve.

Producer Config:
linger.ms=100
# gzip, lz4, Snappy, and Zstandard
compression.type=snappy # if this is not set then increase consumer config max.partition.fetch.bytes
partitioner.class=<Name of your partition>

Server(Broker) Config:
# Default 1000012 bytes
message.max.bytes=1000000
replica.fetch.max.bytes=1048576 [This value should be greater than message.max.bytes]
log.segment.bytes=1073741824

Consumer Config:

# If producer compression applied this value could be lower 1-2MB
max.partition.fetch.bytes=10485760 # No compression then 10MB or lower as per your App needs
fetch.max.bytes=52428800 # 50MB

Sample info:

Assumption: I have a running kafka engine.

I have a file which has > 1MB [1435MB]
Topic name: topic_large_file
Partitions: 3
chunk size: 1000000
key header => uuid:<uuid>,type:<chunk/other>,chunk_no:<1/2 etc>,max_chunks: filesize/chunksize

Ex: Here 2 chunks
key : uuid:0557235b-1888–43a3–81c7–9c22674257fb,type:chunk,chunk_no:1,max_chunks:2, value size : 1000000, partition : 2
key : uuid:0557235b-1888–43a3–81c7–9c22674257fb,type:chunk,chunk_no:2,max_chunks:2, value size : 468893, partition : 2

I maintain a Java Map data structure which is used to process 1 complete file until then I store/update in a memory map, finally remove uuid of the message from map.

Tested successfully working as expected.

Happy Learning!!!.

--

--

Narayana Basetty

Principal Cloud, Enterprise Web/HDP Architect/Consultant with 18+ years of experience building scalable, distributed computing solutions for an enterprise.