<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>1.12.1</version>
</dependency>
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import import org.apache.parquet.column.ColumnReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.metadata.ParquetMetadataConverter;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.io.RecordWriter;
import org.apache.parquet.schema.PrimitiveType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
private static void createParquetFile(String filePath) throws IOException {
MessageType schema = Types.buildMessage()
.addFields(
Types.required(PrimitiveType.PrimitiveTypeName.BINARY).named("name"),
Types.required(PrimitiveType.PrimitiveTypeName.INT32).named("age"),
Types.required(PrimitiveType.PrimitiveTypeName.INT32).named("score")
)
.named("student");
List<Group> groups = new ArrayList<>();
SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);
Group group1 = groupFactory.newGroup()
.append("age", 18)
.append("score", 90);
Group group2 = groupFactory.newGroup()
.append("age", 20)
.append("score", 85);
groups.add(group1);
groups.add(group2);
HadoopOutputFile file = HadoopOutputFile.fromPath(new org.apache.hadoop.fs.Path(filePath), new Configuration());
ParquetMetadata parquetMetadata = new ParquetMetadataConverter().toParquetMetadata(null, schema, groups);
ParquetWriter<Group> writer = new ParquetWriter<>(file, new GroupWriteSupport(), CompressionCodecName.UNCOMPRESSED, 1024, 1024, 512, true, false, new ParquetMetadata(parquetMetadata.getFileMetaData().getSchema(), parquetMetadata.getBlocks(), parquetMetadata.getCreatedBy()));
for (Group group : groups) {
writer.write(group);
}
writer.close();
}
private static void queryParquetFile(String filePath) throws IOException {
HadoopInputFile file = HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(filePath), new Configuration());
ParquetMetadata parquetMetadata = ParquetMetadataConverter.fromParquetMetadata(file.getFooter());
MessageType schema = parquetMetadata.getFileMetaData().getSchema();
ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
ColumnIOFactory factory = new ColumnIOFactory();
RecordReader<Group> recordReader = newColumnReader(file, schema, metadataConverter, factory);
Map<String, Object> query = new HashMap<>();
query.put("age", 18);
Group record;
while ((record = recordReader.read()) != null) {
boolean matched = true;
for (Map.Entry<String, Object> entry : query.entrySet()) {
String column = entry.getKey();
Object value = entry.getValue();
ColumnReader columnReader = record.getValue(path(schema, column));
Object actualValue = columnReader.getCurrentValue();
if (!value.equals(actualValue)) {
matched = false;
break;
}
}
if (matched) {
System.out.println(record);
}
}
recordReader.close();
}
public static void main(String[] args) {
String filePath = "path/to/parquet/file";
try {
createParquetFile(filePath);
queryParquetFile(filePath);
} catch (IOException e) {
e.printStackTrace();
}
}