Skip to content

[WIP] Adding stream listener to stream changes in child nodes #183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Aug 8, 2018
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
16cdfcb
Added SSEClient library
the-c0d3r Jul 21, 2018
b6f2eae
Added Streaming functionality to db.py
the-c0d3r Jul 21, 2018
561f4eb
Added ignore directive 'protected-access' for db.py in lint.sh
the-c0d3r Jul 21, 2018
e6944c1
Fixed typo
the-c0d3r Jul 21, 2018
93c393a
Merge branch 'master' into stream_listener
the-c0d3r Jul 24, 2018
4dccb8c
Renamed file to internal module
the-c0d3r Jul 24, 2018
f8ca12c
Reverted lint.sh, added ignore directive in db.py
the-c0d3r Jul 24, 2018
f14ebff
Merge branch 'stream_listener' of github.com:the-c0d3r/firebase-admin…
the-c0d3r Jul 24, 2018
6a829d0
Changed import module name to internal name
the-c0d3r Jul 24, 2018
4b9a952
Fixed pylint protected-access by not calling protected member
the-c0d3r Jul 24, 2018
db96e05
Added test_sseclient.py
the-c0d3r Jul 26, 2018
0906a79
python2,3 compatibility, fixed encoding issue
the-c0d3r Jul 27, 2018
ea58b58
Added tests for Event() class
the-c0d3r Jul 27, 2018
104cc4e
removed build_headers() function
the-c0d3r Jul 28, 2018
8c0c3e7
Removed Event().dump() test code
the-c0d3r Jul 28, 2018
0186ed9
Changed import to match code style
the-c0d3r Jul 28, 2018
37ae857
init sseclient to Stream().start() and removed sleep
the-c0d3r Jul 28, 2018
9a0d4c3
Added apache license 2.0 header
the-c0d3r Jul 28, 2018
d9142ad
changed import style, removed for loop in event in test_sseclient.Tes…
the-c0d3r Jul 28, 2018
bfb7c69
Removed self.running from firebase_admin._sseclient
the-c0d3r Jul 31, 2018
df05d22
Merge branch 'master' into stream_listener
the-c0d3r Aug 3, 2018
76896a5
Renamed Stream class to ListenerRegistration, stream() to listen(), r…
the-c0d3r Aug 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 198 additions & 0 deletions firebase_admin/_sseclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
"""SSEClient module to handle streaming of realtime changes on the database
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the Apache license header here.

to the firebase-admin-sdk
"""

import re
import time
import warnings
import six

import requests


# Technically, we should support streams that mix line endings. This regex,
# however, assumes that a system will provide consistent line endings.
end_of_field = re.compile(r'\r\n\r\n|\r\r|\n\n')


class KeepAuthSession(requests.Session):
"""A session that does not drop Authentication on redirects between domains"""
def rebuild_auth(self, prepared_request, response):
pass


class SSEClient(object):
"""SSE Client Class"""
def __init__(self, url, session, build_headers, last_id=None, retry=3000, **kwargs):
self.should_connect = True
self.url = url
self.last_id = last_id
self.retry = retry
self.running = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this as it gets set to False below.

# Optional support for passing in a requests.Session()
self.session = session
# function for building auth header when token expires
self.build_headers = build_headers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this? The session we get from the db module is an instance of google.auth.transport.requests.AuthorizedSession, which is guaranteed to add the required Authorization header. So this shouldn't be required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build_header() and all subsequent function arguments removed.

self.start_time = None
# Any extra kwargs will be fed into the requests.get call later.
self.requests_kwargs = kwargs

# The SSE spec requires making requests with Cache-Control: nocache
if 'headers' not in self.requests_kwargs:
self.requests_kwargs['headers'] = {}
self.requests_kwargs['headers']['Cache-Control'] = 'no-cache'

# The 'Accept' header is not required, but explicit > implicit
self.requests_kwargs['headers']['Accept'] = 'text/event-stream'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above can be written as:

headers = self.request_kwargs.get('headers', {})
# add the required values to headers
self.request_kwargs['headers'] = headers


# Keep data here as it streams in
self.buf = u''

self._connect()

def close(self):
"""Close the SSE Client instance"""
# TODO: check if AttributeError is needed to catch here
self.should_connect = False
self.retry = 0
self.resp.close()
# self.resp.raw._fp.fp.raw._sock.shutdown(socket.SHUT_RDWR)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the commented out lines?

On a side note, I was testing this branch the other day, and I noticed that calling close() here does not immediately release the underlying socket. As a result the Stream remains active for a while even after calling stream.close(). So perhaps we do need the commented out lines?

Copy link
Contributor Author

@the-c0d3r the-c0d3r Jul 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also noticed the delay in calling stream.close(). I was calling it in the python shell, and it takes a few seconds to close and return. I'll test it out a bit more after I followed your other suggestions first.

# self.resp.raw._fp.fp.raw._sock.close()


def _connect(self):
"""connects to the server using requests"""
if self.should_connect:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also check for self.running.

success = False
while not success:
if self.last_id:
self.requests_kwargs['headers']['Last-Event-ID'] = self.last_id
headers = self.build_headers()
self.requests_kwargs['headers'].update(headers)
# Use session if set. Otherwise fall back to requests module.
self.requester = self.session or requests
self.resp = self.requester.get(self.url, stream=True, **self.requests_kwargs)

self.resp_iterator = self.resp.iter_content(decode_unicode=True)

# TODO: Ensure we're handling redirects. Might also stick the 'origin'
# attribute on Events like the Javascript spec requires.
self.resp.raise_for_status()
success = True
else:
raise StopIteration()

def _event_complete(self):
return re.search(end_of_field, self.buf) is not None

def __iter__(self):
return self

def __next__(self):
while not self._event_complete():
try:
nextchar = next(self.resp_iterator)
self.buf += nextchar
except (StopIteration, requests.RequestException):
time.sleep(self.retry / 1000.0)
self._connect()

# The SSE spec only supports resuming from a whole message, so
# if we have half a message we should throw it out.
head, sep, tail = self.buf.rpartition('\n')
self.buf = head + sep
continue

split = re.split(end_of_field, self.buf)
head = split[0]
tail = "".join(split[1:])

self.buf = tail
msg = Event.parse(head)

if msg.data == "credential is no longer valid":
self._connect()
return None

if msg.data == 'null':
return None

# If the server requests a specific retry delay, we need to honor it.
if msg.retry:
self.retry = msg.retry

# last_id should only be set if included in the message. It's not
# forgotten if a message omits it.
if msg.event_id:
self.last_id = msg.event_id

return msg

if six.PY2:
next = __next__


class Event(object):
"""Event class to handle the events fired by SSE"""

sse_line_pattern = re.compile('(?P<name>[^:]*):?( ?(?P<value>.*))?')

def __init__(self, data='', event='message', event_id=None, retry=None):
self.data = data
self.event = event
self.event_id = event_id
self.retry = retry

def dump(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this whole method can be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method Removed, and it's accompanying test in test/test_sseclient.py removed.

"""Dumps the event data"""
lines = []
if self.event_id:
lines.append('id: %s' % self.event_id)

# Only include an event line if it's not the default already.
if self.event != 'message':
lines.append('event: %s' % self.event)

if self.retry:
lines.append('retry: %s' % self.retry)

lines.extend('data: %s' % d for d in self.data.split('\n'))
return '\n'.join(lines) + '\n\n'

@classmethod
def parse(cls, raw):
"""Given a possibly-multiline string representing an SSE message, parse it
and return a Event object.
"""
msg = cls()
for line in raw.split('\n'):
match = cls.sse_line_pattern.match(line)
if match is None:
# Malformed line. Discard but warn.
warnings.warn('Invalid SSE line: "%s"' % line, SyntaxWarning)
continue

name = match.groupdict()['name']
value = match.groupdict()['value']
if name == '':
# line began with a ":", so is a comment. Ignore
continue

if name == 'data':
# If we already have some data, then join to it with a newline.
# Else this is it.
if msg.data:
msg.data = '%s\n%s' % (msg.data, value)
else:
msg.data = value
elif name == 'event':
msg.event = value
elif name == 'id':
msg.event_id = value
elif name == 'retry':
msg.retry = int(value)

return msg

def __str__(self):
return self.data
69 changes: 69 additions & 0 deletions firebase_admin/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,25 @@
import collections
import json
import sys
import threading
import time

import requests
import six
from six.moves import urllib
from google.auth import transport

import firebase_admin
from firebase_admin import _http_client
from firebase_admin import _utils
from firebase_admin._sseclient import SSEClient, KeepAuthSession
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a matter of style, we don't import individual classes and members. Please import the module here as from firebase_admin import _sseclient, and change the code below accordingly.



try:
from urllib.parse import urlencode
except ImportError:
from urllib import urlencode

_DB_ATTRIBUTE = '_database'
_INVALID_PATH_CHARACTERS = '[].?#$'
_RESERVED_FILTERS = ('$key', '$value', '$priority')
Expand Down Expand Up @@ -69,6 +78,49 @@ def _parse_path(path):
return [seg for seg in path.split('/') if seg]


class Stream(object):
"""Class that handles the streaming of data node changes from server"""
def __init__(self, url, build_headers, stream_handler, stream_id):
"""Initialize the streaming object"""
self.url = url
self.build_headers = build_headers
self.stream_handler = stream_handler
self.stream_id = stream_id
self.sse = None
self.thread = None
self.start()

def start(self):
"""Start the streaming by spawning a thread"""
self.thread = threading.Thread(target=self.start_stream)
self.thread.start()
return self

def start_stream(self):
"""Streaming function for the spawned thread to run"""
self.sse = SSEClient(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this to the start() method above, before the thread is started? Then I think you will be able to drop the sleep() call in close().

self.url,
session=KeepAuthSession(),
build_headers=self.build_headers
)

for msg in self.sse:
# iterate the sse client's generator
if msg:
msg_data = json.loads(msg.data)
msg_data["event"] = msg.event
if self.stream_id:
msg_data["stream_id"] = self.stream_id
self.stream_handler(msg_data)

def close(self):
while not self.sse and not hasattr(self.sse, "resp"):
time.sleep(0.001)
self.sse.running = False
self.sse.close()
self.thread.join()


class Reference(object):
"""Reference represents a node in the Firebase realtime database."""

Expand Down Expand Up @@ -101,6 +153,23 @@ def parent(self):
return Reference(client=self._client, segments=self._segments[:-1])
return None

def build_headers(self, token=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See if we can remove this altogether (see my comment on _sseclient). If we must keep this, please rename to _build_headers() so it's treated as an internal method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This build_headers() is tested to be unnecessary just like your comment on _sseclient. So I will remove it from all subsequent code.

headers = {'content-type' : 'application/json; charset=UTF-8'}
if not token and self._client.session:
request = transport.requests.Request()
self._client.session.credentials.refresh(request)
access_token = self._client.session.credentials.token
headers['Authorization'] = 'Bearer ' + access_token
return headers

def stream(self, stream_handler, stream_id=None):
parameters = {}
# reset path and build_query for next query
request_ref = '{}{}.json?{}'.format(
self._client.base_url, self._pathurl, urlencode(parameters)
)
return Stream(request_ref, self.build_headers, stream_handler, stream_id)

def child(self, path):
"""Returns a Reference to the specified child node.

Expand Down
2 changes: 1 addition & 1 deletion tests/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ def test_range_query(self):
assert recorder[0].headers['User-Agent'] == db._USER_AGENT


class TestDatabseInitialization(object):
class TestDatabaseInitialization(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch 👍

"""Test cases for database initialization."""

def teardown_method(self):
Expand Down