Apache Arrow Flight SQL for Java 与 Java 应用程序集成,通过 RPC 和 SQL 查询和检索来自 Flight 数据库服务器的数据。
开始使用 Java Flight SQL 客户端查询 InfluxDB
编写一个 Java 类,用于连接到 InfluxDB 3 Core 的 Flight SQL 客户端,执行 SQL 查询,并检索存储在 InfluxDB 3 Core 数据库中的数据。
该示例使用 Apache Arrow Java 实现 (org.apache.arrow) 与 InfluxDB 3 等 Flight 数据库服务器进行交互。
org.apache.arrow: 提供用于将 Java 应用程序与 Apache Arrow 数据和协议集成的类和方法。org.apache.arrow.flight.sql: 提供使用 Arrow Flight RPC 和 Flight SQL 与 Flight 数据库服务器进行交互的类和方法。
- 设置 InfluxDB
- 安装先决条件
- 创建 FlightQuery 类
- 创建查询客户端
- 执行查询
- 检索和处理 Arrow 数据
要克隆或下载可以使用 Docker 运行的示例应用程序,请参阅 GitHub 上的 InfluxCommunity/ArrowFlightClient_Query_Examples 存储库。
设置 InfluxDB
要配置应用程序以查询 InfluxDB 3 Core,您将需要以下 InfluxDB 资源
- InfluxDB 3 Core 数据库
- InfluxDB 3 Core 数据库令牌,对该数据库具有读取权限
如果您还没有数据库令牌和数据库,请参阅如何 设置 InfluxDB。如果您还没有要查询的数据,请参阅如何 写入数据 到数据库。
安装先决条件
以下内容使用 Docker 和 Maven 构建和运行 Java 应用程序,以避免特定于平台的依赖项问题。
示例 Dockerfile 在 Docker 容器中安装兼容版本的 Maven 和 Java JDK,然后运行 Maven 命令以下载依赖项并编译应用程序。
请按照说明下载和安装适用于您系统的 Docker
查看 Dockerfile
# Use the official Maven image as the base image
FROM maven:3.8.3-openjdk-11 AS build
# Set the working directory
WORKDIR /app
# Copy the pom.xml file into the container
COPY pom.xml .
# Download and cache dependencies
RUN mvn dependency:go-offline
# Copy the rest of the source code into the container
COPY src/ ./src/
# Compile the source code and copy dependencies
RUN mvn compile dependency:copy-dependencies
# Use the official OpenJDK image as the runtime base image
FROM openjdk:11-jre-slim
# Set the working directory
WORKDIR /app
# Copy the compiled classes and dependencies from the build stage
COPY --from=build /app/target/classes ./classes
COPY --from=build /app/target/dependency ./dependency
# Set ARGs for --build-arg options passed in the build command
ARG DATABASE_FIELD
ARG DATABASE_NAME
ARG HOST
ARG TOKEN
# Set run-time ENVs from ARGs
ENV DATABASE_FIELD=${DATABASE_FIELD}
ENV DATABASE_NAME=${DATABASE_NAME}
ENV HOST=${HOST}
ENV TOKEN=${TOKEN}
# Set the entrypoint to run your Java application
ENTRYPOINT ["java", "-cp", "classes:dependency/*", "com.influxdb.examples.FlightExamples"]
查看 Maven pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.influxdb</groupId>
<artifactId>examples</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.influxdb.examples.FlightExamples</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-help-plugin</artifactId>
<version>3.2.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
</configuration>
</execution>
</executions>
<configuration>
<minimizeJar>false</minimizeJar>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-sql</artifactId>
<version>11.0.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.74.Final</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
</project>
创建 FlightQuery 类
查看 FlightQuery.java
package com.influxdb.examples;
import org.apache.arrow.flight.auth2.BearerCredentialWriter;
import org.apache.arrow.flight.CallHeaders;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.grpc.CredentialCallOption;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightClientMiddleware;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.sql.FlightSqlClient;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
public class FlightQuery {
/* Get server credentials from environment variables */
public static final String DATABASE_NAME = System.getenv("DATABASE_NAME");
public static final String HOST = System.getenv("HOST");
public static final String TOKEN = System.getenv("TOKEN");
public static void main() {
System.out.println("Query InfluxDB with the Java Flight SQL Client");
// Create an interceptor that injects header metadata (database name) in every request.
FlightClientMiddleware.Factory f = info -> new FlightClientMiddleware() {
@Override
public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) {
outgoingHeaders.insert("database", DATABASE_NAME);
}
@Override
public void onHeadersReceived(CallHeaders incomingHeaders) {
}
@Override
public void onCallCompleted(CallStatus status) {
}
};
// Create a gRPC+TLS channel URI with HOST and port 443.
Location location = Location.forGrpcTls(HOST, 443);
// Set the allowed memory.
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
// Create a client with the allocator and gRPC channel.
FlightClient client = FlightClient.builder(allocator, location)
.intercept(f)
.build();
System.out.println("client" + client);
FlightSqlClient sqlClient = new FlightSqlClient(client);
System.out.println("sqlClient: " + sqlClient);
// Define the SQL query to execute.
String query = "SELECT * FROM home";
/* Construct a bearer credential using TOKEN.
Construct a credentials option using the bearer credential.
*/
CredentialCallOption auth = new CredentialCallOption(new BearerCredentialWriter(TOKEN));
/* Execute the query.
If successful, execute returns a FlightInfo object that contains metadata
and an endpoints list.
Each endpoint contains the following:
- A list of addresses where you can retrieve the data.
- A `ticket` value that identifies the data to retrieve.
*/
FlightInfo flightInfo = sqlClient.execute(query, auth);
// Extract the Flight ticket from the response.
Ticket ticket = flightInfo.getEndpoints().get(0).getTicket();
// Pass the ticket to request the Arrow stream data from the endpoint.
final FlightStream stream = sqlClient.getStream(ticket, auth);
// Process all the Arrow stream data.
while (stream.next()) {
try {
// Get the current vector data from the stream.
final VectorSchemaRoot root = stream.getRoot();
System.out.println(root.contentToTSVString());
} catch (Exception e) {
// Handle exceptions.
System.out.println("Error executing FlightSqlClient: " + e.getMessage());
}
}
try {
// Close the stream and release resources.
stream.close();
} catch (Exception e) {
// Handle exceptions.
System.out.println("Error closing stream: " + e.getMessage());
}
try {
// Close the client
sqlClient.close();
} catch (Exception e) {
// Handle exceptions.
System.out.println("Error closing client: " + e.getMessage());
}
}
}
在您的 <PROJECT_ROOT>/src/main/java 目录下,为 com.influxdb.examples 包创建 com/influxdb/examples 子目录。
在上一步的 examples 目录中,创建 FlightQuery.java 类文件。您应该有以下目录结构
PROJECT_ROOT
└──src
└──main
└──java
└──com
└──influxdb
└──examples
└──FlightQuery.java
在 FlightQuery.java 中
添加包名
package com.influxdb.examples;
为以下包添加 import 语句。您将在后续步骤中使用这些包中的类和方法
org.apache.arrow.flight.auth2.BearerCredentialWriterorg.apache.arrow.flight.CallOptionsorg.apache.arrow.flight.CallStatusorg.apache.arrow.flight.grpc.CredentialCallOptionorg.apache.arrow.flight.Locationorg.apache.arrow.flight.FlightClientorg.apache.arrow.flight.FlightClientMiddlewareorg.apache.arrow.flight.FlightInfoorg.apache.arrow.flight.FlightStreamorg.apache.arrow.flight.sql.FlightSqlClientorg.apache.arrow.flight.Ticketorg.apache.arrow.memory.BufferAllocatororg.apache.arrow.memory.RootAllocatororg.apache.arrow.vector.VectorSchemaRoot
创建一个 FlightQuery 类。
在 FlightQuery 类中
定义服务器凭证的常量。
示例 Dockerfile 为这些凭证定义了环境变量。
创建一个 main() 方法。
创建查询客户端
在 FlightQuery.main() 方法中,执行以下操作以创建可以连接到 HOST 和 DATABASE_NAME 的 SQL 客户端
构造一个gRPC+TLS通道 URI,其中包含 HOST 和端口 443,用于与 通过 TLS 的 gRPC 服务器 通信。
实例化 FlightClientMiddleware 并定义一个事件回调,该回调插入以下 Flight 请求元数据头属性
"database": "DATABASE_NAME"
实例化一个 BufferAllocator,它设置允许客户端使用的内存。
使用分配器和 gRPC 通道创建 FlightClient。
实例化一个 FlightSqlClient,它包装 FlightClient 实例。
执行查询
在 FlightQuery.main 方法中
使用 TOKEN 作为承载者凭证实例化 CredentialCallOption。结果是一个凭证对象,您将在每次向服务器发出请求时将其传递。
定义一个包含要执行的 SQL 查询的字符串–例如
String query = "SELECT * FROM home";
调用 FlightSqlClient.execute 方法,并传入 SQL 查询和 CredentialCallOption。
如果成功,FlightSqlClient.execute 方法会响应一个 FlightInfo 对象,其中包含元数据和一个 endpoints: [...] 列表。每个端点都包含以下内容
- 可以从中检索数据的地址列表。
- 一个
ticket 值,用于标识要检索的数据。
从响应中提取 ticket。
检索和处理 Arrow 数据
在 FlightQuery.main() 方法中,执行以下操作以检索 FlightInfo 响应中描述的数据流
使用ticket和 CredentialCallOption 调用 FlightSqlClient.getStream 方法来获取 Arrow 流。
调用 FlightStream.getRoot 方法以获取流中的当前向量数据。
处理数据并处理异常。该示例将向量数据转换为制表符分隔值,并将结果打印到 System.out。
有关使用 Java 处理 Arrow 数据的更多示例,请参阅 Apache Arrow Java Cookbook。
最后,关闭流和客户端。
运行应用程序
请按照以下步骤使用 Docker 构建和运行应用程序
将 Dockerfile 和 pom.xml 复制到您的项目根目录。
在项目根目录中打开终端。
在终端中,运行 docker build 命令,并传入服务器凭证的 --build-arg 标志
docker build \
--build-arg DATABASE_NAME=INFLUX_DATABASE \
--build-arg HOST=localhost:8181\
--build-arg TOKEN=INFLUX_TOKEN \
-t javaflight .
该命令构建一个名为 javaflight 的 Docker 映像。
要在新的 Docker 容器中运行应用程序,请输入以下命令
输出是以 TSV 格式的查询数据。
排查 Arrow Flight 请求
有关 Arrow Flight 错误响应代码的列表,请参阅 Arrow Flight RPC 文档。
支持和反馈
感谢您成为我们社区的一员!我们欢迎并鼓励您对 InfluxDB 3 Core 和本文档提供反馈和错误报告。要获得支持,请使用以下资源
具有年度合同或支持合同的客户可以 联系 InfluxData 支持。