295. Find Median from Data Stream LeetCode Solution

Find the Median of a Data Stream with Python

I don’t usually do LeetCode problems, but this one comes up as a real life use case for me so I wanted to share. This problem is about data streaming and handling data in real time. It’s similar to what we call ETL or ELT in industry.

LeetCode 295. Find Median from Data Stream Problem Statement

The median is the middle value in an ordered integer list. If the size of the list is even, there is no middle value and the median is the mean of the two middle values.

Find Median from Data Stream LeetCode Solution Problem Statement
295. Find Median from Data Stream LeetCode Solution Problem Statement

You can find the actual LeetCode problem and submit your solution here.

What is a Data Stream?

Real time data streams are on their way to becoming a big data paradigm. A data stream is a system that provides continuous updates from a data source. Common business use cases for data streams revolve around the need for as close to real time as possible data analysis. An example of this would be Uber prices changing throughout the day.

Emulating a Data Stream in Python

Data streams come in two types of architectures, pull and push based. Pull based data streams rely on the ingestion tool to ping the data source for information. Push (or event) based data streams rely on the data source to push data up to the ingestion tool. Some push based systems push data up at regularly timed intervals, others base their events on the data in the system.

We can emulate both push and pull systems with Python. In this example, we build an event driven data stream. We emulate a data stream with a function that updates the ingested data every time we call it.

Find Median from Data Stream Solution

This is the code that we are given as a template:

class MedianFinder:
 
   def __init__(self):
      
 
   def addNum(self, num: int) -> None:
      
 
   def findMedian(self) -> float:
      
 
 
# Your MedianFinder object will be instantiated and called as such:
# obj = MedianFinder()
# obj.addNum(num)
# param_2 = obj.findMedian()

We are tasked with creating a class that finds the median from a data stream. There are two solutions to this. Due to the bounding of the values that we’re going to get, we can use counting sort. The other solution we can use for this is to create a max heap and a min heap. 

Finding the Median of a Data Stream using Counting Sort

Let’s take a look at how to use counting sort to find the median from a data stream. We break our function up into three functions (other than the init function). One function to add a number to the list of numbers, the “data stream”. One function to do counting sort, and one function to find the median of the data stream. Note that the counting sort function is added to the template above.

We don’t need any parameters for the init function. In the counting sort solution to finding the median of a data stream, we initialize an empty “data stream” and the list that contains the count.

class MedianFinder_Counting:
 
   def __init__(self):
       self.nums = []
       self.count = [0]*211

Adding a Number to the Data Stream

The first function that we create is the addNum function. This function requires one parameter, an integer. It does not return anything. First, we increment the index of the count list of the number + 105 by one. This is because -105 is the lowest possible number we will see. The only other thing we do in this function is add the number to the representation of the data stream.

   # add a number to the list
   # update the count list index num+105
   def addNum(self, num: int) -> None:
       self.count[num+105] += 1
       self.nums.append(num)

Counting Sort Function for Finding a Median from a Data Stream

Next, let’s create the counting sort function. Since we’re keeping track of the counts, we can’t just aggregate that list. The first thing we do in the counting sort function is create a copy of the counts list. It is important that you use the .copy() function, if you just set agg = self.count, it will aggregate the self.count object because Python variables are passed by alias.

After creating a copy of the count list, we aggregate that copy. Now let’s create the sorted list. We start by setting a variable, i, to the index of the last entry in the data stream. We also create a representation of the sorted list as a list of 0s.

While our index variable is above 0, we do three things. First, we set the sorted list’s index based on the aggregate list and the data stream equal to the ith index in the data stream. Second, decrement the value in the aggregate list that we just used as the index in the sorted list by one. Third, we decrement the index variable by 1.

Finally, we return the sorted list. See Counting Sort for a more in depth explanation.

   def counting_sort(self) -> list:
       # turn count into an aggregated count
       agg = self.count.copy()
       for i in range(1, 211):
           agg[i] += agg[i-1]
      
       # start creating sorted list
       i = len(self.nums)-1
       sorted_list = [0]*len(self.nums)
       # counting sort placement algorithm
       while i >= 0:
           sorted_list[agg[self.nums[i]+105]-1] = self.nums[i]
           agg[self.nums[i]+105] -= 1
           i -= 1
       return sorted_list

Finding the Median of the Data Stream

Everything is now in place to find the median from the data stream. This function does not need any parameters and returns a float, the median. The first thing we do is get a sorted list by calling our counting sort function and get the length of the data stream.

If the data stream has an even number of entries, we return the average of the middle two. Else, we return the middle entry.

   # call counting sort
   # find the median
   def findMedian(self) -> float:
       sorted_list = self.counting_sort()
       num = len(sorted_list)
       if num % 2 == 0:
           return (sorted_list[num//2-1] + sorted_list[num//2])/2
       else:
           return sorted_list[(num-1)//2]

Full Code for the Counting Sort Solution to Find Median from a Data Stream

Here’s the full code for the counting sort solution to LeetCode 295. Find Median From Data Stream:

class MedianFinder_Counting:
 
   def __init__(self):
       self.nums = []
       self.count = [0]*211
 
   # add a number to the list
   # update the count list index num+105
   def addNum(self, num: int) -> None:
       # print(f"pre adding {num}: {self.count[num+105]}")
       self.count[num+105] += 1
       self.nums.append(num)
       # print(f"post adding {num}: {self.count[num+105]}")
 
   def counting_sort(self) -> list:
       # turn count into an aggregated count
       agg = self.count.copy()
       for i in range(1, 211):
           agg[i] += agg[i-1]
      
       # start creating sorted list
       i = len(self.nums)-1
       sorted_list = [0]*len(self.nums)
       # counting sort placement algorithm
       while i >= 0:
           sorted_list[agg[self.nums[i]+105]-1] = self.nums[i]
           agg[self.nums[i]+105] -= 1
           i -= 1
       return sorted_list
 
   # call counting sort
   # find the median
   def findMedian(self) -> float:
       sorted_list = self.counting_sort()
       num = len(sorted_list)
       if num % 2 == 0:
           return (sorted_list[num//2-1] + sorted_list[num//2])/2
       else:
           return sorted_list[(num-1)//2]

Using a Max Heap/Min Heap to Find the Median from a Data Stream

Another solution to finding the median of a data stream is to use a min and max heap. Unlike the counting sort solution, this solution sorts the numbers as we add them. We need the heapq built-in Python library to create the min and max heaps. This library abstracts out placing numbers into the lists that represent the heaps.

The first thing we do in our class using these heaps, is create an init function that initializes two empty lists. These two empty lists serve as our max heap and min heap. The init function requires no parameters. Remember that heaps are automatically set to be min heaps. The first index of a heap is the minimum element.

The way we get the median from our heaps is by using them to split the values of the data stream in half. The max heap will keep the maximum value of the lower half of the data stream values as the first index. Meanwhile, the min heap keeps the minimum value of the higher half of the data stream values as its first index.

import heapq
 
class MedianFinder_MaxMin:
 
   def __init__(self):
       self.max_heap = []
       self.min_heap = []

Adding a Number to the Data Stream in the Max Min Heap Solution

The magic of the min max heap solution to the median from data stream problem is in this function. This function requires one parameter, an integer, and returns nothing. The first thing we do is check if the heaps have been populated yet. If they have not, then we insert into the min heap. 

When we insert the second element, the max heap has yet to be populated. The second thing we do in our function is handle inserting this element. If it’s greater than the one on the min heap, then we push the negative value of the first element onto the max heap and push the second element onto the min heap. Otherwise, we push the negative value of the second element onto the max heap.

Remember that the max heap has to have negative entries because heaps push the minimum entry to the first index. Once we are past the first two elements, we take a more generic approach.

If the lengths of the heaps are the same, we check if the number is greater than the max in the max heap, if it is, we push it onto the max heap, else we push it onto the min heap. If the length of the max heap is longer, then we need to compare the number being pushed into the data stream to the first indices on the max and min heaps. 

Next, we check if the number is greater than the first index of the max heap, we move the first entry in the max heap to the min heap. Then, we push the new entry onto the max heap. Otherwise, we just push the new data stream entry onto the min heap. If the min heap is longer than the max heap, we apply the opposite logic.

   # add a number to a constantly changing two heap structure
   def addNum(self, num: int) -> None:
       # first entry goes into min heap
       if not self.max_heap and not self.min_heap:
           heapq.heappush(self.min_heap, num)
           return
       # second entry - if this number is greater than the first
       # entry in min heap then we move it to the min heap and
       # move the negative of the current min heap head to the max heap
       # else, we push the negative value to the max heap
       if not self.max_heap:
           if num > self.min_heap[0]:
               heapq.heappush(self.max_heap, -heapq.heappop(self.min_heap))
               heapq.heappush(self.min_heap, num)
           else:
               heapq.heappush(self.max_heap, -num)
           return
       # if the length of the heaps are the same
       if len(self.max_heap) == len(self.min_heap):
           # if the number is greater than the current
           # greatest number in the max heap, we put it on the
           # max heap, otherwise, we push it on the min heap
           if num < -self.max_heap[0]:
               heapq.heappush(self.max_heap, -num)
           else:
               heapq.heappush(self.min_heap, num)
       # if the length of the max heap is greater than the length
       # of the min heap
       elif len(self.max_heap) > len(self.min_heap):
           # if the number is less than the min of the max heap
           # push the min of the max heap onto the min heap and
           # push the negative of the number onto the max heap
           # otherwise, push the number onto the min heap
           if num < -self.max_heap[0]:
               heapq.heappush(self.min_heap, -heapq.heappop(self.max_heap))
               heapq.heappush(self.max_heap, -num)
           else:
               heapq.heappush(self.min_heap, num)
       # finally if the min heap is greater than the length
       # of the max heap
       else:
           # if the number is greater than the max of the min heap
           # put the min heap max onto the max heap
           # push the number onto the min heap
           # otherwise push the negative of the number onto the max heap
           if num > self.min_heap[0]:
               heapq.heappush(self.max_heap, -heapq.heappop(self.min_heap))
               heapq.heappush(self.min_heap, num)
           else:
               heapq.heappush(self.max_heap, -num)

Find the Median From Data Stream Function with Max Min Heap

Since the heaps handle the placement of the numbers in our data stream, finding the median is straightforward. We check if the length of the heaps are the same. If they are, then we return the average of the first indices in each heap (using the negative value of the max heap since everything in there was inverted when inserted).

If the heaps are different lengths, then we just use the first index of the larger heap.

   # find the median
   def findMedian(self) -> float:
       # if the length of the heaps are the same
       if len(self.max_heap) == len(self.min_heap):
           # return the average of the min of the max heap
           # and the max of the min heap
           return (-self.max_heap[0] + self.min_heap[0])/2
       # if the length of the max heap is greater than
       # the length of the min heap
       elif len(self.max_heap) > len(self.min_heap):
           # return the min of the max heap
           return -self.max_heap[0]
       # if the length of the min heap is greater
       # than the length of the max heap
       else:
           # return the max of the min heap
           return self.min_heap[0]

Full Code to the Min Max Heap Solution to Finding the Median of a Data Stream

Here’s the full code the min max heap solution to finding the median of a data stream:

import heapq
 
class MedianFinder_MaxMin:
 
   def __init__(self):
       self.max_heap = []
       self.min_heap = []
 
   # add a number to a constantly changing two heap structure
   def addNum(self, num: int) -> None:
       # first entry goes into min heap
       if not self.max_heap and not self.min_heap:
           heapq.heappush(self.min_heap, num)
           return
       # second entry - if this number is greater than the first
       # entry in min heap then we move it to the min heap and
       # move the negative of the current min heap head to the max heap
       # else, we push the negative value to the max heap
       if not self.max_heap:
           if num > self.min_heap[0]:
               heapq.heappush(self.max_heap, -heapq.heappop(self.min_heap))
               heapq.heappush(self.min_heap, num)
           else:
               heapq.heappush(self.max_heap, -num)
           return
       # if the length of the heaps are the same
       if len(self.max_heap) == len(self.min_heap):
           # if the number is greater than the current
           # greatest number in the max heap, we put it on the
           # max heap, otherwise, we push it on the min heap
           if num < -self.max_heap[0]:
               heapq.heappush(self.max_heap, -num)
           else:
               heapq.heappush(self.min_heap, num)
       # if the length of the max heap is greater than the length
       # of the min heap
       elif len(self.max_heap) > len(self.min_heap):
           # if the number is less than the min of the max heap
           # push the min of the max heap onto the min heap and
           # push the negative of the number onto the max heap
           # otherwise, push the number onto the min heap
           if num < -self.max_heap[0]:
               heapq.heappush(self.min_heap, -heapq.heappop(self.max_heap))
               heapq.heappush(self.max_heap, -num)
           else:
               heapq.heappush(self.min_heap, num)
       # finally if the min heap is greater than the length
       # of the max heap
       else:
           # if the number is greater than the max of the min heap
           # put the min heap max onto the max heap
           # push the number onto the min heap
           # otherwise push the negative of the number onto the max heap
           if num > self.min_heap[0]:
               heapq.heappush(self.max_heap, -heapq.heappop(self.min_heap))
               heapq.heappush(self.min_heap, num)
           else:
               heapq.heappush(self.max_heap, -num)
   # find the median
   def findMedian(self) -> float:
       # if the length of the heaps are the same
       if len(self.max_heap) == len(self.min_heap):
           # return the average of the min of the max heap
           # and the max of the min heap
           return (-self.max_heap[0] + self.min_heap[0])/2
       # if the length of the max heap is greater than
       # the length of the min heap
       elif len(self.max_heap) > len(self.min_heap):
           # return the min of the max heap
           return -self.max_heap[0]
       # if the length of the min heap is greater
       # than the length of the max heap
       else:
           # return the max of the min heap
           return self.min_heap[0]

Solutions to Follow Ups to 295. Find Median from Data Stream 

Let’s take a look at the follow up questions provided.

Follow up:

If all integer numbers from the stream are in the range [0, 100], how would you optimize your solution?

If 99% of all integer numbers from the stream are in the range [0, 100], how would you optimize your solution?

Out of the two solutions we covered above, the one that can be optimized best from these constraints is the counting sort solution. For the first one, we can optimize our solution by turning our bound from -105 to 105 to 0 to 100. We change the range (211) to 101, and no longer need to add 105 to our index when adding the number in.

If 99% of all the integers in the data stream are between 0 and 100, we do the same thing as above. In addition, we also throw out any number that is not between 0 and 100 that come into our data stream. If 99% of the numbers are between 0 and 100, having a number outside of this won’t affect the median enough to consider.

Summary of 295. Find Median from Data Stream LeetCode Solution

The lesson to take away from this is not that counting sort is an efficient way to find the median of a data stream. The lesson to take away from this is that it’s important to start by knowing your data.

We saw two solutions to the median from data stream LeetCode problem in this post. The first solution extends the basic idea of counting sort to apply to negative numbers. The second solution creates two heaps, a min heap and a max heap, and uses those to find the median.

Counting sort calls the counting sort function, which runs in linear time, O(n+m), each time we call the median. In this time complexity, n is the size of the data stream so far and m is the max size, 211. The nice part is that inserting numbers is constant time.

The min max heap solution sorts the numbers as we insert them. Both heappush and heappop require logarithmic runtimes, O(log(n)). In this case, n is the size of each heap. However, the find median function for the min max heap solution is constant run time, O(n).

So which one has a better time complexity? It depends how many times we call find median and how many numbers we insert. The more we call the find median function, the faster the heap solution is (relatively). The more numbers we insert, the faster the counting sort solution is (relatively).

Got any questions? Drop them below!

Further Reading

I run this site to help you and others like you find cool projects and practice software skills. If this is helpful for you and you enjoy your ad free site, please help fund this site by donating below! If you can’t donate right now, please think of us next time.

Yujian Tang

Leave a Reply

%d bloggers like this: