/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.emr.fs.s3.commit.magic;

import com.aliyun.emr.fs.s3.commit.magic.PathCommitException;
import com.aliyun.emr.fs.s3.commit.magic.PendingSet;
import com.aliyun.emr.fs.s3.commit.magic.S3ClientUtils;
import com.aliyun.emr.fs.s3.commit.magic.S3DirectClient;
import com.aliyun.emr.fs.s3.commit.magic.SinglePendingCommit;
import com.aliyun.emr.fs.s3.commit.magic.SuccessData;
import com.aliyun.emr.fs.s3.commit.magic.Tasks;
import com.aliyun.emr.fs.s3.common.AbstractJindoShimsFileSystem;
import com.aliyun.emr.fs.s3.internal.s3native.S3NativeStore;
import com.aliyun.emr.shade.google_guava.base.Preconditions;
import com.aliyun.emr.shade.google_guava.util.concurrent.ThreadFactoryBuilder;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitOperations {
    private static final Logger LOG = LoggerFactory.getLogger(CommitOperations.class);
    private final AbstractJindoShimsFileSystem fs;
    private final S3DirectClient s3Client;
    public static final PathFilter PENDINGSET_FILTER = path -> path.toString().endsWith(".pendingset");
    public static final PathFilter PENDING_FILTER = path -> path.toString().endsWith(".pending");
    private ExecutorService threadPool;

    public CommitOperations(AbstractJindoShimsFileSystem fs) {
        Preconditions.checkArgument(fs != null, "null fs");
        Preconditions.checkArgument(fs.getStore() instanceof S3NativeStore, "should be s3 store");
        this.fs = fs;
        this.s3Client = new S3DirectClient(((S3NativeStore)fs.getStore()).getS3FileletSystem(), ((S3NativeStore)fs.getStore()).getS3Context());
    }

    public String toString() {
        return "CommitOperations{" + this.fs.getUri() + '}';
    }

    public void commitOrFail(SinglePendingCommit commit) throws IOException {
        this.commit(commit, commit.getFilename()).maybeRethrow();
    }

    public MaybeIOE commit(SinglePendingCommit commit, String origin) {
        MaybeIOE outcome;
        LOG.info("Committing single commit {}", (Object)commit);
        String destKey = "unknown destination";
        try {
            commit.validate();
            destKey = commit.getDestinationKey();
            long l = this.innerCommit(commit);
            LOG.debug("Successful commit of file length {}", (Object)l);
            outcome = MaybeIOE.NONE;
        }
        catch (IOException e) {
            String msg = String.format("Failed to commit upload against %s: %s", destKey, e);
            LOG.warn(msg, (Throwable)e);
            outcome = new MaybeIOE(e);
        }
        catch (Exception e) {
            String msg = String.format("Failed to commit upload against %s, described in %s: %s", destKey, origin, e);
            LOG.warn(msg, (Throwable)e);
            outcome = new MaybeIOE((IOException)((Object)new PathCommitException(origin, msg, e)));
        }
        return outcome;
    }

    private long innerCommit(SinglePendingCommit commit) throws IOException {
        this.s3Client.completeMultipartUpload(commit);
        return commit.getLength();
    }

    public List<LocatedFileStatus> locateAllSinglePendingCommits(Path pendingDir, boolean recursive) throws IOException {
        return S3ClientUtils.listAndFilter(this.fs, pendingDir, recursive, PENDING_FILTER);
    }

    public Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>> loadSinglePendingCommits(Path pendingDir, boolean recursive) throws IOException {
        List<LocatedFileStatus> statusList = this.locateAllSinglePendingCommits(pendingDir, recursive);
        PendingSet commits = new PendingSet(statusList.size());
        ArrayList<Pair> failures = new ArrayList<Pair>(1);
        for (LocatedFileStatus status : statusList) {
            try {
                commits.add(SinglePendingCommit.load(this.fs, status.getPath()));
            }
            catch (IOException e) {
                LOG.warn("Failed to load commit file {}", (Object)status.getPath(), (Object)e);
                failures.add(new Pair((Object)status, (Object)e));
            }
        }
        return new Pair((Object)commits, failures);
    }

    protected final synchronized ExecutorService buildThreadPool() {
        if (this.threadPool == null) {
            int numThreads = 8;
            this.threadPool = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("s3-open-download-pool-%d").build());
        }
        return this.threadPool;
    }

    public Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>> distCploadSinglePendingCommits(Path pendingDir, boolean recursive) throws IOException {
        List<LocatedFileStatus> statusList = this.locateAllSinglePendingCommits(pendingDir, recursive);
        PendingSet commits = new PendingSet(statusList.size());
        ArrayList failures = new ArrayList(1);
        Tasks.foreach(statusList).stopOnFailure().executeWith(this.buildThreadPool()).onFailure((status, exception) -> failures.add(new Pair(status, (Object)new IOException(exception)))).run(status -> commits.add(SinglePendingCommit.load(this.fs, status.getPath())));
        return new Pair((Object)commits, failures);
    }

    public IOException makeIOE(String key, Exception ex) {
        return ex instanceof IOException ? (IOException)ex : new PathCommitException(key, ex.toString(), ex);
    }

    public void abortSingleCommit(SinglePendingCommit commit) throws IOException {
        String destKey = commit.getDestinationKey();
        String origin = commit.getFilename() != null ? " defined in " + commit.getFilename() : "";
        String uploadId = commit.getUploadId();
        LOG.debug("Aborting commit to object {}{}", (Object)destKey, (Object)origin);
        this.abortMultipartCommit(destKey, uploadId);
    }

    public void abortMultipartCommit(String destKey, String uploadId) {
        try {
            this.s3Client.abortMultipartUpload(destKey, uploadId);
        }
        catch (IOException e) {
            LOG.info("Can not abort upload " + uploadId + ", don't worry, it should be already aborted at before.", (Object)e.getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public MaybeIOE abortAllSinglePendingCommits(Path pendingDir, boolean recursive) throws IOException {
        RemoteIterator<LocatedFileStatus> pendingFiles;
        Preconditions.checkArgument(pendingDir != null, "null pendingDir");
        LOG.debug("Aborting all pending commit filess under {} (recursive={}", (Object)pendingDir, (Object)recursive);
        try {
            pendingFiles = this.ls(pendingDir, recursive);
        }
        catch (FileNotFoundException fnfe) {
            LOG.info("No directory to abort {}", (Object)pendingDir);
            return MaybeIOE.NONE;
        }
        MaybeIOE outcome = MaybeIOE.NONE;
        if (!pendingFiles.hasNext()) {
            LOG.debug("No files to abort under {}", (Object)pendingDir);
        }
        while (pendingFiles.hasNext()) {
            Path pendingFile = ((LocatedFileStatus)pendingFiles.next()).getPath();
            if (!pendingFile.getName().endsWith(".pending")) continue;
            try {
                this.abortSingleCommit(SinglePendingCommit.load(this.fs, pendingFile));
            }
            catch (FileNotFoundException e) {
                LOG.debug("listed file already deleted: {}", (Object)pendingFile);
            }
            catch (IOException | IllegalArgumentException e) {
                if (!MaybeIOE.NONE.equals(outcome)) continue;
                outcome = new MaybeIOE(this.makeIOE(pendingFile.toString(), e));
            }
            finally {
                S3ClientUtils.deleteQuietly(this.fs, pendingFile, false);
            }
        }
        return outcome;
    }

    protected RemoteIterator<LocatedFileStatus> ls(Path path, boolean recursive) throws IOException {
        return this.fs.listFiles(path, recursive);
    }

    public List<Pair<String, String>> listPendingUploadsUnderPath(Path dest) throws IOException {
        return this.s3Client.listPendingUploadUnderPath(dest.toString());
    }

    public void deleteSuccessMarker(Path outputPath) throws IOException {
        this.fs.delete(new Path(outputPath, "_SUCCESS"), false);
    }

    public void createSuccessMarker(Path outputPath, SuccessData successData, boolean addMetrics) throws IOException {
        Preconditions.checkArgument(outputPath != null, "null outputPath");
        Configuration conf = this.fs.getConf();
        successData.addDiagnostic("fs.s3.committer.magic.enabled", conf.getTrimmed("fs.s3.committer.magic.enabled", "false"));
        Path markerPath = new Path(outputPath, "_SUCCESS");
        LOG.debug("Touching success marker for job {}: {}", (Object)markerPath, (Object)successData);
        successData.save(this.fs, markerPath, true);
    }

    public void revertCommit(SinglePendingCommit commit) throws IOException {
        LOG.warn("Revert {}", (Object)commit);
        this.fs.delete(commit.destinationPath(), false);
    }

    public static class MaybeIOE {
        private final IOException exception;
        public static final MaybeIOE NONE = new MaybeIOE(null);

        public MaybeIOE(IOException exception) {
            this.exception = exception;
        }

        public IOException getException() {
            return this.exception;
        }

        public boolean hasException() {
            return this.exception != null;
        }

        public void maybeRethrow() throws IOException {
            if (this.exception != null) {
                throw this.exception;
            }
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("MaybeIOE{");
            sb.append(this.hasException() ? this.exception : "");
            sb.append('}');
            return sb.toString();
        }

        public static MaybeIOE of(IOException ex) {
            return ex != null ? new MaybeIOE(ex) : NONE;
        }
    }
}

