From f1a9adb857bb84503320d72fa4c4ca2b2777e14b Mon Sep 17 00:00:00 2001 From: Chris Warrick Date: Mon, 20 Jul 2020 20:15:32 +0200 Subject: [PATCH] Repaste bad-mimetype pastes from 0x0.st This resolves #61. Also adds retrieveUrlLazy that produces a Request in unbuffered mode to avoid buffering pastes we might not be interested in. --- infobob/pastebin.py | 98 ++++++++++++++++++++++++++++++++-- infobob/tests/test_pastebin.py | 41 +++++++++++++- 2 files changed, 134 insertions(+), 5 deletions(-) diff --git a/infobob/pastebin.py b/infobob/pastebin.py index c51623a..b8b0a50 100644 --- a/infobob/pastebin.py +++ b/infobob/pastebin.py @@ -49,10 +49,19 @@ def make_repaster(paster): GenericBadPastebin( u'hastebin.com', [u'www.hastebin.com'], - pasteIdFromFirstOrRaw(u'([a-zA-Z0-9]{4,12})$'), + pasteIdFromFirstOrRaw(u'([a-zA-Z0-9]{4,12})(?:\\.[a-z]+)?$'), u'/raw/', retrieveUrlContent, ), + MimeIgnoringPastebin( + u'0x0.st', + [u'www.0x0.st'], + pasteIdFromFirstOrRaw(u'([a-zA-Z0-9_-]{4,12}\\.py)$'), + u'', + retrieveUrlLazy, + [u'text/plain'], + 'http' + ), ] return BadPasteRepaster(badPastebins, paster) @@ -153,7 +162,7 @@ def extractBadPasteSpecs(self, message): bad paste found. """ potentialUrls = re.findall( - b'(?:https?://)?[a-z0-9.-:]+/[a-z0-9/]+', + b'(?:https?://)?[a-z0-9.-:]+/[a-z0-9/]+(?:\\.[a-z]+)?', message, flags=re.IGNORECASE, ) @@ -214,7 +223,10 @@ def repaste(self, badPastes): for paste in badPastes ] pastes_datas = yield defer.gatherResults(defs) - if len(pastes_datas) == 1: + pastes_datas = [data for data in pastes_datas if data] + if not pastes_datas: + return + elif len(pastes_datas) == 1: data = pastes_datas[0] language = u'python' else: @@ -418,12 +430,13 @@ def __init__( pasteIdFromPath, rawUrlPathPrefix, rawContentRetriever, + rawUrlProtocol=u'https', ): self.name = mainDomain self.domains = (mainDomain,) + tuple(altDomains) self._pasteIdFromPath = pasteIdFromPath self._baseRawUrl = urlparse.urlunparse(( - u'https', + rawUrlProtocol, mainDomain, u'/' + rawUrlPathPrefix.strip(u'/') + u'/', u'', @@ -457,6 +470,57 @@ def __repr__(self): ) +class MimeIgnoringPastebin(GenericBadPastebin): + """ + A pastebin that produces paste URLs with an ID in the URL's path, + and offers a "raw" URL for downloading the raw content of a paste + given the ID. Additionally, it can ignore pastes that have one of + the specified MIME types. + + ``pasteIdFromPath`` is a function used to extract the paste ID + from a paste URL's path (a text string). + """ + def __init__( + self, + mainDomain, + altDomains, + pasteIdFromPath, + rawUrlPathPrefix, + rawRequestRetriever, + mimetypesToIgnore, + rawUrlProtocol=u'https', + ): + super(MimeIgnoringPastebin, self).__init__( + mainDomain, + altDomains, + pasteIdFromPath, + rawUrlPathPrefix, + rawRequestRetriever, + rawUrlProtocol, + ) + self.mimetypesToIgnore = mimetypesToIgnore + + def contentFromPaste(self, badPaste): + def mimeCheckCallback(response): + cts = response.headers.getRawHeaders("Content-Type") + if not cts: + return treq.content(response) + for ct in cts: + if ct.split(';', 1)[0] in self.mimetypesToIgnore: + return None + return treq.content(response) + + if badPaste.pastebinName != self.name: + msgfmt = ( + u'Cannot retrieve paste {paste!r}, not created by {self!r}' + ) + raise ValueError(msgfmt.format(paste=badPaste, self=self)) + url = self._baseRawUrl + badPaste.id + req = self._retrieve(url) + mimeDfd = req.addCallback(mimeCheckCallback) + return mimeDfd + + class FailedToRetrieve(Exception): pass @@ -488,6 +552,32 @@ def cbCheckResponseCode(response): return respOkDfd.addCallback(client.content) +def retrieveUrlLazy(url, client=treq): + """ + Make a GET request to ``url``, verify 200 status response, and + return a Deferred that fires with the response. + + Will errback with :exc:`FailedToRetrieve` if a non-200 response + was received. + """ + if isinstance(url, unicode): + url = url.encode('utf-8') + log.info(u'Attempting to retrieve {url!r}'.format(url=url)) + respDfd = client.get(url, unbuffered=True) + + def cbCheckResponseCode(response): + #print('response!', response) + if response.code != 200: + raise FailedToRetrieve( + 'Expected 200 response from {url!r} but got {code}'.format( + url=url, code=response.code + ) + ) + return response + + return respDfd.addCallback(cbCheckResponseCode) + + ### Support for outgoing pastes def make_paster(): diff --git a/infobob/tests/test_pastebin.py b/infobob/tests/test_pastebin.py index b34af1d..e0a9a8f 100644 --- a/infobob/tests/test_pastebin.py +++ b/infobob/tests/test_pastebin.py @@ -43,6 +43,12 @@ def test_no_results(self, message): (b'pastebin.com/pwZA/', u'pastebin.com', u'pwZA'), (b'pastebin.ca/123986/', u'pastebin.ca', u'123986'), (b'hastebin.com/asdflkkfig/', u'hastebin.com', u'asdflkkfig'), + # Hastebin has insignificant extensions + (b'hastebin.com/asdflkkfig.py', u'hastebin.com', u'asdflkkfig'), + (b'hastebin.com/asdflkkfig.txt', u'hastebin.com', u'asdflkkfig'), + (b'hastebin.com/asdflkkfig.scala', u'hastebin.com', u'asdflkkfig'), + # But 0x0.st cares + (b'0x0.st/asdf.py', u'0x0.st', u'asdf.py'), ] ]) def test_scheme_optional(self, message, domain, pasteid): @@ -334,12 +340,14 @@ def test_contentFromPaste_rejects_foreign_badPaste(self): class CustomResource(object): isLeaf = True # NB: means getChildWithDefault will not be called - def __init__(self, status, content): + def __init__(self, status, content, contentType=u'text/plain'): self.status = status self.content = content + self.contentType = contentType def render(self, request): request.setResponseCode(self.status) + request.setHeader("Content-Type", self.contentType) return self.content @@ -358,3 +366,34 @@ def test_rejects_non_200_response(self): d = self.doRetrieve(400, b'epic fail') f = self.failureResultOf(d) self.assertRegex(str(f), r'Expected 200 response .* but got 400') + + +class MimeIgnoringPastebinTestCase(TrialTestCase): + def getPastebin(self, mimetype): + return pastebin.MimeIgnoringPastebin( + u'paste.example.com', + u'', + pastebin.pasteIdFromFirstOrRaw(u'([a-zA-Z0-9_-]{4,12})$'), + u'', + self.doRetrieve, + [mimetype], + 'http' + ) + + def doRetrieve(self, url): + self.treqStub = treq.testing.StubTreq(CustomResource(200, 'hello', 'text/plain')) + return pastebin.retrieveUrlLazy(url, client=self.treqStub) + + @defer.inlineCallbacks + def test_skips_mimetype(self): + badPaste = pastebin.BadPaste(u'paste.example.com', u'1234') + bin = self.getPastebin('text/plain') + result = yield bin.contentFromPaste(badPaste) + self.assertIsNone(result) + + @defer.inlineCallbacks + def test_accepts_different_mimetype(self): + badPaste = pastebin.BadPaste(u'paste.example.com', u'1234') + bin = self.getPastebin('text/html') + result = yield bin.contentFromPaste(badPaste) + self.assertIsNotNone(result)