1 | /*
|
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one or more
|
---|
3 | * contributor license agreements. See the NOTICE file distributed with
|
---|
4 | * this work for additional information regarding copyright ownership.
|
---|
5 | * The ASF licenses this file to You under the Apache License, Version 2.0
|
---|
6 | * (the "License"); you may not use this file except in compliance with
|
---|
7 | * the License. You may obtain a copy of the License at
|
---|
8 | *
|
---|
9 | * http://www.apache.org/licenses/LICENSE-2.0
|
---|
10 | *
|
---|
11 | * Unless required by applicable law or agreed to in writing, software
|
---|
12 | * distributed under the License is distributed on an "AS IS" BASIS,
|
---|
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
---|
14 | * See the License for the specific language governing permissions and
|
---|
15 | * limitations under the License.
|
---|
16 | */
|
---|
17 | package org.commoncrawl.spark.examples;
|
---|
18 |
|
---|
19 | import java.io.IOException;
|
---|
20 | import java.util.ArrayList;
|
---|
21 | import java.util.Iterator;
|
---|
22 | import java.util.List;
|
---|
23 |
|
---|
24 | import org.apache.commons.cli.CommandLine;
|
---|
25 | import org.apache.commons.cli.CommandLineParser;
|
---|
26 | import org.apache.commons.cli.Option;
|
---|
27 | import org.apache.commons.cli.ParseException;
|
---|
28 | import org.apache.commons.cli.PosixParser;
|
---|
29 | import org.apache.hadoop.conf.Configuration;
|
---|
30 | import org.apache.hadoop.io.Text;
|
---|
31 | import org.apache.spark.api.java.JavaPairRDD;
|
---|
32 | import org.apache.spark.api.java.JavaRDD;
|
---|
33 | import org.apache.spark.sql.Dataset;
|
---|
34 | import org.apache.spark.sql.Row;
|
---|
35 | import org.commoncrawl.spark.util.WarcFileOutputFormat;
|
---|
36 | import org.jets3t.service.S3Service;
|
---|
37 | import org.jets3t.service.ServiceException;
|
---|
38 | import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
---|
39 | import org.jets3t.service.model.S3Object;
|
---|
40 | import org.jets3t.service.utils.ServiceUtils;
|
---|
41 | import org.slf4j.Logger;
|
---|
42 | import org.slf4j.LoggerFactory;
|
---|
43 |
|
---|
44 |
|
---|
45 | public class CCIndexWarcExport extends CCIndexExport {
|
---|
46 |
|
---|
47 | private static final Logger LOG = LoggerFactory.getLogger(CCIndexExport.class);
|
---|
48 |
|
---|
49 | protected static final String COMMON_CRAWL_BUCKET = "commoncrawl";
|
---|
50 |
|
---|
51 | protected int numRecordsPerWarcFile = 10000;
|
---|
52 | protected String warcPrefix = "COMMON-CRAWL-EXPORT";
|
---|
53 | protected String warcCreator;
|
---|
54 | protected String warcOperator;
|
---|
55 | protected String csvQueryResult;
|
---|
56 |
|
---|
57 | protected CCIndexWarcExport() {
|
---|
58 | super();
|
---|
59 | }
|
---|
60 |
|
---|
61 | @Override
|
---|
62 | protected void addOptions() {
|
---|
63 | options.getOption("query")
|
---|
64 | .setDescription("SQL query to select rows. Note: the result is required to contain the columns `url', "
|
---|
65 | + "`warc_filename', `warc_record_offset' and `warc_record_length', make sure they're SELECTed.");
|
---|
66 | options.addOption(new Option(null, "csv", true, "CSV file to load WARC records by filename, offset and length."
|
---|
67 | + "The CSV file must have column headers and the input columns `url', `warc_filename', "
|
---|
68 | + "`warc_record_offset' and `warc_record_length' are mandatory, see also option --query. "));
|
---|
69 |
|
---|
70 | options.addOption(
|
---|
71 | new Option(null, "numOutputPartitions", true, "repartition data to have <n> output partitions"));
|
---|
72 | options.addOption(new Option(null, "numRecordsPerWarcFile", true, "allow max. <n> records per WARC file. "
|
---|
73 | + "This will repartition the data so that in average one partition contains not more than <n> rows. "
|
---|
74 | + "Default is 10000, set to -1 to disable this option."
|
---|
75 | + "\nNote: if both --numOutputPartitions and --numRecordsPerWarcFile are used, the former defines "
|
---|
76 | + "the minimum number of partitions, the latter the maximum partition size."));
|
---|
77 |
|
---|
78 | options.addOption(new Option(null, "warcPrefix", true, "WARC filename prefix"));
|
---|
79 | options.addOption(new Option(null, "warcCreator", true, "(WARC info record) creator of WARC export"));
|
---|
80 | options.addOption(new Option(null, "warcOperator", true, "(WARC info record) operator of WARC export"));
|
---|
81 | }
|
---|
82 |
|
---|
83 | protected int parseOptions(String[] args, List<String> arguments) {
|
---|
84 |
|
---|
85 | CommandLineParser parser = new PosixParser();
|
---|
86 | CommandLine cli;
|
---|
87 |
|
---|
88 | try {
|
---|
89 | cli = parser.parse(options, args);
|
---|
90 | if (cli.hasOption("numRecordsPerWarcFile")) {
|
---|
91 | numRecordsPerWarcFile = Integer.parseInt(cli.getOptionValue("numRecordsPerWarcFile"));
|
---|
92 | }
|
---|
93 | if (cli.hasOption("warcPrefix")) {
|
---|
94 | warcPrefix = cli.getOptionValue("warcPrefix");
|
---|
95 | }
|
---|
96 | if (cli.hasOption("warcCreator")) {
|
---|
97 | warcCreator = cli.getOptionValue("warcCreator");
|
---|
98 | }
|
---|
99 | if (cli.hasOption("warcOperator")) {
|
---|
100 | warcOperator = cli.getOptionValue("warcOperator");
|
---|
101 | }
|
---|
102 | if (cli.hasOption("csv")) {
|
---|
103 | if (cli.hasOption("query")) {
|
---|
104 | LOG.error("Options --csv and --query are mutually exclusive.");
|
---|
105 | return 1;
|
---|
106 | }
|
---|
107 | csvQueryResult = cli.getOptionValue("csv");
|
---|
108 | }
|
---|
109 | } catch (ParseException e) {
|
---|
110 | // ignore, handled in call to super.parseOptions(...)
|
---|
111 | }
|
---|
112 |
|
---|
113 | return super.parseOptions(args, arguments);
|
---|
114 | }
|
---|
115 |
|
---|
116 | protected static byte[] getCCWarcRecord(S3Service s3, String filename, int offset, int length) {
|
---|
117 | long from = offset;
|
---|
118 | long to = offset + length - 1;
|
---|
119 | S3Object s3obj = null;
|
---|
120 | try {
|
---|
121 | s3obj = s3.getObject(COMMON_CRAWL_BUCKET, filename, null, null, null, null, from, to);
|
---|
122 | byte[] bytes = ServiceUtils.readInputStreamToBytes(s3obj.getDataInputStream());
|
---|
123 | s3obj.closeDataInputStream();
|
---|
124 | return bytes;
|
---|
125 | } catch (IOException | ServiceException e) {
|
---|
126 | LOG.error("Failed to fetch s3://{}/{} (bytes = {}-{}): {}", COMMON_CRAWL_BUCKET, filename, from, to, e);
|
---|
127 | } finally {
|
---|
128 | if (s3obj != null) {
|
---|
129 | try {
|
---|
130 | s3obj.closeDataInputStream();
|
---|
131 | } catch (IOException e) {
|
---|
132 | }
|
---|
133 | }
|
---|
134 | }
|
---|
135 | return null;
|
---|
136 | }
|
---|
137 |
|
---|
138 | @Override
|
---|
139 | protected int run(String tablePath, String outputPath) {
|
---|
140 |
|
---|
141 | Dataset<Row> sqlDF;
|
---|
142 | if (csvQueryResult != null) {
|
---|
143 | // ************* START GS TEAM CHANGES 1 ************ //
|
---|
144 | //sqlDF = sparkSession.read().format("csv").option("header", true).option("inferSchema", true)
|
---|
145 | sqlDF = sparkSession.read().format("csv").option("header", false).option("inferSchema", true)
|
---|
146 | .load(csvQueryResult);
|
---|
147 | // ************** END GS TEAM CHANGES 1 ************ //
|
---|
148 | } else {
|
---|
149 | loadTable(sparkSession, tablePath, tableName);
|
---|
150 | sqlDF = executeQuery(sparkSession, sqlQuery);
|
---|
151 | }
|
---|
152 | sqlDF.persist();
|
---|
153 |
|
---|
154 | long numRows = sqlDF.count();
|
---|
155 | LOG.info("Number of records/rows matched by query: {}", numRows);
|
---|
156 | if (numRecordsPerWarcFile > 0) {
|
---|
157 | int n = 1 + (int) (numRows / numRecordsPerWarcFile);
|
---|
158 | if (n > numOutputPartitions) {
|
---|
159 | numOutputPartitions = n;
|
---|
160 | LOG.info("Distributing {} records to {} output partitions (max. {} records per WARC file)", numRows,
|
---|
161 | numOutputPartitions, numRecordsPerWarcFile);
|
---|
162 | } else {
|
---|
163 | // more output partitions requested
|
---|
164 | LOG.info("Distributing {} records to {} output partitions", numRows, numOutputPartitions);
|
---|
165 | }
|
---|
166 | }
|
---|
167 | if (numOutputPartitions > 0) {
|
---|
168 | LOG.info("Repartitioning data to {} output partitions", numOutputPartitions);
|
---|
169 | sqlDF = sqlDF.repartition(numOutputPartitions);
|
---|
170 | }
|
---|
171 |
|
---|
172 | // ************* START GS TEAM CHANGES 2 ************ //
|
---|
173 | //JavaRDD<Row> rdd = sqlDF.select("url", "warc_filename", "warc_record_offset", "warc_record_length").rdd()
|
---|
174 | JavaRDD<Row> rdd = sqlDF.select("_c0", "_c1", "_c2", "_c3").rdd()
|
---|
175 | .toJavaRDD();
|
---|
176 | // ************** END GS TEAM CHANGES 2 ************* //
|
---|
177 |
|
---|
178 | // fetch WARC content from s3://commoncrawl/ and map to paired RDD
|
---|
179 | // <Text url, byte[] warc_record>
|
---|
180 | JavaPairRDD<Text,byte[]> res = rdd.mapPartitionsToPair((Iterator<Row> rows) -> {
|
---|
181 | ArrayList<scala.Tuple2<Text, byte[]>> reslist = new ArrayList<>();
|
---|
182 | S3Service s3 = new RestS3Service(null);
|
---|
183 | while (rows.hasNext()) {
|
---|
184 | Row row = rows.next();
|
---|
185 | String url = row.getString(0);
|
---|
186 | String filename = row.getString(1);
|
---|
187 | int offset = row.getInt(2);
|
---|
188 | int length = row.getInt(3);
|
---|
189 | LOG.info("Fetching WARC record {} {} {} for {}", filename, offset, length, url);
|
---|
190 | byte[] bytes = getCCWarcRecord(s3, filename, offset, length);
|
---|
191 | if (bytes != null) {
|
---|
192 | reslist.add(new scala.Tuple2<Text, byte[]>(new Text(url), bytes));
|
---|
193 | }
|
---|
194 | }
|
---|
195 | return reslist.iterator();
|
---|
196 | }, false);
|
---|
197 |
|
---|
198 | // save data as WARC files
|
---|
199 | Configuration conf = sparkSession.sparkContext().hadoopConfiguration();
|
---|
200 | conf.set("warc.export.prefix", warcPrefix);
|
---|
201 | if (warcCreator != null) {
|
---|
202 | conf.set("warc.export.creator", warcCreator);
|
---|
203 | }
|
---|
204 | if (warcOperator != null) {
|
---|
205 | conf.set("warc.export.operator", warcOperator);
|
---|
206 | }
|
---|
207 | conf.set("warc.export.software",
|
---|
208 | getClass().getCanonicalName() + " (Spark " + sparkSession.sparkContext().version() + ")");
|
---|
209 | conf.set("warc.export.description", "Common Crawl WARC export from " + tablePath + " for query: " + sqlQuery);
|
---|
210 |
|
---|
211 | res.saveAsNewAPIHadoopFile(outputPath, String.class, byte[].class, WarcFileOutputFormat.class, conf);
|
---|
212 | LOG.info("Wrote {} WARC files with {} records total", numOutputPartitions, numRows);
|
---|
213 | sparkStats.report();
|
---|
214 |
|
---|
215 | return 0;
|
---|
216 | }
|
---|
217 |
|
---|
218 | public static void main(String[] args) throws IOException {
|
---|
219 | CCIndexExport job = new CCIndexWarcExport();
|
---|
220 | int success = job.run(args);
|
---|
221 | System.exit(success);
|
---|
222 | }
|
---|
223 |
|
---|
224 | }
|
---|