source: gs3-extensions/maori-lang-detection/hdfs-instructions/patches/CCIndexWarcExport.java@ 33524

Last change on this file since 33524 was 33524, checked in by ak19, 5 years ago
  1. Further adjustments to documenting what we did to get things to run on the hadoop filesystem. 2. All the hadoop related gitprojects (with patches), separate copy of patches, config modifications and missing jar files that we needed, scripts we created to run on the hdfs machine and its host machine.
File size: 8.6 KB
Line 
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17package org.commoncrawl.spark.examples;
18
19import java.io.IOException;
20import java.util.ArrayList;
21import java.util.Iterator;
22import java.util.List;
23
24import org.apache.commons.cli.CommandLine;
25import org.apache.commons.cli.CommandLineParser;
26import org.apache.commons.cli.Option;
27import org.apache.commons.cli.ParseException;
28import org.apache.commons.cli.PosixParser;
29import org.apache.hadoop.conf.Configuration;
30import org.apache.hadoop.io.Text;
31import org.apache.spark.api.java.JavaPairRDD;
32import org.apache.spark.api.java.JavaRDD;
33import org.apache.spark.sql.Dataset;
34import org.apache.spark.sql.Row;
35import org.commoncrawl.spark.util.WarcFileOutputFormat;
36import org.jets3t.service.S3Service;
37import org.jets3t.service.ServiceException;
38import org.jets3t.service.impl.rest.httpclient.RestS3Service;
39import org.jets3t.service.model.S3Object;
40import org.jets3t.service.utils.ServiceUtils;
41import org.slf4j.Logger;
42import org.slf4j.LoggerFactory;
43
44
45public class CCIndexWarcExport extends CCIndexExport {
46
47 private static final Logger LOG = LoggerFactory.getLogger(CCIndexExport.class);
48
49 protected static final String COMMON_CRAWL_BUCKET = "commoncrawl";
50
51 protected int numRecordsPerWarcFile = 10000;
52 protected String warcPrefix = "COMMON-CRAWL-EXPORT";
53 protected String warcCreator;
54 protected String warcOperator;
55 protected String csvQueryResult;
56
57 protected CCIndexWarcExport() {
58 super();
59 }
60
61 @Override
62 protected void addOptions() {
63 options.getOption("query")
64 .setDescription("SQL query to select rows. Note: the result is required to contain the columns `url', "
65 + "`warc_filename', `warc_record_offset' and `warc_record_length', make sure they're SELECTed.");
66 options.addOption(new Option(null, "csv", true, "CSV file to load WARC records by filename, offset and length."
67 + "The CSV file must have column headers and the input columns `url', `warc_filename', "
68 + "`warc_record_offset' and `warc_record_length' are mandatory, see also option --query. "));
69
70 options.addOption(
71 new Option(null, "numOutputPartitions", true, "repartition data to have <n> output partitions"));
72 options.addOption(new Option(null, "numRecordsPerWarcFile", true, "allow max. <n> records per WARC file. "
73 + "This will repartition the data so that in average one partition contains not more than <n> rows. "
74 + "Default is 10000, set to -1 to disable this option."
75 + "\nNote: if both --numOutputPartitions and --numRecordsPerWarcFile are used, the former defines "
76 + "the minimum number of partitions, the latter the maximum partition size."));
77
78 options.addOption(new Option(null, "warcPrefix", true, "WARC filename prefix"));
79 options.addOption(new Option(null, "warcCreator", true, "(WARC info record) creator of WARC export"));
80 options.addOption(new Option(null, "warcOperator", true, "(WARC info record) operator of WARC export"));
81 }
82
83 protected int parseOptions(String[] args, List<String> arguments) {
84
85 CommandLineParser parser = new PosixParser();
86 CommandLine cli;
87
88 try {
89 cli = parser.parse(options, args);
90 if (cli.hasOption("numRecordsPerWarcFile")) {
91 numRecordsPerWarcFile = Integer.parseInt(cli.getOptionValue("numRecordsPerWarcFile"));
92 }
93 if (cli.hasOption("warcPrefix")) {
94 warcPrefix = cli.getOptionValue("warcPrefix");
95 }
96 if (cli.hasOption("warcCreator")) {
97 warcCreator = cli.getOptionValue("warcCreator");
98 }
99 if (cli.hasOption("warcOperator")) {
100 warcOperator = cli.getOptionValue("warcOperator");
101 }
102 if (cli.hasOption("csv")) {
103 if (cli.hasOption("query")) {
104 LOG.error("Options --csv and --query are mutually exclusive.");
105 return 1;
106 }
107 csvQueryResult = cli.getOptionValue("csv");
108 }
109 } catch (ParseException e) {
110 // ignore, handled in call to super.parseOptions(...)
111 }
112
113 return super.parseOptions(args, arguments);
114 }
115
116 protected static byte[] getCCWarcRecord(S3Service s3, String filename, int offset, int length) {
117 long from = offset;
118 long to = offset + length - 1;
119 S3Object s3obj = null;
120 try {
121 s3obj = s3.getObject(COMMON_CRAWL_BUCKET, filename, null, null, null, null, from, to);
122 byte[] bytes = ServiceUtils.readInputStreamToBytes(s3obj.getDataInputStream());
123 s3obj.closeDataInputStream();
124 return bytes;
125 } catch (IOException | ServiceException e) {
126 LOG.error("Failed to fetch s3://{}/{} (bytes = {}-{}): {}", COMMON_CRAWL_BUCKET, filename, from, to, e);
127 } finally {
128 if (s3obj != null) {
129 try {
130 s3obj.closeDataInputStream();
131 } catch (IOException e) {
132 }
133 }
134 }
135 return null;
136 }
137
138 @Override
139 protected int run(String tablePath, String outputPath) {
140
141 Dataset<Row> sqlDF;
142 if (csvQueryResult != null) {
143 // ************* START GS TEAM CHANGES 1 ************ //
144 //sqlDF = sparkSession.read().format("csv").option("header", true).option("inferSchema", true)
145 sqlDF = sparkSession.read().format("csv").option("header", false).option("inferSchema", true)
146 .load(csvQueryResult);
147 // ************** END GS TEAM CHANGES 1 ************ //
148 } else {
149 loadTable(sparkSession, tablePath, tableName);
150 sqlDF = executeQuery(sparkSession, sqlQuery);
151 }
152 sqlDF.persist();
153
154 long numRows = sqlDF.count();
155 LOG.info("Number of records/rows matched by query: {}", numRows);
156 if (numRecordsPerWarcFile > 0) {
157 int n = 1 + (int) (numRows / numRecordsPerWarcFile);
158 if (n > numOutputPartitions) {
159 numOutputPartitions = n;
160 LOG.info("Distributing {} records to {} output partitions (max. {} records per WARC file)", numRows,
161 numOutputPartitions, numRecordsPerWarcFile);
162 } else {
163 // more output partitions requested
164 LOG.info("Distributing {} records to {} output partitions", numRows, numOutputPartitions);
165 }
166 }
167 if (numOutputPartitions > 0) {
168 LOG.info("Repartitioning data to {} output partitions", numOutputPartitions);
169 sqlDF = sqlDF.repartition(numOutputPartitions);
170 }
171
172 // ************* START GS TEAM CHANGES 2 ************ //
173 //JavaRDD<Row> rdd = sqlDF.select("url", "warc_filename", "warc_record_offset", "warc_record_length").rdd()
174 JavaRDD<Row> rdd = sqlDF.select("_c0", "_c1", "_c2", "_c3").rdd()
175 .toJavaRDD();
176 // ************** END GS TEAM CHANGES 2 ************* //
177
178 // fetch WARC content from s3://commoncrawl/ and map to paired RDD
179 // <Text url, byte[] warc_record>
180 JavaPairRDD<Text,byte[]> res = rdd.mapPartitionsToPair((Iterator<Row> rows) -> {
181 ArrayList<scala.Tuple2<Text, byte[]>> reslist = new ArrayList<>();
182 S3Service s3 = new RestS3Service(null);
183 while (rows.hasNext()) {
184 Row row = rows.next();
185 String url = row.getString(0);
186 String filename = row.getString(1);
187 int offset = row.getInt(2);
188 int length = row.getInt(3);
189 LOG.info("Fetching WARC record {} {} {} for {}", filename, offset, length, url);
190 byte[] bytes = getCCWarcRecord(s3, filename, offset, length);
191 if (bytes != null) {
192 reslist.add(new scala.Tuple2<Text, byte[]>(new Text(url), bytes));
193 }
194 }
195 return reslist.iterator();
196 }, false);
197
198 // save data as WARC files
199 Configuration conf = sparkSession.sparkContext().hadoopConfiguration();
200 conf.set("warc.export.prefix", warcPrefix);
201 if (warcCreator != null) {
202 conf.set("warc.export.creator", warcCreator);
203 }
204 if (warcOperator != null) {
205 conf.set("warc.export.operator", warcOperator);
206 }
207 conf.set("warc.export.software",
208 getClass().getCanonicalName() + " (Spark " + sparkSession.sparkContext().version() + ")");
209 conf.set("warc.export.description", "Common Crawl WARC export from " + tablePath + " for query: " + sqlQuery);
210
211 res.saveAsNewAPIHadoopFile(outputPath, String.class, byte[].class, WarcFileOutputFormat.class, conf);
212 LOG.info("Wrote {} WARC files with {} records total", numOutputPartitions, numRows);
213 sparkStats.report();
214
215 return 0;
216 }
217
218 public static void main(String[] args) throws IOException {
219 CCIndexExport job = new CCIndexWarcExport();
220 int success = job.run(args);
221 System.exit(success);
222 }
223
224}
Note: See TracBrowser for help on using the repository browser.