Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ func NewClient(opts ...ClientOption) (*Client, error) {
c.mu.Lock()
stream, ok := c.playback[msg.StreamIndex]
c.mu.Unlock()
if ok && stream.state == running && !stream.underflow {
if ok && stream.state.is(running) && !stream.underflow {
stream.started <- true
}
case *proto.Underflow:
c.mu.Lock()
stream, ok := c.playback[msg.StreamIndex]
c.mu.Unlock()
if ok {
if stream.state == running {
if stream.state.is(running) {
stream.underflow = true
}
}
Expand All @@ -103,7 +103,7 @@ func NewClient(opts ...ClientOption) (*Client, error) {
for _, p := range c.playback {
close(p.request)
p.err = ErrConnectionClosed
p.state = serverLost
p.state.set(serverLost)
}
for _, r := range c.record {
r.err = ErrConnectionClosed
Expand Down
72 changes: 38 additions & 34 deletions playback.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@ type PlaybackStream struct {
c *Client

index uint32
state streamState
state *stateMachine
underflow bool
err error

front, back []byte
requested int
request chan int
started chan bool
request chan int
started chan bool

events chan struct{}
eventsLock sync.Mutex
Expand Down Expand Up @@ -82,8 +80,7 @@ func (c *Client) NewPlayback(r Reader, opts ...PlaybackOption) (*PlaybackStream,
return nil, err
}
p.index = p.createReply.StreamIndex
p.front = make([]byte, p.createReply.BufferMaxLength)
p.back = make([]byte, p.createReply.BufferMaxLength)
p.state = newStateMachine()
p.request = make(chan int)
p.started = make(chan bool)
c.mu.Lock()
Expand All @@ -94,28 +91,33 @@ func (c *Client) NewPlayback(r Reader, opts ...PlaybackOption) (*PlaybackStream,
}

func (p *PlaybackStream) run() {
for n := range p.request {
if p.state != running {
requested := 0
front := make([]byte, p.createReply.BufferMaxLength)
back := make([]byte, p.createReply.BufferMaxLength)

for bufferLength := range p.request {
if !p.state.is(running) {
continue
}
p.requested += n
for p.requested > 0 {
n, err := p.r.Read(p.front[:p.requested])
if n > 0 {
p.c.c.Send(p.index, p.front[:n])
p.requested -= n
p.front, p.back = p.back, p.front
}
requested += bufferLength
for requested > 0 {
readCount, err := p.r.Read(front[:requested])
if err != nil {
if err != EndOfData {
p.err = err
}
p.state = idle
p.state.set(idle)
break
}
if readCount > 0 {
p.c.c.Send(p.index, front[:readCount])
requested -= readCount
front, back = back, front
}

select {
case n = <-p.request:
p.requested += n
case nextBufferLength := <-p.request:
requested += nextBufferLength
default:
}
}
Expand Down Expand Up @@ -186,9 +188,9 @@ func (p *PlaybackStream) handleEvents(events chan struct{}) {

// Start starts playing audio.
func (p *PlaybackStream) Start() {
if p.state == idle {
if p.state.is(idle) {
p.c.c.Request(&proto.FlushPlaybackStream{StreamIndex: p.index}, nil)
p.state = running
p.state.set(running)
p.err = nil
p.request <- int(p.createReply.BufferTargetLength)
p.underflow = false
Expand All @@ -200,32 +202,32 @@ func (p *PlaybackStream) Start() {
// Stop stops playing audio; the callback will no longer be called.
// If the buffer size/latency is large, audio may continue to play for some time after the call to Stop.
func (p *PlaybackStream) Stop() {
if p.state == running || p.state == paused {
p.state = idle
if p.state.is(running, paused) {
p.state.set(idle)
}
}

// Pause stops playing audio immediately.
func (p *PlaybackStream) Pause() {
if p.state == running {
if p.state.is(running) {
p.c.c.Request(&proto.CorkPlaybackStream{StreamIndex: p.index, Corked: true}, nil)
p.state = paused
p.state.set(paused)
}
}

// Resume resumes a paused stream.
func (p *PlaybackStream) Resume() {
if p.state == paused {
if p.state.is(paused) {
p.c.c.Request(&proto.CorkPlaybackStream{StreamIndex: p.index, Corked: false}, nil)
p.state = running
p.state.set(running)
p.underflow = false
}
}

// Drain waits until the playback has ended.
// Drain does not return when the stream is paused.
func (p *PlaybackStream) Drain() {
if p.state == running {
if p.state.is(running) {
p.c.c.Request(&proto.DrainPlaybackStream{StreamIndex: p.index}, nil)
}
}
Expand Down Expand Up @@ -267,25 +269,27 @@ func (p *PlaybackStream) SetVolume(volumes proto.ChannelVolumes) error {
func (p *PlaybackStream) Close() {
if !p.Closed() {
p.c.c.Request(&proto.DeletePlaybackStream{StreamIndex: p.index}, nil)
p.state = closed
p.state.set(closed)
close(p.request)

p.c.mu.Lock()
delete(p.c.playback, p.index)
p.c.mu.Unlock()

p.eventsLock.Lock()
close(p.events)
p.events = nil
if p.events != nil {
close(p.events)
p.events = nil
}
p.eventsLock.Unlock()
}
}

// Closed returns wether the stream was closed.
func (p *PlaybackStream) Closed() bool { return p.state == closed || p.state == serverLost }
func (p *PlaybackStream) Closed() bool { return p.state.is(closed, serverLost) }

// Running returns wether the stream is currently playing.
func (p *PlaybackStream) Running() bool { return p.state == running }
func (p *PlaybackStream) Running() bool { return p.state.is(running) }

// Underflow returns true if any underflows happend since the last call to Start or Resume.
// Underflows usually happen because the latency/buffer size is too low or because the callback
Expand Down
39 changes: 39 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package pulse

import "sync"

type streamState int

const (
Expand All @@ -9,3 +11,40 @@ const (
closed
serverLost
)

type stateMachine struct {
state streamState

lock *sync.RWMutex
}

func newStateMachine() *stateMachine {
return &stateMachine{
state: idle,
lock: &sync.RWMutex{},
}
}

func (s *stateMachine) set(state streamState) {
s.lock.Lock()
defer s.lock.Unlock()

s.state = state
}

func (s *stateMachine) get() streamState {
s.lock.RLock()
defer s.lock.RUnlock()

return s.state
}

func (s *stateMachine) is(states ...streamState) bool {
current := s.get()
for _, state := range states {
if current == state {
return true
}
}
return false
}