66
77from . import ForeignDataWrapper
88from .utils import log_to_postgres
9- from logging import WARNING
9+ from logging import INFO , WARNING , ERROR
1010import csv
11+ import glob
12+ import gzip
13+ import itertools
14+ import os
15+ import re
1116
1217
1318class CsvFdw (ForeignDataWrapper ):
1419 """A foreign data wrapper for accessing csv files.
1520
1621 Valid options:
17- - filename : full path to the csv file, which must be readable
18- by the user running postgresql (usually postgres)
19- - delimiter : the delimiter used between fields.
22+ - filename : Full path to the csv file, which must be readable
23+ by the user running postgresql (usually postgres).
24+ - globs : Glob for recognizing files to process, which must be
25+ readable by the user running postgresql (usually postgres). Multiple
26+ globs may be passed, separated by ' // ' (space-slash-slash-space).
27+ (UNIX paths can not meaningfully contain two slashes.)
28+ - delimiter : The delimiter used between fields.
2029 Default: ","
30+ - quotechar : The CSV quote character.
31+ Default: '"'
32+ - skip_header : The number of lines to skip.
33+ Default: 0
34+ - gzip : Look for GZip file magic and decompress files if found.
35+ Default: true
2136 """
2237
2338 def __init__ (self , fdw_options , fdw_columns ):
2439 super (CsvFdw , self ).__init__ (fdw_options , fdw_columns )
25- self .filename = fdw_options [ "filename" ]
40+ self .filename = fdw_options . get ( "filename" )
2641 self .delimiter = fdw_options .get ("delimiter" , "," )
2742 self .quotechar = fdw_options .get ("quotechar" , '"' )
28- self .skip_header = int (fdw_options .get (' skip_header' , 0 ))
43+ self .skip_header = int (fdw_options .get (" skip_header" , 0 ))
2944 self .columns = fdw_columns
45+ self .globs = set (fdw_options .get ("globs" , "" ).split (" // " ))
46+ self .globs .discard ("" )
47+ if not (bool (self .globs ) ^ bool (self .filename )):
48+ log_to_postgres ("Please set either 'filename' or 'globs'." , ERROR )
49+ try :
50+ self .gzip = {"true" : True ,
51+ "false" : False }[fdw_options .get ("gzip" , "true" )]
52+ except KeyError :
53+ log_to_postgres ("Please set 'gzip' as 'true' or 'false'." , ERROR )
54+
3055
3156 def execute (self , quals , columns ):
32- with open (self .filename ) as stream :
57+ generators = (self .load_file (path , columns ) for path in self .paths ())
58+ return itertools .chain (* generators )
59+
60+ def load_file (self , path , columns ):
61+ open_function = self .opener (path )
62+ with open_function (path ) as stream :
3363 reader = csv .reader (stream , delimiter = self .delimiter )
3464 count = 0
3565 checked = False
@@ -40,10 +70,24 @@ def execute(self, quals, columns):
4070 # appropriate length
4171 checked = True
4272 if len (line ) > len (self .columns ):
43- log_to_postgres ("There are more columns than "
44- "defined in the table" , WARNING )
73+ log_to_postgres ("More columns than defined in "
74+ "table: %s" % path , WARNING )
4575 if len (line ) < len (self .columns ):
46- log_to_postgres ("There are less columns than "
47- "defined in the table" , WARNING )
76+ log_to_postgres ("Fewer columns than defined in "
77+ "table: %s" % path , WARNING )
4878 yield line [:len (self .columns )]
4979 count += 1
80+
81+ def opener (self , path ):
82+ with open (path ) as stream :
83+ if self .gzip and stream .read (2 ) == b'\x1F \x8b ' :
84+ log_to_postgres ("Reading CSV with gzip: %s" % path , INFO )
85+ return gzip .open
86+ return open
87+
88+ def paths (self ):
89+ if self .globs :
90+ joined = itertools .chain (* (glob .iglob (g ) for g in self .globs ))
91+ return (p for p in joined if os .path .isfile (p ))
92+ else :
93+ return [self .filename ]
0 commit comments