Java Flight SQL包
Apache Arrow Flight SQL for Java 与Java应用程序集成,使用RPC和SQL查询和检索Flight数据库服务器中的数据。
开始使用Java Flight SQL客户端查询InfluxDB
编写一个Java类,用于连接到InfluxDB Clustered,执行SQL查询,并检索InfluxDB Clustered数据库中存储的数据。
该示例使用Apache Arrow Java实现(org.apache.arrow
) 与InfluxDB v3等Flight数据库服务器交互。
org.apache.arrow
: 提供将Java应用程序与Apache Arrow数据和协议集成的类和方法。org.apache.arrow.flight.sql
: 提供使用Arrow Flight RPC和Flight SQL与Flight数据库服务器交互的类和方法。
- 设置 InfluxDB
- 安装先决条件
- 创建FlightQuery类
- 创建查询客户端
- 执行查询
- 检索和处理Arrow数据
要克隆或下载您可以使用Docker运行的示例应用程序,请参阅GitHub上的InfluxCommunity/ArrowFlightClient_Query_Examples存储库。
设置 InfluxDB
要配置应用程序以查询InfluxDB Clustered,您需要以下InfluxDB资源
- InfluxDB Clustered 数据库
- InfluxDB Clustered 数据库令牌,具有对数据库的读取权限
如果您还没有数据库令牌和数据库,请参阅如何设置InfluxDB。如果您还没有要查询的数据,请参阅如何向数据库写入数据。
安装先决条件
以下示例使用Docker和Maven构建和运行Java应用程序,以避免特定平台依赖问题。
本例中的Dockerfile
在Docker容器中安装兼容版本的Maven和Java JDK,然后运行Maven命令下载依赖项并编译应用程序。
按照说明在您的系统上下载和安装Docker
查看Dockerfile
# Use the official Maven image as the base image
FROM maven:3.8.3-openjdk-11 AS build
# Set the working directory
WORKDIR /app
# Copy the pom.xml file into the container
COPY pom.xml .
# Download and cache dependencies
RUN mvn dependency:go-offline
# Copy the rest of the source code into the container
COPY src/ ./src/
# Compile the source code and copy dependencies
RUN mvn compile dependency:copy-dependencies
# Use the official OpenJDK image as the runtime base image
FROM openjdk:11-jre-slim
# Set the working directory
WORKDIR /app
# Copy the compiled classes and dependencies from the build stage
COPY --from=build /app/target/classes ./classes
COPY --from=build /app/target/dependency ./dependency
# Set ARGs for --build-arg options passed in the build command
ARG DATABASE_FIELD
ARG DATABASE_NAME
ARG HOST
ARG TOKEN
# Set run-time ENVs from ARGs
ENV DATABASE_FIELD=${DATABASE_FIELD}
ENV DATABASE_NAME=${DATABASE_NAME}
ENV HOST=${HOST}
ENV TOKEN=${TOKEN}
# Set the entrypoint to run your Java application
ENTRYPOINT ["java", "-cp", "classes:dependency/*", "com.influxdb.examples.FlightExamples"]
查看Maven pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.influxdb</groupId>
<artifactId>examples</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.influxdb.examples.FlightExamples</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-help-plugin</artifactId>
<version>3.2.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
</configuration>
</execution>
</executions>
<configuration>
<minimizeJar>false</minimizeJar>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-sql</artifactId>
<version>11.0.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.74.Final</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
</project>
创建FlightQuery类
查看FlightQuery.java
package com.influxdb.examples;
import org.apache.arrow.flight.auth2.BearerCredentialWriter;
import org.apache.arrow.flight.CallHeaders;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.grpc.CredentialCallOption;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightClientMiddleware;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.sql.FlightSqlClient;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
public class FlightQuery {
/* Get server credentials from environment variables */
public static final String DATABASE_NAME = System.getenv("DATABASE_NAME");
public static final String HOST = System.getenv("HOST");
public static final String TOKEN = System.getenv("TOKEN");
public static void main() {
System.out.println("Query InfluxDB with the Java Flight SQL Client");
// Create an interceptor that injects header metadata (database name) in every request.
FlightClientMiddleware.Factory f = info -> new FlightClientMiddleware() {
@Override
public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) {
outgoingHeaders.insert("database", DATABASE_NAME);
}
@Override
public void onHeadersReceived(CallHeaders incomingHeaders) {
}
@Override
public void onCallCompleted(CallStatus status) {
}
};
// Create a gRPC+TLS channel URI with HOST and port 443.
Location location = Location.forGrpcTls(HOST, 443);
// Set the allowed memory.
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
// Create a client with the allocator and gRPC channel.
FlightClient client = FlightClient.builder(allocator, location)
.intercept(f)
.build();
System.out.println("client" + client);
FlightSqlClient sqlClient = new FlightSqlClient(client);
System.out.println("sqlClient: " + sqlClient);
// Define the SQL query to execute.
String query = "SELECT * FROM home";
/* Construct a bearer credential using TOKEN.
Construct a credentials option using the bearer credential.
*/
CredentialCallOption auth = new CredentialCallOption(new BearerCredentialWriter(TOKEN));
/* Execute the query.
If successful, execute returns a FlightInfo object that contains metadata
and an endpoints list.
Each endpoint contains the following:
- A list of addresses where you can retrieve the data.
- A `ticket` value that identifies the data to retrieve.
*/
FlightInfo flightInfo = sqlClient.execute(query, auth);
// Extract the Flight ticket from the response.
Ticket ticket = flightInfo.getEndpoints().get(0).getTicket();
// Pass the ticket to request the Arrow stream data from the endpoint.
final FlightStream stream = sqlClient.getStream(ticket, auth);
// Process all the Arrow stream data.
while (stream.next()) {
try {
// Get the current vector data from the stream.
final VectorSchemaRoot root = stream.getRoot();
System.out.println(root.contentToTSVString());
} catch (Exception e) {
// Handle exceptions.
System.out.println("Error executing FlightSqlClient: " + e.getMessage());
}
}
try {
// Close the stream and release resources.
stream.close();
} catch (Exception e) {
// Handle exceptions.
System.out.println("Error closing stream: " + e.getMessage());
}
try {
// Close the client
sqlClient.close();
} catch (Exception e) {
// Handle exceptions.
System.out.println("Error closing client: " + e.getMessage());
}
}
}
在您的<PROJECT_ROOT>/src/main/java
目录下,为com.influxdb.examples
包创建子目录。
在上一步的examples
目录中,创建FlightQuery.java
类文件。您应该有以下目录结构
PROJECT_ROOT
└──src
└──main
└──java
└──com
└──influxdb
└──examples
└──FlightQuery.java
在FlightQuery.java
中
添加包名
package com.influxdb.examples;
为以下包添加import
语句。在接下来的步骤中,您将使用这些包中的类和方法
org.apache.arrow.flight.auth2.BearerCredentialWriter
org.apache.arrow.flight.CallHeaders
org.apache.arrow.flight.CallStatus
org.apache.arrow.flight.grpc.CredentialCallOption
org.apache.arrow.flight.Location
org.apache.arrow.flight.FlightClient
org.apache.arrow.flight.FlightClientMiddleware
org.apache.arrow.flight.FlightInfo
org.apache.arrow.flight.FlightStream
org.apache.arrow.flight.sql.FlightSqlClient
org.apache.arrow.flight.Ticket
org.apache.arrow.memory.BufferAllocator
org.apache.arrow.memory.RootAllocator
org.apache.arrow.vector.VectorSchemaRoot
创建一个FlightQuery
类。
在FlightQuery
类中
定义服务器凭证的常量。
示例Dockerfile
定义了这些凭证的环境变量。
创建一个main()
方法。
创建查询客户端
在FlightQuery.main()
方法中,执行以下操作以创建一个可以连接到HOST
和DATABASE_NAME
的SQL客户端
使用HOST
和端口号443
构建一个gRPC+TLS通道URI,以通过TLS与gRPC服务器进行通信。
实例化FlightClientMiddleware
并定义一个事件回调,在其中插入以下Flight请求元数据头属性
"database": "DATABASE_NAME"
实例化一个BufferAllocator
,设置客户端允许的内存。
使用分配器和gRPC通道创建一个FlightClient
。
实例化一个包装FlightClient
实例的FlightSqlClient
。
执行查询
在FlightQuery.main
方法中
使用作为bearer凭证的TOKEN
实例化一个CredentialCallOption
。结果是您将在每个请求中传递给服务器的凭证对象。
定义一个包含要执行SQL查询的字符串 – 例如
String query = "SELECT * FROM home";
使用SQL查询和CredentialCallOption
调用FlightSqlClient.execute
方法。
如果成功,FlightSqlClient.execute
方法将返回一个包含元数据和endpoints: [...]
列表的FlightInfo
对象。每个端点包含以下内容
- 可以检索数据的一组地址。
- 一个标识要检索数据的
ticket
值。
从响应中提取ticket。
检索和处理Arrow数据
在FlightQuery.main()
方法中,执行以下操作以检索FlightInfo
响应中描述的数据流
使用ticket和CredentialCallOption
调用FlightSqlClient.getStream
方法来获取Arrow流。
调用FlightStream.getRoot
方法从流中获取当前向量数据。
处理数据和异常。示例将向量数据转换为制表符分隔值,并将结果打印到System.out
。
有关使用Java处理Arrow数据的更多示例,请参阅Apache Arrow Java食谱。
最后,关闭流和客户端。
运行应用程序
按照以下步骤使用Docker构建和运行应用程序
将Dockerfile
和pom.xml
复制到您的项目根目录。
在项目根目录中打开一个终端。
在您的终端中,运行docker build
命令并传递--build-arg
标志用于服务器凭据
docker build \
--build-arg DATABASE_NAME=INFLUX_DATABASE \
--build-arg HOST=cluster-host.com \
--build-arg TOKEN=INFLUX_TOKEN \
-t javaflight .
此命令构建一个名为javaflight
的Docker镜像。
要在新的Docker容器中运行应用程序,请输入以下命令
输出是TSV格式的查询数据。
诊断Arrow Flight请求
有关Arrow Flight错误响应代码的列表,请参阅Arrow Flight RPC文档。
支持和反馈
感谢您成为我们社区的一员!我们欢迎并鼓励您对InfluxDB和本文档的反馈和错误报告。要获取支持,请使用以下资源
拥有年度或支持合同的客户可以联系InfluxData支持。