diff --git a/examples/follow.py b/examples/follow.py index 64b3e1ac6..cbb559deb 100755 --- a/examples/follow.py +++ b/examples/follow.py @@ -42,7 +42,7 @@ def follow(job, count, items): job.refresh() continue stream = items(offset+1) - for event in results.ResultsReader(stream): + for event in results.JSONResultsReader(stream): pprint(event) offset = total @@ -72,10 +72,10 @@ def main(): if job['reportSearch'] is not None: # Is it a transforming search? count = lambda: int(job['numPreviews']) - items = lambda _: job.preview() + items = lambda _: job.preview(output_mode='json') else: count = lambda: int(job['eventCount']) - items = lambda offset: job.events(offset=offset) + items = lambda offset: job.events(offset=offset, output_mode='json') try: follow(job, count, items) diff --git a/examples/oneshot.py b/examples/oneshot.py index dc34bb8cb..8429aedfb 100755 --- a/examples/oneshot.py +++ b/examples/oneshot.py @@ -32,7 +32,7 @@ "(e.g., export PYTHONPATH=~/splunk-sdk-python.") def pretty(response): - reader = results.ResultsReader(response) + reader = results.JSONResultsReader(response) for result in reader: if isinstance(result, dict): pprint(result) @@ -46,7 +46,7 @@ def main(): search = opts.args[0] service = connect(**opts.kwargs) socket.setdefaulttimeout(None) - response = service.jobs.oneshot(search) + response = service.jobs.oneshot(search, output_mode='json') pretty(response) diff --git a/examples/results.py b/examples/results.py index 9c0f18751..e18e8f567 100755 --- a/examples/results.py +++ b/examples/results.py @@ -17,18 +17,21 @@ """A script that reads XML search results from stdin and pretty-prints them back to stdout. The script is designed to be used with the search.py example, eg: './search.py "search 404" | ./results.py'""" - + from __future__ import absolute_import from pprint import pprint import sys, os + sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) import splunklib.results as results + def pretty(): - reader = results.ResultsReader(sys.stdin) + reader = results.JSONResultsReader(sys.stdin) for event in reader: pprint(event) + if __name__ == "__main__": pretty() diff --git a/examples/search_modes.py b/examples/search_modes.py index f3e05f362..f1d1687f2 100644 --- a/examples/search_modes.py +++ b/examples/search_modes.py @@ -24,7 +24,7 @@ def modes(argv): while not job.is_ready(): time.sleep(0.5) pass - reader = results.ResultsReader(job.events()) + reader = results.JSONResultsReader(job.events(output_mode='json')) # Events found: 0 print('Events found with adhoc_search_level="smart": %s' % len([e for e in reader])) @@ -33,7 +33,7 @@ def modes(argv): while not job.is_ready(): time.sleep(0.5) pass - reader = results.ResultsReader(job.events()) + reader = results.JSONResultsReader(job.events(output_mode='json')) # Events found: 10 print('Events found with adhoc_search_level="verbose": %s' % len([e for e in reader])) diff --git a/examples/stail.py b/examples/stail.py index 85f38a853..6ba4ee54e 100755 --- a/examples/stail.py +++ b/examples/stail.py @@ -25,7 +25,7 @@ from pprint import pprint from splunklib.client import connect -from splunklib.results import ResultsReader +from splunklib.results import JSONResultsReader try: import utils @@ -49,9 +49,10 @@ def main(): search=search, earliest_time="rt", latest_time="rt", - search_mode="realtime") + search_mode="realtime", + output_mode="json") - for result in ResultsReader(result.body): + for result in JSONResultsReader(result.body): if result is not None: print(pprint(result)) diff --git a/splunklib/client.py b/splunklib/client.py index 7b0772f11..0979140c2 100644 --- a/splunklib/client.py +++ b/splunklib/client.py @@ -2767,9 +2767,8 @@ def pause(self): return self def results(self, **query_params): - """Returns a streaming handle to this job's search results. To get a - nice, Pythonic iterator, pass the handle to :class:`splunklib.results.ResultsReader`, - as in:: + """Returns a streaming handle to this job's search results. To get a nice, Pythonic iterator, pass the handle + to :class:`splunklib.results.JSONResultsReader` along with the query param "output_mode='json'", as in:: import splunklib.client as client import splunklib.results as results @@ -2778,7 +2777,7 @@ def results(self, **query_params): job = service.jobs.create("search * | head 5") while not job.is_done(): sleep(.2) - rr = results.ResultsReader(job.results()) + rr = results.JSONResultsReader(job.results(output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results @@ -2808,19 +2807,17 @@ def results(self, **query_params): def preview(self, **query_params): """Returns a streaming handle to this job's preview search results. - Unlike :class:`splunklib.results.ResultsReader`, which requires a job to - be finished to - return any results, the ``preview`` method returns any results that have - been generated so far, whether the job is running or not. The - returned search results are the raw data from the server. Pass - the handle returned to :class:`splunklib.results.ResultsReader` to get a - nice, Pythonic iterator over objects, as in:: + Unlike :class:`splunklib.results.JSONResultsReader`along with the query param "output_mode='json'", + which requires a job to be finished to return any results, the ``preview`` method returns any results that + have been generated so far, whether the job is running or not. The returned search results are the raw data + from the server. Pass the handle returned to :class:`splunklib.results.JSONResultsReader` to get a nice, + Pythonic iterator over objects, as in:: import splunklib.client as client import splunklib.results as results service = client.connect(...) job = service.jobs.create("search * | head 5") - rr = results.ResultsReader(job.preview()) + rr = results.JSONResultsReader(job.preview(output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results @@ -2975,15 +2972,15 @@ def create(self, query, **kwargs): return Job(self.service, sid) def export(self, query, **params): - """Runs a search and immediately starts streaming preview events. - This method returns a streaming handle to this job's events as an XML - document from the server. To parse this stream into usable Python objects, - pass the handle to :class:`splunklib.results.ResultsReader`:: + """Runs a search and immediately starts streaming preview events. This method returns a streaming handle to + this job's events as an XML document from the server. To parse this stream into usable Python objects, + pass the handle to :class:`splunklib.results.JSONResultsReader` along with the query param + "output_mode='json'":: import splunklib.client as client import splunklib.results as results service = client.connect(...) - rr = results.ResultsReader(service.jobs.export("search * | head 5")) + rr = results.JSONResultsReader(service.jobs.export("search * | head 5",output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results @@ -3032,14 +3029,14 @@ def itemmeta(self): def oneshot(self, query, **params): """Run a oneshot search and returns a streaming handle to the results. - The ``InputStream`` object streams XML fragments from the server. To - parse this stream into usable Python objects, - pass the handle to :class:`splunklib.results.ResultsReader`:: + The ``InputStream`` object streams XML fragments from the server. To parse this stream into usable Python + objects, pass the handle to :class:`splunklib.results.JSONResultsReader` along with the query param + "output_mode='json'" :: import splunklib.client as client import splunklib.results as results service = client.connect(...) - rr = results.ResultsReader(service.jobs.oneshot("search * | head 5")) + rr = results.JSONResultsReader(service.jobs.oneshot("search * | head 5",output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results diff --git a/splunklib/results.py b/splunklib/results.py index 66e9ad7d1..5f3966859 100644 --- a/splunklib/results.py +++ b/splunklib/results.py @@ -34,15 +34,19 @@ from __future__ import absolute_import -from io import BytesIO +from io import BufferedReader, BytesIO from splunklib import six + +from splunklib.six import deprecated + try: import xml.etree.cElementTree as et except: import xml.etree.ElementTree as et from collections import OrderedDict +from json import loads as json_loads try: from splunklib.six.moves import cStringIO as StringIO @@ -54,6 +58,7 @@ "Message" ] + class Message(object): """This class represents informational messages that Splunk interleaves in the results stream. @@ -64,6 +69,7 @@ class Message(object): m = Message("DEBUG", "There's something in that variable...") """ + def __init__(self, type_, message): self.type = type_ self.message = message @@ -77,6 +83,7 @@ def __eq__(self, other): def __hash__(self): return hash((self.type, self.message)) + class _ConcatenatedStream(object): """Lazily concatenate zero or more streams into a stream. @@ -89,6 +96,7 @@ class _ConcatenatedStream(object): s = _ConcatenatedStream(StringIO("abc"), StringIO("def")) assert s.read() == "abcdef" """ + def __init__(self, *streams): self.streams = list(streams) @@ -107,6 +115,7 @@ def read(self, n=None): del self.streams[0] return response + class _XMLDTDFilter(object): """Lazily remove all XML DTDs from a stream. @@ -120,6 +129,7 @@ class _XMLDTDFilter(object): s = _XMLDTDFilter("") assert s.read() == "" """ + def __init__(self, stream): self.stream = stream @@ -150,6 +160,8 @@ def read(self, n=None): n -= 1 return response + +@deprecated("Use the JSONResultsReader function instead in conjuction with the 'output_mode' query param set to 'json'") class ResultsReader(object): """This class returns dictionaries and Splunk messages from an XML results stream. @@ -177,6 +189,7 @@ class ResultsReader(object): print "Message: %s" % result print "is_preview = %s " % reader.is_preview """ + # Be sure to update the docstrings of client.Jobs.oneshot, # client.Job.results_preview and client.Job.results to match any # changes made to ResultsReader. @@ -257,16 +270,16 @@ def _parse_results(self, stream): # So we'll define it here def __itertext(self): - tag = self.tag - if not isinstance(tag, six.string_types) and tag is not None: - return - if self.text: - yield self.text - for e in self: - for s in __itertext(e): - yield s - if e.tail: - yield e.tail + tag = self.tag + if not isinstance(tag, six.string_types) and tag is not None: + return + if self.text: + yield self.text + for e in self: + for s in __itertext(e): + yield s + if e.tail: + yield e.tail text = "".join(__itertext(elem)) values.append(text) @@ -288,5 +301,69 @@ def __itertext(self): raise +class JSONResultsReader(object): + """This class returns dictionaries and Splunk messages from a JSON results + stream. + ``JSONResultsReader`` is iterable, and returns a ``dict`` for results, or a + :class:`Message` object for Splunk messages. This class has one field, + ``is_preview``, which is ``True`` when the results are a preview from a + running search, or ``False`` when the results are from a completed search. + This function has no network activity other than what is implicit in the + stream it operates on. + :param `stream`: The stream to read from (any object that supports + ``.read()``). + **Example**:: + import results + response = ... # the body of an HTTP response + reader = results.JSONResultsReader(response) + for result in reader: + if isinstance(result, dict): + print "Result: %s" % result + elif isinstance(result, results.Message): + print "Message: %s" % result + print "is_preview = %s " % reader.is_preview + """ + + # Be sure to update the docstrings of client.Jobs.oneshot, + # client.Job.results_preview and client.Job.results to match any + # changes made to JSONResultsReader. + # + # This wouldn't be a class, just the _parse_results function below, + # except that you cannot get the current generator inside the + # function creating that generator. Thus it's all wrapped up for + # the sake of one field. + def __init__(self, stream): + # The search/jobs/exports endpoint, when run with + # earliest_time=rt and latest_time=rt, output_mode=json, streams a sequence of + # JSON documents, each containing a result, as opposed to one + # results element containing lots of results. + stream = BufferedReader(stream) + self.is_preview = None + self._gen = self._parse_results(stream) + + def __iter__(self): + return self + def next(self): + return next(self._gen) + + __next__ = next + def _parse_results(self, stream): + """Parse results and messages out of *stream*.""" + for line in stream.readlines(): + strip_line = line.strip() + if strip_line.__len__() == 0: continue + parsed_line = json_loads(strip_line) + if "preview" in parsed_line: + self.is_preview = parsed_line["preview"] + if "messages" in parsed_line and parsed_line["messages"].__len__() > 0: + for message in parsed_line["messages"]: + msg_type = message.get("type", "Unknown Message Type") + text = message.get("text") + yield Message(msg_type, text) + if "result" in parsed_line: + yield parsed_line["result"] + if "results" in parsed_line: + for result in parsed_line["results"]: + yield result diff --git a/splunklib/six.py b/splunklib/six.py index 5fe9f8e14..d13e50c93 100644 --- a/splunklib/six.py +++ b/splunklib/six.py @@ -978,3 +978,16 @@ def python_2_unicode_compatible(klass): del i, importer # Finally, add the importer to the meta path import hook. sys.meta_path.append(_importer) + +import warnings + +def deprecated(message): + def deprecated_decorator(func): + def deprecated_func(*args, **kwargs): + warnings.warn("{} is a deprecated function. {}".format(func.__name__, message), + category=DeprecationWarning, + stacklevel=2) + warnings.simplefilter('default', DeprecationWarning) + return func(*args, **kwargs) + return deprecated_func + return deprecated_decorator \ No newline at end of file diff --git a/tests/searchcommands/test_validators.py b/tests/searchcommands/test_validators.py index 38836c4aa..cc524b307 100755 --- a/tests/searchcommands/test_validators.py +++ b/tests/searchcommands/test_validators.py @@ -208,10 +208,9 @@ def test(integer): def test_float(self): # Float validator test - import random - maxsize = random.random() + 1 - minsize = random.random() - 1 + maxsize = 1.5 + minsize = -1.5 validator = validators.Float() diff --git a/tests/test_job.py b/tests/test_job.py index 4de34b611..44326086b 100755 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -54,8 +54,8 @@ def test_oneshot_with_garbage_fails(self): def test_oneshot(self): jobs = self.service.jobs - stream = jobs.oneshot("search index=_internal earliest=-1m | head 3") - result = results.ResultsReader(stream) + stream = jobs.oneshot("search index=_internal earliest=-1m | head 3", output_mode='json') + result = results.JSONResultsReader(stream) ds = list(result) self.assertEqual(result.is_preview, False) self.assertTrue(isinstance(ds[0], dict) or \ @@ -69,8 +69,8 @@ def test_export_with_garbage_fails(self): def test_export(self): jobs = self.service.jobs - stream = jobs.export("search index=_internal earliest=-1m | head 3") - result = results.ResultsReader(stream) + stream = jobs.export("search index=_internal earliest=-1m | head 3", output_mode='json') + result = results.JSONResultsReader(stream) ds = list(result) self.assertEqual(result.is_preview, False) self.assertTrue(isinstance(ds[0], dict) or \ @@ -82,7 +82,7 @@ def test_export_docstring_sample(self): import splunklib.client as client import splunklib.results as results service = self.service # cheat - rr = results.ResultsReader(service.jobs.export("search * | head 5")) + rr = results.JSONResultsReader(service.jobs.export("search * | head 5", output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results @@ -98,7 +98,7 @@ def test_results_docstring_sample(self): job = service.jobs.create("search * | head 5") while not job.is_done(): sleep(0.2) - rr = results.ResultsReader(job.results()) + rr = results.JSONResultsReader(job.results(output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results @@ -113,7 +113,7 @@ def test_preview_docstring_sample(self): import splunklib.results as results service = self.service # cheat job = service.jobs.create("search * | head 5") - rr = results.ResultsReader(job.preview()) + rr = results.JSONResultsReader(job.preview(output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results @@ -130,7 +130,7 @@ def test_oneshot_docstring_sample(self): import splunklib.client as client import splunklib.results as results service = self.service # cheat - rr = results.ResultsReader(service.jobs.oneshot("search * | head 5")) + rr = results.JSONResultsReader(service.jobs.oneshot("search * | head 5", output_mode='json')) for result in rr: if isinstance(result, results.Message): # Diagnostic messages may be returned in the results @@ -295,12 +295,12 @@ def test_get_preview_and_events(self): self.assertEventuallyTrue(self.job.is_done) self.assertLessEqual(int(self.job['eventCount']), 3) - preview_stream = self.job.preview() - preview_r = results.ResultsReader(preview_stream) + preview_stream = self.job.preview(output_mode='json') + preview_r = results.JSONResultsReader(preview_stream) self.assertFalse(preview_r.is_preview) - events_stream = self.job.events() - events_r = results.ResultsReader(events_stream) + events_stream = self.job.events(output_mode='json') + events_r = results.JSONResultsReader(events_stream) n_events = len([x for x in events_r if isinstance(x, dict)]) n_preview = len([x for x in preview_r if isinstance(x, dict)]) diff --git a/tests/test_results.py b/tests/test_results.py index 52e290f25..5fdca2b91 100755 --- a/tests/test_results.py +++ b/tests/test_results.py @@ -30,7 +30,7 @@ def test_read_from_empty_result_set(self): job = self.service.jobs.create("search index=_internal_does_not_exist | head 2") while not job.is_done(): sleep(0.5) - self.assertEqual(0, len(list(results.ResultsReader(io.BufferedReader(job.results()))))) + self.assertEqual(0, len(list(results.JSONResultsReader(io.BufferedReader(job.results(output_mode='json')))))) def test_read_normal_results(self): xml_text = """ diff --git a/tox.ini b/tox.ini index 227be746c..58ee004ca 100644 --- a/tox.ini +++ b/tox.ini @@ -33,6 +33,7 @@ deps = pytest unittest2 unittest-xml-reporting python-dotenv + deprecation distdir = build commands =