Apache Kafka CCDAK Exam Notes Apache Kafka CCDAK Exam Notes

Page content

Hi Readers,

If you are planning or preparing for Apache Kafka Certification then this is the right place for you.There are many Apache Kafka Certifications are available in the market but CCDAK (Confluent Certified Developer for Apache Kafka) is the most known certification as Kafka is now maintained by Confluent.

CCDAK vs CCOAK

Confluent has introduced CCOAK certification recently. CCOAK is mainly for devOps engineer focusing on build and manage Kafka cluster. CCDAK is mainly for developers and Solution architects focusing on design, producer and consumer. If you are still not sure, I recommend to go for CCDAK as it is more comprehensive exam as compared to CCOAK. These exam notes are very helpful for both CCDAK and CCOAK certifications.

From here onward, we will talk about how to prepare for CCDAK.

Frequently Asked Questions


  • Prepare well for the exam as it verifies your theoretical as well as practical understanding of Kafka.
  • At least 40-50 hours of preparation is required.
  • You can register online and schedule exam on the Examity site. I suggest to set a goal of 1 to 2 months for exam preparation and register accordingly.
  • Confluent kafka certification price cost at $150 for one attempt. You need to pay the fee again in order to retake exam after a gap of at least 14 days.
  • You need to answer 60 multiple-choice questions in 90 minutes from your laptop (with webcam) under the supervision of online proctor.
  • There is no negative scoring so try to answer as many questions as possible.
  • There is no mention of number of questions need to be correct in order to pass the exam. Result will be shown immediately (PASS or FAIL) at the end of exam. No scoring or percentage is provided.
  • You will receive a certificate similar to my CCDAK certificate after passing the exam. What an achievement !!!
  • The certification expires after two years but you can still brag about it ;)
  • Don’t waste time searching for CCDAK Certification Dumps. There ain’t any.
  • Confluent has launched FREE Fundamentals Accreditation Exam which you can signup for free to better understand the CCDAK exam.

Exam Preparation


I have prepared for CCDAK using following:

  1. Apache Kafka Documentation
  2. Confluent Kafka Documentation
  3. Confluent Kafka Definitive Guide PDF
  4. Udemy Apache Kafka Series - Learn Apache Kafka for Beginners v2
  5. Udemy CCDAK 150 Practice Exam Questions

You should prepare well for following topics. It is recommended to study the topics in the same sequence.

Kafka Architecture
Read Confluent Kafka Definitive Guide PDF and Apache Kafka Documentation. Once you read both, revise using Kafka Architecture section in this post.

Kafka CLI
Read Confluent Kafka Definitive Guide PDF and revise using KAFKA CLI section of this post.

Kafka Streams
Read Confluent Kafka Definitive Guide PDF and revise using Kafka Streams section of this post.

Kafka Security
Read Apache Kafka Documentation Security Section

Kafka APIs
Read Apache Kafka Documentation API section and revise using KAFKA API section of this post.

Kafka Monitoring (Metrics)
Read Confluent Kafka Definitive Guide PDF and Apache Kafka Documentation for important metrics. Read Confluent Kafka Documentation as well.

Confluent Schema Registry
Read Confluent Kafka Documentation and revise using Confluent Schema Registry section of this post.

Confluent KSQL
Read Confluent Kafka Documentation KSQL and Kafka Streams section

Confluent REST Proxy
Read Confluent Kafka Documentation Rest Proxy Section

Sample Exam Questions


Please note that these are not the actual questions from the CCDAK exam but most likelihood to ask in exam.

1. Kafka Theory
  • Kafka is a …. ?
    pub-sub system
  • Mostly Kafka is written in which language?
    Scala
  • Which errors are retriable from Kafka Producer?
    LEADER_NOT_AVAILABLE, NOT_LEADER_FOR_PARTITION, UNKNOWN_TOPIC_OR_PARTITION
  • What is a generic unique id which can be used for a messages received from a consumer?
    Topic + Partition + Offset

Read Kafka Architecture section of this post for more questions and answers

2. Kafka Streams
  • To transform data from a Kafka topic to another one, You should use?
    Kafka Streams
  • Which of the Kafka Stream operators are stateful?
  • Which of the Kafka Stream operators are stateless?
  • Which window is not having gap?
  • Which Kafka Stream joins doesn’t require co-partition of data?
    KStream-to-GlobalKTable
  • Which Kafka Stream joins is always windowed join?
    KStream-to-KStream
  • What is the output of KStream-to-KTable join?
    KStream

Read Kafka Streams section of this post for answers

3. Confluent Schema Registry
  • Which of the following is not a primitive type of Avro?
  • Which of the following in not a complex type of Avro?
  • Which of the following is not a required field in Avro Schema?
  • Delete a field without default value in Avro schema is …… compatibility?
    backward
  • Adding a field to record without default value is …… schema evolution?
    forward
  • In Avro, removing or adding a field that has a default value is a …… schema evolution?
    full
  • What client protocols are supported for the schema registry?
    HTTP, HTTPS
  • Where are Avro schema stored in Confluent Schema Registry?
    _schemas Kafka topic

Read Confluent Schema Registry section of this post for answers

4. Confluent KSQL
  • is KSQL ANSI SQL Compliant?
    No
  • What Java library is KSQL based on?
    Kafka Streams
5. Kafka Security
  • What are the valid authentication mechanism in KAFKA?
    SSL
    SASL/GSSAPI (Kerberos)
    SASL/PLAIN
    SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512
    SASL/OAUTHBEARER

Kafka Architecture


☛ I have spent so much time preparing these notes. You can expect most of the questions related to Kafka architecture from these notes. ☛ Moreover, If you are preparing for interviews then you can also expect most of the Kafka interview questions from these notes.

Cluster

  • Cluster typically consist of mutliple Kafka brokers.
  • All Kafka brokers within a Cluster are managed and coordinated by Zookeeper.

Rack

  • Rack is a logical grouping of Kafka brokers within a Cluster.
  • Cluster can have multiple racks. Rack can have multiple brokers.
  • Broker belongs to a rack when property broker.rack=<rack-id> is specified at broker level. This enables the rack awareness feature which spreads replicas of the same partition across different racks.
  • Let’s say you have 6 brokers and 2 racks. Brokers 1, 2, 3 are on the rack_1, and brokers 4, 5, 6 are on rack_2.
    Now when you create a topic with 6 partition, instead of assigning broker to partition in order from 1, 2, 3, 4, 5, 6, each partition is assigned to each rack repeatedly i.e. 1, 4, 2, 5, 3, 6.
    ┌――――――――――――┐                  ┌――――――――――――┐
    |  rack_1    |                  |  rack_2    | 
    |――――――――――――|                  |――――――――――――|
    |  Broker 1  |◁― Partition 1    |  Broker 4  |◁― Partition 2
    |  Broker 2  |◁― Partition 3    |  Broker 5  |◁― Partition 4
    |  Broker 3  |◁― Partition 5    |  Broker 6  |◁― Partition 6
    └――――――――――――┘                  └――――――――――――┘
    

Broker

  • Every broker in Kafka is a bootstrap server which knows about all brokers, topics and partitions (metadata) that means Kafka client (e.g. producer,consumer etc) only need to connect to one broker in order to connect to entire cluster.
  • At all times, only one broker should be the controller, and one broker must always be the controller in the cluster

Topic

  • Kafka takes bytes as input without even loading them into memory (that’s called zero copy)
  • Brokers have defaults for all the topic configuration parameters

Partition

  • Topic can have one or more partition.
  • Each partition can have one or more replica decided by Replication Factor. Replication Factor = 3 means One leader partition and two replicas.
  • It is not possible to delete a partition of topic once created.
  • Order is guaranteed within the partition and once data is written into partition, its immutable!
  • If producer writes at 1 GB/sec and consumer consumes at 250MB/sec then requires 4 partition!

Segment

  • Partitions are made of segments (*.log files)
  • At a time only one segment is active in a partition
  • Segment stores the actual Kafka messages along with offset, timestamp, compression etc.
  • log.segment.bytes = 1 GB (default), Max size in bytes to close the segment and roll over to new segment.
  • log.segment.ms = 1 Week (default), Max time in ms to wait before closing the segment even if it is not full or reached Max Size.
  • Every segment also has two indexes (files):-
    1. An offset to position index (*.index file) - Allows kafka where to read to find a message
    2. A timestamp to offset index (*.timeindex file) - Allows kafka to find a message with a timestamp

Log Retention and Cleanup policies

  • Log cleanup happen on partition segments (.log files). Smaller or more segments means the log cleanup will happen more often!
  • Old segments are cleaned up based on time-based and size-based log retention policies whichever happens first.
  • Time Based Retention: The segment older by configured retention time eligible for Cleanup. You can use any of the three configuration log.retention.ms, log.retention.minutes, or log.retention.hours to specify retention period in ms, minutes or hours. If ms is not set, minutes is used. If minutes is not set, hours is used. Default retention time is 1 Week.
  • Size Based Retention: The older segment are cleaned up when the max configured size of a topic partition (includes all segments), is reached. You can use log.retention.bytes=-1 (default is infinite) to configure max size in bytes.
  • Cleanup process checks any logs to cleanup in every log.cleaner.backoff.ms=15 seconds (default).
  • Cleanup Policy: Logs are deleted, compacted, or both based on log.cleanup.policy. Delete policy discard the old segments when their retention time or size is reached. Compact policy delete the old messages per key and keep the latest copy for that key.
  • Deleted records can still be seen by consumers for a period of delete.retention.ms=24 hours (default)

Offset

  • Each Topic Partition is having its own offset starting from 0.

Topic Replication

  • Replication factor = 3 and partition = 2 means there will be total 6 partition distributed across Kafka cluster. Each partition will be having 1 leader and 2 ISR (in-sync replica).
  • Broker contains leader partition called leader of that partition and only leader can receive and serve data for partition.
  • Replication factor can not be greater then number of broker in the kafka cluster. If topic is having a replication factor of 3 then each partition will live on 3 different brokers.

Producer

  • Kafka Producer automatically recover from following retriable errors:
    LEADER_NOT_AVAILABLE
    NOT_LEADER_FOR_PARTITION
  • Kafka Producer throw error for following non-retriable errors:
    OFFSET_OUT_OF_RANGE
    BROKER_NOT_AVAILABLE
    MESSAGE_TOO_LARGE
    INVALID_TOPIC_EXCEPTION
  • If you send a message of size 3 MB to a topic with default message size configuration. Then producer will throw MessageSizeTooLarge exception immediately since it is not a retriable exception.
  • When produce to a topic which doesn’t exist and auto.create.topic.enable=true then kafka creates the topic automatically with the broker/topic settings num.partition and default.replication.factor

Producer Acknowledgment

acks Producer… Data Loss
acks=0 do not wait for acknowledgement possible data loss
acks=1 wait for leader acknowledgement limited data loss
acks=all wait for leader+replica acknowledgement no data loss

acks=all must be used in conjunction with min.insync.replicas which can be set at broker or topic level.
*( assuming that replicas are distributed across 3 brokers for below points )

  • min.insync.replica only matters if acks=all
  • acks=all, min.insync.replica=2 implies that at least 2 brokers that are ISR (including leader) must acknowledge
  • acks=all, min.insync.replica=1 implies that at least 1 brokers that is ISR (including leader) must acknowledge
  • A kafka topic with replication.factor=3, acks=all, min.insync.replicas=2 can only tolerate 1 broker going down, otherwise the producer will receive an exception NOT_ENOUGH_REPLICAS on send.
  • A kafka topic with replication.factor=3, acks=all, min.insync.replicas=1, can tolerate maximum number of 2 brokers going down, so that a producer can still produce to the topic.

Producer Configuration

  • Mandatory properties to configure Kafka producer is as follows:
    bootstrap.servers
    key.serializer
    value.serializer

Safe Producer Configuration

  • min.insync.replicas=2 (set at broker or topic level)
  • retries=MAX_INT number of reties by producer in case of transient failure/exception. (default is 0)
  • max.in.flight.per.connection number=5 number of producer request can be made in parallel (default is 5)
  • acks=all
  • enable.idempotence=true producer send producerId with each message to identify for duplicate msg at kafka end. When kafka receives duplicate message with same producerId which it already committed. It do not commit it again and send ack to producer (default is false)

High Throughput Producer using compression and batching

  • compression.type=snappy value can be none(default), gzip, lz4, snappy. Compression is enabled at the producer level and doesn’t require any config change in broker or consumer Compression is more effective in case of bigger batch of messages being sent to kafka
  • linger.ms=20 Number of millisecond a producer is willing to wait before sending a batch out. (default 0). Increase linger.ms value increase the chance of batching.
  • batch.size=32KB or 64KB Maximum number of bytes that will be included in a batch (default 16KB). Any message bigger than the batch size will not be batched

Message Key

  • Producer can choose to send a message with key.
  • If key=null, data is sent in round robin fashion
  • If key!=null and has some value, then all the message for that key always go to the same partition. This can be used to order the messages for a specific key since order is guaranteed in same partition.
  • Adding a partition to the topic will loose the guarantee of same key go to same partition.
  • Keys are hashed using murmur2 algorithm by default.

Message Size

  • Default Message max size is 1MB
  • If you try to send message > 1MB then MessageSizeTooLargeException is thrown
  • Suppose you want to send a 15MB of message from producer to broker to consumer successfully then you need to configure:-
    1. message.max.bytes=15728640 (broker/topic config) - is max size of message that can be received by the broker from producer.
    2. replica.fetch.max.bytes=15728640 (broker config) - is max size of message that can be replicated across brokers.
    3. fetch.message.max.bytes=15728640 (consumer config) - is max size of message that can be fetched by consumer.

Consumer

  • Per thread one consumer is the rule. Consumer must not be multi threaded.
  • Each consumer is assigned to different partition in same consumer group.
    • If there are 5 consumers of same consumer group consuming from a topic with 10 partition then 2 partitions will be assigned to each consumer and no consumer will remain idle.
    • If there are 10 consumers of same consumer group consuming from a topic with 5 partition then 5 partition will be assigned to 5 consumers and rest 5 consumers will remain idle.
  • records-lag-max (monitoring metrics) The maximum lag in terms of number of records for any partition in this window. An increasing value over time is your best indication that the consumer group is not keeping up with the producers.

Consumer Group

  • If two applications want to process all the messages independently from a kafka topic having 4 partition, then you should create 2 consumer groups with 4 consumers in each group for optimal performance.

Consumer Offset

When consumer in a group has processed the data received from Kafka, it commits the offset in Kafka topic named _consumer_offset which is used when a consumer dies, it will be able to read back from where it left off.

Delivery Semantics

  • At most once Offset are committed as soon as message batch is received. If the processing goes wrong, the message will be lost (it won’t be read again)
  • At least once (default) Offset are committed after the message is processed. If the processing goes wrong, the message will be read again. This can result in duplicate processing of message.
    • Make sure your processing is idempotent. (i.e. re-processing the message won’t impact your systems). For most of the application, we use this and ensure processing are idempotent.
  • Exactly once Can only be achieved for Kafka=>Kafka workflows using Kafka Streams API. For Kafka=>Sink workflows, use an idempotent consumer.

Consumer Offset commit strategy

  • enable.auto.commit=true & synchronous processing of batches with auto commit, offset will be committed automatically for you at regular interval (auto.commit.interval.ms=5000 by default) every time you call .poll(). If you don’t use synchronous processing, you will be in “at most once” behavior because offsets will be committed before your data is processed.
  • enable.auto.commit=false & manual commit of offsets (recommended)

Consumer Offset reset behavior

  • auto.offset.reset=latest will read from the end of the log
  • auto.offset.reset=earliest will read from the start of the log
  • auto.offset.reset=none will throw exception of no offset is found
  • Consumer offset can be lost if hasn’t read new data in 7 days. This can be controlled by broker setting offset.retention.minutes

Consumer Poll Behavior

  • fetch.min.bytes = 1 (default), Control how much data you want to pull at least on each request. Help improving throughput and decreasing request number. At the cost of latency.
  • max.poll.records = 500 (default), Controls how many records to receive per poll request. Increase if your messages are very small and have a lot of available RAM.
  • max.partition.fetch.bytes = 1MB (default), Maximum data returned by broker per partition. If you read from 100 partition, you will need a lot of memory (RAM)
  • fetch.max.bytes = 50MB (default), Maximum data returned for each fetch request (covers multiple partition). Consumer performs multiple fetches in parallel.

Consumer Heartbeat Thread

  • Heartbeat mechanism is used to detect if consumer application in dead.
  • session.timeout.ms=10s (default), If heartbeat is not sent in 10 second period, the consumer is considered dead. Set lower value to faster consumer rebalances
  • heartbeat.interval.ms=3s (default), Heartbeat is sent in every 3 seconds interval. Usually 1/3rd of session.timeout.ms

Consumer Poll Thread

  • Poll mechanism is also used to detect if consumer application is dead.
  • max.poll.interval.ms = 5min (default), Max amount of time between two .poll() calls before declaring consumer dead. If processing of message batch takes more time in general in application then should increase the interval.

Kafka Guarantees

  • Messages are appended to a topic-partition in the order they are sent
  • Consumer read the messages in the order stored in topic-partition
  • With a replication factor of N, producers and consumers can tolerate upto N-1 brokers being down
  • As long as number of partitions remains constant for a topic ( no new partition), the same key will always go to same partition

Client Bi-Directional Compatibility

  • an Older client (1.1) can talk to Newer broker (2.0)
  • a Newer client (2.0) can talk to Older broker (1.1)

Kafka Connect

  • Source connect Get data from common data source to Kafka for e.g. import data from external database to kafka
  • Sink connect Publish data from Kafka to common data source for e.g. export data from Kafka to external database

Zookeeper

  • ZooKeeper servers will be deployed on multiple nodes. This is called an ensemble. An ensemble is a set of 2n + 1 ZooKeeper servers where n is any number greater than 0. The odd number of servers allows ZooKeeper to perform majority elections for leadership. At any given time, there can be up to n failed servers in an ensemble and the ZooKeeper cluster will keep quorum. If at any time, quorum is lost, the ZooKeeper cluster will go down.

  • Zookeeper cluster to withstand loss of 2 server, require total of 2*2+1 = 5 servers.

  • In Zookeeper multi-node configuration, initLimit and syncLimit are used to govern how long following ZooKeeper servers can take to initialize with the current leader and how long they can be out of sync with the leader. If tickTime=2000, initLimit=5 and syncLimit=2 then a follower can take (tickTimeinitLimit) = 10000ms to initialize and may be out of sync for up to (tickTimesyncLimit) = 4000ms

  • In Zookeeper multi-node configuration, The server.* properties set the ensemble membership. The format is

    server.<myid>=<hostname>:<leaderport>:<electionport>, where:

    • myid is the server identification number. In this example, there are three servers, so each one will have a different myid with values 1, 2, and 3 respectively. The myid is set by creating a file named myid in the dataDir that contains a single integer in human readable ASCII text. This value must match one of the myid values from the configuration file. If another ensemble member has already been started with a conflicting myid value, an error will be thrown upon startup.
    • leaderport is used by followers to connect to the active leader. This port should be open between all ZooKeeper ensemble members.
    • electionport is used to perform leader elections between ensemble members. This port should be open between all ZooKeeper ensemble members.

KAFKA CLI


  1. kafka-topics.sh allows you to create, modify, delete, and list information about topic in the cluster.
  2. kafka-console-producer.sh allows you to write messages into a kafka topic in your cluster.
  3. kafka-console-consumer.sh allows you to consume messages out of one or more topics in your Kafka cluster.
  4. kafka-consumer-groups.sh allows you to list consumer groups, describe, specific groups, delete consumer groups or specific group info, or reset consumer group offset information.
  5. kafka-configs.sh allows you to change the configuration dynamically for --entity-types topics, brokers, users, and clients.

You are required to provide the cluster connection string and port through the --bootstrap-server option. Provide cluster information through -zookeeper for older version of Kafka. Zookeeper is deprecated.

Following are few examples of Kafka command line:-

① Start a zookeeper at default port 2181

$bin/zookeeper-server-start.sh config/zookeeper.properties

Please note that Zookeeper is no longer required to manage Apache Kafka cluster in latest version. You still need it for older versions.

② Start a kafka server at default port 9092

$bin/kafka-server-start.sh config/server.properties

③ Create a kafka topic ‘my-first-topic’ with 3 partitions and 3 replicas

$bin/kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --topic my-first-topic \
    --create \
    --replication-factor 3 \
    --partitions 3

④ List all kafka topics in a cluster

$bin/kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --list

⑤ Modify kafka topic ‘my-first-topic’

$bin/kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --topic my-first-topic \
    --partitions 9 \
    --alter

This will add 6 more partition to the topic, which is already having 3 partition

⑥ Delete kafka topic ‘my-first-topic’

$bin/kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --topic my-first-topic \
    --delete

This will have no impact if delete.topic.enable is not set to true

⑦ Describe kafka topic ‘my-first-topic’ details

$bin/kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --topic my-first-topic \
    --describe

Find out all the partitions without a leader

$bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--describe --unavailable-partitions

⑧ Produce messages to Kafka topic ‘my-first-topic’

$bin/kafka-console-producer.sh \
    --bootstrap-server localhost:9092 \
    --topic my-first-topic \
    --producer-property acks=all 
> message 1  
> message 2  
> ^C

⑨ Start Consuming messages from kafka topic ‘my-first-topic’

$bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic my-first-topic \
    --from-beginning
> message 1  
> message 2

⑩ Start Consuming messages in a consumer group from kafka topic ‘my-first-topic’

$bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic my-first-topic \
    --group my-first-consumer-group \
    --from-beginning

⑪ List all consumer groups

$bin/kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --list

⑫ Describe consumer group

$bin/kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    -group my-first-consumer-group \
    --describe 

⑬ Reset offset of consumer group to replay all messages

$bin/kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --group my-first-consumer-group \
    --reset-offsets --to-earliest \
    --topic my-first-topic \
    --execute

⑭ Shift offsets by 2 (forward) of a consumer group

$bin/kafka-consumer-groups \
    --bootstrap-server localhost:9092 \
    --group my-first-consumer-group \
    --reset-offsets --shift-by 2 \
    --topic my-first_topic \
    --execute

You can shift offsets by 2 (backward) using --shift-by -2

⑮ Setting the retention for the topic named ‘my-first-topic’ to 1 hour (3,600,000 ms):

$bin/kafka-configs.sh \
    --bootstrap-server localhost:9092
    --entity-type topics --entity-name my-first-topic
    --add-config retention.ms=3600000
    --alter

Kafka Streams


Kafka Streams is used for building streaming applications which transform data of one Kafka topics and feeds to another Kafka topic.

1. Stateless Operators

branch
filter
inverseFilter
flatMap
flatMapValues
foreach
groupByKey
groupBy
map
mapValues

2. Stateful Operators

join
aggregate
count
reduce
windowing

3. Window

1) Tumbling window
  • Time based, Fixed Size, Non overlapping, Gap less windows
  • For e.g. if window-size=5min and advance-interval =5min then it looks like [0-5min] [5min-10min] [10min-15min]…..
2) Hopping window
  • Time based, Fixed Size, Overlapping windows
  • For e.g. if widow-size=5min and advance-interval=3min then it looks like [0-5min] [3min-8min] [6min-11min]……
3) Sliding window
  • Fixed size overlapping window that works on the difference between record timestamp
  • Used only for join operation
4) Session window
  • Session based, Dynamically sized, Non overlapping, Data driven window.
  • Used to aggregate key based events into session.

For more information, refer Apache Kafka Documentation on windowing

4. SerDes data types

Kafka stream operations require SerDes (Serializer/Deserializer) to identify data type.

byte[]
ByteBuffer
Double
Integer
Long
String

5. Streams DSL

1) KStream
  • Abstraction of record stream from subset of partitions of topic
  • In database table analogy, interpreted as INSERT statement
  • In an e-commerce application, any type of transactions like purchase, payment should be modeled as KStream
2) KTable
  • Abstraction of changelog stream from subset of partitions of topic
  • In database table analogy, interpreted as UPDATE statement
  • In an e-commerce application, mostly static data like inventory list, customer list and aggregated data like total sales should be modeled as KTable
3) GlobalKTable
  • Abstraction of changelog stream from all partitions of topic
  • In database table analogy, interpreted as UPDATE statement

For more information, refer Apache Kafka Documentation on stream DSL

6. Join Operands

Join Operands Output Type co-partition required Join Type
KStream-to-KStream KStream Windowed Yes key and window based
KTable-to-KTable KTable Non-windowed Yes key or foreign-key based
KStream-to-KTable KStream Non-windowed Yes key based
KStream-to-GlobalKTable KStream Non-windowed No key or foreign-key based
co-partition

co-partition means both the left and right join operand topics must have same number of partitions.

A join between a topic A ( 5 parition ) and topic B (3 partition) is possible using KStream-to-GlobalKTrade since it does not require co-partition.

KAFKA API


  • Click here to find out how we can create a Safe and high throughput Kafka Producer using Java.
  • Click here to find out how we can create a Kafka consumer using Java with manual auto commit enabled.

Confluent Schema Registry


1. Avro

Primitive Types
1. null
2. boolean
3. int (32 bit)
4. long (64 bit)
5. float (32 bit)
6. double (64 bit)
7. byte[] (8 bit)
8. string (char sequence)
Complex Types
1. record
2. enum
3. array
4. map
5. union
6. fixed
Avro Schema Definition
namespace (required)
type (required) => record, enum, array, map, union, fixed
name (required)
doc (optional)
aliases (optional)
fields (required) {
    name (required)
    type (required)
    doc (optional)
    default (optional)
    order (optional)
    aliases (optional)
}

2. Confluent Schema Notes

  • Schema Registry stores all schemas in a Kafka topic _schemas defined by kafkastore.config = _schemas (default) which is a single partition topic with log compacted.
  • The default response media type application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json are used in response header.
  • HTTP and HTTPS client protocol are supported for schema registry.
  • Prefix to apply to metric names for the default JMX reporter kafka.schema.registry
  • Default port for listener is 8081
  • Confluent support primitive types of null, Boolean, Integer, Long, Float, Double, String, byte[], and complex type of IndexedRecord. Sending data of other types to KafkaAvroSerializer will cause a SerializationException

3. Confluent Schema Compatibility Types

BACKWARD
  • Consumer using schema X can process data produced with schema X or X-1. In case of BACKWARD_TRANSITIVE, consumer using schema X can process data produced with all previous schema X, X-1, X-2 and so on
  • Delete field without default value (Required field) is allowed. In this case, Consumer ignore this field.
  • Add field with default value (Optional field) is allowed. In this case, Consumer will assign default value.
  • BACKWARD is default compatibility type in confluent schema registry.
  • There is no assurance that consumers using older schema can read data produced using the new schema. Therefore, upgrade all consumers before you start producing new events.
FORWARD
  • Data produced using schema X can be ready by consumers with schema X or X-1. In case of FORWARD_TRANSITIVE, data produced using schema X can be ready by consumers with all previous schema X, X-1, X-2 and so on
  • Add field without default value (Required field) is allowed. In this case, Consumer ignore this field.
  • Delete field with default value (Optional field) is allowed. In this case, Consumer will assign default value.
  • There is no assurance that consumers using the new schema can read data produced using older schema. Therefore, first upgrade all producers to using the new schema and make sure the data already produced using the older schema are not available to consumers, then upgrade the consumers.
FULL
  • Backward and forward compatible between schema X and X-1. In case of FULL_TRANSITIVE, backward and forward compatible between all previous schema X and X-1 and X-2 and so on
  • Modify field with default value (Optional field) is allowed.
  • There are assurances that consumers using older schema can read data produced using the new schema and that consumers using the new schema can read data produced using older schema. Therefore, you can upgrade the producers and consumers independently.
NONE
  • Compatibility type means schema compatibility checks are disabled.
  • Upgrading Consumer or Producer depends. For example, modifying a field type from Number to String. In this case, you will either need to upgrade all producers and consumers to the new schema version at the same time

Default Ports


  • Zookeeper Client Port: 2181
  • Zookeeper Leader Port: 3888
  • Zookeeper Election Port (Peer port): 2888
  • Broker: 9092
  • REST Proxy: 8082
  • Schema Registry: 8081
  • KSQL: 8088