source: gs3-extensions/maori-lang-detection/hdfs-cc-work/patches/GZRangeClient.java@ 33541

Last change on this file since 33541 was 33541, checked in by ak19, 5 years ago
  1. hdfs-cc-work/GS_README.txt now contains the complete instructions to use Autistici crawl to download a website (as WARC file) as well as now also the instructions to convert those WARCs to WET. 2. Moved the first part out of MoreReading/crawling-Nutch.txt. 3. Adding patched WARC-to-WET files for the gitprojects ia-web-commons and ia-hadoop-tools to successfully do the WARC-to-WET processing on WARC files generated by Austistici crawl. (Worked on Dr Bainbridge's home page site as a test. Not tried any other site yet, as I wanted to get the work flow from crawl to WET working.)
File size: 12.2 KB
Line 
1package org.archive.server;
2import java.io.ByteArrayInputStream;
3import java.io.ByteArrayOutputStream;
4import java.io.File;
5import java.io.FileOutputStream;
6import java.io.IOException;
7import java.io.InputStream;
8import java.net.URL;
9import java.net.URLConnection;
10import java.nio.charset.Charset;
11import java.text.ParseException;
12import java.util.List;
13import java.util.UUID;
14import java.util.logging.Logger;
15import java.net.URI;
16import java.net.URISyntaxException;
17
18
19import org.apache.hadoop.conf.Configuration;
20import org.apache.hadoop.fs.FSDataInputStream;
21import org.apache.hadoop.fs.FileSystem;
22import org.apache.hadoop.fs.Path;
23import org.archive.util.HMACSigner;
24import org.archive.format.gzip.GZIPFormatException;
25import org.archive.format.gzip.GZIPMemberSeries;
26import org.archive.format.gzip.GZIPMemberWriter;
27import org.archive.format.gzip.GZIPSeriesMember;
28import org.archive.streamcontext.SimpleStream;
29import org.archive.util.IAUtils;
30import org.archive.util.DateUtils;
31import org.archive.util.FileNameSpec;
32import com.google.common.io.ByteStreams;
33
34public class GZRangeClient {
35
36 private final static Logger LOGGER =
37 Logger.getLogger(GZRangeClient.class.getName());
38
39 private static final Charset UTF8 = Charset.forName("UTF-8");
40 private static int CR = 13;
41 private static int LF = 10;
42 private static final long DEFAULT_MAX_ARC_SIZE = 1024 * 1024 * 100;
43 private static final long DEFAULT_MAX_WARC_SIZE = 1024 * 1024 * 1024;
44
45 private File targetDir;
46 private long maxArcSize;
47 private long maxWarcSize;
48 private String timestamp14;
49 private String timestampZ;
50 private FileNameSpec warcNamer;
51 private FileNameSpec arcNamer;
52 private File currentArc;
53 private File currentArcTmp;
54
55 private String hmacName = "";
56 private String hmacSignature = "";
57
58 protected static FileSystem hdfsSys = null;
59
60 private File currentWarc;
61 private File currentWarcTmp;
62
63 private FileOutputStream currentArcOS;
64 private long currentArcSize = 0;
65 private FileOutputStream currentWarcOS;
66 private long currentWarcSize = 0;
67 private byte[] warcHeaderContents;
68 private boolean exitOnError = false;
69
70 private final static String ARC_PATTERN =
71 "filedesc://%s 0.0.0.0 %s text/plain 76\n" +
72 "1 0 InternetArchive\n" +
73 "URL IP-address Archive-date Content-type Archive-length\n\n";
74
75 private final static String WARC_PATTERN =
76 "WARC/1.0\r\n" +
77 "Content-Type: application/warc-fields\r\n" +
78 "WARC-Type: warcinfo\r\n" +
79 "WARC-Warcinfo-ID: <urn:uuid:%s>\r\n" +
80 "Content-Length: %d\r\n\r\n" +
81 "WARC-Record-ID: <urn:uuid:%s>\r\n" +
82 "WARC-Date: %s\r\n";
83
84 /*
85filedesc://IQ-125-20061126082604-03075-crawling08.us.archive.org.arc 0.0.0.0 20061126082604 text/plain 1447
861 1 InternetArchive
87URL IP-address Archive-date Content-type Archive-length
88
89
90
91WARC-Date: 2009-10-10T21:33:10Z
92WARC-Filename: LOC-WEEKLY-008-20091010213310-06162-crawling110.us.archive.org.warc.gz
93WARC-Record-ID: <urn:uuid:776a760b-f456-48f1-97e3-1d29967c75d2>
94Content-Type: application/warc-fields
95Content-Length: 599
96
97software: Heritrix/1.15.4 http://crawler.archive.org
98ip: 207.241.235.29
99hostname: crawling110.us.archive.org
100format: WARC File Format 0.17
101conformsTo: http://crawler.archive.org/warc/0.17/WARC0.17ISO.doc
102operator: Vinay Goel
103publisher: Internet Archive
104audience: Library of Congress
105isPartOf: LOC-WEEKLY-008-RECOVER
106created: 2009-09-30T12:21:49Z
107description: Library of Congress Monthly Harvest
108robots: ignore
109http-header-user-agent: Mozilla/5.0 (compatible; archive.org_bot/1.5.0 +http://www.loc.gov/minerva/crawl.html)
110http-header-from: [email protected]
111
112 */
113
114 private static String DEFAULT_WARC_PATTERN = "Software: crawl/1.0\r\n" +
115 "Format: WARC File Format 1.0\r\n" +
116 "Conformsto: http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf\r\n\r\n";
117 // +
118 //"publisher: Internet Archive\r\n" +
119 //"created: %s\r\n\r\n";
120
121 private static final String defaultWarcHeaderString = String.format(
122 DEFAULT_WARC_PATTERN,
123 IAUtils.COMMONS_VERSION,
124 DateUtils.getLog17Date(System.currentTimeMillis()));
125
126 private static final byte[] DEFAULT_WARC_HEADER_BYTES =
127 defaultWarcHeaderString.getBytes(UTF8);
128
129 public GZRangeClient(File targetDir, String prefix, String timestamp14)
130 throws ParseException {
131
132 this.targetDir = targetDir;
133 arcNamer = new FileNameSpec(prefix, ".arc.gz");
134 warcNamer = new FileNameSpec(prefix, ".warc.gz");
135 this.timestamp14 = timestamp14;
136 long msse = DateUtils.parse14DigitDate(timestamp14).getTime();
137 this.timestampZ = DateUtils.getLog17Date(msse);
138 maxArcSize = DEFAULT_MAX_ARC_SIZE;
139 maxWarcSize = DEFAULT_MAX_WARC_SIZE;
140 warcHeaderContents = DEFAULT_WARC_HEADER_BYTES;
141 }
142
143 public GZRangeClient(File targetDir, String prefix, String timestamp14, String hmacName, String hmacSignature)
144 throws ParseException {
145
146 this.targetDir = targetDir;
147 arcNamer = new FileNameSpec(prefix, ".arc.gz");
148 warcNamer = new FileNameSpec(prefix, ".warc.gz");
149 this.timestamp14 = timestamp14;
150 long msse = DateUtils.parse14DigitDate(timestamp14).getTime();
151 this.timestampZ = DateUtils.getLog17Date(msse);
152 maxArcSize = DEFAULT_MAX_ARC_SIZE;
153 maxWarcSize = DEFAULT_MAX_WARC_SIZE;
154 warcHeaderContents = DEFAULT_WARC_HEADER_BYTES;
155 this.hmacName = hmacName;
156 this.hmacSignature = hmacSignature;
157 }
158
159 public void finish() throws IOException {
160 closeArc();
161 closeWarc();
162 }
163
164 private long getGZLength(InputStream is)
165 throws IOException, GZIPFormatException {
166
167 SimpleStream s = new SimpleStream(is);
168 GZIPMemberSeries gzs = new GZIPMemberSeries(s,"range",0,true);
169 GZIPSeriesMember m = gzs.getNextMember();
170 m.skipMember();
171 return m.getCompressedBytesRead();
172 }
173
174 public void append(long offset, List<String> urls) throws IOException {
175 boolean isArc = false;
176 String first = urls.get(0);
177 if(first.endsWith(".arc.gz")) {
178 isArc = true;
179 } else if(first.endsWith(".warc.gz")) {
180
181 } else {
182 throw new IOException("URL (" + first +
183 ") must end with '.arc.gz' or '.warc.gz'");
184 }
185
186 HMACSigner signer = null;
187 if(hmacName != null && hmacSignature != null && !hmacName.isEmpty() && !hmacSignature.isEmpty())
188 signer = new HMACSigner(hmacSignature, hmacName);
189
190 for(String url : urls) {
191 FileBackedInputStream fbis = null;
192 InputStream is = null;
193 try {
194 if(url.startsWith("http://")) {
195 URL u = new URL(url);
196 URLConnection conn = u.openConnection();
197 conn.setRequestProperty("Range", String.format("bytes=%d-", offset));
198 if(signer != null)
199 conn.setRequestProperty("Cookie", signer.getHMacCookieStr(1000));
200 LOGGER.info(String.format("Attempting(%d) from(%s)",offset,url));
201 conn.connect();
202 is = conn.getInputStream();
203 } else if(url.startsWith("hdfs://")){
204 URI u = new URI(url);
205 //only initialize the FS once
206 if (hdfsSys == null) {
207 Configuration conf = new Configuration();
208 URI defaultURI = new URI(u.getScheme() + "://" + u.getHost() + ":"+ u.getPort() + "/");
209 hdfsSys = FileSystem.get(defaultURI, conf);
210 }
211 Path path = new Path(u.getPath());
212 FSDataInputStream fis = hdfsSys.open(path);
213 fis.seek(offset);
214 is = fis;
215 }
216 fbis = new FileBackedInputStream(is);
217 long length = getGZLength(fbis);
218 InputStream orig = fbis.getInputStream();
219 if(isArc) {
220 writeARCRecord(orig, length);
221 } else {
222 writeWARCRecord(orig, length);
223 }
224 LOGGER.info(String.format("Wrote record(%d) from(%s)",
225 offset,url));
226 return;
227 } catch (IOException e) {
228 LOGGER.warning("FAILED URL-OFFSET("+url+")(" + offset+")");
229 } catch (URISyntaxException e) {
230 LOGGER.warning("FAILED URL-OFFSET("+url+")(" + offset+")");
231 } finally {
232 if(is != null) {
233 is.close();
234 }
235 if(fbis != null) {
236 fbis.resetBacker();
237 }
238 }
239 }
240 StringBuilder sb = new StringBuilder();
241 for(String u : urls) {
242 if(sb.length() != 0) {
243 sb.append(",");
244 }
245 sb.append(u);
246 }
247 String errMsg = String.format("Unable to get offset(%d) from (%s)",
248 offset,sb.toString());
249 if(exitOnError) {
250 throw new IOException(errMsg);
251 } else {
252 LOGGER.severe(errMsg);
253 }
254 }
255
256
257
258 private String getWARCRecordID() {
259 return "urn:uuid:" + UUID.randomUUID().toString();
260 }
261 private byte[] getARCHeader(String name) {
262 return String.format(ARC_PATTERN,name,timestamp14).getBytes(UTF8);
263 }
264 private byte[] getWARCHeader(String name) {
265 String t = String.format(WARC_PATTERN,
266 timestampZ,name,getWARCRecordID(),warcHeaderContents.length + 4);
267 byte[] b = t.getBytes(UTF8);
268 ByteArrayOutputStream baos = new ByteArrayOutputStream();
269 try {
270 baos.write(b);
271 baos.write(warcHeaderContents);
272 } catch(IOException e) {
273 // not gonna happen
274 }
275 baos.write(CR);
276 baos.write(LF);
277 baos.write(CR);
278 baos.write(LF);
279 return baos.toByteArray();
280 }
281
282 private void writeWARCRecord(InputStream is, long length) throws IOException {
283 if(currentWarcSize == 0) {
284 nextWarc();
285 }
286 InputStream lis = ByteStreams.limit(is, length);
287 ByteStreams.copy(lis, currentWarcOS);
288 currentWarcSize += length;
289 if(currentWarcSize > maxWarcSize) {
290 closeWarc();
291 }
292 }
293 private void writeARCRecord(InputStream is, long length) throws IOException {
294 if(currentArcSize == 0) {
295 nextArc();
296 }
297 InputStream lis = ByteStreams.limit(is, length);
298 ByteStreams.copy(lis, currentArcOS);
299 currentArcSize += length;
300 if(currentArcSize > maxArcSize) {
301 closeArc();
302 }
303 }
304
305 private void closeArc() throws IOException {
306 if(currentArcOS == null) {
307 return;
308 }
309 currentArcOS.close();
310 if(!currentArcTmp.renameTo(currentArc)) {
311 throw new IOException(String.format("Failed rename(%s) to (%s)",
312 currentArcTmp.getAbsolutePath(),
313 currentArc.getAbsolutePath()));
314 }
315 currentArcOS = null;
316 currentArcSize = 0;
317 LOGGER.info(String.format("Closed(%s)",currentArc.getAbsolutePath()));
318 }
319 private void closeWarc() throws IOException {
320 if(currentWarcOS == null) {
321 return;
322 }
323 currentWarcOS.close();
324 if(!currentWarcTmp.renameTo(currentWarc)) {
325 throw new IOException(String.format("Failed rename(%s) to (%s)",
326 currentWarcTmp.getAbsolutePath(),
327 currentWarc.getAbsolutePath()));
328 }
329 currentWarcOS = null;
330 currentWarcSize = 0;
331 LOGGER.info(String.format("Closed(%s)",currentWarc.getAbsolutePath()));
332 }
333 private void nextArc() throws IOException {
334 String newArcName = arcNamer.getNextName();
335 currentArc = new File(targetDir,newArcName);
336 String tmpArcName = newArcName + ".OPEN";
337 currentArcTmp = new File(targetDir,tmpArcName);
338 currentArcOS = new FileOutputStream(currentArcTmp);
339 byte[] header = getARCHeader(newArcName);
340 GZIPMemberWriter w = new GZIPMemberWriter(currentArcOS);
341 w.write(new ByteArrayInputStream(header));
342 currentArcSize = w.getBytesWritten();
343 LOGGER.info(String.format("Openned(%s)",currentArc.getAbsolutePath()));
344 }
345
346 private void nextWarc() throws IOException {
347 String newWarcName = warcNamer.getNextName();
348 currentWarc = new File(targetDir,newWarcName);
349 String tmpWarcName = newWarcName + ".OPEN";
350 currentWarcTmp = new File(targetDir,tmpWarcName);
351 currentWarcOS = new FileOutputStream(currentWarcTmp);
352
353 byte[] header = getWARCHeader(newWarcName);
354 GZIPMemberWriter w = new GZIPMemberWriter(currentWarcOS);
355 w.write(new ByteArrayInputStream(header));
356 currentWarcSize = w.getBytesWritten();
357 LOGGER.info(String.format("Openned(%s)",currentWarc.getAbsolutePath()));
358 }
359
360 /**
361 * @return the warcHeaderContents
362 */
363 public byte[] getWarcHeaderContents() {
364 return warcHeaderContents;
365 }
366
367 /**
368 * @param warcHeaderContents the warcHeaderContents to set
369 */
370 public void setWarcHeaderContents(byte[] warcHeaderContents) {
371 this.warcHeaderContents = warcHeaderContents;
372 }
373
374 /**
375 * @return the maxArcSize
376 */
377 public long getMaxArcSize() {
378 return maxArcSize;
379 }
380
381 /**
382 * @param maxArcSize the maxArcSize to set
383 */
384 public void setMaxArcSize(long maxArcSize) {
385 this.maxArcSize = maxArcSize;
386 }
387
388 /**
389 * @return the maxWarcSize
390 */
391 public long getMaxWarcSize() {
392 return maxWarcSize;
393 }
394
395 /**
396 * @param maxWarcSize the maxWarcSize to set
397 */
398 public void setMaxWarcSize(long maxWarcSize) {
399 this.maxWarcSize = maxWarcSize;
400 }
401
402
403 /**
404 * @return the exitOnError
405 */
406 public boolean isExitOnError() {
407 return exitOnError;
408 }
409
410
411 /**
412 * @param exitOnError the exitOnError to set
413 */
414 public void setExitOnError(boolean exitOnError) {
415 this.exitOnError = exitOnError;
416 }
417}
Note: See TracBrowser for help on using the repository browser.