Streaming Audio: Apache Kafka® & Real-Time Data

Optimizing Apache Kafka's Internals with Its Co-Creator Jun Rao

April 28, 2022 Confluent, original creators of Apache Kafka® Season 1 Episode 211
Streaming Audio: Apache Kafka® & Real-Time Data
Optimizing Apache Kafka's Internals with Its Co-Creator Jun Rao
Show Notes Transcript Chapter Markers

You already know Apache Kafka® is a distributed event streaming system for setting your data in motion, but how does its internal architecture work? No one can explain Kafka’s internal architecture better than Jun Rao, one of its original creators and Co-Founder of Confluent. Jun has an in-depth understanding of Kafka that few others can claim—and he shares that with us in this episode, and in his new Kafka Internals course on Confluent Developer. 

One of Jun's goals in publishing the Kafka Internals course was to cover the evolution of Kafka since its initial launch. In line with that goal, he discusses the history of Kafka development, including the original thinking behind some of its design decisions, as well as how its features have been improved to better meet its key goals of durability, scalability, and real-time data. 

With respect to its initial design, Jun relates how Kafka was conceived from the ground up as a distributed system, with compute and storage always maintained as separate entities, so that they could scale independently. Additionally, he shares that Kafka was deliberately made for high throughput since many of the popular messaging systems at the time of its invention were single node, but his team needed to process large volumes of non-transactional data, such as application metrics, various logs, click streams, and IoT information.

As regards the evolution of its features, in addition to others, Jun explains these two topics at great length:

  • Consumer rebalancing protocol: The original "stop the world" approach to Kafka's consumer rebalancing—although revolutionary at the time of its launch, was eventually improved upon to take a more incremental approach.
  • Cluster metadata: Moving from the external ZooKeeper to the built-in KRaft protocol allows for better scaling by a factor of ten. according to Jun, and it also means you only need to worry about running a single binary.

The Kafka Internals course consists of eleven concise modules, each dense with detail—covering Kafka fundamentals in technical depth. The course also pairs with four hands-on exercise modules led by Senior Developer Advocate Danica Fine. 

EPISODE LINKS

Kris Jenkins: (00:00)
How much do you know about how Kafka works under the hood? I mean, you can go a long way just knowing how it works logically, but it's like anything. If you want to get the best out of it, it does pay to peek inside and learn something about how it does what it does.

Kris Jenkins: (00:16)
Well, for this week's episode of Streaming Audio, we've brought in a real expert, Jun Rao. He's been working on Kafka since the very beginning. He's one of the co-founders of Confluent. And if you want to peer under the hood of Kafka? Well, this week we're getting a guided tour by the guy who helped design the car.

Kris Jenkins: (00:35)
He's joining us fresh from recording an in-depth series of tutorials about Kafka's internals, so it's front of his mind. And if you find this podcast whets your appetite for gory details, you can learn even more from Jun over at our educational site. That's developer.confluent.io.

Kris Jenkins: (00:53)
The course is up now. It's completely free, and it's split up into logical modules so that you can find the parts that are most interesting to you. Although having said that, I've been going through the whole thing and I've learned something from every module. So if you've got the time, it's really worth it.

Kris Jenkins: (01:08)
But for now, let's listen to the man himself as he shines a light on some of Kafka's Internals. My guest today is Jun Rao, who is one of the original creators of Kafka, the original committers. The co-founder of Confluent, our company, and still an active developer on Kafka itself. I was stalking you on GitHub yesterday and you reviewed two pull requests.

Jun Rao: (01:44)
That's right.

Kris Jenkins: (01:45)
So I know you're currently active. Welcome back to the show, Jun.

Jun Rao: (01:50)
Thanks, Kris. Good to see you again.

Kris Jenkins: (01:53)
Likewise. To frame this, you have recently launched on our developer education site, a course all about Kafka Internals. How it actually works under the hood. I've watched most of it and it's a super-rich meal full of information.

Kris Jenkins: (02:14)
I thought, whilst we can't hope to do the whole course in this podcast, maybe I can pull some juicy nuggets out of your brain in the next hour?

Jun Rao: (02:23)
Yeah.

Kris Jenkins: (02:26)
Let's start with this. Let's gradually work our way into the depth of Kafka. What is a Kafka broker? By that, I mean, what's it responsible for and what's it not responsible for? What's its boundary?

Jun Rao: (02:42)
First of all, I think about the course. Over the years ... We started with the architecture of Kafka, but over the years, we actually have been improving architecture in various ways. We actually improved a lot of details to make Kafka performing more real time.

Jun Rao: (02:59)
There's actually ... A lot of work went into Kafka's internals. I thought it's probably a good idea just to summarize what we have done, so that most people would understand a bit of how the core things work. In particular, for the broker side. When we started with Kafka, we deliberately had this design of separating the compute from the storage.

Jun Rao: (03:26)
It's like a lot of other systems. Like a database, where you have someplace you need to store the data, but you also need to do some processing for the data that you are storing. In the case of Kafka, we store data in real-time as a continuous stream. It's like a log. That's essentially the storage part.

Jun Rao: (03:47)
The second part is, how do you leverage that storage? What do you do with that stream of data coming in? Well, that's the real-time processing part. Typically, you want to ... There's a lot of ETR-style processing that's moving towards real-time. You want to do this continuously. This can be as simple as doing some filtering projection, but can be more complicated stuff like joining two streams together, compute some window-based aggregation, and so and so forth.

Jun Rao: (04:22)
When we designed Kafka, we said, "Okay. The broker layer is really in some sense the storage layer." We want that to be the place where we store the data. And then, we want that to be good at delivering the data based on what a user needs. But it doesn't do too much of the processing. All the processing of the data is done essentially on the client side.

Jun Rao: (04:50)
That's the computation layer we have. This can be as simple as just a simple application based on our consumer API. But it can also be a more sophisticated application based on Kafka Streams, where you do some of those more complex computational processing that I mentioned earlier. Or it can be a ksqlDB layer.

Jun Rao: (05:15)
All the more advanced processing will be done in that layer. What the broker is doing is really two things. One, is to be able to store the incoming data as a stream in real-time as reliable as possible. And then, when the user needs to make a subscription of the data, it can deliver the changes to the user in an efficient and real-time way.

Kris Jenkins: (05:44)
We've got this ladder of abstraction climbing up for processing, but that's completely separate from the broker?

Jun Rao: (05:50)
Yeah. I think the idea for that is ... By having this separation, you can scale them out separately. Because Kafka is designed as a distributed system. Both the broker and the clients can run in a distributed way, so that you can scale out the resources as you need. By decoupling the broker from the processing layer, now you can scale them independently.

Jun Rao: (06:18)
If there is some storage need, you can scale out a broker. And if you want to scale out processing, then you can just scale out the application layer. This also actually provides a little bit better isolation. Because in some of the cases, when you do the processing, you may want to run a little bit of ad hoc logic that's specific to the user's logic.

Jun Rao: (06:44)
By running this outside broker, it just protects the different parts better. Because if you have some issues with your application-level code, it's only impacting your application, but not really impacting the broker. What runs on a broker is really some pre-defined, limited capability to retrieve that data.

Kris Jenkins: (07:07)
Right. Let's dive a bit into the broker side of the responsibility. You said it's worrying about real-time and durable and scalable. What are the principle techniques you employ for those three guarantees or aims?

Jun Rao: (07:33)
Let's talk about ... The first thing we really wanted to have in Kafka is to really solve that high throughput issue, because a lot of the traditional messaging system is really designed as a single node system. It wasn't designed to handle those high volume of event streams that's common in some of the modern architectures.

Kris Jenkins: (07:59)
You're there in the early days of LinkedIn, facing exactly that problem.

Jun Rao: (08:04)
That's right. Because I think there you have to be dealing with not only just a traditional, transactional type of data that's stored in the database. Because you are dealing with all this magnitude of more volume and data, that are non-transactional in nature.

Jun Rao: (08:18)
These are like clickstreams. These are application metrics, various logs, IoT information. These are as useful information as transactional information for all the data analytics you want to do. But volume wise, it's just far bigger.

Kris Jenkins: (08:39)
Yep.

Jun Rao: (08:41)
The way this is achieved in Kafka is just ... Fundamentally, this is actually designed from the ground up as a distributed system. By running multiple of those brokers, typically in a cluster, you can distribute the load among those brokers.

Jun Rao: (09:00)
Then, you need to have a unit to distribute your entities or resources. And that's the concept of topic and partitions. That's the first thing.

Jun Rao: (09:12)
By having this thing together, you still have the architecture that you can scale out as you need on the broker side. That is probably one of the fundamental reasons for achieving this high throughput.

Kris Jenkins: (09:29)
Is that why, from the early days, you had this key-value basic unit where we're going to shard based on key?

Jun Rao: (09:37)
That's part of the reason. One is, if you have keys, then you can do a little bit of partitioning based on semantics. This actually is useful for a few cases. It can be used for co-locating records with the same key together. This actually can be useful for some of the processing.

Jun Rao: (10:05)
If you want to do things like aggregating a bunch of values against a particular key, having all the records with the same key being landed in the same partition and consumed by a single consumer is definitely convenient for doing this kind of computation.

Jun Rao: (10:23)
It's also useful for ordering guarantees, which is also important. Because a lot of applications, they don't necessarily want a global ordering. But they do want some ordering within a subdomain.

Jun Rao: (10:38)
Often, it's per key. Think of a key. Maybe per customer. Or maybe it can be a particular user or a particular session. Having the ability to be guaranteed that everything happens within that key order strictly is useful for building some of those applications as well.

Kris Jenkins: (11:05)
That makes sense. You've got my clickstream and my instant messages stream keyed by my LinkedIn username. All that data is going to be ordered for me and on the same node for me.

Jun Rao: (11:23)
Exactly. For example, let's say, if we understand ... What's a user's watching behavior of, let's say, Comcast? Having a particular Comcast user's watching session coming in order makes your understanding of that behavior probably easier. Because you know this is actually a particular ordering that the user has been seeing the content.

Kris Jenkins: (11:55)
Makes it far more processable. But then, I see that you've immediately got a problem. Because once you've split something out, you've got to coordinate.

Kris Jenkins: (12:08)
Once you've charted your database into lots of different machines, you then have a coordination partitioning problem, which I think you go into in the Internals Course. Solving that quite cleverly.

Jun Rao: (12:23)
I think that's one of the things for ... When a producer first sends the data, you certainly need to know which partition it goes to. And then, this can be done based on key, if the key is provided, which will guarantee some of semantics I mentioned earlier. Either it's co-locating or ordering.

Jun Rao: (12:50)
If you don't care, we have freedom just to spread data more evenly. That's on the path when the data is coming in. Now, on the way out ... When you consume the data into the consumer applications, the same thing is happening. I think one thing with the concept partitioning is it actually allows a lot of parallelisms. Independent of the number of brokers on the consumer application side.

Jun Rao: (13:26)
Because the degree of parallelism you have is really the number of partitions that you are consuming, which can be far more than the number of brokers on the server side. What we see is ... In a lot of applications, when they process those messages or records, their bottleneck is typically not on the server. It's really on the application.

Jun Rao: (13:48)
For each of the records they are consuming, sometimes they have to do some expensive logic. Some reason could be, they have a legacy system that they have to interact with that has high latency. The easiest way to get around that is maybe to have more parallelism, so you can hide that latency.

Jun Rao: (14:09)
For things like that, the consumer application often requires a lot of flexibility in terms of having more degree of parallelism. And then, having partition is a great way for achieving this parallelism for those consumer applications.

Kris Jenkins: (14:30)
You've got this deep relationship between the number of partitions and the way consumers handle load balancing?

Jun Rao: (14:38)
That's right.

Kris Jenkins: (14:42)
Maybe we should go a bit into that protocol? Because it gets us into the world of how you deal with recovery and downtime. This protocol of load balancing a consumer group.

Jun Rao: (15:03)
Well, in terms of recovery and load balancing. The first thing. On the broker side, a big part of the requirement is to have this high availability guarantee. Because Kafka is designed as a real-time system. People want that to be available all the time. And then, the fact that it's designed as a distributed system just means there are a lot of servers you have to deal with.

Jun Rao: (15:36)
At any particular point of time, there could be a broker that's down. Maybe you are taking it down for maintenance. Maybe there's some hardware issue. Even in those cases, we need the ability to continue to provide this high availability of our service. A big part of how we achieve that is the redundancy capability we added on the broker side.

Jun Rao: (16:04)
On the broker side, we have the ability for you to have each of those topic partitions replicated. If you enable that, which I think pretty much everybody enables that now, you will have the same record be redundantly stored on multiple of those brokers.

Jun Rao: (16:24)
Then, if one of those brokers goes down, you can be assured that the service will still be available on another copy of the same data. That's one of the things to make our service truly highly available on the server side.

Jun Rao: (16:46)
Of course, if there's a real failure of a particular broker, we also have this recovery logic. To make sure, once the failed broker comes back, it can be completely in sync with the rest of the brokers again.

Kris Jenkins: (17:03)
Yep.

Jun Rao: (17:04)
Then, it comes with the same high availability guarantee with the consumer application. The same thing happens with the consumer application. Because typically, when you run the consumer application as I mentioned earlier, you typically will run multiple instances of that for better parallelism.

Jun Rao: (17:29)
And then, we have this nice protocol, which is actually ... When it was first invented, it was really a revolutionary thing compared with the state-of-the-art around that time, which is we can magically distribute a load of all the topic partitions this particular application is interested in subscribing evenly among those instances.

Jun Rao: (17:59)
And if one of those incidents goes down, we can actually magically reshuffle the load among the surviving instances. The same thing. If you have a new instance added, again, we'll just magically redistribute the data among those instances. Everything happens dynamically, automatically for the user.

Jun Rao: (18:22)
The user actually doesn't have to do anything when they change the number of instances. This is actually a pretty big deal for our users. And it's pretty convenient, but the effective way of consuming those data in a distributed way.

Kris Jenkins: (18:36)
But even that ... You say that was revolutionary at the time, but I know you go into your course that the rebalancing protocol itself has evolved quite a lot in the last decade.

Jun Rao: (18:52)
Initially, we designed this like a protocol, so that all those independent instances in the consumer application can coordinate among themselves. Now, how do they coordinate? Well, to make some decisions, you need a coordinator. Right? To coordinate this distributed effort.

Jun Rao: (19:16)
That's the consumer group coordinator capability we added on the broker side. And its responsibility is to understand, "How many instances are there in that particular consumer group?" And then, "What are the things they are in interested in?" Then, the coordinator is responsible for coordinating the dividing of the load among those instances.

Jun Rao: (19:40)
And then, after that, it's also responsible for keeping track if those consumer instances are still alive or if any of the new instances have been joining. That's a lot of logic that's been added into the coordinator logic, which is pretty useful. Over time, what we found is ... There are just a couple of things, for some of the more advanced use cases, where the initial design was a bit lacking.

Jun Rao: (20:17)
The first thing is, every time when you have to do another rebalance. This could be either because an existing instance is dead or maybe a new instance is added. You have to shuffle the work a little bit among those instances. As part of that ... The naive way, which the old protocol was doing, is to stop everybody what they're doing now. And then, clear their state.

Jun Rao: (20:54)
Because for some of the applications, as part of processing, you need to maintain a little bit the state associated with the data that you are processing. So if you are owning a particular partition, sometimes you may need to maintain the corresponding state for that partition.

Jun Rao: (21:09)
Sometimes it even would just say, "Okay. Since we know, we have to redistribute work." We don't know what that work would be. We'll just clear up everything upfront.

Kris Jenkins: (21:21)
Stop the world [crosstalk 00:21:21]. Pretend you never did it and start again.

Jun Rao: (21:25)
That's right. And then, you let the rebalance complete. And then, you get this new distribution of work. Then, you start building that state again. The first problem, of course, is ... Well, sometimes in some of the cases, you get back essentially the same work as you had before.

Jun Rao: (21:48)
Or at least maybe some of the partial works that you had before. In those cases, having to clear that state and then rebuild that state after rebalance can be expensive. Especially, if that state is large.

Kris Jenkins: (22:02)
Mm-hmm (affirmative).

Jun Rao: (22:03)
The second issue is that's what we call this, "Stop the world," rebalance. Because you are first stopping the world for everyone, when there's rebalance happening. Even though, in some of the cases, when the new assignment you got ... You realize you actually will be continuing with the same work or the partition that you've been doing the work before.

Jun Rao: (22:28)
That a second part that's a little bit inconvenient. The evolution of the group protocol we have been doing, trying to address both issues. We try to be a little bit smart, in terms of what work we truly need to stop and then rebuild the state, so we can improve a little bit both fronts.

Jun Rao: (22:49)
We try to avoid unnecessary clean up and the rebuild of the state associated with those partitions. But we also try to continue the processing for some of the data that in the end may not need to be redistributed.

Kris Jenkins: (23:12)
You've gone from the original setup, which is, "A node going down, a new one being added, doesn't need to stop the system." To, "A node going down, a new node being added, doesn't need to stop some of the other people who are processing right now."

Jun Rao: (23:27)
Yeah. We're trying to make that a little bit more incremental. That's the improvement we have been making. Another thing is, I think a lot of the common reasons why you need to do the rebalance is really you are deploying new software. You need to restart each of the application instances, because you want to upgrade to a new version. You want to make some complete changes.

Jun Rao: (23:58)
In those cases, each of the instances will go down, but they actually will go back very quickly. Sometimes it's just a few seconds. You bring it down and bring it up. In those cases, we also added another option for doing organization. Where if you can tolerate a bit of latency, you can just say, "I know I'm bringing down an application, but I know it's going to be brought up pretty soon again."

Jun Rao: (24:27)
Because in those cases, maybe it's cheaper just not to do those rebalances again. Because in the end, the same set of instances will come back. Just wait a little bit. If all those instances will come back within a reasonable or short period of time, then you can just continue with the assignment you had before. In those cases, you can actually completely avoid rebalance as it were.

Kris Jenkins: (24:53)
You go into that in your course. I know it's one of these tunable parameters that you can say, "This is the amount of time. You shouldn't worry. Us being down. We're coming back up soon."

Jun Rao: (25:05)
That's right. To achieve that, one is, you have to set a static member ID, so we can determine and at least know which instance you are. No matter how many times you are restarted.

Jun Rao: (25:18)
The second thing is you can tune that session time out. So that if you can come back within that period of time, you are still considered alive.

Kris Jenkins: (25:35)
That moves me onto another section, which is spiritually very similar if I can say that. That has really evolved in unplanned downtime, which is our move away from ZooKeeper.

Jun Rao: (25:52)
That's another big evolution we've been doing. When we started with Kafka, I think we deliberately created this separation between our control plane and the data plane. The data plane is what we discussed earlier about most of the things the broker's been doing. It's responsible for storing the data, for delivering the data, for making sure the data is redundant.

Jun Rao: (26:22)
But there's also some metadata that we need to manage at the whole cluster level. These are the things like ... What are the topics and partitions out there? Where are they located on the broker? Who is the current leader? Which replicas are fully caught up in sync?

Jun Rao: (26:41)
This kind of stuff. We need a place to store that information. That essentially is part of the control plane. Initially, that control plane ... Just for convenience, it's done on ZooKeeper. Because it's a replicated consensus service.

Jun Rao: (27:01)
It's perfect for storing this metadata because we don't have to build something ourselves. And then, it sort of served our need when we started. It actually allowed us to build a distributed system much quicker than before.

Kris Jenkins: (27:17)
I remember the state of playback around that time. ZooKeeper was just your go-to distributed consensus tool. Right?

Jun Rao: (27:25)
Exactly. It definitely served its need around that time. But what we realized over time, which essentially led to this effort of replacing ZooKeeper with building KRaft. I think of a few things.

Jun Rao: (27:41)
One is we realized that, for a lot of the users, managing one type of distributed system is actually much easier than two types.

Kris Jenkins: (27:54)
Yep.

Jun Rao: (27:55)
For a lot of places, people are okay with running a distributed system. Because it's the norm now. But if you can say, "There's only one type of the binary. You just deploy this type of binary to however number of instances you want and then just set it up." That's actually a much easier thing to manage and understand.

Jun Rao: (28:21)
If you have, say, two different type of binaries and they need to have its own membership and distribution ... Then, there's more work in terms of both deployment and of course the operational part. Because you have to set up the monitoring system for each. And then, they probably are a little bit different. And then, you have to, of course, collect the logging and other things.

Kris Jenkins: (28:48)
Managing a group of nodes is hard enough, but they're managing stuff within those nodes. Not being able to treat them as just units.

Jun Rao: (28:58)
That's right. That's the first thing. I think if we have something that's built-in, we can eliminate the dependency of a separate distributed system. The second thing is really a lot on the scalability and performance.

Jun Rao: (29:15)
In the data plane, because it's distributed, we actually have achieved a lot of scalability for better throughput and performance. But for the control plane, it's really single-noded. It's really handled by a single node. And then, ZooKeeper, if you look at it, it's a replicated service. Not a scaled out service.

Kris Jenkins: (29:45)
Right.

Jun Rao: (29:46)
As the usage of Kafka grows, what we realize is ... A lot of places, people want to have more of those topic partitions. Over time, more and more business data are integrated and consolidated in Kafka. The second thing is, because partition is the easiest way to achieve parallelism, often people just want to have more partitions over time to achieve the scalability.

Jun Rao: (30:18)
You always have a bit of this pressure over time. People just want to use more of those metadata. What we realized ... Partly, because ZooKeeper is like an external system, and then it's like a separate API. It's hard for us to achieve this scalability in terms of the amount of metadata that we can manage.

Jun Rao: (30:48)
By switching to KRaft, which essentially is a built-in implementation of a consensus service based on Kafka's internal log ... We actually can achieve a factor of 10 in terms of scalability. In terms of how many of those topic partitions we can handle in the single Kafka cluster.

Jun Rao: (31:11)
This would be a lot harder to do if we tried to stay in the ZooKeeper then. That's a second key benefit we got from this exercise.

Kris Jenkins: (31:24)
I remember first learning that and thinking, "Okay. This is a good thing." The first time they're looking to scale out a data storage problem, they actually used Kafka to solve it. It's very much a dog food thing.

Jun Rao: (31:40)
That's another thing. Because we built that specifically for Kafka, we can leverage some of the capability and functionality within Kafka. We can also optimize it a lot more, because it's custom-designed for Kafka.

Kris Jenkins: (32:01)
Which you know inside-out.

Jun Rao: (32:03)
Right. And then, as a side effect of that. By having this metadata service built-in in the KRaft layer, we actually automatically get a hot standby. Earlier, a big part of the problem is ... Because all the metadata is stored in ZooKeeper, we only have a single controller at any given point in time, which caches this metadata stored on ZooKeeper.

Jun Rao: (32:30)
But the issue is, if that controller goes down, the new controller doesn't have actually any data. It really has to bootstrap the state by reading all those metadata at a cluster level from zookeeper. It really depends on how many topic partitions you have out there.

Jun Rao: (32:50)
Sometimes you have a large cluster. This can take minutes just to reload that. With KRaft, the benefit is now all those metadata are replicated and then cached in memory in the KRaft replicas. One of those KRaft replicas, which happens to be the leader of the Raft quorum, would be the active controller.

Jun Rao: (33:21)
But if that controller goes down, now we can switch to any of the replicas of that KRaft quorum, which has a hot in-memory copy of all the metadata already. It can take over as a new controller much faster than before, because it has the state ready to go.

Jun Rao: (33:45)
In that case, I think that's another big advantage for a large cluster with lots of those metadata. I think our controller failover story is much better than before.

Kris Jenkins: (33:59)
And it ends up making ...

Jun Rao: (34:01)
Which indirectly allow us to have more of those topic partitions in the same cluster.

Kris Jenkins: (34:08)
And that presumably speeds up the process a lot?

Jun Rao: (34:12)
Yeah. I think it just means ... Because of that, now your system is much more highly available. Because you can always act on things that needs a controller. This could include a leader election.

Jun Rao: (34:31)
Earlier, if you have a hard failure on the broker which happens to run the controller, you can't elect the new leader for those data partitions until the new controller has bootstrapped its state, which can be minutes.

Jun Rao: (34:48)
But now, if the same thing happens, I think the new controller can take over and then elect a new leader almost instantaneously. In terms of availability, it's much better than before.

Kris Jenkins: (35:06)
We had Liz Fong-Jones on the show a while back. Honeycomb. She's doing two million messages a second. At that scale, downtime of a few seconds is colossal.

Jun Rao: (35:22)
Exactly.

Kris Jenkins: (35:24)
These are problems I'm assuming you didn't actually have to face on day one?

Jun Rao: (35:29)
Once you have this system, over time different users and applications are always ... Especially, for some of the leaders in some of the tech industries, they're always pushing the envelope, which is good for us.

Jun Rao: (35:47)
We want to make sure we evolve this platform, so that we can continue the innovation to serve stronger or better needs for the users.

Kris Jenkins: (36:00)
Success comes with its own problems. I was just wondering. I'll tell you something that slightly surprised me about the whole KRaft project.

Kris Jenkins: (36:13)
I didn't realize ... I assumed the metadata that you get for each node was just one big blob. Just like, "This is the layout of the cluster." But it's actually sharded too. Right? It's partitioned.

Jun Rao: (36:28)
The data is definitely partitioned, because we have different topics. We have partitions. These are distributed to all those brokers. But for the metadata, it's actually backed by a single log. You can think of it as a single topic and partition.

Jun Rao: (36:47)
All the metadata for the whole cluster per day is stored in the single topic partition. I think a lot of that is just for convenience. Because think of ZooKeeper. That's essentially a single log. A single topic partition. We carry that over, but we just make that single topic partition much more efficient than an external system like ZooKeeper.

Kris Jenkins: (37:16)
Okay.

Jun Rao: (37:16)
In the future, I think it is possible to consider to even shard the metadata, if we have even more metadata. That's also a possibility. But right now, I think we are just starting with the first version of KRaft. For simplicity, we just want that to be a single partition.

Kris Jenkins: (37:41)
In that case, I've misunderstood something. Perhaps you can clear it up for me?

Jun Rao: (37:44)
Mm-hmm (affirmative).

Kris Jenkins: (37:45)
Which is if you've got a single log for the metadata, and it's just one blob that keeps changing every time the metadata state changes ... Where does snapshotting come into that?

Kris Jenkins: (37:59)
Because I know you snapshot periodically for the metadata. Why is it not just the latest record?

Jun Rao: (38:05)
That's the thing. I think if you look at what's stored in this metadata log, a bunch of the changes to those resources are related to the metadata. Think of if you change the leader for a partition. We need to record that change. But a lot of the other aspects don't necessarily change.

Jun Rao: (38:29)
The assignment may not change. Maybe only the leader is changing. For things like that ... For a bunch of resources, whether it's topic, partition, maybe some of the configurations. We want that to be updated over time. Either based on the request from the user or based on decisions that controller has made based on observing the healthiness of the whole cluster.

Jun Rao: (38:57)
All those changes are stored in the log. Now, the issue is, if you don't do something on the log, this log will just keep growing. Because you can't easily trim the data. Because you don't know which record in the log still carries the latest information for a particular resource.

Jun Rao: (39:23)
Because it is true. We only care about the latest value for a particular resource, but they don't get updated at the same time. Some of the resources haven't changed their value for a long time. You can't just easily say, "Okay. We'll just truncate the data for data that's older than seven days."

Jun Rao: (39:44)
Because that data may still include the latest information for some of the resources. That's where snapshot is useful. What snapshot is doing is essentially is to periodically take a prefix of the log. And then, collect the recent value for each of those resources, which is designated by a key.

Jun Rao: (40:13)
We want to keep track of the latest value for each separate key. Because the only thing that we care about. That's essentially the snapshot we're regenerating. Once we have generated a snapshot, essentially we don't need a prefix of the log anymore.

Jun Rao: (40:29)
Because everything useful in that log is fully captured in that snapshot. That's essentially the way. How we can control the size of this log. We can bound its size, as long as we periodically generate those snapshots.

Kris Jenkins: (40:46)
Okay. That's the thing I've misunderstood. I thought you had the metadata for the cluster, and every time it changed, you saved the whole thing to your internal topic.

Kris Jenkins: (40:55)
But actually, you are doing event sourcing on that metadata. You record, "We updated this thing." You've got this log of changes to the metadata, which you can then snapshot to get a complete picture periodically.

Jun Rao: (41:10)
That's right. That's right.

Kris Jenkins: (41:11)
Got it.

Jun Rao: (41:11)
The log is the one that keeps track of all the incremental changes. And then, a snapshot is an internal way for us to essentially ... For two things. One, is to bound the space of the log. We don't want that to keep growing forever.

Jun Rao: (41:28)
The second thing is for the process to rebuild the metadata state in a more efficient way. Because if the log grows too long, you can still rebuild your metadata state from the log, but it can take a long time.

Jun Rao: (41:46)
Because maybe a lot of things have been updated that you have to play through. But with this periodic snapshot, it allows you to rebuild that state a lot faster. Because the snapshot essentially is a cleaned portion of the log, which is much more efficient for loading.

Kris Jenkins: (42:06)
That makes sense. I feel like we could go on forever. Because the course you've produced is full of ... You could almost do a whole podcast in every single episode. But let's not get too deep into the weeds.

Jun Rao: (42:18)
No.

Kris Jenkins: (42:19)
Let me try and pull it back out a bit. You've got modules that do geo-replication and focus on the producer protocol and the consumer protocol and load balancing. All these things.

Kris Jenkins: (42:34)
Without saying, "You should just watch them in order." What do you think is the most important or your favorite one of those modules in the course?

Jun Rao: (42:45)
Well, it's hard to say. Because I think a lot of those capabilities are added over time to solve a particular need. And then, I think a lot of those are a pretty interesting need for building various types of applications.

Jun Rao: (43:02)
But if you're relatively new to Kafka, I think probably you want to start with the fundamentals. Understand a little bit just the basic things. How Kafka stores the data in a distributed way. How the data plane works with the publisher and with the subscriber. How that interacts a little bit with the control plane.

Jun Rao: (43:35)
Another thing is, I think a lot of the applications, they really want these redundancy capabilities. They want data to be, of course, replicated for both high availability as well as durability.

Jun Rao: (43:54)
Over time, they want that not only for a single data center, but potentially for multiple data centers. For all those different environments they want to have. This could be between on-prem environment and the cloud. It could also be in the cloud, but across multiple clouds.

Jun Rao: (44:18)
That's a sequence of the capability we covered in the Internal Class. We started with, "Within the single Kafka cluster, how would you provide redundancy through the internal Kafka replication in the data plane?"

Jun Rao: (44:36)
But we also have a more advanced module for going beyond even a single data center. We talk about, "If you have a multi-data center environment, how would you provide a similar high availability and durability guarantee even across those environments?"

Jun Rao: (44:57)
There are quite a few different options. Depending on, for example, how close those data centers are. And then, what do you want to do in this environment? Do you want to switch the applications seamlessly from one cluster to another? Or do you want maybe lower latency?

Jun Rao: (45:22)
We have different options even for geo-replicating environment as well. I think that probably for a lot of places, where they really want to put mission critical applications on Kafka, this sequence of high availability and durability capabilities are probably relevant to them.

Kris Jenkins: (45:46)
That's one of the ones where ... You're probably going to know, if you're growing into that space, that's something you must hit soon.

Jun Rao: (45:55)
That's right.

Kris Jenkins: (45:56)
But you might ... I've watched all of them except for geo-replication now. Maybe one or two. I am going to watch it anyway. Because it's just interesting, how much detail you manage to get into in a 10-minute video.

Jun Rao: (46:14)
I think it's an area where a lot of companies, when they grow over time, it's an area where they often are looking into.

Kris Jenkins: (46:31)
I think on that ... Like I say, we could do an hour long podcast on everyone of the videos, but let's not do that. Jun, I'm going to let you go. I'm going to say thank you very much for letting us pick apart some of your brain.

Kris Jenkins: (46:44)
And if anyone wants to catch more, they can see the course on Confluent Developer. Thank you for your time, Jun.

Jun Rao: (46:51)
That's good. Thanks a lot, Kris.

Kris Jenkins: (46:54)
Cheers. That was Jun Rao. I have to let you into a little secret. After we stopped recording, we got chatting about those soft social issues when you're building a tech company. A sort of Agile-ish view about how big traditional enterprise companies have a completely different feedback loop to modern cloud providers.

Kris Jenkins: (47:18)
And it changes the way they build things and what they decide they need to build. I just wish we'd kept the tape recording because it would've been fascinating. But instead, I think we'll have to have Jun back on the show soon.

Kris Jenkins: (47:31)
Long before that happens, I hope you'll check out his tutorial videos. Link in the show notes. They're quite short individually, but they are absolutely jam-packed with knowledge. So if you took one every lunchtime, in just seven days, we could make you a manual.

Kris Jenkins: (47:48)
That course also has exercises led by our very own Danica Fine. When you reach those, you'll want a Kafka cluster to play with. You can easily get one started at Confluent Cloud. And if you sign up with the code PODCAST100, we'll give you $100 of extra free credit.

Kris Jenkins: (48:05)
Meanwhile, as ever, if you have thoughts, questions about today's episode, please get in touch. My contact details are always in the show notes. Or you could just leave us a comment or a like. A thumbs up, a review, five stars. Let us know you enjoyed it.

Kris Jenkins: (48:20)
With that, it remains for me to thank Jun Rao for joining us and you for listening. I've been your host, Kris Jenkins. And I'll catch you next time.

Intro
Kafka Internals course
What is a Kafka broker?
Achieving high throughput
High availability guarantee
Consumer Group Protocol
"Stop the world" rebalance
Control Plane and Data Plane
Continue innovation to serve stronger and better user needs
Cluster metadata
Jun's favorite module(s) in the course
It's a wrap