/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.emr.fs.jfs.commit;

import com.aliyun.emr.fs.jfs.JindoBatchMetaHelper;
import com.aliyun.emr.fs.jfs.commit.AutoFlushingConcurrentLinkedQueue;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JindoBatchMetaCommitter
extends FileOutputCommitter {
    public static final String JINDO_FILE_COMMIT_PROTOCOL_MESSAGE = "jindo.file.commit.protocol.message.in.use";
    private static final String JINDO_BATCH_META_COMMITTER_CONF = "fs.jfs.committer.batch.meta.enabled";
    private static final boolean JINDO_BATCH_META_COMMITTER_DEFAULT = false;
    private static final String JOB_UUID_CONF = "spark.sql.sources.writeJobUUID";
    private static final String BATCH_META_TEMP_SUBDIR = "_jindo_batch";
    private static final Charset CHARSET = StandardCharsets.UTF_8;
    private static final Logger LOG = LoggerFactory.getLogger(JindoBatchMetaCommitter.class);
    private final Path outputPath;
    private final boolean skipCleanup;
    private final boolean ignoreCleanupFailures;
    private final ConcurrentHashMap<String, AutoFlushingConcurrentLinkedQueue> taskCreateFiles;

    public JindoBatchMetaCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
        this(outputPath, (JobContext)context);
    }

    public JindoBatchMetaCommitter(Path outputPath, JobContext context) throws IOException {
        super(outputPath, context);
        this.outputPath = outputPath;
        this.taskCreateFiles = new ConcurrentHashMap();
        Configuration conf = context.getConfiguration();
        this.skipCleanup = conf.getBoolean("mapreduce.fileoutputcommitter.cleanup.skipped", false);
        this.ignoreCleanupFailures = conf.getBoolean("mapreduce.fileoutputcommitter.cleanup-failures.ignored", false);
        LOG.info("Skip cleanup {} folders under output directory: {}, ignore cleanup failures: {}", new Object[]{BATCH_META_TEMP_SUBDIR, this.skipCleanup, this.ignoreCleanupFailures});
    }

    public static boolean useJindoBatchMetaCommitter(Path outputPath, Configuration conf) throws IOException {
        if (outputPath == null || conf == null) {
            return false;
        }
        if (!conf.getBoolean(JINDO_FILE_COMMIT_PROTOCOL_MESSAGE, false)) {
            return false;
        }
        if (!conf.getBoolean(JINDO_BATCH_META_COMMITTER_CONF, false)) {
            return false;
        }
        FileSystem fs = outputPath.getFileSystem(conf);
        return JindoBatchMetaHelper.isBatchMetaSupported(fs, outputPath);
    }

    public String getOptimizedPath(TaskAttemptContext context, @Nullable Path outputPath, String subDir, String filename, String defaultPath) throws IOException {
        Path finalPath = this.getFinalPath(outputPath, subDir, filename, defaultPath);
        String taskAttemptId = this.getTaskAttemptId(context);
        if (!this.taskCreateFiles.containsKey(taskAttemptId)) {
            this.taskCreateFiles.putIfAbsent(taskAttemptId, new AutoFlushingConcurrentLinkedQueue());
        }
        this.taskCreateFiles.get(taskAttemptId).add(finalPath.toString());
        FileSystem fs = finalPath.getFileSystem(context.getConfiguration());
        return JindoBatchMetaHelper.composeCreatePath(fs, finalPath).toString();
    }

    private Path getFinalPath(@Nullable Path outputPath, String subDir, String filename, String defaultPath) {
        Path dst = outputPath != null && !outputPath.toString().isEmpty() ? outputPath : (this.hasBMOutputPath() ? this.outputPath : new Path(defaultPath));
        Path finalPath = subDir == null || subDir.isEmpty() ? new Path(dst, filename) : new Path(new Path(dst, subDir), filename);
        return finalPath;
    }

    private boolean hasBMOutputPath() {
        return this.outputPath != null;
    }

    private String getTaskAttemptId(TaskAttemptContext context) {
        return String.valueOf(context.getTaskAttemptID());
    }

    private Path getBMTaskAttemptPath(TaskAttemptContext context) {
        String taskAttemptId = this.getTaskAttemptId(context);
        return new Path(this.getBMJobAttemptPath((JobContext)context), taskAttemptId);
    }

    private Path getBMJobAttemptPath(JobContext context) {
        if (context != null && this.hasBMOutputPath()) {
            String jobUuid = context.getConfiguration().get(JOB_UUID_CONF, null);
            Path jobUuidTempPath = jobUuid == null || jobUuid.isEmpty() ? this.getBMTempPath() : new Path(this.getBMTempPath(), jobUuid);
            return FileOutputCommitter.getJobAttemptPath((JobContext)context, (Path)jobUuidTempPath);
        }
        return null;
    }

    private Path getBMTempPath() {
        return new Path(this.outputPath, BATCH_META_TEMP_SUBDIR);
    }

    public void setupJob(JobContext context) throws IOException {
        if (this.hasBMOutputPath()) {
            Path jobAttemptPath = this.getBMJobAttemptPath(context);
            FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
            if (!fs.mkdirs(jobAttemptPath)) {
                LOG.error("Mkdirs failed to create " + jobAttemptPath);
            }
        } else {
            LOG.warn("Output Path is null in setupJob()");
        }
    }

    protected void commitJobInternal(JobContext context) throws IOException {
        if (this.hasBMOutputPath()) {
            LOG.info("Committing job ...");
            this.finalizeJob(context, false);
            if (this.skipCleanup) {
                LOG.info("Skip cleanup the {} folders under job's output directory in commitJob.", (Object)BATCH_META_TEMP_SUBDIR);
            } else {
                try {
                    this.cleanupJob(context);
                }
                catch (IOException e) {
                    if (this.ignoreCleanupFailures) {
                        LOG.error("Error in cleanup job, manually cleanup is needed.", (Throwable)e);
                    }
                    throw e;
                }
            }
            FileSystem fs = this.outputPath.getFileSystem(context.getConfiguration());
            if (context.getConfiguration().getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
                Path markerPath = new Path(this.outputPath, "_SUCCESS");
                if (this.isCommitJobRepeatable(context)) {
                    fs.create(markerPath, true).close();
                } else {
                    fs.create(markerPath).close();
                }
            }
        } else {
            LOG.warn("Output Path is null in commitJob()");
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Deprecated
    public void cleanupJob(JobContext context) throws IOException {
        if (this.hasBMOutputPath()) {
            LOG.info("Cleaning the batch meta temporary dir ...");
            Path batchMetaTempPath = this.getBMTempPath();
            FileSystem fs = batchMetaTempPath.getFileSystem(context.getConfiguration());
            try {
                fs.delete(batchMetaTempPath, true);
                return;
            }
            catch (FileNotFoundException e) {
                if (this.isCommitJobRepeatable(context)) return;
                throw e;
            }
        } else {
            LOG.warn("Output Path is null in cleanupJob()");
        }
    }

    public void abortJob(JobContext context, JobStatus.State state) throws IOException {
        if (this.hasBMOutputPath()) {
            LOG.info("Aborting job ...");
            this.finalizeJob(context, true);
        }
        this.cleanupJob(context);
    }

    private void finalizeJob(JobContext context, boolean abort) throws IOException {
        Path jobAttemptPath = this.getBMJobAttemptPath(context);
        FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
        for (FileStatus status : fs.listStatus(jobAttemptPath)) {
            Path committedTaskAttemptPath = fs.makeQualified(status.getPath());
            for (FileStatus files : fs.listStatus(committedTaskAttemptPath)) {
                ArrayList<String> pathWithStatus = new ArrayList<String>();
                try (BufferedReader br = new BufferedReader(new InputStreamReader((InputStream)fs.open(files.getPath()), CHARSET));){
                    String line;
                    while ((line = br.readLine()) != null) {
                        pathWithStatus.add(line);
                    }
                }
                try {
                    if (!fs.delete(files.getPath(), false)) {
                        LOG.warn("Temporary meta file may not be cleaned.");
                    }
                }
                catch (IOException ignore) {
                    LOG.warn("Temporary meta file may not be cleaned.");
                }
                JindoBatchMetaHelper.closeBatch(fs, pathWithStatus, abort);
            }
        }
        LOG.info("Job finalized.");
    }

    public void setupTask(TaskAttemptContext context) throws IOException {
        super.setupTask(context);
    }

    public void commitTask(TaskAttemptContext context) throws IOException {
        String attemptId = this.getTaskAttemptId(context);
        if (this.hasBMOutputPath()) {
            context.progress();
            AutoFlushingConcurrentLinkedQueue files = this.taskCreateFiles.getOrDefault(attemptId, null);
            if (files != null && files.nonEmpty()) {
                LOG.info("Committing task with attempt id {} ...", (Object)attemptId);
                Path taskAttemptPath = this.getBMTaskAttemptPath(context);
                files.foreach(BatchRecordFunction.create(taskAttemptPath, context.getConfiguration()));
                files.close();
            } else {
                LOG.warn("No Output found for " + attemptId);
            }
        } else {
            LOG.warn("Output Path is null in commitTask()");
        }
    }

    public void abortTask(TaskAttemptContext context) throws IOException {
        String attemptId = this.getTaskAttemptId(context);
        if (this.hasBMOutputPath()) {
            context.progress();
            AutoFlushingConcurrentLinkedQueue files = this.taskCreateFiles.getOrDefault(attemptId, null);
            if (files != null && files.nonEmpty()) {
                LOG.info("Aborting task with attempt id {} ...", (Object)attemptId);
                FileSystem fs = this.outputPath.getFileSystem(context.getConfiguration());
                files.foreach(new BatchCloseFunction(fs, true));
                files.close();
            } else {
                LOG.warn("No Output found for {} during task abortion.", (Object)attemptId);
            }
        } else {
            LOG.warn("Output Path is null in abortTask()");
        }
    }

    public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
        if (this.hasBMOutputPath()) {
            String attemptId = this.getTaskAttemptId(context);
            AutoFlushingConcurrentLinkedQueue files = this.taskCreateFiles.getOrDefault(attemptId, null);
            return files != null && files.nonEmpty();
        }
        return false;
    }

    @Deprecated
    public boolean isRecoverySupported() {
        return false;
    }

    public boolean isCommitJobRepeatable(JobContext context) throws IOException {
        return false;
    }

    public void recoverTask(TaskAttemptContext context) throws IOException {
        String attemptId = this.getTaskAttemptId(context);
        LOG.error("Task recovery is not supported. Task attempt id: " + attemptId);
        throw new IOException("Task recovery is not supported. Task attempt id: " + attemptId);
    }

    private static class BatchRecordFunction
    implements AutoFlushingConcurrentLinkedQueue.BatchFunction {
        private final FileSystem fs;
        private final Path dir;

        BatchRecordFunction(FileSystem fs, Path dir) {
            this.fs = fs;
            this.dir = dir;
        }

        @Override
        public void apply(long i, List<String> batch) throws IOException {
            if (batch == null || batch.isEmpty()) {
                return;
            }
            ArrayList<String> pathWithStatus = new ArrayList<String>();
            for (String path : batch) {
                Path encodedWithStatus = JindoBatchMetaHelper.composeClosePath(this.fs, new Path(path));
                pathWithStatus.add(encodedWithStatus.toString());
            }
            Path taskFile = new Path(this.dir, String.valueOf(i));
            try (FSDataOutputStream out = this.fs.create(taskFile, false);){
                for (String line : pathWithStatus) {
                    out.write(line.getBytes(CHARSET));
                    out.write(10);
                }
            }
        }

        static BatchRecordFunction create(Path dir, Configuration conf) throws IOException {
            FileSystem fs = dir.getFileSystem(conf);
            if (fs.exists(dir = fs.makeQualified(dir))) {
                throw new IOException("Cannot save status info to an existing directory.");
            }
            if (!fs.mkdirs(dir) || !fs.isDirectory(dir)) {
                throw new IOException("Cannot make base directory for saving status info.");
            }
            return new BatchRecordFunction(fs, dir);
        }
    }

    private static class BatchCloseFunction
    implements AutoFlushingConcurrentLinkedQueue.BatchFunction {
        private final FileSystem fs;
        private final boolean abort;

        private BatchCloseFunction(FileSystem fs, boolean abort) {
            this.fs = fs;
            this.abort = abort;
        }

        @Override
        public void apply(long i, List<String> batch) throws IOException {
            JindoBatchMetaHelper.closeBatch(this.fs, batch, this.abort);
        }
    }
}

