Skip to content

Commit cf2b173

Browse files
authored
Merge pull request #59 from matemaciek/main
Sparkline memory improvements
2 parents de9791d + 72ebdc4 commit cf2b173

File tree

4 files changed

+408
-264
lines changed

4 files changed

+408
-264
lines changed
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
# SPDX-FileCopyrightText: 2020 Kevin Matocha
2+
#
3+
# SPDX-License-Identifier: MIT
4+
5+
"""
6+
`multisparkline`
7+
================================================================================
8+
9+
Various common shapes for use with displayio - Multiple Sparklines on one chart!
10+
11+
12+
* Author(s): Kevin Matocha, Maciej Sokolowski
13+
14+
Implementation Notes
15+
--------------------
16+
17+
**Software and Dependencies:**
18+
19+
* Adafruit CircuitPython firmware for the supported boards:
20+
https://github.com/adafruit/circuitpython/releases
21+
22+
"""
23+
24+
try:
25+
from typing import Optional, List, TypeVar
26+
27+
T = TypeVar("T")
28+
except ImportError:
29+
pass
30+
import displayio
31+
from adafruit_display_shapes.polygon import Polygon
32+
33+
34+
class _CyclicBuffer:
35+
def __init__(self, size: int, init_value: T) -> None:
36+
self._buffer = [init_value] * size
37+
self._start = 0 # between 0 and size-1
38+
self._end = 0 # between 0 and 2*size-1
39+
40+
def push(self, value: T) -> None:
41+
"""Pushes value at the end of the buffer.
42+
43+
:param T value: value to be pushed
44+
45+
"""
46+
47+
if self.len() == len(self._buffer):
48+
raise RuntimeError("Trying to push to full buffer")
49+
self._buffer[self._end % len(self._buffer)] = value
50+
self._end += 1
51+
52+
def pop(self) -> T:
53+
"""Pop value from the start of the buffer and returns it."""
54+
55+
if self.len() == 0:
56+
raise RuntimeError("Trying to pop from empty buffer")
57+
result = self._buffer[self._start]
58+
self._start += 1
59+
if self._start == len(self._buffer):
60+
self._start -= len(self._buffer)
61+
self._end -= len(self._buffer)
62+
return result
63+
64+
def len(self) -> int:
65+
"""Returns count of valid data in the buffer."""
66+
67+
return self._end - self._start
68+
69+
def clear(self) -> None:
70+
"""Marks all data as invalid."""
71+
72+
self._start = 0
73+
self._end = 0
74+
75+
def values(self) -> List[T]:
76+
"""Returns valid data from the buffer."""
77+
78+
if self.len() == 0:
79+
return []
80+
start = self._start
81+
end = self._end % len(self._buffer)
82+
if start < end:
83+
return self._buffer[start:end]
84+
return self._buffer[start:] + self._buffer[:end]
85+
86+
87+
class MultiSparkline(displayio.TileGrid):
88+
"""A multiple sparkline graph.
89+
90+
:param int width: Width of the multisparkline graph in pixels
91+
:param int height: Height of the multisparkline graph in pixels
92+
:param int max_items: Maximum number of values housed in each sparkline
93+
:param bool dyn_xpitch: (Optional) Dynamically change xpitch (True)
94+
:param list y_mins: Lower range for the y-axis per line.
95+
Set each to None for autorange of respective line.
96+
Set to None for autorange of all lines.
97+
:param list y_maxs: Upper range for the y-axis per line.
98+
Set each to None for autorange of respective line.
99+
Set to None for autorange of all lines.
100+
:param int x: X-position on the screen, in pixels
101+
:param int y: Y-position on the screen, in pixels
102+
:param list colors: Each line color. Number of items in this list determines maximum
103+
number of sparklines
104+
105+
Note: If dyn_xpitch is True (default), each sparkline will allways span
106+
the complete width. Otherwise, each sparkline will grow when you
107+
add values. Once the line has reached the full width, each sparkline
108+
will scroll to the left.
109+
"""
110+
111+
# pylint: disable=too-many-arguments, too-many-instance-attributes
112+
def __init__(
113+
self,
114+
width: int,
115+
height: int,
116+
max_items: int,
117+
colors: List[int], # each line color
118+
dyn_xpitch: Optional[bool] = True, # True = dynamic pitch size
119+
y_mins: Optional[List[Optional[int]]] = None, # None = autoscaling
120+
y_maxs: Optional[List[Optional[int]]] = None, # None = autoscaling
121+
x: int = 0,
122+
y: int = 0,
123+
) -> None:
124+
# define class instance variables
125+
self._max_items = max_items # maximum number of items in the list
126+
self._lines = len(colors)
127+
self._buffers = [
128+
_CyclicBuffer(self._max_items, 0.0) for i in range(self._lines)
129+
] # values per sparkline
130+
self._points = [
131+
_CyclicBuffer(self._max_items, (0, 0)) for i in range(self._lines)
132+
] # _points: all points of sparkline
133+
self.dyn_xpitch = dyn_xpitch
134+
if not dyn_xpitch:
135+
self._xpitch = (width - 1) / (self._max_items - 1)
136+
self.y_mins = (
137+
[None] * self._lines if y_mins is None else y_mins
138+
) # minimum of each y-axis (None: autoscale)
139+
self.y_maxs = (
140+
[None] * self._lines if y_maxs is None else y_maxs
141+
) # maximum of each y-axis (None: autoscale)
142+
self.y_bottoms = self.y_mins.copy()
143+
# y_bottom: The actual minimum value of the vertical scale, will be
144+
# updated if autorange
145+
self.y_tops = self.y_maxs.copy()
146+
# y_top: The actual maximum value of the vertical scale, will be
147+
# updated if autorange
148+
self._palette = displayio.Palette(self._lines + 1)
149+
self._palette.make_transparent(0)
150+
for (i, color) in enumerate(colors):
151+
self._palette[i + 1] = color
152+
self._bitmap = displayio.Bitmap(width, height, self._lines + 1)
153+
154+
super().__init__(self._bitmap, pixel_shader=self._palette, x=x, y=y)
155+
156+
# pylint: enable=too-many-arguments
157+
158+
def clear_values(self) -> None:
159+
"""Clears _buffer and removes all lines in the group"""
160+
self._bitmap.fill(0)
161+
for buffer in self._buffers:
162+
buffer.clear()
163+
164+
def add_values(self, values: List[float], update: bool = True) -> None:
165+
"""Add a value to each sparkline.
166+
167+
:param list values: The values to be added, one per sparkline
168+
:param bool update: trigger recreation of primitives
169+
170+
Note: when adding multiple values per sparkline it is more efficient to call
171+
this method with parameter 'update=False' and then to manually
172+
call the update()-method
173+
"""
174+
175+
for (i, value) in enumerate(values):
176+
if value is not None:
177+
top = self.y_tops[i]
178+
bottom = self.y_bottoms[i]
179+
if (
180+
self._buffers[i].len() >= self._max_items
181+
): # if list is full, remove the first item
182+
first = self._buffers[i].pop()
183+
# check if boundaries have to be updated
184+
if self.y_mins[i] is None and first == bottom:
185+
bottom = min(self._buffers[i].values())
186+
if self.y_maxs[i] is None and first == self.y_tops[i]:
187+
top = max(self._buffers[i].values())
188+
self._buffers[i].push(value)
189+
190+
if self.y_mins[i] is None:
191+
bottom = value if not bottom else min(value, bottom)
192+
if self.y_maxs[i] is None:
193+
top = value if not top else max(value, top)
194+
195+
self.y_tops[i] = top
196+
self.y_bottoms[i] = bottom
197+
198+
if update:
199+
self.update_line(i)
200+
201+
def _add_point(
202+
self,
203+
line: int,
204+
x: int,
205+
value: float,
206+
) -> None:
207+
# Guard for y_top and y_bottom being the same
208+
top = self.y_tops[line]
209+
bottom = self.y_bottoms[line]
210+
if top == bottom:
211+
y = int(0.5 * self.height)
212+
else:
213+
y = int((self.height - 1) * (top - value) / (top - bottom))
214+
self._points[line].push((x, y))
215+
216+
def _draw(self) -> None:
217+
self._bitmap.fill(0)
218+
for i in range(self._lines):
219+
Polygon.draw(self._bitmap, self._points[i].values(), i + 1, close=False)
220+
221+
def update_line(self, line: int = None) -> None:
222+
"""Update the drawing of the sparkline.
223+
param int|None line: Line to update. Set to None for updating all (default).
224+
"""
225+
226+
if line is None:
227+
lines = range(self._lines)
228+
else:
229+
lines = [line]
230+
231+
redraw = False
232+
for a_line in lines:
233+
# bail out early if we only have a single point
234+
n_points = self._buffers[a_line].len()
235+
if n_points < 2:
236+
continue
237+
238+
redraw = True
239+
if self.dyn_xpitch:
240+
# this is a float, only make int when plotting the line
241+
xpitch = (self.width - 1) / (n_points - 1)
242+
else:
243+
xpitch = self._xpitch
244+
245+
self._points[a_line].clear() # remove all points
246+
247+
for count, value in enumerate(self._buffers[a_line].values()):
248+
self._add_point(a_line, int(xpitch * count), value)
249+
250+
if redraw:
251+
self._draw()
252+
253+
def values_of(self, line: int) -> List[float]:
254+
"""Returns the values displayed on the sparkline at given index."""
255+
256+
return self._buffers[line].values()
257+
258+
@property
259+
def width(self) -> int:
260+
"""
261+
:return: the width of the graph in pixels
262+
"""
263+
return self._bitmap.width
264+
265+
@property
266+
def height(self) -> int:
267+
"""
268+
:return: the height of the graph in pixels
269+
"""
270+
return self._bitmap.height

0 commit comments

Comments
 (0)