package org.embulk.filter.crawler;

import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskSource;
import org.embulk.spi.Column;
import org.embulk.spi.Exec;
import org.embulk.spi.FilterPlugin;
import org.embulk.spi.Page;
import org.embulk.spi.PageBuilder;
import org.embulk.spi.PageOutput;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
import org.embulk.spi.type.Types;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import edu.uci.ics.crawler4j.crawler.CrawlConfig;
import edu.uci.ics.crawler4j.crawler.CrawlController;
import edu.uci.ics.crawler4j.fetcher.PageFetcher;
import edu.uci.ics.crawler4j.robotstxt.RobotstxtConfig;
import edu.uci.ics.crawler4j.robotstxt.RobotstxtServer;

public class CrawlerFilterPlugin
        implements FilterPlugin
{
    public interface PluginTask
            extends Task
    {
        @Config("max_depth_of_crawling")
        @ConfigDefault("null")
        public Optional<Integer> getMaxDepthOfCrawling();

        @Config("number_of_crawlers")
        @ConfigDefault("1")
        public int getNumberOfCrawlers();

        @Config("max_pages_to_fetch")
        @ConfigDefault("-1")
        public int getMaxPagesToFetch();

        @Config("target_key")
        public String getTargetKey();

        @Config("crawl_storage_folder")
        public String getCrawlStorageFolder();

        @Config("politeness_delay")
        @ConfigDefault("null")
        public Optional<Integer> getPolitenessDelay();

        @Config("user_agent_string")
        @ConfigDefault("null")
        public Optional<String> getUserAgentString();

        @Config("keep_input")
        @ConfigDefault("true")
        public boolean getKeepInput();

        @Config("output_prefix")
        @ConfigDefault("\"\"")
        public String getOutputPrefix();

        @Config("should_not_visit_pattern")
        @ConfigDefault("null")
        public Optional<String> getShouldNotVisitPattern();

        @Config("connection_timeout")
        @ConfigDefault("null")
        public Optional<Integer> getConnectionTimeout();

        @Config("socket_timeout")
        @ConfigDefault("null")
        public Optional<Integer> getSocketTimeout();
    }

    @Override
    public void transaction(ConfigSource config, Schema inputSchema,
            FilterPlugin.Control control)
    {
        PluginTask task = config.loadConfig(PluginTask.class);
        ImmutableList.Builder<Column> builder = ImmutableList.builder();

        int i = 0;
        builder.addAll(getOutputColumns(i, task.getOutputPrefix()));

        Schema outputSchema = new Schema(builder.build());
        control.run(task.dump(), outputSchema);
    }

    /**
     * @param i
     * @param outputPrefix
     * @return
     */
    private List<Column> getOutputColumns(int i, String outputPrefix) {
        List<Column> list = Lists.newArrayList();
        list.add(new Column(i++, outputPrefix + Constants.URL, Types.STRING));
        list.add(new Column(i++, outputPrefix + Constants.DOMAIN, Types.STRING));
        list.add(new Column(i++, outputPrefix + Constants.SUBDOMAIN, Types.STRING));
        list.add(new Column(i++, outputPrefix + Constants.PATH, Types.STRING));
        list.add(new Column(i++, outputPrefix + Constants.ANCHOR, Types.STRING));
        list.add(new Column(i++, outputPrefix + Constants.PARENT_URL, Types.STRING));
        list.add(new Column(i++, outputPrefix + Constants.CONTENT_CHARSET, Types.STRING));
        list.add(new Column(i++, outputPrefix + Constants.REDIRECT_TO_URL, Types.STRING));
        list.add(new Column(i++, outputPrefix + Constants.LANGUAGE, Types.STRING));
        list.add(new Column(i++, outputPrefix + Constants.STATUS_CODE, Types.LONG));
        list.add(new Column(i++, outputPrefix + Constants.TITLE, Types.STRING));
        list.add(new Column(i++, outputPrefix + Constants.TEXT, Types.STRING));
        list.add(new Column(i++, outputPrefix + Constants.HTML, Types.STRING));
        return list;
    }

    @Override
    public PageOutput open(TaskSource taskSource, final Schema inputSchema,
            final Schema outputSchema, final PageOutput output)
    {
        final PluginTask task = taskSource.loadTask(PluginTask.class);
        final Column keyNameColumn = inputSchema.lookupColumn(task.getTargetKey());

        return new PageOutput() {
            private PageReader reader = new PageReader(inputSchema);
            private PageBuilder builder = new PageBuilder(Exec.getBufferAllocator(), outputSchema, output);
            private CrawlController controller = getController();

            @Override
            public void finish()
            {
                for (Object object : controller.getCrawlersLocalData()) {
                    CrawlStat crawlStat = (CrawlStat) object;
                    for (Map<String, Object> map : crawlStat.getPages()) {
                        for (Column outputColumn : outputSchema.getColumns()) {
                            final Object value = map.get(outputColumn.getName());
                            setValue(value, outputColumn);
                        }
                        builder.addRecord();
                    }
                }
                builder.finish();
            }

            @Override
            public void close()
            {
                builder.close();
            }

            @Override
            public void add(Page page)
            {
                reader.setPage(page);
                while (reader.nextRecord()) {
                    controller.addSeed(reader.getString(keyNameColumn));
                }
                Map<String, Object> customData = Maps.newHashMap();
                customData.put("output_prefix", task.getOutputPrefix());
                if (task.getShouldNotVisitPattern().isPresent()) {
                    customData.put("should_not_visit_pattern", task.getShouldNotVisitPattern().get());
                }
                controller.setCustomData(customData);
                controller.start(EmbulkCrawler.class, task.getNumberOfCrawlers());
            }

            /**
             * @param seeds
             */
            private void setValue(Object value, Column column)
            {
                if (value == null) {
                    builder.setNull(column);
                }
                else if (column.getType().equals(Types.STRING)) {
                    builder.setString(column, (String) value);
                }
                else if (column.getType().equals(Types.LONG)) {
                    builder.setLong(column, (Integer) value);
                }
            }

            /**
             * @param seeds
             * @return
             */
            private CrawlController getController()
            {
                CrawlConfig config = new CrawlConfig();
                String directoryPath = String.format(task.getCrawlStorageFolder(), UUID.randomUUID());
                File dir = new File(directoryPath);
                dir.mkdirs();

                config.setCrawlStorageFolder(directoryPath);
                if (task.getMaxDepthOfCrawling().isPresent()) {
                    config.setMaxDepthOfCrawling(task.getMaxDepthOfCrawling().get());
                }

                config.setMaxPagesToFetch(task.getMaxPagesToFetch());
                if (task.getPolitenessDelay().isPresent()) {
                    config.setPolitenessDelay(task.getPolitenessDelay().get());
                }
                if (task.getUserAgentString().isPresent()) {
                    config.setUserAgentString(task.getUserAgentString().get());
                }
                if (task.getSocketTimeout().isPresent()) {
                    config.setSocketTimeout(task.getSocketTimeout().get());
                }
                if (task.getConnectionTimeout().isPresent()) {
                    config.setConnectionTimeout(task.getConnectionTimeout().get());
                }

                PageFetcher pageFetcher = new PageFetcher(config);
                RobotstxtConfig robotstxtConfig = new RobotstxtConfig();
                RobotstxtServer robotstxtServer = new RobotstxtServer(robotstxtConfig, pageFetcher);
                CrawlController crawlController = null;
                try {
                    crawlController = new CrawlController(config, pageFetcher, robotstxtServer);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }

                return crawlController;
            }
        };
    }
}