package cascading.flow.hadoop;

import cascading.CascadingException;
import cascading.flow.FlowException;
import cascading.flow.FlowProcess;
import cascading.flow.FlowStep;
import cascading.flow.hadoop.planner.HadoopFlowStepJob;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.flow.planner.BaseFlowStep;
import cascading.flow.planner.FlowStepJob;
import cascading.property.ConfigDef;
import cascading.tap.Tap;
import cascading.tap.hadoop.io.MultiInputFormat;
import cascading.tap.hadoop.util.Hadoop18TapUtil;
import cascading.tap.hadoop.util.TempHfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.hadoop.TupleSerialization;
import cascading.tuple.hadoop.util.CoGroupingComparator;
import cascading.tuple.hadoop.util.CoGroupingPartitioner;
import cascading.tuple.hadoop.util.GroupingComparator;
import cascading.tuple.hadoop.util.GroupingPartitioner;
import cascading.tuple.hadoop.util.GroupingSortingComparator;
import cascading.tuple.hadoop.util.GroupingSortingPartitioner;
import cascading.tuple.hadoop.util.IndexTupleCoGroupingComparator;
import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator;
import cascading.tuple.hadoop.util.ReverseTupleComparator;
import cascading.tuple.hadoop.util.TupleComparator;
import cascading.tuple.io.IndexTuple;
import cascading.tuple.io.TuplePair;
import cascading.util.Util;
import cascading.util.Version;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;

/* loaded from: input_file:cascading/flow/hadoop/HadoopFlowStep.class */
public class HadoopFlowStep extends BaseFlowStep<JobConf> {
    private final Map<String, Tap> mapperTraps;
    private final Map<String, Tap> reducerTraps;

    public HadoopFlowStep(String str, int i) {
        super(str, i);
        this.mapperTraps = new HashMap();
        this.reducerTraps = new HashMap();
    }

    @Override // cascading.flow.planner.BaseFlowStep
    public JobConf getInitializedConfig(FlowProcess<JobConf> flowProcess, JobConf jobConf) {
        JobConf jobConf2 = jobConf == null ? new JobConf() : HadoopUtil.copyJobConf(jobConf);
        jobConf2.setBoolean("mapred.used.genericoptionsparser", true);
        jobConf2.setJobName(getStepDisplayName(jobConf2.getInt("cascading.step.display.id.truncate", Util.ID_LENGTH)));
        jobConf2.setOutputKeyClass(Tuple.class);
        jobConf2.setOutputValueClass(Tuple.class);
        jobConf2.setMapRunnerClass(FlowMapper.class);
        jobConf2.setReducerClass(FlowReducer.class);
        TupleSerialization.setSerializations(jobConf2);
        initFromSources(flowProcess, jobConf2);
        initFromSink(flowProcess, jobConf2);
        initFromTraps(flowProcess, jobConf2);
        initFromProcessConfigDef(jobConf2);
        if (getSink().getScheme().getNumSinkParts() != 0) {
            if (getGroup() != null) {
                jobConf2.setNumReduceTasks(getSink().getScheme().getNumSinkParts());
            } else {
                jobConf2.setNumMapTasks(getSink().getScheme().getNumSinkParts());
            }
        }
        jobConf2.setOutputKeyComparatorClass(TupleComparator.class);
        if (getGroup() == null) {
            jobConf2.setNumReduceTasks(0);
        } else {
            jobConf2.setMapOutputKeyClass(Tuple.class);
            jobConf2.setMapOutputValueClass(Tuple.class);
            jobConf2.setPartitionerClass(GroupingPartitioner.class);
            if (getGroup().isSortReversed()) {
                jobConf2.setOutputKeyComparatorClass(ReverseTupleComparator.class);
            }
            addComparators(jobConf2, "cascading.group.comparator", getGroup().getKeySelectors());
            if (getGroup().isGroupBy()) {
                addComparators(jobConf2, "cascading.sort.comparator", getGroup().getSortingSelectors());
            }
            if (!getGroup().isGroupBy()) {
                jobConf2.setPartitionerClass(CoGroupingPartitioner.class);
                jobConf2.setMapOutputKeyClass(IndexTuple.class);
                jobConf2.setMapOutputValueClass(IndexTuple.class);
                jobConf2.setOutputKeyComparatorClass(IndexTupleCoGroupingComparator.class);
                jobConf2.setOutputValueGroupingComparator(CoGroupingComparator.class);
            }
            if (getGroup().isSorted()) {
                jobConf2.setPartitionerClass(GroupingSortingPartitioner.class);
                jobConf2.setMapOutputKeyClass(TuplePair.class);
                if (getGroup().isSortReversed()) {
                    jobConf2.setOutputKeyComparatorClass(ReverseGroupingSortingComparator.class);
                } else {
                    jobConf2.setOutputKeyComparatorClass(GroupingSortingComparator.class);
                }
                jobConf2.setOutputValueGroupingComparator(GroupingComparator.class);
            }
        }
        String release = Version.getRelease();
        if (release != null) {
            jobConf2.set("cascading.version", release);
        }
        jobConf2.set(FlowStep.CASCADING_FLOW_STEP_ID, getID());
        jobConf2.set("cascading.flow.step.num", Integer.toString(getStepNum()));
        String pack = pack(this, jobConf2);
        if (isHadoopLocalMode(jobConf2) || pack.length() < 32767) {
            jobConf2.set("cascading.flow.step", pack);
        } else {
            jobConf2.set("cascading.flow.step.path", HadoopUtil.writeStateToDistCache(jobConf2, getID(), pack));
        }
        return jobConf2;
    }

    public boolean isHadoopLocalMode(JobConf jobConf) {
        return HadoopUtil.isLocal(jobConf);
    }

    private String pack(Object obj, JobConf jobConf) {
        try {
            return HadoopUtil.serializeBase64(obj, jobConf, true);
        } catch (IOException e) {
            throw new FlowException("unable to pack object: " + obj.getClass().getCanonicalName(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // cascading.flow.planner.BaseFlowStep
    public FlowStepJob<JobConf> createFlowStepJob(FlowProcess<JobConf> flowProcess, JobConf jobConf) {
        JobConf initializedConfig = getInitializedConfig(flowProcess, jobConf);
        setConf(initializedConfig);
        return new HadoopFlowStepJob(createClientState(flowProcess), this, initializedConfig);
    }

    @Override // cascading.flow.planner.BaseFlowStep
    public void clean(JobConf jobConf) {
        String str = jobConf.get("cascading.flow.step.path");
        if (str != null) {
            try {
                HadoopUtil.removeStateFromDistCache(jobConf, str);
            } catch (IOException e) {
                logWarn("unable to remove step state file: " + str, e);
            }
        }
        if (this.tempSink != null) {
            try {
                this.tempSink.deleteResource((Tap) jobConf);
            } catch (Exception e2) {
                logWarn("unable to remove temporary file: " + this.tempSink, e2);
            }
        }
        if (getSink().isTemporary() && (getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null)) {
            try {
                getSink().deleteResource((Tap) jobConf);
            } catch (Exception e3) {
                logWarn("unable to remove temporary file: " + getSink(), e3);
            }
        } else {
            cleanTapMetaData(jobConf, getSink());
        }
        Iterator<Tap> it = getMapperTraps().values().iterator();
        while (it.hasNext()) {
            cleanTapMetaData(jobConf, it.next());
        }
        Iterator<Tap> it2 = getReducerTraps().values().iterator();
        while (it2.hasNext()) {
            cleanTapMetaData(jobConf, it2.next());
        }
    }

    private void cleanTapMetaData(JobConf jobConf, Tap tap) {
        try {
            Hadoop18TapUtil.cleanupTapMetaData(jobConf, tap);
        } catch (IOException e) {
        }
    }

    private void addComparators(JobConf jobConf, String str, Map<String, Fields> map) {
        Iterator<Fields> it = map.values().iterator();
        if (it.hasNext()) {
            Fields next = it.next();
            if (next.hasComparators()) {
                jobConf.set(str, pack(next, jobConf));
                return;
            }
            Fields outValuesFields = getPreviousScopes(getGroup()).iterator().next().getOutValuesFields();
            if (outValuesFields.size() != 0) {
                jobConf.setInt(str + ".size", outValuesFields.size());
            }
        }
    }

    private void initFromTraps(FlowProcess<JobConf> flowProcess, JobConf jobConf, Map<String, Tap> map) {
        if (map.isEmpty()) {
            return;
        }
        JobConf copyJobConf = HadoopUtil.copyJobConf(jobConf);
        Iterator<Tap> it = map.values().iterator();
        while (it.hasNext()) {
            it.next().sinkConfInit(flowProcess, copyJobConf);
        }
    }

    protected void initFromSources(FlowProcess<JobConf> flowProcess, JobConf jobConf) {
        Set<Tap> uniqueStreamedSources = getUniqueStreamedSources();
        JobConf[] jobConfArr = new JobConf[uniqueStreamedSources.size()];
        int i = 0;
        for (Tap tap : uniqueStreamedSources) {
            if (tap.getIdentifier() == null) {
                throw new IllegalStateException("tap may not have null identifier: " + tap.toString());
            }
            jobConfArr[i] = flowProcess.copyConfig(jobConf);
            jobConfArr[i].set("cascading.step.source", Tap.id(tap));
            tap.sourceConfInit(flowProcess, jobConfArr[i]);
            i++;
        }
        for (Tap tap2 : getAllAccumulatedSources()) {
            JobConf copyConfig = flowProcess.copyConfig(jobConf);
            tap2.sourceConfInit(flowProcess, copyConfig);
            jobConf.set("cascading.step.accumulated.source.conf." + Tap.id(tap2), pack(flowProcess.diffConfigIntoMap(jobConf, copyConfig), jobConf));
            try {
                if (DistributedCache.getCacheFiles(copyConfig) != null) {
                    DistributedCache.setCacheFiles(DistributedCache.getCacheFiles(copyConfig), jobConf);
                }
            } catch (IOException e) {
                throw new CascadingException(e);
            }
        }
        MultiInputFormat.addInputFormat(jobConf, jobConfArr);
    }

    public Tap getTapForID(Set<Tap> set, String str) {
        for (Tap tap : set) {
            if (Tap.id(tap).equals(str)) {
                return tap;
            }
        }
        return null;
    }

    private void initFromProcessConfigDef(JobConf jobConf) {
        initConfFromProcessConfigDef(getSetterFor(jobConf));
    }

    private ConfigDef.Setter getSetterFor(final JobConf jobConf) {
        return new ConfigDef.Setter() { // from class: cascading.flow.hadoop.HadoopFlowStep.1
            @Override // cascading.property.ConfigDef.Setter
            public String set(String str, String str2) {
                String str3 = get(str);
                jobConf.set(str, str2);
                return str3;
            }

            @Override // cascading.property.ConfigDef.Setter
            public String update(String str, String str2) {
                String str3 = get(str);
                if (str3 == null) {
                    jobConf.set(str, str2);
                } else if (!str3.contains(str2)) {
                    jobConf.set(str, str3 + "," + str2);
                }
                return str3;
            }

            @Override // cascading.property.ConfigDef.Setter
            public String get(String str) {
                String str2 = jobConf.get(str);
                if (str2 == null || str2.isEmpty()) {
                    return null;
                }
                return str2;
            }
        };
    }

    private Set<Tap> getUniqueStreamedSources() {
        HashSet hashSet = new HashSet(this.sources.keySet());
        hashSet.removeAll(getAllAccumulatedSources());
        return hashSet;
    }

    protected void initFromSink(FlowProcess<JobConf> flowProcess, JobConf jobConf) {
        if (getSink() != null) {
            getSink().sinkConfInit(flowProcess, jobConf);
        }
        if (FileOutputFormat.getOutputPath(jobConf) == null) {
            this.tempSink = new TempHfs(jobConf, "tmp:/" + new Path(getSink().getIdentifier()).toUri().getPath(), true);
        }
        if (this.tempSink != null) {
            this.tempSink.sinkConfInit(flowProcess, jobConf);
        }
    }

    protected void initFromTraps(FlowProcess<JobConf> flowProcess, JobConf jobConf) {
        initFromTraps(flowProcess, jobConf, getMapperTraps());
        initFromTraps(flowProcess, jobConf, getReducerTraps());
    }

    @Override // cascading.flow.FlowStep
    public Set<Tap> getTraps() {
        HashSet hashSet = new HashSet();
        hashSet.addAll(this.mapperTraps.values());
        hashSet.addAll(this.reducerTraps.values());
        return Collections.unmodifiableSet(hashSet);
    }

    @Override // cascading.flow.FlowStep
    public Tap getTrap(String str) {
        Tap mapperTrap = getMapperTrap(str);
        if (mapperTrap == null) {
            mapperTrap = getReducerTrap(str);
        }
        return mapperTrap;
    }

    public Map<String, Tap> getMapperTraps() {
        return this.mapperTraps;
    }

    public Map<String, Tap> getReducerTraps() {
        return this.reducerTraps;
    }

    public Tap getMapperTrap(String str) {
        return getMapperTraps().get(str);
    }

    public Tap getReducerTrap(String str) {
        return getReducerTraps().get(str);
    }
}
