<dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-column</artifactId> <version>1.12.1</version> </dependency> import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Types; import import org.apache.parquet.column.ColumnReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.GroupWriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.metadata.ParquetMetadataConverter; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.io.ColumnIOFactory; import org.apache.parquet.io.RecordReader; import org.apache.parquet.io.RecordWriter; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; private static void createParquetFile(String filePath) throws IOException { MessageType schema = Types.buildMessage() .addFields( Types.required(PrimitiveType.PrimitiveTypeName.BINARY).named("name"), Types.required(PrimitiveType.PrimitiveTypeName.INT32).named("age"), Types.required(PrimitiveType.PrimitiveTypeName.INT32).named("score") ) .named("student"); List<Group> groups = new ArrayList<>(); SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); Group group1 = groupFactory.newGroup() .append("age", 18) .append("score", 90); Group group2 = groupFactory.newGroup() .append("age", 20) .append("score", 85); groups.add(group1); groups.add(group2); HadoopOutputFile file = HadoopOutputFile.fromPath(new org.apache.hadoop.fs.Path(filePath), new Configuration()); ParquetMetadata parquetMetadata = new ParquetMetadataConverter().toParquetMetadata(null, schema, groups); ParquetWriter<Group> writer = new ParquetWriter<>(file, new GroupWriteSupport(), CompressionCodecName.UNCOMPRESSED, 1024, 1024, 512, true, false, new ParquetMetadata(parquetMetadata.getFileMetaData().getSchema(), parquetMetadata.getBlocks(), parquetMetadata.getCreatedBy())); for (Group group : groups) { writer.write(group); } writer.close(); } private static void queryParquetFile(String filePath) throws IOException { HadoopInputFile file = HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(filePath), new Configuration()); ParquetMetadata parquetMetadata = ParquetMetadataConverter.fromParquetMetadata(file.getFooter()); MessageType schema = parquetMetadata.getFileMetaData().getSchema(); ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); ColumnIOFactory factory = new ColumnIOFactory(); RecordReader<Group> recordReader = newColumnReader(file, schema, metadataConverter, factory); Map<String, Object> query = new HashMap<>(); query.put("age", 18); Group record; while ((record = recordReader.read()) != null) { boolean matched = true; for (Map.Entry<String, Object> entry : query.entrySet()) { String column = entry.getKey(); Object value = entry.getValue(); ColumnReader columnReader = record.getValue(path(schema, column)); Object actualValue = columnReader.getCurrentValue(); if (!value.equals(actualValue)) { matched = false; break; } } if (matched) { System.out.println(record); } } recordReader.close(); } public static void main(String[] args) { String filePath = "path/to/parquet/file"; try { createParquetFile(filePath); queryParquetFile(filePath); } catch (IOException e) { e.printStackTrace(); } }


上一篇:
下一篇:
切换中文