Skip to content

Adding stream listeners to sniff change in child #50

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions firebase_admin/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@
import firebase_admin
from firebase_admin import utils

from sseclient import SSEClient
import socket
import threading
import time
from requests import Session

try:
from urllib.parse import urlencode, quote
except:
from urllib import urlencode, quote

_DB_ATTRIBUTE = '_database'
_INVALID_PATH_CHARACTERS = '[].#$'
_RESERVED_FILTERS = ('$key', '$value', '$priority')
Expand Down Expand Up @@ -67,6 +78,70 @@ def _parse_path(path):
'Invalid path: "{0}". Path contains illegal characters.'.format(path))
return [seg for seg in path.split('/') if seg]

class KeepAuthSession(Session):
"""
A session that doesn't drop Authentication on redirects between domains.
"""

def rebuild_auth(self, prepared_request, response):
pass

class ClosableSSEClient(SSEClient):
def __init__(self, *args, **kwargs):
self.should_connect = True
super(ClosableSSEClient, self).__init__(*args, **kwargs)

def _connect(self):
if self.should_connect:
super(ClosableSSEClient, self)._connect()
else:
raise StopIteration()

def close(self):
self.should_connect = False
self.retry = 0
self.resp.raw._fp.fp.raw._sock.shutdown(socket.SHUT_RDWR)
self.resp.raw._fp.fp.raw._sock.close()

class Stream:
def __init__(self, url, stream_handler, build_headers, stream_id):
self.build_headers = build_headers
self.url = url
self.stream_handler = stream_handler
self.stream_id = stream_id
self.sse = None
self.thread = None
self.start()

def make_session(self):
"""
Return a custom session object to be passed to the ClosableSSEClient.
"""
session = KeepAuthSession()
return session

def start(self):
self.thread = threading.Thread(target=self.start_stream)
self.thread.start()
return self

def start_stream(self):
self.sse = ClosableSSEClient(self.url, session=self.make_session(), build_headers=self.build_headers)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes an error if pyrebase is not installed.

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.5/site-packages/firebase_admin/db.py", line 129, in start_stream
    self.sse = ClosableSSEClient(self.url, session=self.make_session(), build_headers=self.build_headers)
  File "/usr/local/lib/python3.5/site-packages/firebase_admin/db.py", line 88, in __init__
    super(ClosableSSEClient, self).__init__(*args, **kwargs)
  File "/usr/local/lib/python3.5/site-packages/sseclient.py", line 39, in __init__
    self._connect()
  File "/usr/local/lib/python3.5/site-packages/firebase_admin/db.py", line 94, in _connect
    super(ClosableSSEClient, self)._connect()
  File "/usr/local/lib/python3.5/site-packages/sseclient.py", line 47, in _connect
    self.resp = requester.get(self.url, stream=True, **self.requests_kwargs)
  File "/usr/local/lib/python3.5/site-packages/requests/sessions.py", line 521, in get
    return self.request('GET', url, **kwargs)
TypeError: request() got an unexpected keyword argument 'build_headers'

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check. I'll get back to you on this.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi - Any luck with this one, I am getting exactly the same error. What is the fix?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have resolved this issue. I'll share the changes by tonight or tomorrow.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome. Thanks. Waiting for the patch. By the way. This is an awesome library.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vijjuk
Yes firebase-admin is much better. I shifted from pyrebase to firebase-admin.
firebase-admin is using sse-client library to listen for changes in firebase database.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, can I ask what is the status with this pull request? Is it going to be merged? I am developing an app, that requires the server to listen to changes in the firebase database. @Aqsa-K I tried your fork repo, somehow it fires off an event as soon as I set the stream() and the callback function is called immediately. But the intended listening event is triggered every time when the data is changed in the database side.

>>> ref.stream(callbackfunc)
<firebase_admin.db.Stream object at 0x10c9b7ef0>
>>> {'event': 'put', 'data': True, 'path': '/'}
>>> # The event above is fired off automatically on stream without modification on db

>>> # these 3 events below are fired off by the changes in the db. 
>>> {'event': 'put', 'data': 'true', 'path': '/'}
>>> {'event': 'put', 'data': 'false', 'path': '/'}
>>> {'event': 'put', 'data': 'true', 'path': '/'}

>>> self.db.reference("/users/dev1/successful").stream(callbackfunc)
<firebase_admin.db.Stream object at 0x10ef897f0>
>>> {'event': 'put', 'data': True, 'path': '/'}
>>> # The event above is fired off automatically on stream without modification on db

Is it possible to fix this unexpected behavior? I would like to help fix it, but I have no idea where to start looking and I have never worked with SSE before. If anyone can point me in correct direction, please do.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first event is fired initially to retrieve all of the current data in the database.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@the-c0d3r you can separate out the first event which is fired initially by filtering the type 'event' or the type 'path' in the message received by callback function.

If you intend to look for changes in your data, then that should have the type 'event' as 'put'.

Let me know if this is still unclear and you need more help with this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @Aqsa-K Thanks for your comment. I have implemented functionality to filter the events and to only fire them if the value changes. I also created a PR on your fork. I noticed a few things on the SSE client. When I close the stream, it takes about 10 to 20 seconds for it to return. The following close function triggers some attribute error as well. So I catch them with an exception.

    def close(self):
        self.should_connect = False
        self.retry = 0
        try:
            self.resp.close()
            self.resp.raw._fp.fp.raw._sock.shutdown(socket.SHUT_RDWR)
            self.resp.raw._fp.fp.raw._sock.close()
        except AttributeError:
            pass
    def close(self):
        while not self.sse and not hasattr(self.sse, 'resp'):
            time.sleep(0.001)
        self.sse.running = False
        self.sse.close()
        self.thread.join()
        #  return self

But the problem still remains, the closing of the stream takes too long for a single threaded operation for my app. The close() function of the stream waits for the sse client thread to join. I think that takes up the most time. Is there any way to force the closing of the stream? I need to watch the changes based on the users' query (which differs everytime), then after getting results, I have to dispose of the stream. One workaround I can think of is to do that in a threadpool, so the thread can wait on the sse client closing process, but that does not seem to be the optimal way to me. Or am I doing anything wrong by trying to close?

The following is the way I dispose of the streams

streamRef = db.reference("/path/to/obj").stream(lambda x: print(x))
# attempt some changes on the path
streamRef.close()
# takes from 10 to 20 seconds here to return

Thanks for reading.

for msg in self.sse:
if msg:
msg_data = json.loads(msg.data)
msg_data["event"] = msg.event
if self.stream_id:
msg_data["stream_id"] = self.stream_id
self.stream_handler(msg_data)

def close(self):
while not self.sse and not hasattr(self.sse, 'resp'):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be or instead of and?

time.sleep(0.001)
self.sse.running = False
self.sse.close()
self.thread.join()
return self

class Reference(object):
"""Reference represents a node in the Firebase realtime database."""
Expand Down Expand Up @@ -100,6 +175,22 @@ def parent(self):
return Reference(client=self._client, segments=self._segments[:-1])
return None

def build_headers(self, token=None):
headers = {"content-type": "application/json; charset=UTF-8"}
if not token and self._client._session.credentials:
access_token = self._client._session.credentials.token
headers['Authorization'] = 'Bearer ' + access_token
return headers

def stream(self, stream_handler, token=None, stream_id=None):
# request_ref = self.build_request_url(token)
parameters = {}
# reset path and build_query for next query
request_ref = '{0}{1}.json?{2}'.format(self._client._url, self._pathurl, urlencode(parameters))
#self.stream_path = ""
#self.build_query = {}
return Stream(request_ref, stream_handler, self.build_headers, stream_id)

def child(self, path):
"""Returns a Reference to the specified child node.

Expand Down
1 change: 1 addition & 0 deletions sseclient/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .sseclient import SSEClient
168 changes: 168 additions & 0 deletions sseclient/sseclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import re
import time
import warnings
import threading
import six

import requests


# Technically, we should support streams that mix line endings. This regex,
# however, assumes that a system will provide consistent line endings.
end_of_field = re.compile(r'\r\n\r\n|\r\r|\n\n')

class SSEClient(object):
def __init__(self, url, session, build_headers, last_id=None, retry=3000, **kwargs):
self.url = url
self.last_id = last_id
self.retry = retry
self.running = True
# Optional support for passing in a requests.Session()
self.session = session
# function for building auth header when token expires
self.build_headers = build_headers
self.start_time = None
# Any extra kwargs will be fed into the requests.get call later.
self.requests_kwargs = kwargs

# The SSE spec requires making requests with Cache-Control: nocache
if 'headers' not in self.requests_kwargs:
self.requests_kwargs['headers'] = {}
self.requests_kwargs['headers']['Cache-Control'] = 'no-cache'

# The 'Accept' header is not required, but explicit > implicit
self.requests_kwargs['headers']['Accept'] = 'text/event-stream'

# Keep data here as it streams in
self.buf = u''

self._connect()

def _connect(self):
if self.last_id:
self.requests_kwargs['headers']['Last-Event-ID'] = self.last_id
headers = self.build_headers()
self.requests_kwargs['headers'].update(headers)
# Use session if set. Otherwise fall back to requests module.
self.requester = self.session or requests
self.resp = self.requester.get(self.url, stream=True, **self.requests_kwargs)

self.resp_iterator = self.resp.iter_content(decode_unicode=True)

# TODO: Ensure we're handling redirects. Might also stick the 'origin'
# attribute on Events like the Javascript spec requires.
self.resp.raise_for_status()

def _event_complete(self):
return re.search(end_of_field, self.buf) is not None

def __iter__(self):
return self

def __next__(self):
while not self._event_complete():
try:
nextchar = next(self.resp_iterator)
self.buf += nextchar
except (StopIteration, requests.RequestException):
time.sleep(self.retry / 1000.0)
self._connect()

# The SSE spec only supports resuming from a whole message, so
# if we have half a message we should throw it out.
head, sep, tail = self.buf.rpartition('\n')
self.buf = head + sep
continue

split = re.split(end_of_field, self.buf)
head = split[0]
tail = "".join(split[1:])

self.buf = tail
msg = Event.parse(head)

if msg.data == "credential is no longer valid":
self._connect()
return None

if msg.data == 'null':
return None

# If the server requests a specific retry delay, we need to honor it.
if msg.retry:
self.retry = msg.retry

# last_id should only be set if included in the message. It's not
# forgotten if a message omits it.
if msg.id:
self.last_id = msg.id

return msg

if six.PY2:
next = __next__


class Event(object):

sse_line_pattern = re.compile('(?P<name>[^:]*):?( ?(?P<value>.*))?')

def __init__(self, data='', event='message', id=None, retry=None):
self.data = data
self.event = event
self.id = id
self.retry = retry

def dump(self):
lines = []
if self.id:
lines.append('id: %s' % self.id)

# Only include an event line if it's not the default already.
if self.event != 'message':
lines.append('event: %s' % self.event)

if self.retry:
lines.append('retry: %s' % self.retry)

lines.extend('data: %s' % d for d in self.data.split('\n'))
return '\n'.join(lines) + '\n\n'

@classmethod
def parse(cls, raw):
"""
Given a possibly-multiline string representing an SSE message, parse it
and return a Event object.
"""
msg = cls()
for line in raw.split('\n'):
m = cls.sse_line_pattern.match(line)
if m is None:
# Malformed line. Discard but warn.
warnings.warn('Invalid SSE line: "%s"' % line, SyntaxWarning)
continue

name = m.groupdict()['name']
value = m.groupdict()['value']
if name == '':
# line began with a ":", so is a comment. Ignore
continue

if name == 'data':
# If we already have some data, then join to it with a newline.
# Else this is it.
if msg.data:
msg.data = '%s\n%s' % (msg.data, value)
else:
msg.data = value
elif name == 'event':
msg.event = value
elif name == 'id':
msg.id = value
elif name == 'retry':
msg.retry = int(value)

return msg

def __str__(self):
return self.data