关于JDBC标准
先谈谈我们熟悉的JDBC。JDBC(Java DataBase Connectivity)是Java程序访问数据库的标准接口,本意是提供一个标准接口,由数据库厂商来提供各自的实现(包括Driver客户端和服务端),这样开发者可以通过一套代码,尽可能屏蔽不同数据库之间的差异。早在JDK 1.1,JDBC就作为Java SE的一部分了。
不同的数据库厂商会定义自己的JDBC Driver,由于JDBC仅仅是一个接口定义,Driver的具体实现、如何和数据库通信完全由不同的Driver来定义。因此JDBC的是使用方需要明确并下载好Driver才能进行连接,例如常见的Driver类型如下:
JDBC提供的标准接口主要包括java.sql.Connection、java.sql.Statement、java.sql.PreparedStatement、java.sql.ResultSet等。
以MySQL为例,MySQL客户端和服务端采用TCP长连接,通讯报文格式也是MySQL自己定义的。MySQL JDBC Driver的主要作用,就是作为客户端组装报文和数据库通讯。
大数据生态下的JDBC
如今不仅是普通的关系型数据库,数仓也会提供JDBC服务,以便于各种分析类/BI工具能够基于标准的接口适配并使用。最典型的例如Hive,Hive本身暴露的服务其实是HiveServer2,HiveServer2默认是基于TCP模式,采用Thrift RPC框架和协议,有自己的接口定义。而Hive JDBC Driver的作用,就是把JDBC接口适配到HiveServer2的接口,然后用thrift rpc进行通信。JDBC接口和HiveServer2的接口定义还是有一些区别的,例如JDBC原本是为数据库设计的,Statement.execute都是同步返回的,而Hive的client.ExecuteStatement是异步提交的,Hive JDBC Driver在这里就会做一些适配。
Spark SQL设计之初就考虑了Hive兼容性,比如元数据/SerDes/UDF,当然也支持了完全兼容HiveServer2的Thrift JDBC/ODBC server。因此Spark SQL提供的JDBC,直接采用Hive JDBC Driver进行连接即可。
其他查询分析引擎,大多都自己实现了Driver和传输方式。Presto用okhttp包装了自己的Driver(io.prestosql.jdbc.PrestoDriver);Impala也有自己的JDBC Driver(com.cloudera.impala.jdbc41.Driver),是通过thrift server传输的,所以也可以兼容Hive JDBC Driver来使用。
云计算厂商提供的数仓,也会提供JDBC连接,通常也都是自己实现一套,例如Redshift、Athena、BigQuery、Snowflake。
如何提供JDBC服务
如果要开发一个数据分析平台,需要独自提供一个JDBC连接点,考虑到多租户/安全/功能扩展等要求,对底层的引擎原生提供的JDBC肯定需要包装一层。考虑到使用方的兼容性,自己实现一套Driver和连接协议不太现实,通常小型的数据平台会提供一份兼容MySQL/PostgreSQL/Hive之类常见JDBC的连接协议。
比如,我们如果要提供一个兼容Hive的JDBC服务,本质上需要实现一个满足HiveServer2接口的Thrift Server即可。这里提供了一个小Demo,实现了一个hive-jdbc server,并把转发请求到livy提供的jdbc连接,同时加了user/password认证,IP访问控制等功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
| import org.apache.hive.jdbc.HiveConnection; import org.apache.hive.service.auth.PlainSaslHelper; import org.apache.hive.service.rpc.thrift.*; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.server.ServerContext; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TServerEventHandler; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.*;
import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; import javax.security.auth.callback.NameCallback; import javax.security.auth.callback.PasswordCallback; import javax.security.sasl.AuthorizeCallback; import javax.security.sasl.SaslException; import java.net.SocketAddress; import java.time.Duration; import java.util.HashMap; import java.util.concurrent.TimeUnit;
public class ThriftServerTest {
private static ThreadLocal<String> userThreadLocal = new ThreadLocal<>();
public static void main(String[] args) { try { TCLIService.Iface client = livyJdbcClient(); Runnable simple = new Runnable() { @Override public void run() { createJdbcThriftServer(new TCLIService.Processor(new TCLIService.Iface() { @Override public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { System.out.println("OpenSession "+ userThreadLocal.get()); req.getConfiguration().put("set:hiveconf:livy.server.sessionId", "1"); req.setUsername("anonymous"); req.setPassword("anonymous"); TOpenSessionResp newSession = client.OpenSession(req); return newSession; } @Override public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException { System.out.println("CloseSession"); return client.CloseSession(req); } @Override public TGetInfoResp GetInfo(TGetInfoReq req) throws TException { System.out.println("GetInfo"); return client.GetInfo(req); } @Override public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException { System.out.println("ExecuteStatement"); return client.ExecuteStatement(req); } @Override public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException { System.out.println("GetTypeInfo"); return client.GetTypeInfo(req); } @Override public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException { System.out.println("GetCatalogs"); return client.GetCatalogs(req); } @Override public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws TException { System.out.println("GetSchemas"); return client.GetSchemas(req); } @Override public TGetTablesResp GetTables(TGetTablesReq req) throws TException { System.out.println("GetTables start"); return client.GetTables(req); } @Override public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws TException { System.out.println("GetTableTypes"); return client.GetTableTypes(req); } @Override public TGetColumnsResp GetColumns(TGetColumnsReq req) throws TException { System.out.println("GetColumns"); return client.GetColumns(req); } @Override public TGetFunctionsResp GetFunctions(TGetFunctionsReq req) throws TException { System.out.println("GetFunctions"); return client.GetFunctions(req); } @Override public TGetPrimaryKeysResp GetPrimaryKeys(TGetPrimaryKeysReq req) throws TException { System.out.println("GetPrimaryKeys"); return client.GetPrimaryKeys(req); } @Override public TGetCrossReferenceResp GetCrossReference(TGetCrossReferenceReq req) throws TException { System.out.println("GetCrossReference"); return client.GetCrossReference(req); } @Override public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException { System.out.println("GetOperationStatus"); return client.GetOperationStatus(req); } @Override public TCancelOperationResp CancelOperation(TCancelOperationReq req) throws TException { System.out.println("CancelOperation"); return client.CancelOperation(req); } @Override public TCloseOperationResp CloseOperation(TCloseOperationReq req) throws TException { System.out.println("CloseOperation"); return client.CloseOperation(req); } @Override public TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq req) throws TException { System.out.println("GetResultSetMetadata"); return client.GetResultSetMetadata(req); } @Override public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { System.out.println("FetchResults"); return client.FetchResults(req); } @Override public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req) throws TException { System.out.println("GetDelegationToken"); return client.GetDelegationToken(req); } @Override public TCancelDelegationTokenResp CancelDelegationToken(TCancelDelegationTokenReq req) throws TException { System.out.println("CancelDelegationToken"); return client.CancelDelegationToken(req); } @Override public TRenewDelegationTokenResp RenewDelegationToken(TRenewDelegationTokenReq req) throws TException { System.out.println("RenewDelegationToken"); return client.RenewDelegationToken(req); } @Override public TGetQueryIdResp GetQueryId(TGetQueryIdReq req) throws TException { System.out.println("GetQueryId"); return client.GetQueryId(req); } @Override public TSetClientInfoResp SetClientInfo(TSetClientInfoReq req) throws TException { System.out.println("SetClientInfo"); return client.SetClientInfo(req); } })); } }; new Thread(simple).start(); } catch (Exception e) { e.printStackTrace(); } }
public static void createJdbcThriftServer(TProcessor processor) { try { TServerTransport serverTransport = new TServerSocket(9093); TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory(); saslFactory.addServerDefinition("PLAIN", "NONE", null, new HashMap<>(), new CallbackHandler() { @Override public void handle(Callback[] callbacks) { String username = null; String password = null; for (Callback callback : callbacks) { if (callback instanceof NameCallback) { username = ((NameCallback) callback).getName(); } else if (callback instanceof PasswordCallback) { password = new String(((PasswordCallback) callback).getPassword()); } else if (callback instanceof AuthorizeCallback) { System.out.println("auth: user=" + username + " psw=" + password); userThreadLocal.set(username); ((AuthorizeCallback) callback).setAuthorizedID(username); ((AuthorizeCallback) callback).setAuthorized(true); } } } });
TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport) .processor(processor) .transportFactory(saslFactory) .inputProtocolFactory(new TBinaryProtocol.Factory(true, true, 104857600, 104857600)) .requestTimeout(10) .requestTimeoutUnit(TimeUnit.SECONDS) .beBackoffSlotLength((int) Duration.ofMillis(100).toMillis()) .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS); TServer server = new TThreadPoolServer(args); server.setServerEventHandler(new MyTServerEventHandler()); System.out.println("Starting the simple server..."); server.serve(); System.out.println("Ending the simple server..."); } catch (Exception e) { e.printStackTrace(); } }
public static TCLIService.Iface livyJdbcClient() { try { TSocket tsocket = new TSocket("xxx.xxx.xxx.xxx", 10090); TTransport transport = PlainSaslHelper.getPlainTransport("anonymous", "anonymous", tsocket); transport.open(); TProtocol protocol = new TBinaryProtocol(transport); TCLIService.Iface client = new TCLIService.Client(protocol); return HiveConnection.newSynchronizedClient(client); } catch (TException | SaslException x) { x.printStackTrace(); throw new RuntimeException(x); } }
static class MyTServerEventHandler implements TServerEventHandler {
@Override public void preServe() { System.out.println("Start server ..."); }
@Override public ServerContext createContext(TProtocol input, TProtocol output) { System.out.println("createContext ... "); return null; }
@Override public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) { System.out.println("deleteContext ... "); }
@Override public void processContext(ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) { TSaslServerTransport tServerTransport = (TSaslServerTransport) inputTransport; TTransport ttransport = tServerTransport.getUnderlyingTransport(); SocketAddress ip = ((TSocket) ttransport).getSocket().getRemoteSocketAddress(); System.out.println(ip + "method invoke ... " + ((TSaslServerTransport)inputTransport).getSaslServer().getAuthorizationID() ); } } }
|