Add UDF for computing MD5 checksum
Inspired by the recent UDF on image processing in Warcbase. If we had a UDF for computing the MD5 checksum of arbitrary data, we could apply to all images and find, for example, the most popular gifs in GeoCities.
PR: https://github.com/lintool/warcbase/pull/237
UDF for computing the MD5 checksum => ComputeChecksum.get(url: String, timeoutVal: Int = 5000, removeIconImage: Boolean = false, minWidth: Int = 30, minHeight: Int = 30)
ComputeChecksum.get("https://avatars1.githubusercontent.com/u/7608739?v=3&s=96")
Most popular images => ExtractPopularImages(records: RDD[ArchiveRecord], limit: Int, timeoutVal: Int = 5000)
import org.warcbase.spark.matchbox._
import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.RecordLoader
val recs=RecordLoader.loadArchives("/collections/webarchives/geocities/warcs/",sc).keepUrlPatterns(Set("http://geocities.com/EnchantedForest/.*".r)).persist()
ExtractPopularImages(recs, 1500) // show 1500 most populr
Example result: https://github.com/yb1/warcbase/blob/k_means2/src/main/resources/top1500.md
@yb1 Sorry for not being clear... your current implementation fetches the images from IA, which requires web access and thus is slow. We already have the images in our archives, so we should work from those.
I envision something like:
RecordLoader.loadArchives(...)
.keepImages()
.map( img => (img.getUrl, ComputeMD5(img.getRawBytes), ComputeSize(img.getRawBytes)) )
After that, we can filter by size as we wish, group by, etc.
Just chiming in to quickly note that this is going to be great – I already have a use case in mind (comparing two collections), esp. once we can get the icons filtered out.
As noted in e-mail, happy to test this, but can you send me a test script? Haven't been able to get it working but am probably having a brainfart here.
If so I will test on our CPP collection, both on cluster & rho server.
This was implemented in ab72ae4.
Example usage:
import org.warcbase.spark.matchbox._
import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.RecordLoader
val r = RecordLoader.loadArc("/mnt/vol1/data_sets/cpp/cpp_arcs_accession_01/ARCHIVEIT-227-UOFTORONTO-CANPOLPINT-20090202064118-00089-crawling04.us.archive.org.arc.gz", sc).persist()
ExtractPopularImages(r, 100) // show 100 most popular
In this example, it took the ARC file, found the 100 most popular images (based on a MD5 checksum) and outputted them as an array.
Am going to run this on a larger collection, provide example script and output.
Struggling with most efficient way to send the output to a text file rather than just shell output. @yb1 can you work on the following script so that it saves it out:
something like:
import org.warcbase.spark.matchbox._
import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.RecordLoader
val r = RecordLoader.loadArchives("/collections/webarchives/geocities/warcs/*.warc.gz",sc).persist()
r.ExtractPopularImages(r, 2000)
.saveAsTextFile("2000-Popular-Images-Geocities/")
For now, please use
import org.warcbase.spark.matchbox._
import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.RecordLoader
val r = RecordLoader.loadArchives("/collections/webarchives/geocities/warcs/*.warc.gz",sc).persist()
val arr = r.ExtractPopularImages(r, 2000)
sc.parallelize(arr.map(x=>x._2._2 + "\t" + x._2._3), 1).saveAsTextFile("2000-Popular-Images-Geocities13")
Thanks.
Thanks @yb1 – is there a way to bake the sc.parallelize(arr.map(x=>x._2._2 + "\t" + x._2._3), 1) command into the underlying UDF – that'll be a bit intimidating for our user base (even if they don't need to fully understand it).
And what would be the best way to do subsets, i.e. implementing .keepUrlPatterns(Set("(?i)http://geocities.com/EnchantedForest/.*".r)) as well.
Right now, testing with minor tweak on your script above:
import org.warcbase.spark.matchbox._
import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.RecordLoader
val r = RecordLoader.loadArchives("/collections/webarchives/geocities/warcs/*.warc.gz",sc).persist()
val arr = ExtractPopularImages(r, 2000)
sc.parallelize(arr.map(x=>x._2._2 + "\t" + x._2._3), 1).saveAsTextFile("2000-Popular-Images-Geocities13")
The output type is modified to rdd. (https://github.com/lintool/warcbase/pull/241).
New script will be:
import org.warcbase.spark.matchbox._
import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.RecordLoader
val r = RecordLoader.loadArchives("/collections/webarchives/geocities/warcs/*.warc.gz",sc).persist()
ExtractPopularImages(r, 2000, sc).saveAsTextFile("2000-Popular-Images-Geocities14")
And for subsets, I'll get back to you soon.
Thanks!
Great, @yb1. Did you successfully get a run of this done on GeoCities? If so, what memory settings did you use? (just so we don't re-invent the wheel – I ran into memory crashes with spark-shell --jars /cliphomes/i2millig/warcbase/warcbase-core/target/warcbase-core-0.1.0-SNAPSHOT-fatjar.jar --num-executors 75 --executor-cores 5 --executor-memory 10G --driver-memory 10G).
Works like a charm.
Ran with spark-shell --jars /cliphomes/i2millig/warcbase/warcbase-core/target/warcbase-core-0.1.0-SNAPSHOT-fatjar.jar --num-executors 50 --executor-cores 5 --executor-memory 20G --driver-memory 10G --conf spark.yarn.executor.memoryOverhead=4096 --conf spark.driver.maxResultSize=2048.
Script:
import org.warcbase.spark.matchbox._
import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.RecordLoader
val r = RecordLoader.loadArchives("/collections/webarchives/geocities/warcs/*.warc.gz",sc).persist()
ExtractPopularImages(r, 2000, sc).saveAsTextFile("2000-Popular-Images-Geocities14")
Since this works, will document and then close the issue.