-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Description
I am running a cloud dataflow pipeline which requires that each worker download some videos from a gcp bucket, process them, and reupload. This pipeline works locally, but when deployed to dataflow I get the cryptic error when using google-cloud-storage to download blobs.
with open(local_path, 'wb') as file_obj:
blob.download_to_file(file_obj)
returns:
File "run_clouddataflow.py", line 48, in process
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 464, in download_to_file
self._do_download(transport, file_obj, download_url, headers)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 418, in _do_download
download.consume(transport)
File "/usr/local/lib/python2.7/dist-packages/google/resumable_media/requests/download.py", line 101, in consume
self._write_to_stream(result)
File "/usr/local/lib/python2.7/dist-packages/google/resumable_media/requests/download.py", line 62, in _write_to_stream
with response:
AttributeError: __exit__ [while running 'Run DeepMeerkat']
The function in question is
def process(self,element):
import csv
from google.cloud import storage
from DeepMeerkat import DeepMeerkat
from urlparse import urlparse
import os
import google.auth
import logging
DM=DeepMeerkat.DeepMeerkat()
logging.info(os.getcwd())
logging.info(element)
#try adding credentials?
#set credentials, inherent from worker
credentials, project = google.auth.default()
#download element locally
parsed = urlparse(element[0])
logging.info(parsed)
#parse gcp path
storage_client=storage.Client(credentials=credentials)
bucket = storage_client.get_bucket(parsed.hostname)
blob=storage.Blob(parsed.path[1:],bucket)
#store local path
local_path=parsed.path.split("/")[-1]
logging.info('local path: ' + local_path)
with open(local_path, 'wb') as file_obj:
blob.download_to_file(file_obj)
logging.info("Downloaded" + local_path)
#Assign input from DataFlow/manifest
DM.process_args(video=local_path)
DM.process_args()
DM.args.output="Frames"
#Run DeepMeerkat
DM.run()
Mostly i'm trying to understand what this error means. Is this a permissions error? A latency error? A local write error? I cannot recreate outside of dataflow. What I can tell you from my logging is that dataflow worker is trying to write to write "video.avi" to root (i.e "/"). I've tried writing to /tmp/, but without knowing if this is a permissions error, either locally or from gcp bucket, i'm having trouble debugging. Can you give me a sense of what kind of error this stems from?
SO question is here.
Dataflow processes require a setup.py, so all packages are current pip install google-cloud.
I've tried explicitly passing credentials using google-auth, but doesn't seem to make a difference.