# 30. \(k^{th}\) largest in a stream

# \(k^{th}\) largest in a stream

## Introduction

This chapter deals with a problem where the input data is not statically provided all at once but is instead given as a continuous stream of data. These kind of algorithms are common in real life and they are becoming increasingly important in fields like medicine (where data from wearable devices is used to provide real-time insights on the patient’s health conditions), or finance where an enormous amount of data (usually provided from the stock exchanges) is used to perform high-frequency trading. We are going to study a coding interview question that has been popular during the last few years and that asks you to design a data structure that is able to deal with a stream of integers and can keep track of the \(k^{th}\) largest element seen so far. We will present and discuss three solution based on the same fundamental idea (discussed in Section 34.3) but which are built around three different data structures:

a simple array (in Section 34.3.1),

a self balancing binary search tree (in Section 34.3.2) and, finally

a heap (in Section [kth_largest_in_stream:sec:heap]).

.

## Problem statement

Design a class which task is to accept a stream of integers one number at the time and to return the \(k^{th}\) largest integer seen so far in the stream. The class has two public functions having the following signatures:

: the array contains the first elements of the stream. This function will be called only once upon initialization.

: this function accepts a new element from the stream and returns the \(k^{th}\) largest numbers seen so far.

Note that you can assume that the function is called only once before any call to the function .

Given \(K=4\) and the initial array \(I=\{1,2,3\}\), the function behaves as follows:

returns \(1\)

returns \(1\)

returns \(1\)

returns \(2\)

returns \(2\)

Given \(K=4\) and the initial array \(I\{1,2,3,4,50,100,150,200\}\) the function behavse as follows:

returns \(50\)

returns \(100\)

returns \(100\)

returns \(110\)

returns \(150\)

returns \(180\)

[ex:kthlargest_in_stream:example2]

## Clarification Questions

What should the function return if the stream counted less than \(K\) elements?

*You can assume that the largest \(k^{th}\) elements exists when is called.*

Is there a limit to the size of the array \(I\)?

*No.*

## Discussion

There are two phases associated with this class:

the initialization phase where an initial input array \(I\) is provided to the class. Because when calling the \(k^{th}\) largest value exists, then we can deduce that the size of the vector \(I\) is at least \(K-1\) otherwise the first call of could not possibly return the correct value. This operations is guaranteed to happen one time only before any call to .

the stream elements processing phase where the class is ready to accept a new number from the stream and return the answer.

The key to attacking this problem is to understand that during the initialization phase when the initialization array comes in, we are forced to remember the largest elements within it. In particular, if \(|I| \geq K\) then we can throw away all the elements that are not among the \(K\) largest and keep the rest (this should be self-explanatory as those elements will never be used as a return value of as there are already \(K\) values larger than all of them), otherwise we can remember \(I\) as it is (and in this case we know that \(|I| = K-1\)). One might think that it isn’t necessary to remember all \(K\) largest numbers seen so far and that it is in fact only necessary to remember the \(K^{th}\) largest element. We will use Example [ex:kthlargest_in_stream:example2] as a simple counterexample to demonstrate why this leads to incorrect results. First of all, after the initialization phase the largest number (\(K=4\) in this example) is \(50\). Then, after the call to the largest number is not changed and the function still returns \(50\). But when is called, then \(50\) suddenly becomes the largest number and it is at this point that remembering the other numbers larger than \(50\) becomes important. Without them we would not now be able to return the correct value i.e. \(100\).

In short, in order to be able to always give an answer we need to store and keep track of all the \(K\) largest numbers seen so far. This naturally leads to the question of where and how we can actually do that? Let’s name the set of the largest \(K\) numbers seen so far \(L^K\). Moreover let \(m\) be the smallest element in \(L^K\); \(m = \min_{} (L^K)\). When a new number \(n\) arrives, we can do one of the following operations depending on its value:

if \(|L^K| < K\) we simply insert \(n\) in \(L^K\) and return.

otherwise, if \(n \leq m\) then, \(n\) can be ignored as it has no influence among the elements of \(L^K\).

otherwise (\(n > m\)), \(m\) can be safely removed from \(L^K\) as it would become the \(K-1^{th}\) largest after the addition of \(n\). The new element \(n\) can be inserted in \(L^K\).

Note that the size of \(L^K\) never changes after it reaches \(K\). The way we decide to store \(L^K\) has a big influence on the cost of the operations listed above, namely:

find the minimum element (\(m\))

remove the minimum element (\(m\))

insert a new element (\(n\))

. In the following Section we will investigate three data structures that can be used to hold the values of \(L^K\).

### Array based solution

In this section we present a solution where the elements of \(L^K\) are stored in a sorted array. We will see that this is probably not the best idea as, when a new element \(n\) arrives the whole array needs to be rearranged and that can be a costly operation. In particular let’s have a look at how both the and function can be implemented:

`initialize(I,K)`

:The initialization phase has to filter the largest \(K\) elements out of the initialization array \(I\). This can be done by sorting \(I\) in ascending order first and then copying in \(L^K\) only the first of its \(K\) elements (the \(K\) largest). The same outcome can be obtained by using a partial sort algorithm which makes sure that the largest \(K\) elements are at the front of the array but given no guarantees on the relative ordering of the rest of the array. The complexity of this operation is \(O(|I| log(|I|))\) if the whole array \(I\) is sorted but when a partial sort algorithm is used then the costs becomes \(O(|I| log(K))\).

`add(n)`

:When \(|L^K|<K\) or \(n\) is inserted in \(L^K\). But if \(n>m\) then \(m\) is substituted by \(n\) (thus effectively removing \(m\)) and subsequently the ordering of \(L^K\) is restored. When this happens we will be in a situation where the \(L^K\) is sorted except for \(n\) which might not be in the right position (for instance when \(n\) would be the largest element of \(L^K\)). So restoring the order on \(L^K\) can be achieved either by:

fully sorting \(L^K\). In this case the complexity of is \(O(K log(K))\)

by moving the newly inserted element from the first location of the array up, by swapping it with its subsequent element, until it reaches the correct position. This operation is analogous to the way the insertion sort algorithm operates. The cost of this operation is \(O(K)\). In the worst case scenario we need to bubble up \(n\) up to the last cell of the array (when \(n\) is effectively the largest element of \(L^K\)) by performing \(K-1\) swap operations.

Listing [list:kth_largest_in_stream:array] shows an implementation where the initialization phase is performed using a normal sort and the add is implemented by using an approach aĺa insertion-sort.

Listing 1: Solution to the \textit{$k^{th}$ largest element in a stream problem} using arrays

```
class KthLargestInStreamArray : IKThLargestInStream
{
public:
() = default;
KthLargestInStreamArrayvoid initialize(const std::vector<int>& initArray, const size_t K) override
{
m_values = initArray;
m_k = K;
std::sort(begin(m_values), end(m_values));
const auto start = begin(m_values);
const auto end = m_values.size() >= K
? m_values.begin() + (m_values.size() - K)
: m_values.begin();
m_values.erase(start, end);
};
int add(const int n) override
{
assert(m_values.size() == m_k);
if (n < m_values.front())
return m_values.front();
m_values.front() = n;
auto it = begin(m_values);
auto itn = std::next(it);
while (itn != end(m_values) && *it > *itn)
{
std::iter_swap(it, itn);
= itn;
it = std::next(it);
itn }
return m_values.front();
};
private:
std::vector<int> m_values;
size_t m_k = 0;
};
```

### Ordered set

If instead of an array we use a data structure [^30]that allows us to perform insert/search and min operations runs in \(log\) and constant time, respectively, then we can substantially improve the time complexity of the solution shown in Section 34.3.1.

Let’s have a more detailed look at what the two operations would look like in this case:

`initialize(I,K)`

:Recall that the final goal of this operation is to keep only the largest \(K\) elements of \(I\). This can be easily achieved in \(O(|I|log(K))\) by looking at each element of \(I\), say \(I_j\) individually, and inserting it into \(L^K\) if:

the size of \(|L^K| < K\) or

if \(I_j\) is greater than the smallest element of \(L^K\). Additionally in this case, if after the insertion we have that \(|L^K| > K\) then the current smallest element in \(L^K\) is removed to make sure that \(|L^K|=K\).

Because there are \(O(|I|)\) elements that can potentially go in \(L^K\) and each insertion costs \(O(log(K))\) then the final complexity is \(O(|I|log(K))\). Note that it is not much better than the array solution in this case.

`add(n)`

:This is where we see the advantages of having the elements of \(L^K\) not stored in a plain array. Like in the array solution, we compare \(n\) with the smallest element in \(L^K\), \(m\), and insert \(n\) in \(L^K\) only if \(n>m\). But because the ordered multiset supports the and operations in \(O(log)\) and \(O(1)\) time, respectively, then the complexity of this operation if \(O(log(K))\). Quite an improvement compared with the array solution.

Listing [list:kth_largest_in_stream:set] shows a possible implementation of this idea using a ordered multiset (we use the C++ STL implementation named ).

Listing 2: Solution to the \textit{$k^{th}$ largest element in a stream problem} using \inline{std::multiset}

```
#include "IKThLargestInStream.h"
class KthLargestInStreamMap : IKThLargestInStream
{
public:
() = default;
KthLargestInStreamMapvoid initialize(const std::vector<int>& initArray, const size_t K) override
{
assert(K <= initArray.size());
m_k = K;
m_values.insert(begin(initArray), end(initArray));
auto it = begin(m_values);
while (m_values.size() > K)
{
= m_values.erase(it);
it }
assert(m_k == K);
assert(K == m_values.size());
}
int add(const int n) override
{
assert(m_k == m_values.size());
if (n > *(m_values.begin()))
{
m_values.insert(n);
m_values.erase(m_values.begin());
}
assert(m_k == m_values.size());
return *(m_values.begin());
}
private:
std::multiset<int> m_values;
size_t m_k;
};
```

In Listing 55 you can see an implementation using a heap instead. Note that there is no dedicated class in C++ for heaps but instead, we can use an array as a container for the elements and then manipulate it by using the heap dedicated functions:

, to arrange the elements of an array into a heap

, to add an element to an array assembled by

, to remove the smallest element from the heap.

Also note that Listing [list:kth_largest_in_stream:heap] uses a slightly different strategy for implementing the two class functions. Specifically

`initialize(I,K)`

:We insert the first \(K\) elements of \(I\) into an array that we immediately turn into a heap (using ). At this point we call for all the remaining elements of \(I\).

`add(n)`

:as for the other solution when \(n < m\) the function simply returns the smallest element of \(L^K\). On the other hand when this is not the case, it removes the smallest element of the heap by calling [^31] and inserts \(n\) by using [^32].

Listing 3: Solution to the \textit{$k^{th}$ largest element in a stream problem} using a heap

```
#include "IKThLargestInStream.h"
class KthLargestInStreamHeap : IKThLargestInStream
{
public:
() = default;
KthLargestInStreamHeapvoid initialize(const std::vector<int>& initArray, const size_t K) override
{
assert(K <= initArray.size());
m_k = K;
auto s = begin(initArray);
auto e = begin(initArray) + K;
m_values_heap.insert(begin(m_values_heap), s, e);
std::make_heap(begin(m_values_heap), end(m_values_heap), std::greater<>());
assert(K == m_values_heap.size());
while (e != end(initArray))
{
(*e);
add++e;
}
assert(K == m_values_heap.size());
}
int add(const int n) override
{
assert(m_k == m_values_heap.size());
if (n <= m_values_heap.front())
return m_values_heap.front();
std::pop_heap(begin(m_values_heap), end(m_values_heap), std::greater<>());
m_values_heap.back() = n;
std::push_heap(begin(m_values_heap), end(m_values_heap), std::greater<>());
assert(m_k == m_values_heap.size());
return m_values_heap.front();
}
private:
std::vector<int> m_values_heap;
size_t m_k;
};
```