v1.4.4
This commit is contained in:
commit
9c94d113d3
10260 changed files with 1237388 additions and 0 deletions
98
source/core/StarObserverStream.hpp
Normal file
98
source/core/StarObserverStream.hpp
Normal file
|
@ -0,0 +1,98 @@
|
|||
#ifndef STAR_OBSERVER_STREAM_HPP
|
||||
#define STAR_OBSERVER_STREAM_HPP
|
||||
|
||||
#include "StarList.hpp"
|
||||
|
||||
namespace Star {
|
||||
|
||||
// Holds a stream of values which separate observers can query and track
|
||||
// occurrences in the stream without pulling them from the stream. Each
|
||||
// addition to the stream is given an abstract step value, and queries to the
|
||||
// stream can reference a given step value in order to track events since the
|
||||
// last query.
|
||||
template <typename T>
|
||||
class ObserverStream {
|
||||
public:
|
||||
ObserverStream(uint64_t historyLimit = 0);
|
||||
|
||||
// If a history limit is set, then any entries with step values older than
|
||||
// the given limit will be discarded automatically. A historyLimit of 0
|
||||
// means that no values will be forgotten. The step value increases by one
|
||||
// with each entry added, or can be increased artificially by a call to
|
||||
// tickStep.
|
||||
uint64_t historyLimit() const;
|
||||
void setHistoryLimit(uint64_t historyLimit = 0);
|
||||
|
||||
// Add a value to the end of the stream and increment the step value by 1.
|
||||
void add(T value);
|
||||
|
||||
// Artificially tick the step by the given delta, which can be used to clear
|
||||
// older values.
|
||||
void tick(uint64_t delta = 1);
|
||||
|
||||
// Query values in the stream since the given step value. Will return the
|
||||
// values in the stream, and a new since value to pass to query on the next
|
||||
// call.
|
||||
pair<List<T>, uint64_t> query(uint64_t since = 0) const;
|
||||
|
||||
// Resets the step value to 0 and clears all values.
|
||||
void reset();
|
||||
|
||||
private:
|
||||
uint64_t m_historyLimit;
|
||||
uint64_t m_nextStep;
|
||||
Deque<pair<uint64_t, T>> m_values;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
ObserverStream<T>::ObserverStream(uint64_t historyLimit)
|
||||
: m_historyLimit(historyLimit), m_nextStep(0) {}
|
||||
|
||||
template <typename T>
|
||||
uint64_t ObserverStream<T>::historyLimit() const {
|
||||
return m_historyLimit;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void ObserverStream<T>::setHistoryLimit(uint64_t historyLimit) {
|
||||
m_historyLimit = historyLimit;
|
||||
tick(0);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void ObserverStream<T>::add(T value) {
|
||||
m_values.append({m_nextStep, move(value)});
|
||||
tick(1);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void ObserverStream<T>::tick(uint64_t delta) {
|
||||
m_nextStep += delta;
|
||||
uint64_t removeBefore = m_nextStep - min(m_nextStep, m_historyLimit);
|
||||
while (!m_values.empty() && m_values.first().first < removeBefore)
|
||||
m_values.removeFirst();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
pair<List<T>, uint64_t> ObserverStream<T>::query(uint64_t since) const {
|
||||
List<T> res;
|
||||
auto i = std::lower_bound(m_values.begin(),
|
||||
m_values.end(),
|
||||
since,
|
||||
[](pair<uint64_t, T> const& p, uint64_t step) { return p.first < step; });
|
||||
while (i != m_values.end()) {
|
||||
res.append(i->second);
|
||||
++i;
|
||||
}
|
||||
return {res, m_nextStep};
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void ObserverStream<T>::reset() {
|
||||
m_nextStep = 0;
|
||||
m_values.clear();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
Loading…
Add table
Add a link
Reference in a new issue