Skip to content

Commit 6e5aca3

Browse files
authored
Merge pull request #1 from DTStack/master
merge the latest code
2 parents 8a109e4 + 41d323d commit 6e5aca3

File tree

9 files changed

+291
-4
lines changed

9 files changed

+291
-4
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* 增加oracle维表,结果表功能
1919
* 增加SQlServer维表,结果表功能
2020
* 增加kafka结果表功能
21+
* 增加SQL支持CEP
2122

2223
## 1 快速起步
2324
### 1.1 运行模式
@@ -149,7 +150,7 @@ CREATE TABLE MyTable(
149150
channel varchar,
150151
pv int,
151152
xctime bigint,
152-
CHARACTER_LENGTH(channel) AS timeLeng
153+
CHARACTER_LENGTH(channel) AS timeLeng //自定义的函数
153154
)WITH(
154155
type ='kafka09',
155156
bootstrapServers ='172.16.8.198:9092',
@@ -187,7 +188,7 @@ CREATE TABLE sideTable(
187188
cf:name varchar as name,
188189
cf:info varchar as info,
189190
PRIMARY KEY(name),
190-
PERIOD FOR SYSTEM_TIME
191+
PERIOD FOR SYSTEM_TIME //维表标识
191192
)WITH(
192193
type ='hbase',
193194
zookeeperQuorum ='rdos1:2181',

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
121121
rightScopeChild.setTableName(joinInfo.getRightTableName());
122122
SideTableInfo sideTableInfo = sideTableMap.get(joinInfo.getRightTableName());
123123
if(sideTableInfo == null){
124-
sideTableInfo = sideTableMap.get(joinInfo.getRightTableName());
124+
sideTableInfo = sideTableMap.get(joinInfo.getRightTableAlias());
125125
}
126126

127127
if(sideTableInfo == null){

hbase/hbase-side/hbase-all-side/pom.xml

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,20 @@
1313
<artifactId>sql.side.all.hbase</artifactId>
1414
<name>hbase-all-side</name>
1515

16+
<dependencies>
17+
<dependency>
18+
<groupId>com.dtstack.flink</groupId>
19+
<artifactId>sql.side.hbase.core</artifactId>
20+
<version>1.0-SNAPSHOT</version>
21+
</dependency>
22+
23+
<dependency>
24+
<groupId>org.apache.hbase</groupId>
25+
<artifactId>hbase-client</artifactId>
26+
<version>1.3.1</version>
27+
</dependency>
28+
</dependencies>
29+
1630
<build>
1731
<plugins>
1832
<plugin>
@@ -28,7 +42,9 @@
2842
<configuration>
2943
<artifactSet>
3044
<excludes>
31-
45+
<exclude>org.apache.hadoop:hadoop-common</exclude>
46+
<exclude>org.apache.hadoop:hadoop-auth</exclude>
47+
<exclude>org.apache.hadoop:hadoop-mapreduce-client-core</exclude>
3248
</excludes>
3349
</artifactSet>
3450
<filters>
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package com.dtstack.flink.sql.side.hbase;
2+
3+
import com.dtstack.flink.sql.side.*;
4+
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
5+
import org.apache.commons.collections.map.HashedMap;
6+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
7+
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
8+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
9+
import org.apache.flink.types.Row;
10+
import org.apache.flink.util.Collector;
11+
import org.apache.hadoop.conf.Configuration;
12+
import org.apache.hadoop.hbase.Cell;
13+
import org.apache.hadoop.hbase.CellUtil;
14+
import org.apache.hadoop.hbase.TableName;
15+
import org.apache.hadoop.hbase.client.*;
16+
import org.apache.hadoop.hbase.util.Bytes;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
20+
import java.io.IOException;
21+
import java.sql.SQLException;
22+
import java.sql.Timestamp;
23+
import java.util.*;
24+
import java.util.concurrent.atomic.AtomicReference;
25+
import java.util.stream.Collectors;
26+
27+
public class HbaseAllReqRow extends AllReqRow {
28+
29+
private static final Logger LOG = LoggerFactory.getLogger(HbaseAllReqRow.class);
30+
31+
private String tableName;
32+
33+
private Map<String, String> aliasNameInversion;
34+
35+
private AtomicReference<Map<String, Map<String, Object>>> cacheRef = new AtomicReference<>();
36+
37+
public HbaseAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
38+
super(new HbaseAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
39+
tableName = ((HbaseSideTableInfo)sideTableInfo).getTableName();
40+
41+
HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo;
42+
Map<String, String> aliasNameRef = hbaseSideTableInfo.getAliasNameRef();
43+
aliasNameInversion = new HashMap<>();
44+
for(Map.Entry<String, String> entry : aliasNameRef.entrySet()){
45+
aliasNameInversion.put(entry.getValue(), entry.getKey());
46+
}
47+
}
48+
49+
@Override
50+
protected Row fillData(Row input, Object sideInput) {
51+
Map<String, Object> sideInputList = (Map<String, Object>) sideInput;
52+
Row row = new Row(sideInfo.getOutFieldInfoList().size());
53+
for(Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()){
54+
Object obj = input.getField(entry.getValue());
55+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
56+
57+
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
58+
if(obj instanceof Timestamp && isTimeIndicatorTypeInfo){
59+
obj = ((Timestamp)obj).getTime();
60+
}
61+
row.setField(entry.getKey(), obj);
62+
}
63+
64+
for(Map.Entry<Integer, Integer> entry : sideInfo.getSideFieldIndex().entrySet()){
65+
if(sideInputList == null){
66+
row.setField(entry.getKey(), null);
67+
}else{
68+
String key = sideInfo.getSideFieldNameIndex().get(entry.getKey());
69+
row.setField(entry.getKey(), sideInputList.get(key));
70+
}
71+
}
72+
73+
return row;
74+
}
75+
76+
@Override
77+
protected void initCache() throws SQLException {
78+
Map<String, Map<String, Object>> newCache = Maps.newConcurrentMap();
79+
cacheRef.set(newCache);
80+
loadData(newCache);
81+
}
82+
83+
@Override
84+
protected void reloadCache() {
85+
Map<String, Map<String, Object>> newCache = Maps.newConcurrentMap();
86+
try {
87+
loadData(newCache);
88+
} catch (SQLException e) {
89+
LOG.error("", e);
90+
}
91+
92+
cacheRef.set(newCache);
93+
LOG.info("----- HBase all cacheRef reload end:{}", Calendar.getInstance());
94+
}
95+
96+
@Override
97+
public void flatMap(Row value, Collector<Row> out) throws Exception {
98+
Map<String, Object> refData = Maps.newHashMap();
99+
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
100+
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
101+
Object equalObj = value.getField(conValIndex);
102+
if(equalObj == null){
103+
out.collect(null);
104+
}
105+
refData.put(sideInfo.getEqualFieldList().get(i), equalObj);
106+
}
107+
108+
String rowKeyStr = ((HbaseAllSideInfo)sideInfo).getRowKeyBuilder().getRowKey(refData);
109+
110+
Map<String, Object> cacheList = null;
111+
112+
SideTableInfo sideTableInfo = sideInfo.getSideTableInfo();
113+
HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo;
114+
if (hbaseSideTableInfo.isPreRowKey())
115+
{
116+
for (Map.Entry<String, Map<String, Object>> entry : cacheRef.get().entrySet()){
117+
if (entry.getKey().startsWith(rowKeyStr))
118+
{
119+
cacheList = cacheRef.get().get(entry.getKey());
120+
Row row = fillData(value, cacheList);
121+
out.collect(row);
122+
}
123+
}
124+
} else {
125+
cacheList = cacheRef.get().get(rowKeyStr);
126+
Row row = fillData(value, cacheList);
127+
out.collect(row);
128+
}
129+
130+
}
131+
132+
private void loadData(Map<String, Map<String, Object>> tmpCache) throws SQLException {
133+
SideTableInfo sideTableInfo = sideInfo.getSideTableInfo();
134+
HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo;
135+
Configuration conf = new Configuration();
136+
conf.set("hbase.zookeeper.quorum", hbaseSideTableInfo.getHost());
137+
Connection conn = null;
138+
Table table = null;
139+
ResultScanner resultScanner = null;
140+
try {
141+
conn = ConnectionFactory.createConnection(conf);
142+
table = conn.getTable(TableName.valueOf(tableName));
143+
resultScanner = table.getScanner(new Scan());
144+
for (Result r : resultScanner) {
145+
Map<String, Object> kv = new HashedMap();
146+
for (Cell cell : r.listCells())
147+
{
148+
String family = Bytes.toString(CellUtil.cloneFamily(cell));
149+
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
150+
String value = Bytes.toString(CellUtil.cloneValue(cell));
151+
StringBuilder key = new StringBuilder();
152+
key.append(family).append(":").append(qualifier);
153+
154+
kv.put(aliasNameInversion.get(key.toString().toUpperCase()), value);
155+
}
156+
tmpCache.put(new String(r.getRow()), kv);
157+
}
158+
} catch (IOException e) {
159+
e.printStackTrace();
160+
} finally {
161+
try {
162+
conn.close();
163+
table.close();
164+
resultScanner.close();
165+
} catch (IOException e) {
166+
e.printStackTrace();
167+
}
168+
}
169+
}
170+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.dtstack.flink.sql.side.hbase;
2+
3+
import com.dtstack.flink.sql.side.FieldInfo;
4+
import com.dtstack.flink.sql.side.JoinInfo;
5+
import com.dtstack.flink.sql.side.SideInfo;
6+
import com.dtstack.flink.sql.side.SideTableInfo;
7+
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
8+
import org.apache.calcite.sql.SqlBasicCall;
9+
import org.apache.calcite.sql.SqlKind;
10+
import org.apache.calcite.sql.SqlNode;
11+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
12+
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
13+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
14+
15+
import java.util.List;
16+
import java.util.Map;
17+
18+
public class HbaseAllSideInfo extends SideInfo {
19+
20+
private RowKeyBuilder rowKeyBuilder;
21+
22+
public HbaseAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
23+
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
24+
}
25+
26+
@Override
27+
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
28+
rowKeyBuilder = new RowKeyBuilder();
29+
if(sideTableInfo.getPrimaryKeys().size() < 1){
30+
throw new RuntimeException("Primary key dimension table must be filled");
31+
}
32+
33+
rowKeyBuilder.init(sideTableInfo.getPrimaryKeys().get(0));
34+
35+
String sideTableName = joinInfo.getSideTableName();
36+
SqlNode conditionNode = joinInfo.getCondition();
37+
38+
List<SqlNode> sqlNodeList = Lists.newArrayList();
39+
if(conditionNode.getKind() == SqlKind.AND){
40+
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall)conditionNode).getOperands()));
41+
}else{
42+
sqlNodeList.add(conditionNode);
43+
}
44+
45+
for(SqlNode sqlNode : sqlNodeList){
46+
dealOneEqualCon(sqlNode, sideTableName);
47+
}
48+
}
49+
50+
public RowKeyBuilder getRowKeyBuilder() {
51+
return rowKeyBuilder;
52+
}
53+
54+
public void setRowKeyBuilder(RowKeyBuilder rowKeyBuilder) {
55+
this.rowKeyBuilder = rowKeyBuilder;
56+
}
57+
58+
}

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ public class HbaseSideParser extends AbsSideTableParser {
5656

5757
public static final String PRE_ROW_KEY = "preRowKey";
5858

59+
public static final String CACHE = "cache";
60+
5961

6062
static {
6163
keyPatternMap.put(SIDE_SIGN_KEY, SIDE_TABLE_SIGN);
@@ -76,6 +78,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
7678
hbaseTableInfo.setHost((String) props.get(HBASE_ZOOKEEPER_QUORUM.toLowerCase()));
7779
hbaseTableInfo.setParent((String)props.get(ZOOKEEPER_PARENT.toLowerCase()));
7880
hbaseTableInfo.setPreRowKey(MathUtil.getBoolean(props.get(PRE_ROW_KEY.toLowerCase()), false));
81+
hbaseTableInfo.setCacheType((String) props.get(CACHE));
7982
return hbaseTableInfo;
8083
}
8184

mysql/mysql-side/mysql-all-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAllReqRow.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
119
package com.dtstack.flink.sql.side.mysql;
220

321
import com.dtstack.flink.sql.side.AllReqRow;

mysql/mysql-side/mysql-all-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAllSideInfo.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
119
package com.dtstack.flink.sql.side.mysql;
220

321
import com.dtstack.flink.sql.side.FieldInfo;

mysql/mysql-sink/src/main/java/com/dtstack/flink/sql/sink/mysql/DBSink.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.flink.table.sinks.TableSink;
3333
import org.apache.flink.types.Row;
3434

35+
import java.sql.Timestamp;
3536
import java.sql.Types;
3637
import java.util.List;
3738

@@ -113,6 +114,8 @@ protected void buildSqlTypes(List<Class> fieldTypeArray){
113114
tmpFieldsType[i] = Types.BINARY;
114115
}else if(fieldType.equals(Float.class.getName()) || fieldType.equals(Double.class.getName())){
115116
tmpFieldsType[i] = Types.DOUBLE;
117+
}else if (fieldType.equals(Timestamp.class.getName())){
118+
tmpFieldsType[i] = Types.TIMESTAMP;
116119
}else{
117120
throw new RuntimeException("no support field type for sql. the input type:" + fieldType);
118121
}

0 commit comments

Comments
 (0)