17
17
from frontera .core .models import Request
18
18
from frontera .contrib .backends .partitioners import Crc32NamePartitioner
19
19
from frontera .utils .misc import chunks
20
+ import logging
20
21
21
22
22
23
_pack_functions = {
@@ -60,11 +61,11 @@ class HBaseQueue(Queue):
60
61
61
62
GET_RETRIES = 3
62
63
63
- def __init__ (self , connection , partitions , logger , table_name , drop = False ):
64
+ def __init__ (self , connection , partitions , table_name , drop = False ):
64
65
self .connection = connection
65
66
self .partitions = [i for i in range (0 , partitions )]
66
67
self .partitioner = Crc32NamePartitioner (self .partitions )
67
- self .logger = logger
68
+ self .logger = logging . getLogger ( "hbase.queue" )
68
69
self .table_name = table_name
69
70
70
71
tables = set (self .connection .tables ())
@@ -88,7 +89,7 @@ def schedule(self, batch):
88
89
if 'domain' not in request .meta :
89
90
_ , hostname , _ , _ , _ , _ = parse_domain_from_url_fast (request .url )
90
91
if not hostname :
91
- self .logger .error ("Can't get hostname for URL %s, fingerprint %s" % ( request .url , fprint ) )
92
+ self .logger .error ("Can't get hostname for URL %s, fingerprint %s" , request .url , fprint )
92
93
request .meta ['domain' ] = {'name' : hostname }
93
94
to_schedule .append ((score , fprint , request .meta ['domain' ], request .url ))
94
95
self ._schedule (to_schedule )
@@ -183,7 +184,8 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs):
183
184
while tries < self .GET_RETRIES :
184
185
tries += 1
185
186
limit *= 5.5 if tries > 1 else 1.0
186
- self .logger .debug ("Try %d, limit %d, last attempt: requests %d, hosts %d" % (tries , limit , count , len (queue .keys ())))
187
+ self .logger .debug ("Try %d, limit %d, last attempt: requests %d, hosts %d" ,
188
+ tries , limit , count , len (queue .keys ()))
187
189
meta_map .clear ()
188
190
queue .clear ()
189
191
count = 0
@@ -213,7 +215,7 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs):
213
215
continue
214
216
break
215
217
216
- self .logger .debug ("Finished: tries %d, hosts %d, requests %d" % ( tries , len (queue .keys ()), count ) )
218
+ self .logger .debug ("Finished: tries %d, hosts %d, requests %d" , tries , len (queue .keys ()), count )
217
219
218
220
# For every fingerprint collect it's row keys and return all fingerprints from them
219
221
fprint_map = {}
@@ -241,7 +243,7 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs):
241
243
with table .batch (transaction = True ) as b :
242
244
for rk in trash_can :
243
245
b .delete (rk )
244
- self .logger .debug ("%d row keys removed" % ( len (trash_can ) ))
246
+ self .logger .debug ("%d row keys removed" , len (trash_can ))
245
247
return results
246
248
247
249
def count (self ):
@@ -250,10 +252,10 @@ def count(self):
250
252
251
253
class HBaseState (States ):
252
254
253
- def __init__ (self , connection , table_name , logger , cache_size_limit ):
255
+ def __init__ (self , connection , table_name , cache_size_limit ):
254
256
self .connection = connection
255
257
self ._table_name = table_name
256
- self .logger = logger
258
+ self .logger = logging . getLogger ( "hbase.states" )
257
259
self ._state_cache = {}
258
260
self ._cache_size_limit = cache_size_limit
259
261
@@ -373,7 +375,7 @@ class HBaseBackend(DistributedBackend):
373
375
374
376
def __init__ (self , manager ):
375
377
self .manager = manager
376
- self .logger = manager . logger .backend
378
+ self .logger = logging . getLogger ( "hbase .backend" )
377
379
settings = manager .settings
378
380
port = settings .get ('HBASE_THRIFT_PORT' )
379
381
hosts = settings .get ('HBASE_THRIFT_HOST' )
@@ -400,7 +402,7 @@ def __init__(self, manager):
400
402
def strategy_worker (cls , manager ):
401
403
o = cls (manager )
402
404
settings = manager .settings
403
- o ._states = HBaseState (o .connection , settings .get ('HBASE_METADATA_TABLE' ), o . manager . logger . backend ,
405
+ o ._states = HBaseState (o .connection , settings .get ('HBASE_METADATA_TABLE' ),
404
406
settings .get ('HBASE_STATE_CACHE_SIZE_LIMIT' ))
405
407
return o
406
408
@@ -409,7 +411,7 @@ def db_worker(cls, manager):
409
411
o = cls (manager )
410
412
settings = manager .settings
411
413
drop_all_tables = settings .get ('HBASE_DROP_ALL_TABLES' )
412
- o ._queue = HBaseQueue (o .connection , o .queue_partitions , o . manager . logger . backend ,
414
+ o ._queue = HBaseQueue (o .connection , o .queue_partitions ,
413
415
settings .get ('HBASE_QUEUE_TABLE' ), drop = drop_all_tables )
414
416
o ._metadata = HBaseMetadata (o .connection , settings .get ('HBASE_METADATA_TABLE' ), drop_all_tables ,
415
417
settings .get ('HBASE_USE_SNAPPY' ), settings .get ('HBASE_BATCH_SIZE' ),
@@ -461,5 +463,5 @@ def get_next_requests(self, max_next_requests, **kwargs):
461
463
results = self .queue .get_next_requests (max_next_requests , partition_id , min_requests = 64 ,
462
464
min_hosts = 24 , max_requests_per_host = 128 )
463
465
next_pages .extend (results )
464
- self .logger .debug ("Got %d requests for partition id %d" % ( len (results ), partition_id ) )
466
+ self .logger .debug ("Got %d requests for partition id %d" , len (results ), partition_id )
465
467
return next_pages
0 commit comments