diff --git a/docs/quickStart.md b/docs/quickStart.md index a5bf1fa9a..5ad03d884 100644 --- a/docs/quickStart.md +++ b/docs/quickStart.md @@ -65,7 +65,7 @@ sh submit.sh -yarnconf F:\dtstack\flinkStreamSql\localhost\hadoop -flinkJarPath F:\Java\flink-1.8.2-bin-scala_2.12\flink-1.8.2\lib -pluginLoadMode shipfile - -confProp {\"time.characteristic\":\"eventTime\",\"logLevel\":\"info\"} + -confProp "{\"time.characteristic\":\"eventTime\",\"logLevel\":\"info\"}" ``` #### yarn模式命令 @@ -95,7 +95,7 @@ sh submit.sh -yarnconf /home/wen/Desktop/flink_stream_sql_conf/yarnConf_node1 -flinkJarPath /home/wen/Desktop/dtstack/flink-1.8.1/lib -pluginLoadMode shipfile - -confProp {\"time.characteristic\":\"eventTime\",\"logLevel\":\"info\"} + -confProp "{\"time.characteristic\":\"eventTime\",\"logLevel\":\"info\"}" -queue c ``` 参数具体细节请看[命令参数说明](./config.md) diff --git a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java deleted file mode 100644 index 28b6ca3b3..000000000 --- a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dtstack.flink.sql.side.rdb.all; - -import com.dtstack.flink.sql.side.BaseAllReqRow; -import com.dtstack.flink.sql.side.BaseSideInfo; -import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo; -import com.dtstack.flink.sql.side.rdb.util.SwitchUtil; -import com.dtstack.flink.sql.util.RowDataComplete; -import com.dtstack.flink.sql.util.RowDataConvert; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.calcite.sql.JoinType; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.dataformat.BaseRow; -import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; -import org.apache.flink.types.Row; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.*; -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - -/** - * side operator with cache for all(period reload) - * Date: 2018/11/26 - * Company: www.dtstack.com - * - * @author maqi - */ - -public abstract class AbstractRdbAllReqRow extends BaseAllReqRow { - - private static final long serialVersionUID = 2098635140857937718L; - - private static final Logger LOG = LoggerFactory.getLogger(AbstractRdbAllReqRow.class); - - private static final int CONN_RETRY_NUM = 3; - - private static final int DEFAULT_FETCH_SIZE = 1000; - - private AtomicReference>>> cacheRef = new AtomicReference<>(); - - public AbstractRdbAllReqRow(BaseSideInfo sideInfo) { - super(sideInfo); - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - RdbSideTableInfo tableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo(); - LOG.info("rdb dim table config info: {} ", tableInfo.toString()); - } - - @Override - protected void initCache() throws SQLException { - Map>> newCache = Maps.newConcurrentMap(); - cacheRef.set(newCache); - loadData(newCache); - } - - @Override - protected void reloadCache() { - //reload cacheRef and replace to old cacheRef - Map>> newCache = Maps.newConcurrentMap(); - try { - loadData(newCache); - } catch (SQLException e) { - throw new RuntimeException(e); - } - cacheRef.set(newCache); - LOG.info("----- rdb all cacheRef reload end:{}", Calendar.getInstance()); - } - - @Override - public void flatMap(Row value, Collector out) throws Exception { - List equalValIndex = sideInfo.getEqualValIndex(); - ArrayList inputParams = equalValIndex.stream() - .map(value::getField) - .filter(Objects::nonNull) - .collect(Collectors.toCollection(ArrayList::new)); - - if (inputParams.size() != equalValIndex.size() && sideInfo.getJoinType() == JoinType.LEFT) { - Row row = fillData(value, null); - RowDataComplete.collectRow(out, row); - return; - } - - String cacheKey = inputParams.stream() - .map(Object::toString) - .collect(Collectors.joining("_")); - - List> cacheList = cacheRef.get().get(cacheKey); - if (CollectionUtils.isEmpty(cacheList) && sideInfo.getJoinType() == JoinType.LEFT) { - Row row = fillData(value, null); - RowDataComplete.collectRow(out, row); - } else if (!CollectionUtils.isEmpty(cacheList)) { - cacheList.forEach(one -> out.collect(RowDataConvert.convertToBaseRow(fillData(value, one)))); - } - } - - private void loadData(Map>> tmpCache) throws SQLException { - RdbSideTableInfo tableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo(); - Connection connection = null; - - try { - for (int i = 0; i < CONN_RETRY_NUM; i++) { - try { - connection = getConn(tableInfo.getUrl(), tableInfo.getUserName(), tableInfo.getPassword()); - break; - } catch (Exception e) { - if (i == CONN_RETRY_NUM - 1) { - throw new RuntimeException("", e); - } - try { - String connInfo = "url:" + tableInfo.getUrl() + ";userName:" + tableInfo.getUserName() + ",pwd:" + tableInfo.getPassword(); - LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo); - Thread.sleep(5 * 1000); - } catch (InterruptedException e1) { - LOG.error("", e1); - } - } - } - queryAndFillData(tmpCache, connection); - } catch (Exception e) { - LOG.error("", e); - throw new SQLException(e); - } finally { - if (connection != null) { - connection.close(); - } - } - } - - private void queryAndFillData(Map>> tmpCache, Connection connection) throws SQLException { - //load data from table - String sql = sideInfo.getSqlCondition(); - Statement statement = connection.createStatement(); - statement.setFetchSize(getFetchSize()); - ResultSet resultSet = statement.executeQuery(sql); - - String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields(), ","); - String[] sideFieldTypes = sideInfo.getSideTableInfo().getFieldTypes(); - Map sideFieldNamesAndTypes = Maps.newHashMap(); - for (int i = 0; i < sideFieldNames.length; i++) { - sideFieldNamesAndTypes.put(sideFieldNames[i], sideFieldTypes[i]); - } - - while (resultSet.next()) { - Map oneRow = Maps.newHashMap(); - for (String fieldName : sideFieldNames) { - Object object = resultSet.getObject(fieldName.trim()); - object = SwitchUtil.getTarget(object, sideFieldNamesAndTypes.get(fieldName)); - oneRow.put(fieldName.trim(), object); - } - - String cacheKey = sideInfo.getEqualFieldList().stream() - .map(oneRow::get) - .map(Object::toString) - .collect(Collectors.joining("_")); - - tmpCache.computeIfAbsent(cacheKey, key -> Lists.newArrayList()) - .add(oneRow); - } - } - - public int getFetchSize() { - return DEFAULT_FETCH_SIZE; - } - - /** - * get jdbc connection - * - * @param dbURL - * @param userName - * @param password - * @return - */ - public abstract Connection getConn(String dbURL, String userName, String password); - -}