目录
  1. 关于JDBC标准
  2. 大数据生态下的JDBC
  3. 如何提供JDBC服务

关于JDBC标准

​  先谈谈我们熟悉的JDBC。JDBC(Java DataBase Connectivity)是Java程序访问数据库的标准接口,本意是提供一个标准接口,由数据库厂商来提供各自的实现(包括Driver客户端和服务端),这样开发者可以通过一套代码,尽可能屏蔽不同数据库之间的差异。早在JDK 1.1,JDBC就作为Java SE的一部分了。

​  不同的数据库厂商会定义自己的JDBC Driver,由于JDBC仅仅是一个接口定义,Driver的具体实现、如何和数据库通信完全由不同的Driver来定义。因此JDBC的是使用方需要明确并下载好Driver才能进行连接,例如常见的Driver类型如下:

​  JDBC提供的标准接口主要包括java.sql.Connection、java.sql.Statement、java.sql.PreparedStatement、java.sql.ResultSet等。

​  以MySQL为例,MySQL客户端和服务端采用TCP长连接,通讯报文格式也是MySQL自己定义的。MySQL JDBC Driver的主要作用,就是作为客户端组装报文和数据库通讯。

大数据生态下的JDBC

​  如今不仅是普通的关系型数据库,数仓也会提供JDBC服务,以便于各种分析类/BI工具能够基于标准的接口适配并使用。最典型的例如Hive,Hive本身暴露的服务其实是HiveServer2,HiveServer2默认是基于TCP模式,采用Thrift RPC框架和协议,有自己的接口定义。而Hive JDBC Driver的作用,就是把JDBC接口适配到HiveServer2的接口,然后用thrift rpc进行通信。JDBC接口和HiveServer2的接口定义还是有一些区别的,例如JDBC原本是为数据库设计的,Statement.execute都是同步返回的,而Hive的client.ExecuteStatement是异步提交的,Hive JDBC Driver在这里就会做一些适配。

​  Spark SQL设计之初就考虑了Hive兼容性,比如元数据/SerDes/UDF,当然也支持了完全兼容HiveServer2的Thrift JDBC/ODBC server。因此Spark SQL提供的JDBC,直接采用Hive JDBC Driver进行连接即可。

​  其他查询分析引擎,大多都自己实现了Driver和传输方式。Presto用okhttp包装了自己的Driver(io.prestosql.jdbc.PrestoDriver);Impala也有自己的JDBC Driver(com.cloudera.impala.jdbc41.Driver),是通过thrift server传输的,所以也可以兼容Hive JDBC Driver来使用。

​  云计算厂商提供的数仓,也会提供JDBC连接,通常也都是自己实现一套,例如RedshiftAthenaBigQuerySnowflake

如何提供JDBC服务

​  如果要开发一个数据分析平台,需要独自提供一个JDBC连接点,考虑到多租户/安全/功能扩展等要求,对底层的引擎原生提供的JDBC肯定需要包装一层。考虑到使用方的兼容性,自己实现一套Driver和连接协议不太现实,通常小型的数据平台会提供一份兼容MySQL/PostgreSQL/Hive之类常见JDBC的连接协议。

​  比如,我们如果要提供一个兼容Hive的JDBC服务,本质上需要实现一个满足HiveServer2接口的Thrift Server即可。这里提供了一个小Demo,实现了一个hive-jdbc server,并把转发请求到livy提供的jdbc连接,同时加了user/password认证,IP访问控制等功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
import org.apache.hive.jdbc.HiveConnection;
import org.apache.hive.service.auth.PlainSaslHelper;
import org.apache.hive.service.rpc.thrift.*;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.*;

import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.SaslException;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;

/**
* 一个简单的hive-jdbc server实现,仅负责转发请求到livy的jdbc连接
* @author yilun.fyl
*/
public class ThriftServerTest {


private static ThreadLocal<String> userThreadLocal = new ThreadLocal<>();

public static void main(String[] args) {
try {
// 用于连接后端livy jdbc的client
TCLIService.Iface client = livyJdbcClient();
Runnable simple = new Runnable() {
@Override
public void run() {
createJdbcThriftServer(new TCLIService.Processor(new TCLIService.Iface() {
@Override
public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException {
// 此处拿到user Id比较麻烦,因为req里没有带上下文信息,只能通过threadlocal来
// TThreadPoolServer模式可以保证业务线程是独立的
System.out.println("OpenSession "+ userThreadLocal.get());
// TODO sessionId替换成用户对应的session id
req.getConfiguration().put("set:hiveconf:livy.server.sessionId", "1");
req.setUsername("anonymous");
req.setPassword("anonymous");
TOpenSessionResp newSession = client.OpenSession(req);
return newSession;
}
@Override
public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException {
System.out.println("CloseSession");
return client.CloseSession(req);
}
@Override
public TGetInfoResp GetInfo(TGetInfoReq req) throws TException {
System.out.println("GetInfo");
return client.GetInfo(req);
}
@Override
public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException {
System.out.println("ExecuteStatement");
return client.ExecuteStatement(req);
}
@Override
public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException {
System.out.println("GetTypeInfo");
return client.GetTypeInfo(req);
}
@Override
public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException {
System.out.println("GetCatalogs");
return client.GetCatalogs(req);
}
@Override
public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException {
System.out.println("GetSchemas");
return client.GetSchemas(req);
}
@Override
public TGetTablesResp GetTables(TGetTablesReq req) throws TException {
System.out.println("GetTables start");
return client.GetTables(req);
}
@Override
public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException {
System.out.println("GetTableTypes");
return client.GetTableTypes(req);
}
@Override
public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException {
System.out.println("GetColumns");
return client.GetColumns(req);
}
@Override
public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException {
System.out.println("GetFunctions");
return client.GetFunctions(req);
}
@Override
public TGetPrimaryKeysResp GetPrimaryKeys(TGetPrimaryKeysReq req) throws TException {
System.out.println("GetPrimaryKeys");
return client.GetPrimaryKeys(req);
}
@Override
public TGetCrossReferenceResp GetCrossReference(TGetCrossReferenceReq req) throws TException {
System.out.println("GetCrossReference");
return client.GetCrossReference(req);
}
@Override
public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException {
System.out.println("GetOperationStatus");
return client.GetOperationStatus(req);
}
@Override
public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TException {
System.out.println("CancelOperation");
return client.CancelOperation(req);
}
@Override
public TCloseOperationResp CloseOperation(TCloseOperationReq req) throws TException {
System.out.println("CloseOperation");
return client.CloseOperation(req);
}
@Override
public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq req) throws TException {
System.out.println("GetResultSetMetadata");
return client.GetResultSetMetadata(req);
}
@Override
public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException {
System.out.println("FetchResults");
return client.FetchResults(req);
}
@Override
public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req) throws TException {
System.out.println("GetDelegationToken");
return client.GetDelegationToken(req);
}
@Override
public TCancelDelegationTokenResp CancelDelegationToken(TCancelDelegationTokenReq req) throws TException {
System.out.println("CancelDelegationToken");
return client.CancelDelegationToken(req);
}
@Override
public TRenewDelegationTokenResp RenewDelegationToken(TRenewDelegationTokenReq req) throws TException {
System.out.println("RenewDelegationToken");
return client.RenewDelegationToken(req);
}
@Override
public TGetQueryIdResp GetQueryId(TGetQueryIdReq req) throws TException {
System.out.println("GetQueryId");
return client.GetQueryId(req);
}
@Override
public TSetClientInfoResp SetClientInfo(TSetClientInfoReq req) throws TException {
System.out.println("SetClientInfo");
return client.SetClientInfo(req);
}
}));
}
};
new Thread(simple).start();
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 提供thrift server
*
* @param processor
*/
public static void createJdbcThriftServer(TProcessor processor) {
// JDBC URL: jdbc:hive2://<host>:<port>/dbName;sess_var_list?hive_conf_list#hive_var_list
try {
// 提供的jdbc端口,自定义port
TServerTransport serverTransport = new TServerSocket(9093);
// authType 有几种,NOSASL, NONE, LDAP, KERBEROS,hive默认值NONE,决定了不同的transportFactory
TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
saslFactory.addServerDefinition("PLAIN", "NONE", null, new HashMap<>(), new CallbackHandler() {
@Override
public void handle(Callback[] callbacks) {
String username = null;
String password = null;
for (Callback callback : callbacks) {
if (callback instanceof NameCallback) {
username = ((NameCallback) callback).getName();
} else if (callback instanceof PasswordCallback) {
password = new String(((PasswordCallback) callback).getPassword());
} else if (callback instanceof AuthorizeCallback) {
System.out.println("auth: user=" + username + " psw=" + password);
// TODO 此时需要验证用户传入的user和password,此处默认认证通过
userThreadLocal.set(username);
((AuthorizeCallback) callback).setAuthorizedID(username);
((AuthorizeCallback) callback).setAuthorized(true);
}
}
}
});

TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport)
.processor(processor)
.transportFactory(saslFactory)
.inputProtocolFactory(new TBinaryProtocol.Factory(true, true, 104857600, 104857600))
.requestTimeout(10)
.requestTimeoutUnit(TimeUnit.SECONDS)
.beBackoffSlotLength((int) Duration.ofMillis(100).toMillis())
.beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS);
// TThreadPoolServer IO单线程、业务多线程、阻塞IO
TServer server = new TThreadPoolServer(args);
server.setServerEventHandler(new MyTServerEventHandler());
System.out.println("Starting the simple server...");
server.serve();
System.out.println("Ending the simple server...");
} catch (Exception e) {
e.printStackTrace();
}
}


/**
* 连接到后端真正的jdbc,拿到thrift client
* 此处代码可以参考hive的jdbc driver源码
*
* @return
*/
public static TCLIService.Iface livyJdbcClient() {
try {
// 后端livy server的IP和端口
TSocket tsocket = new TSocket("xxx.xxx.xxx.xxx", 10090);
TTransport transport = PlainSaslHelper.getPlainTransport("anonymous", "anonymous", tsocket);
transport.open();
TProtocol protocol = new TBinaryProtocol(transport);
// 参考自 org.apache.hive.jdbc.HiveConnection
TCLIService.Iface client = new TCLIService.Client(protocol);
// client本地不是线程安全,需要同步处理。
// 但是这样会降低server的吞吐量,同时只能处理一次请求
// 可以优化成一个[连接池]
return HiveConnection.newSynchronizedClient(client);
} catch (TException | SaslException x) {
x.printStackTrace();
throw new RuntimeException(x);
}
}


static class MyTServerEventHandler implements TServerEventHandler {

/**
* 服务成功启动后执行
*/
@Override
public void preServe() {
System.out.println("Start server ...");
}

/**
* 创建Context的时候,触发
* 在server启动后,只会执行一次
*/
@Override
public ServerContext createContext(TProtocol input, TProtocol output) {
System.out.println("createContext ... ");
return null;
}

/**
* 删除Context的时候,触发
* 在server启动后,只会执行一次
*/
@Override
public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
System.out.println("deleteContext ... ");
}

/**
* 调用RPC服务的时候触发
* 每调用一次方法,就会触发一次
*/
@Override
public void processContext(ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) {
TSaslServerTransport tServerTransport = (TSaslServerTransport) inputTransport;
TTransport ttransport = tServerTransport.getUnderlyingTransport();
SocketAddress ip = ((TSocket) ttransport).getSocket().getRemoteSocketAddress();
// 可以再这里拿到user,再来匹配ip白名单,不符合的直接抛异常
System.out.println(ip + "method invoke ... " + ((TSaslServerTransport)inputTransport).getSaslServer().getAuthorizationID() );
}
}
}