package cascading.flow.stream;

import cascading.CascadingException;
import cascading.flow.FlowProcess;
import cascading.flow.SliceCounters;
import cascading.flow.StepCounters;
import cascading.tap.Tap;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import java.io.IOException;

/* loaded from: input_file:cascading/flow/stream/SinkStage.class */
public class SinkStage extends ElementStage<TupleEntry, Void> {
    private final Tap sink;
    private TupleEntryCollector collector;

    public SinkStage(FlowProcess flowProcess, Tap tap) {
        super(flowProcess, tap);
        this.sink = tap;
    }

    @Override // cascading.flow.stream.Duct
    public void bind(StreamGraph streamGraph) {
    }

    @Override // cascading.flow.stream.Duct
    public void prepare() {
        try {
            this.collector = this.sink.openForWrite(this.flowProcess, getOutput());
            if (this.sink.getSinkFields().isAll()) {
                this.collector.setFields(getIncomingScopes().get(0).getIncomingTapFields());
            }
        } catch (IOException e) {
            throw new DuctException("failed opening sink", e);
        }
    }

    protected Object getOutput() {
        return null;
    }

    @Override // cascading.flow.stream.Duct
    public void start(Duct duct) {
    }

    @Override // cascading.flow.stream.Stage, cascading.flow.stream.Duct
    public void receive(Duct duct, TupleEntry tupleEntry) {
        try {
            this.collector.add(tupleEntry);
            this.flowProcess.increment(StepCounters.Tuples_Written, 1L);
            this.flowProcess.increment(SliceCounters.Tuples_Written, 1L);
        } catch (CascadingException e) {
            handleException(e, tupleEntry);
        } catch (OutOfMemoryError e2) {
            handleReThrowableException("out of memory, try increasing task memory allocation", e2);
        } catch (Throwable th) {
            handleException(new DuctException("internal error: " + tupleEntry.getTuple().print(), th), tupleEntry);
        }
    }

    @Override // cascading.flow.stream.Duct
    public void complete(Duct duct) {
    }

    @Override // cascading.flow.stream.ElementStage, cascading.flow.stream.Duct
    public void cleanup() {
        try {
            if (this.collector != null) {
                this.collector.close();
            }
            this.collector = null;
            super.cleanup();
        } catch (Throwable th) {
            super.cleanup();
            throw th;
        }
    }
}
