nutch icon indicating copy to clipboard operation
nutch copied to clipboard

NUTCH-2793 indexer-csv: make it work in distributed mode

Open pmezard opened this issue 5 years ago • 6 comments

Before the change, the output file name was hard-coded to "nutch.csv". When running in distributed mode, multiple reducers would clobber each other output.

After the change, the filename is taken from the first open(cfg, name) initialization call, where name is a unique file name generated by IndexerOutputFormat, derived from hadoop FileOutputFormat. The CSV files are now named like part-r-000xx.

pmezard avatar Jun 10 '20 12:06 pmezard

What are the backward compatibility requirements for nutch? Is it OK to just change the interface and implement what you suggest? Should it be best-effort to keep things BC? Or is it impossible to implement such a change at this point?

pmezard avatar Jun 10 '20 16:06 pmezard

Is it OK to just change the interface and implement what you suggest?

Yes, that's ok. We'll put a notice about a breaking change to the release notes, so that users having there own indexer plugin know they have to adapt it.

Should it be best-effort to keep things BC?

We could try to only extend the IndexWriter interface and provide default do-nothing implementations for newly added methods as most index writers do not write data to the filesystem.

sebastian-nagel avatar Jun 10 '20 16:06 sebastian-nagel

OK, there is a lot to unpack. Let me try to rephrase what was my naive understanding of the issue, how I intended to fix it and what is wrong about it.

What I saw is indexing to csv worked locally but failed in a distributed setup (with only 3 nodes). The reduce step emitted errors when writing data to GCS. At the end, there was something containing roughly a third of the expected dataset. I assumed I had 3 reducers overwriting each other with only one winner at the end (or a mix of winning output blocks). So I thought "if only I could map the CSVIndexWriter output file to a reducer to separate each reducer output, that would solve the issue".

What you are saying is:

  • In addition to distributed mode requiring the writers output to be separated, there is a lot of complexity involved with dealing with eventually consistent object stores (I will assume that GCS works roughly like S3). Ideally we would like reducers output to appear in the outpath only if the tasks or jobs succeed, which involves the commiter logic you referenced. But in an initial implementation we may not care about that. If the indexing fails, partial output will be left in outpath and such is life (I am OK with that).
  • I assumed that NutchAction writes in a given reducer are serialized. It it no clear to me if this is correct or not.
  • Exchanges introduce additional complexity in that a single NutchAction can be handled by more than one writer. I do not see what would be the issue with this assuming each writer output are separated. If I have 2 writers with an outpath set to "out1" and "out2", in a reducer generating a "part-r-0001", the actions would go either in "out1/part-r-0001" or "out2/part-r-0002" or both. I do not see overlapping writes there.
  • Same reasoning with there is also the open question how to allow two index writers writing output the filesystem:. Again I assume the writers have distinct output "directories" and the active reducer defines a unique output file name, so the combination of both should be unique.
  • About "name" was just an arbitrary name not a file name indicating a task-specific output path, maybe but does anything prevents it to be used that way? getUniqueFile seems suitable here.

With this current understanding, I would now implement it like:

  • Kill open(Configuration cfg, String name) method, if possible (I haven't checked the code yet).
  • Refactor open(IndexWriterParams params) into open(IndexWriterParams params, String name), where name would be the same thing passed to the other method.
  • In CSVIndexWriter, use name directly and drop the filename kludge I introduced.
  • Maybe implement a fallback of the previous method to the new one with a dummy argument.

How far am I?

pmezard avatar Jun 11 '20 15:06 pmezard

Thanks for the exhaustive listing. I have only a few points to add.

I assumed that NutchAction writes in a given reducer are serialized. It it no clear to me if this is correct or not.

The MapReduce framework takes care of data serialization and concurrency issues: the reduce() method is never called concurrently within one task - tasks run in parallel and that's why every task needs it's own output (part-r-nnnnn). The name of the output file (the number in n) is also determined by the framework - that's important if a task is restarted to avoid duplicated output.

writers have distinct output "directories" and the active reducer defines a unique output file name, so the combination of both should be unique.

I think we need 3 components:

  • the task-specific file or folder (part-r-nnnnn)
  • a unique folder per index writer (eg. the name or a path defined in index-writers.xml)
  • a job-specific output location - you do not want to change the index-writers.xml for that if you run another indexing job

In short, the path of a task output might look like: job-output/indexer-csv-1/part-r-00000.csv

getUniqueFile

You mean [ParseOutputFormat::getUniqueFile](https://github.com/apache/nutch/blob/59d0d9532abdac409e123ab103a506cfb0df790a/src/java/org/apache/nutch/parse/ParseOutputFormat.java#L120]? ParseOutputFormat or FetcherOutputFormat are good examples as they write output into multiple segment subdirectories. Hence, there are no plugins involved which determine whether there is output written to the filesystem or not.

Maybe implement a fallback of the previous method to the new one with a dummy argument

That could be done using default method implementations in Java 8 interfaces. Note: Nutch requires now Java 8 but it started with Java 1.4 and there is still a lot of code not using features of Java 8.

Also, to keep the indexer usable (because most index writers (solr, elasticsearch, etc.) do not write output to the filesystem): if nothing is written to the filesystem IndexingJob should not require an output location as command-line argument.

sebastian-nagel avatar Jun 12 '20 08:06 sebastian-nagel

Thank you for the details.

One thing I wonder is if it would not be possible to define the index-writers specific path as their identifier in index-writers.xml, at least by default. It would be unique by construction, which reduces a bit the amount of configuration. Drawbacks:

  • The identifier may be arbitrary and not compatible with FS/Object stores paths constraints. Not sure how hard it would be to detect that in practice, or if it is a real problem in practice.
  • Said identifiers are a bit ugly, like indexer_csv_1. Maybe we can change them. Or maybe that's not an issue.

pmezard avatar Jun 12 '20 12:06 pmezard

Yes, we could use the identifier but as we already have the param "outpath" - why not use it? The other constraints should be documented.

sebastian-nagel avatar Jun 15 '20 08:06 sebastian-nagel