Java Flight SQL 包
Apache Arrow Flight SQL for Java 集成 Java 应用程序,以使用 RPC 和 SQL 从 Flight 数据库服务器查询和检索数据。
使用 InfluxDB 3 客户端库
我们建议使用 influxdb3-java
Java 客户端库 将 InfluxDB 3 与您的 Java 应用程序代码集成。
InfluxDB 3 客户端库封装了 Apache Arrow Flight 客户端,并为写入、查询和处理存储在 InfluxDB Clustered 中的数据提供了便捷的方法。客户端库可以使用 SQL 或 InfluxQL 进行查询。
开始使用 Java Flight SQL 客户端查询 InfluxDB
编写一个 Java 类,用于创建一个 Flight SQL 客户端,该客户端连接到 InfluxDB Clustered,执行 SQL 查询,并检索存储在 InfluxDB Clustered 数据库中的数据。
该示例使用 Apache Arrow Java 实现 (org.apache.arrow
) 与 Flight 数据库服务器(如 InfluxDB 3)进行交互。
org.apache.arrow
:提供用于将 Java 应用程序与 Apache Arrow 数据和协议集成的类和方法。org.apache.arrow.flight.sql
:提供用于使用 Arrow Flight RPC 和 Flight SQL 与 Flight 数据库服务器交互的类和方法。
- 设置 InfluxDB
- 安装先决条件
- 创建 FlightQuery 类
- 创建查询客户端
- 执行查询
- 检索和处理 Arrow 数据
要克隆或下载可以使用 Docker 运行的示例应用程序,请参阅 GitHub 上的 InfluxCommunity/ArrowFlightClient_Query_Examples 仓库。
设置 InfluxDB
要配置应用程序以查询 InfluxDB Clustered,您需要以下 InfluxDB 资源
- InfluxDB Clustered 数据库
- InfluxDB Clustered 数据库令牌,具有对数据库的读取权限
如果您还没有数据库令牌和数据库,请参阅如何 设置 InfluxDB。如果您还没有要查询的数据,请参阅如何 写入数据 到数据库。
安装先决条件
以下内容使用 Docker 和 Maven 构建并运行 Java 应用程序,并避免特定于平台的依赖问题。
示例 Dockerfile
在 Docker 容器中安装兼容版本的 Maven 和 Java JDK,然后运行 Maven 命令来下载依赖项并编译应用程序。
按照说明下载并安装适用于您系统的 Docker
查看 Dockerfile
# Use the official Maven image as the base image
FROM maven:3.8.3-openjdk-11 AS build
# Set the working directory
WORKDIR /app
# Copy the pom.xml file into the container
COPY pom.xml .
# Download and cache dependencies
RUN mvn dependency:go-offline
# Copy the rest of the source code into the container
COPY src/ ./src/
# Compile the source code and copy dependencies
RUN mvn compile dependency:copy-dependencies
# Use the official OpenJDK image as the runtime base image
FROM openjdk:11-jre-slim
# Set the working directory
WORKDIR /app
# Copy the compiled classes and dependencies from the build stage
COPY --from=build /app/target/classes ./classes
COPY --from=build /app/target/dependency ./dependency
# Set ARGs for --build-arg options passed in the build command
ARG DATABASE_FIELD
ARG DATABASE_NAME
ARG HOST
ARG TOKEN
# Set run-time ENVs from ARGs
ENV DATABASE_FIELD=${DATABASE_FIELD}
ENV DATABASE_NAME=${DATABASE_NAME}
ENV HOST=${HOST}
ENV TOKEN=${TOKEN}
# Set the entrypoint to run your Java application
ENTRYPOINT ["java", "-cp", "classes:dependency/*", "com.influxdb.examples.FlightExamples"]
查看 Maven pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.influxdb</groupId>
<artifactId>examples</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.influxdb.examples.FlightExamples</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-help-plugin</artifactId>
<version>3.2.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
</configuration>
</execution>
</executions>
<configuration>
<minimizeJar>false</minimizeJar>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-sql</artifactId>
<version>11.0.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.74.Final</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
</project>
创建 FlightQuery 类
查看 FlightQuery.java
package com.influxdb.examples;
import org.apache.arrow.flight.auth2.BearerCredentialWriter;
import org.apache.arrow.flight.CallHeaders;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.grpc.CredentialCallOption;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightClientMiddleware;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.sql.FlightSqlClient;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
public class FlightQuery {
/* Get server credentials from environment variables */
public static final String DATABASE_NAME = System.getenv("DATABASE_NAME");
public static final String HOST = System.getenv("HOST");
public static final String TOKEN = System.getenv("TOKEN");
public static void main() {
System.out.println("Query InfluxDB with the Java Flight SQL Client");
// Create an interceptor that injects header metadata (database name) in every request.
FlightClientMiddleware.Factory f = info -> new FlightClientMiddleware() {
@Override
public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) {
outgoingHeaders.insert("database", DATABASE_NAME);
}
@Override
public void onHeadersReceived(CallHeaders incomingHeaders) {
}
@Override
public void onCallCompleted(CallStatus status) {
}
};
// Create a gRPC+TLS channel URI with HOST and port 443.
Location location = Location.forGrpcTls(HOST, 443);
// Set the allowed memory.
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
// Create a client with the allocator and gRPC channel.
FlightClient client = FlightClient.builder(allocator, location)
.intercept(f)
.build();
System.out.println("client" + client);
FlightSqlClient sqlClient = new FlightSqlClient(client);
System.out.println("sqlClient: " + sqlClient);
// Define the SQL query to execute.
String query = "SELECT * FROM home";
/* Construct a bearer credential using TOKEN.
Construct a credentials option using the bearer credential.
*/
CredentialCallOption auth = new CredentialCallOption(new BearerCredentialWriter(TOKEN));
/* Execute the query.
If successful, execute returns a FlightInfo object that contains metadata
and an endpoints list.
Each endpoint contains the following:
- A list of addresses where you can retrieve the data.
- A `ticket` value that identifies the data to retrieve.
*/
FlightInfo flightInfo = sqlClient.execute(query, auth);
// Extract the Flight ticket from the response.
Ticket ticket = flightInfo.getEndpoints().get(0).getTicket();
// Pass the ticket to request the Arrow stream data from the endpoint.
final FlightStream stream = sqlClient.getStream(ticket, auth);
// Process all the Arrow stream data.
while (stream.next()) {
try {
// Get the current vector data from the stream.
final VectorSchemaRoot root = stream.getRoot();
System.out.println(root.contentToTSVString());
} catch (Exception e) {
// Handle exceptions.
System.out.println("Error executing FlightSqlClient: " + e.getMessage());
}
}
try {
// Close the stream and release resources.
stream.close();
} catch (Exception e) {
// Handle exceptions.
System.out.println("Error closing stream: " + e.getMessage());
}
try {
// Close the client
sqlClient.close();
} catch (Exception e) {
// Handle exceptions.
System.out.println("Error closing client: " + e.getMessage());
}
}
}
在您的 <PROJECT_ROOT>/src/main/java
目录中,为 com.influxdb.examples
包创建 com/influxdb/examples
子目录。
在上一步骤的 examples
目录中,创建 FlightQuery.java
类文件。您应该具有以下目录结构
PROJECT_ROOT
└──src
└──main
└──java
└──com
└──influxdb
└──examples
└──FlightQuery.java
在 FlightQuery.java
中
添加包名
package com.influxdb.examples;
为以下包添加 import
语句。您将在后续步骤中使用这些包中的类和方法
org.apache.arrow.flight.auth2.BearerCredentialWriter
org.apache.arrow.flight.CallHeaders
org.apache.arrow.flight.CallStatus
org.apache.arrow.flight.grpc.CredentialCallOption
org.apache.arrow.flight.Location
org.apache.arrow.flight.FlightClient
org.apache.arrow.flight.FlightClientMiddleware
org.apache.arrow.flight.FlightInfo
org.apache.arrow.flight.FlightStream
org.apache.arrow.flight.sql.FlightSqlClient
org.apache.arrow.flight.Ticket
org.apache.arrow.memory.BufferAllocator
org.apache.arrow.memory.RootAllocator
org.apache.arrow.vector.VectorSchemaRoot
创建 FlightQuery
类。
在 FlightQuery
类中
定义服务器凭据的常量。
示例 Dockerfile
为这些凭据定义了环境变量。
创建 main()
方法。
创建查询客户端
在 FlightQuery.main()
方法中,执行以下操作以创建可以连接到 HOST
和 DATABASE_NAME
的 SQL 客户端
使用 HOST
和端口 443
构建 gRPC+TLS 通道 URI,以便通过 TLS 与 gRPC 服务器 通信。
实例化 FlightClientMiddleware
并定义一个事件回调,该回调插入以下 Flight 请求元数据标头属性
"database": "DATABASE_NAME"
实例化一个 BufferAllocator
,用于设置客户端允许的内存。
使用分配器和 gRPC 通道创建 FlightClient
。
实例化一个 FlightSqlClient
,用于封装 FlightClient
实例。
执行查询
在 FlightQuery.main
方法中
使用 TOKEN
作为 bearer 凭据实例化 CredentialCallOption
。结果是一个凭据对象,您将在每个请求中将其传递给服务器。
定义一个包含要执行的 SQL 查询的字符串 - 例如
String query = "SELECT * FROM home";
使用 SQL 查询和 CredentialCallOption
调用 FlightSqlClient.execute
方法。
如果成功,FlightSqlClient.execute
方法将响应一个 FlightInfo
对象,其中包含元数据和一个 endpoints: [...]
列表。每个端点包含以下内容
- 您可以从中检索数据的地址列表。
- 标识要检索数据的
ticket
值。
从响应中提取 ticket。
检索和处理 Arrow 数据
在 FlightQuery.main()
方法中,执行以下操作以检索 FlightInfo
响应中描述的数据流
使用 ticket 和 CredentialCallOption
调用 FlightSqlClient.getStream
方法以获取 Arrow 流。
调用 FlightStream.getRoot
方法以从流中获取当前向量数据。
处理数据并处理异常。该示例将向量数据转换为制表符分隔的值,并将结果打印到 System.out
。
有关使用 Java 处理 Arrow 数据的更多示例,请参阅 Apache Arrow Java Cookbook。
最后,关闭流和客户端。
运行应用程序
按照以下步骤使用 Docker 构建并运行应用程序
将 Dockerfile
和 pom.xml
复制到您的项目根目录。
在您的项目根目录中打开一个终端。
在您的终端中,运行 docker build
命令并为服务器凭据传递 --build-arg
标志
docker build \
--build-arg DATABASE_NAME=INFLUX_DATABASE \
--build-arg HOST=cluster-host.com \
--build-arg TOKEN=INFLUX_TOKEN \
-t javaflight .
该命令构建一个名为 javaflight
的 Docker 镜像。
要在新的 Docker 容器中运行应用程序,请输入以下命令
输出是 TSV 格式的查询数据。
Arrow Flight 请求问题排查
有关 Arrow Flight 错误响应代码的列表,请参阅 Arrow Flight RPC 文档。
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您提供关于 InfluxDB Clustered 和本文档的反馈和错误报告。要获得支持,请使用以下资源
拥有年度或支持合同的客户可以 联系 InfluxData 支持。