11package com .chinagoods .bigdata .functions .string ;
22
33import com .chinagoods .bigdata .functions .utils .MysqlUtil ;
4+ import com .google .common .cache .CacheBuilder ;
5+ import com .google .common .cache .CacheLoader ;
6+ import com .google .common .cache .LoadingCache ;
47import org .apache .hadoop .hive .ql .exec .Description ;
58import org .apache .hadoop .hive .ql .exec .UDFArgumentException ;
69import org .apache .hadoop .hive .ql .metadata .HiveException ;
1215import org .slf4j .LoggerFactory ;
1316
1417import java .io .IOException ;
18+ import java .sql .SQLException ;
19+ import java .time .LocalDateTime ;
20+ import java .time .format .DateTimeFormatter ;
21+ import java .util .ArrayList ;
22+ import java .util .List ;
1523import java .util .Map ;
24+ import java .util .concurrent .ExecutionException ;
25+ import java .util .concurrent .TimeUnit ;
1626
1727/**
1828 * @author zyl
2535 , extended = "Example:\n > SELECT _FUNC_(device_no,pay_at) FROM src;" )
2636public class UDFRestNameFormat extends GenericUDF {
2737 private static final Logger logger = LoggerFactory .getLogger (UDFRestNameFormat .class );
28- private static final String DB_URL = "jdbc:mysql://172.18.5.22:3306 /source?characterEncoding=UTF-8&useSSL=false" ;
38+ private static final String DB_URL = "jdbc:mysql://172.18.5.10:23307 /source?characterEncoding=UTF-8&useSSL=false" ;
2939 private static final String DB_USER = "source" ;
3040 private static final String DB_PASSWORD = "jP8*dKw,bRjBVos=" ;
3141 /**
3242 * 设备对应食堂有效期 fast_pass_device_valid_inf
3343 */
34- private static final String REST_QUERY_SQL = "select device_no,rest_name from fast_pass_device_valid_inf " ;
44+ private static final String REST_QUERY_SQL = "select device_no,rest_name,valid_start_at,valid_end_at from fast_pass_device_valid_inf " ;
3545 private ObjectInspectorConverters .Converter [] converters ;
3646 private static final int ARG_COUNT = 2 ;
47+ private CacheLoader <String , List <List <String >>> uaLoader = null ;
48+ public LoadingCache <String , List <List <String >>> uaCache = null ;
49+ DateTimeFormatter formatter = DateTimeFormatter .ofPattern ("yyyy-MM-dd HH:mm:ss" );
3750
3851 public UDFRestNameFormat () {
3952 }
@@ -49,6 +62,19 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen
4962 converters [i ] = ObjectInspectorConverters .getConverter (arguments [i ],
5063 PrimitiveObjectInspectorFactory .javaStringObjectInspector );
5164 }
65+ uaLoader = new CacheLoader <String , List <List <String >>>() {
66+ @ Override
67+ public List <List <String >> load (String key ) throws UDFArgumentException {
68+ // 缓存miss时,加载数据的方法
69+ logger .debug ("进入加载数据, key: {}" , key );
70+ return queryRestDeviceList (key );
71+ }
72+ };
73+ uaCache = CacheBuilder .newBuilder ()
74+ .maximumSize (10000 )
75+ //缓存项在给定时间内没有被写访问(创建或覆盖),则回收。如果认为缓存数据总是在固定时候后变得陈旧不可用,这种回收方式是可取的。
76+ .expireAfterAccess (10 , TimeUnit .MINUTES )
77+ .build (uaLoader );
5278 return PrimitiveObjectInspectorFactory .javaStringObjectInspector ;
5379 }
5480
@@ -57,20 +83,33 @@ public String evaluate(DeferredObject[] arguments) throws HiveException {
5783 assert (arguments .length == ARG_COUNT );
5884 String deviceNo = converters [0 ].convert (arguments [0 ].get ()).toString ();
5985 String payAt = converters [0 ].convert (arguments [1 ].get ()).toString ();
60- return queryRestName (deviceNo ,payAt );
86+ LocalDateTime paramAt = LocalDateTime .parse (payAt , formatter );
87+ List <List <String >> dataList = null ;
88+ try {
89+ dataList = uaCache .get (deviceNo );
90+ } catch (ExecutionException e ) {
91+ logger .error ("缓存获取失败,原始DVT为: {}" , dataList , e );
92+ }
93+ DateTimeFormatter formatter = DateTimeFormatter .ofPattern ("yyyy-MM-dd HH:mm:ss" );
94+ for (List <String > subList : dataList ) {
95+ LocalDateTime start = LocalDateTime .parse (subList .get (2 ), formatter );
96+ LocalDateTime end = LocalDateTime .parse (subList .get (3 ), formatter );
97+ if (subList .get (0 ).equals (deviceNo ) && paramAt .isAfter (start ) && paramAt .isBefore (end )) {
98+ return subList .get (1 );
99+ }
100+ }
101+ return null ;
61102 }
62103
63104 /**
64- *
65- *
66105 * @throws UDFArgumentException 查询mysql异常
67106 */
68- public String queryRestName (String deviceNo , String payAt ) throws UDFArgumentException {
107+ public List < List < String >> queryRestDeviceList (String deviceNo ) throws UDFArgumentException {
69108 try {
70109 // 配置信息
71110 MysqlUtil mysqlUtil = new MysqlUtil (DB_URL , DB_USER , DB_PASSWORD );
72- Map < String , String > paramKvMap = mysqlUtil .getMap (REST_QUERY_SQL + " where device_no='" + deviceNo + "' and valid_start_at<='" + payAt + "' and valid_end_at>='" + payAt + "'" );
73- return paramKvMap . get ( deviceNo ) ;
111+ List < List < String >> list = mysqlUtil .getLists (REST_QUERY_SQL + "where device_no='" + deviceNo + "'" );
112+ return list ;
74113 } catch (Exception e ) {
75114 logger .error ("Failed to query the rest name. Procedure, the error details are: " , e );
76115 throw new UDFArgumentException (String .format ("Failed to query the rest name. Procedure, the error details are: %s" , e ));
@@ -84,20 +123,37 @@ public String getDisplayString(String[] strings) {
84123 }
85124
86125 public static void main (String [] args ) throws HiveException {
87- String restName ;
88- try (UDFRestNameFormat urlFormat = new UDFRestNameFormat ()) {
89- DeferredObject [] deferredObjects = new DeferredObject [2 ];
90- // 设备编码、支付时间
91- deferredObjects [0 ] = new DeferredJavaObject ("YPT13291" );
92- deferredObjects [1 ] = new DeferredJavaObject ("2024-07-16 12:34:23" );
93- ObjectInspector [] inspectorArr = new ObjectInspector [2 ];
94- inspectorArr [0 ] = PrimitiveObjectInspectorFactory .javaStringObjectInspector ;
95- inspectorArr [1 ] = PrimitiveObjectInspectorFactory .javaStringObjectInspector ;
96- urlFormat .initialize (inspectorArr );
97- restName = urlFormat .evaluate (deferredObjects );
98- } catch (IOException e ) {
99- throw new RuntimeException (e );
126+ List <String > list = new ArrayList <>();
127+ list .add ("YPT13285" );
128+ list .add ("YPT13286" );
129+ list .add ("YPT13289" );
130+ list .add ("YPT13290" );
131+ list .add ("YPT13291" );
132+ list .add ("YPT13294" );
133+ list .add ("YPT13296" );
134+ list .add ("YPT13298" );
135+ list .add ("YPT13299" );
136+ list .add ("YPT13345" );
137+ list .add ("YPT13482" );
138+ list .add ("YPT13483" );
139+ list .add ("YPT13496" );
140+ list .add ("YPT13512" );
141+ for (String dvo : list ) {
142+ String restName ;
143+ try (UDFRestNameFormat urlFormat = new UDFRestNameFormat ()) {
144+ DeferredObject [] deferredObjects = new DeferredObject [2 ];
145+ // 设备编码、支付时间
146+ deferredObjects [0 ] = new DeferredJavaObject (dvo );
147+ deferredObjects [1 ] = new DeferredJavaObject ("2024-07-16 12:34:23" );
148+ ObjectInspector [] inspectorArr = new ObjectInspector [2 ];
149+ inspectorArr [0 ] = PrimitiveObjectInspectorFactory .javaStringObjectInspector ;
150+ inspectorArr [1 ] = PrimitiveObjectInspectorFactory .javaStringObjectInspector ;
151+ urlFormat .initialize (inspectorArr );
152+ restName = urlFormat .evaluate (deferredObjects );
153+ } catch (IOException e ) {
154+ throw new RuntimeException (e );
155+ }
156+ System .out .println (restName );
100157 }
101- System .out .println (restName );
102158 }
103159}
0 commit comments