From 365937168526ad9a6f07ed4cd854907fb2af7e98 Mon Sep 17 00:00:00 2001 From: bastien Date: Tue, 27 Nov 2018 11:38:13 +0100 Subject: [PATCH] Implementing secondary output --- .../org/antvoice/beam/BigQueryWriter.java | 37 +++++++------- .../antvoice/beam/GoogleStorageWriter.java | 43 ++++++++++++++++ .../antvoice/beam/PubsubMessageProcessor.java | 51 +++++-------------- .../org/antvoice/beam/StreamerOptions.java | 13 ++++- .../org/antvoice/beam/StreamerRunner.java | 24 ++++++--- .../antvoice/beam/entities/BigQueryRow.java | 33 ++++++++++++ .../beam/formatter/FormatterFactory.java | 3 +- .../beam/formatter/JsonRowFormatter.java | 7 +-- .../java/org/antvoice/beam/helper/Zip.java | 30 +++++++++++ 9 files changed, 174 insertions(+), 67 deletions(-) create mode 100644 src/main/java/org/antvoice/beam/GoogleStorageWriter.java create mode 100644 src/main/java/org/antvoice/beam/entities/BigQueryRow.java create mode 100644 src/main/java/org/antvoice/beam/helper/Zip.java diff --git a/src/main/java/org/antvoice/beam/BigQueryWriter.java b/src/main/java/org/antvoice/beam/BigQueryWriter.java index 3495ab7..8657621 100644 --- a/src/main/java/org/antvoice/beam/BigQueryWriter.java +++ b/src/main/java/org/antvoice/beam/BigQueryWriter.java @@ -2,35 +2,38 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; -import com.google.protobuf.ByteString; +import org.antvoice.beam.entities.BigQueryRow; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.ValueInSingleWindow; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.AbstractMap; +public class BigQueryWriter extends PTransform, PDone> { -public class BigQueryWriter extends PTransform>, PDone> { + private String _project; + private SerializableFunction _formatter; - private SerializableFunction, TableRow> _formatter; - - public BigQueryWriter(SerializableFunction, TableRow> formatter) { + public BigQueryWriter(String project, SerializableFunction formatter) { + _project = project; _formatter = formatter; } - private static class DestinationComputer extends DynamicDestinations, String>{ + private static class DestinationComputer + extends DynamicDestinations{ + + private String _project; + + public DestinationComputer(String project) { + _project = project; + } @Override - public String getDestination(ValueInSingleWindow> element) { - return element.getValue().getKey(); + public String getDestination(ValueInSingleWindow element) { + return _project + ":" + element.getValue().getDataset() + "." + element.getValue().getTable() ; } @Override @@ -45,12 +48,12 @@ public TableSchema getSchema(String destination) { } @Override - public PDone expand(PCollection> input) { - input.apply(BigQueryIO.>write() - .to(new DestinationComputer()) + public PDone expand(PCollection input) { + input.apply(BigQueryIO.write() + .to(new DestinationComputer(_project)) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) - .withFormatFunction(_formatter)); + .withFormatFunction(row -> _formatter.apply(row.getRow()))); return PDone.in(input.getPipeline()); } diff --git a/src/main/java/org/antvoice/beam/GoogleStorageWriter.java b/src/main/java/org/antvoice/beam/GoogleStorageWriter.java new file mode 100644 index 0000000..08fe56d --- /dev/null +++ b/src/main/java/org/antvoice/beam/GoogleStorageWriter.java @@ -0,0 +1,43 @@ +package org.antvoice.beam; + +import org.antvoice.beam.entities.BigQueryRow; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.Contextful; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +public class GoogleStorageWriter + extends PTransform, PDone> { + + private String _dumpLocation; + private long _currentTimeMillis; + + public GoogleStorageWriter(String dumpLocation, long currentTimeMillis) { + _dumpLocation = dumpLocation; + this._currentTimeMillis = currentTimeMillis; + } + + @Override + public PDone expand(PCollection input) { + input.apply( + FileIO + .writeDynamic() + .by((SerializableFunction) + row -> String.format("%s/%s/", row.getDataset(), row.getTable())) + .via(Contextful.fn((SerializableFunction) BigQueryRow::getRow), + TextIO.sink()) + .to(_dumpLocation) + .withNaming(partition -> FileIO.Write.defaultNaming(partition, "")) + .withDestinationCoder(StringUtf8Coder.of()) + .withNumShards(10)); + + return PDone.in(input.getPipeline()); + } +} diff --git a/src/main/java/org/antvoice/beam/PubsubMessageProcessor.java b/src/main/java/org/antvoice/beam/PubsubMessageProcessor.java index f3c6888..216061e 100644 --- a/src/main/java/org/antvoice/beam/PubsubMessageProcessor.java +++ b/src/main/java/org/antvoice/beam/PubsubMessageProcessor.java @@ -1,6 +1,7 @@ package org.antvoice.beam; -import com.google.protobuf.ByteString; +import org.antvoice.beam.entities.BigQueryRow; +import org.antvoice.beam.helper.Zip; import org.antvoice.beam.metrics.CounterProvider; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.transforms.DoFn; @@ -10,29 +11,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.util.AbstractMap; import java.util.Map; -import java.util.zip.GZIPInputStream; -public class PubsubMessageProcessor extends PTransform, PCollection>> { +public class PubsubMessageProcessor + extends PTransform, PCollection> { private static final Logger LOG = LoggerFactory.getLogger(PubsubMessageProcessor.class); - private String _project; - public PubsubMessageProcessor(String project) { - _project = project; + public PubsubMessageProcessor() { } - public static class ExtractMessagesFn extends DoFn> { - private String _project; + public static class ExtractMessagesFn extends DoFn { private final CounterProvider _counterProvider = new CounterProvider(); - public ExtractMessagesFn(String project) { - _project = project; + public ExtractMessagesFn() { } @ProcessElement @@ -50,7 +44,8 @@ public void processElement(ProcessContext c) { String dataset = metadata.get("dataset"); String table = metadata.get("table"); - _counterProvider.getCounter("PubsubMessageProcessor", dataset + "." + table).inc(c.element().getPayload().length); + _counterProvider.getCounter("PubsubMessageProcessor", dataset + "." + table) + .inc(c.element().getPayload().length); String message; if(metadata.containsKey("compression")){ @@ -61,7 +56,7 @@ public void processElement(ProcessContext c) { } try { - message = UnzipMessage(c); + message = Zip.Unzip(c.element().getPayload()); } catch (IOException e) { LOG.error("Cannot uncompress gzip message", e); return; @@ -75,33 +70,13 @@ public void processElement(ProcessContext c) { } } - AbstractMap.SimpleImmutableEntry row = new AbstractMap.SimpleImmutableEntry<>(_project + ":" + dataset + "." + table, message); - c.output(row); - } - - private String UnzipMessage(ProcessContext c) throws IOException { - String message;ByteArrayInputStream bytein = new ByteArrayInputStream(c.element().getPayload()); - GZIPInputStream gzip = new GZIPInputStream(bytein); - ByteArrayOutputStream byteout = new ByteArrayOutputStream(); - - int res = 0; - byte buf[] = new byte[1024]; - while (res >= 0) { - res = gzip.read(buf, 0, buf.length); - if (res > 0) { - byteout.write(buf, 0, res); - } - } - - byte uncompressed[] = byteout.toByteArray(); - message = new String(uncompressed, "UTF-8"); - return message; + c.output(new BigQueryRow(dataset, table, message)); } } @Override - public PCollection> expand(PCollection input) { - return input.apply(ParDo.of(new ExtractMessagesFn(_project))); + public PCollection expand(PCollection input) { + return input.apply(ParDo.of(new ExtractMessagesFn())); } } diff --git a/src/main/java/org/antvoice/beam/StreamerOptions.java b/src/main/java/org/antvoice/beam/StreamerOptions.java index 5bb8e55..4137447 100644 --- a/src/main/java/org/antvoice/beam/StreamerOptions.java +++ b/src/main/java/org/antvoice/beam/StreamerOptions.java @@ -3,7 +3,6 @@ import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.Validation; public interface StreamerOptions extends GcpOptions { @@ -15,7 +14,7 @@ public interface StreamerOptions extends GcpOptions { String getSubscription(); void setSubscription(String value); - @Description("Serializtion type. Uses uncompressed JSON as a default. No other value at the moment") + @Description("Serialization type. Uses uncompressed JSON as a default. No other value at the moment") String getFormat(); void setFormat(String value); @@ -28,4 +27,14 @@ public interface StreamerOptions extends GcpOptions { @Default.Boolean(false) Boolean getAttached(); void setAttached(Boolean value); + + @Description("Used to redirect the data to Google Storage instead of BigQuery") + @Default.Boolean(false) + Boolean getDumpGoogleStorage(); + void setDumpGoogleStorage(Boolean isDumpGoogleStorage); + + @Description("The GS directory where the data will be dumped (they will dump in gs://dumpLocation/dataset/table/*") + @Default.String("") + String getDumpLocation(); + void setDumpLocation(String gsLocation); } diff --git a/src/main/java/org/antvoice/beam/StreamerRunner.java b/src/main/java/org/antvoice/beam/StreamerRunner.java index 94f2649..bf08f8d 100644 --- a/src/main/java/org/antvoice/beam/StreamerRunner.java +++ b/src/main/java/org/antvoice/beam/StreamerRunner.java @@ -17,11 +17,13 @@ */ package org.antvoice.beam; +import org.antvoice.beam.entities.BigQueryRow; import org.antvoice.beam.formatter.FormatterFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,19 +39,28 @@ * --runner=DataflowRunner */ public class StreamerRunner { - private static final Logger LOG = LoggerFactory.getLogger(StreamerRunner.class); - public static void main(String[] args) { + public static void main(String[] args) throws Exception { StreamerOptions options = PipelineOptionsFactory .fromArgs(args) .withValidation() .as(StreamerOptions.class); Pipeline p = Pipeline.create(options); + long currentTimeMillis = System.currentTimeMillis(); - p.apply("ReadLines", new PubsubReader(options.getTopic(), options.getSubscription())) - .apply("ExtractMessages", new PubsubMessageProcessor(options.getProject())) - .apply(Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())))) - .apply("WriteBq", new BigQueryWriter(new FormatterFactory(options.getFormat()).getFormatter())); + PCollection rows = p.apply("ReadLines", new PubsubReader(options.getTopic(), options.getSubscription())) + .apply("ExtractMessages", new PubsubMessageProcessor()) + .apply(Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())))); + + if(options.getDumpGoogleStorage()) { + if(options.getDumpLocation().isEmpty() || !options.getDumpLocation().startsWith("gs://")){ + throw new Exception("Invalid dumpLocation parameter. Must be a valid Google Storage URI."); + } + rows.apply("WriteGS", new GoogleStorageWriter(options.getDumpLocation(), currentTimeMillis)); + }else { + rows.apply("WriteBq", new BigQueryWriter(options.getProject(), + new FormatterFactory(options.getFormat()).getFormatter())); + } if(options.getAttached()){ p.run().waitUntilFinish(); @@ -58,3 +69,4 @@ public static void main(String[] args) { } } } + diff --git a/src/main/java/org/antvoice/beam/entities/BigQueryRow.java b/src/main/java/org/antvoice/beam/entities/BigQueryRow.java new file mode 100644 index 0000000..1c06854 --- /dev/null +++ b/src/main/java/org/antvoice/beam/entities/BigQueryRow.java @@ -0,0 +1,33 @@ +package org.antvoice.beam.entities; + +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; + +@DefaultCoder(AvroCoder.class) +public class BigQueryRow { + private String dataset; + private String table; + private String row; + + public BigQueryRow(){ + + } + + public BigQueryRow(String dataset, String table, String row) { + this.dataset = dataset; + this.table = table; + this.row = row; + } + + public String getDataset() { + return dataset; + } + + public String getTable() { + return table; + } + + public String getRow() { + return row; + } +} diff --git a/src/main/java/org/antvoice/beam/formatter/FormatterFactory.java b/src/main/java/org/antvoice/beam/formatter/FormatterFactory.java index be93a7f..84f62f5 100644 --- a/src/main/java/org/antvoice/beam/formatter/FormatterFactory.java +++ b/src/main/java/org/antvoice/beam/formatter/FormatterFactory.java @@ -2,6 +2,7 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.protobuf.ByteString; +import org.antvoice.beam.entities.BigQueryRow; import org.apache.beam.sdk.transforms.SerializableFunction; import java.util.AbstractMap; @@ -14,7 +15,7 @@ public FormatterFactory(String format) { _format = format; } - public SerializableFunction, TableRow> getFormatter() { + public SerializableFunction getFormatter() { // Only known format at the moment return new JsonRowFormatter(); } diff --git a/src/main/java/org/antvoice/beam/formatter/JsonRowFormatter.java b/src/main/java/org/antvoice/beam/formatter/JsonRowFormatter.java index 0daea77..8b5e8fb 100644 --- a/src/main/java/org/antvoice/beam/formatter/JsonRowFormatter.java +++ b/src/main/java/org/antvoice/beam/formatter/JsonRowFormatter.java @@ -1,6 +1,7 @@ package org.antvoice.beam.formatter; import com.google.api.services.bigquery.model.TableRow; +import org.antvoice.beam.entities.BigQueryRow; import org.apache.beam.sdk.transforms.SerializableFunction; import org.json.JSONArray; import org.json.JSONException; @@ -13,7 +14,7 @@ import java.util.ArrayList; import java.util.List; -public class JsonRowFormatter implements SerializableFunction, TableRow> { +public class JsonRowFormatter implements SerializableFunction { private static final Logger LOG = LoggerFactory.getLogger(JsonRowFormatter.class); @@ -56,9 +57,9 @@ private TableRow convertRow(JSONObject object) { } @Override - public TableRow apply(AbstractMap.SimpleImmutableEntry input) { + public TableRow apply(String input) { try { - JSONObject obj = new JSONObject(input.getValue()); + JSONObject obj = new JSONObject(input); TableRow row = convertRow(obj); diff --git a/src/main/java/org/antvoice/beam/helper/Zip.java b/src/main/java/org/antvoice/beam/helper/Zip.java new file mode 100644 index 0000000..746a6e1 --- /dev/null +++ b/src/main/java/org/antvoice/beam/helper/Zip.java @@ -0,0 +1,30 @@ +package org.antvoice.beam.helper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.zip.GZIPInputStream; + +public class Zip { + + public static String Unzip(byte[] content) throws IOException { + String message; + ByteArrayInputStream bytein = new ByteArrayInputStream(content); + GZIPInputStream gzip = new GZIPInputStream(bytein); + ByteArrayOutputStream byteout = new ByteArrayOutputStream(); + + int res = 0; + byte buf[] = new byte[1024]; + while (res >= 0) { + res = gzip.read(buf, 0, buf.length); + if (res > 0) { + byteout.write(buf, 0, res); + } + } + + byte uncompressed[] = byteout.toByteArray(); + message = new String(uncompressed, "UTF-8"); + return message; + } + +}