Reducer.java

127 lines | 5.554 kB Blame History Raw Download
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package br.ufrgs.inf.prosoft.cache.tools;

import br.ufrgs.inf.prosoft.cache.CacheEvent;
import br.ufrgs.inf.prosoft.cache.EventType;
import com.google.gson.Gson;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;

/**
 *
 * @author romulo
 */
public class Reducer {

    private static final Logger LOGGER = Logger.getLogger(Reducer.class.getName());

    public static void reduce(String eventsPath, String reducePath, String prefix) {
        try (Stream<String> lines = Files.lines(Paths.get(eventsPath))) {
            Gson gson = new Gson();
            Map<String, Integer> objectHasHits = new HashMap<>();
            lines.forEach(line -> {
                CacheEvent event = gson.fromJson(line, CacheEvent.class);
                if (event.getType().equals(EventType.HIT) || event.getType().equals(EventType.ADDITION) || event.getType().equals(EventType.MISS)) {
                    String identifier = event.getName() + "," + event.getIdentifier() + "," + event.getType().name().toLowerCase();
                    try {
                        objectHasHits.put(identifier, objectHasHits.get(identifier) + 1);
                    } catch (Exception ex) {
                        objectHasHits.put(identifier, 1);
                    }
                }
            });

            try (FileWriter fileWriter = new FileWriter(reducePath, true)) {
                objectHasHits.entrySet().stream().forEach(entry -> {
                    try {
                        fileWriter.write(prefix + entry.getKey() + "," + entry.getValue() + "\n");
                    } catch (IOException ex) {
                        LOGGER.log(Level.SEVERE, "IOException {0}", ex);
                    }
                });
            }
        } catch (IOException ex) {
            LOGGER.log(Level.SEVERE, "file not found {0}", eventsPath);
        }
    }

    public static void size(String eventsPath, String reducePath, String prefix) {
        try (Stream<String> lines = Files.lines(Paths.get(eventsPath))) {
            Gson gson = new Gson();
            Map<String, TreeMap<Long, Integer>> cacheHasSizeAlongTime = new HashMap<>();
            lines.forEach(line -> {
                CacheEvent event = gson.fromJson(line, CacheEvent.class);
                if (!(event.getType().equals(EventType.ADDITION) || event.getType().equals(EventType.INVALIDATION))) {
                    return;
                }
                cacheHasSizeAlongTime.compute(event.getName(), (key, previous) -> {
                    if (previous == null) {
                        previous = new TreeMap<>();
                    }
                    previous.compute(event.getTime() / 1000, (innerKey, innerPrevious) -> {
                        if (innerPrevious == null) {
                            innerPrevious = 0;
                        }
                        if (event.getType().equals(EventType.ADDITION)) {
                            return innerPrevious + 1;
                        }
                        return innerPrevious - 1;
                    });
                    return previous;
                });
            });

            cacheHasSizeAlongTime.forEach((name, sizeAlongTime) -> {
                Iterator<Map.Entry<Long, Integer>> iterator = sizeAlongTime.entrySet().iterator();
                int accumulated = iterator.next().getValue();
                while (iterator.hasNext()) {
                    Map.Entry<Long, Integer> entry = iterator.next();
                    accumulated += entry.getValue();
                    if (accumulated < 0) {
                        accumulated = 0;
                    }
                    entry.setValue(accumulated);
                }
            });

            final Long baseTime = cacheHasSizeAlongTime.values().stream().map(map -> map.firstKey()).min(Long::compare).orElse(0L);
            final Long adjustedMaxTime = cacheHasSizeAlongTime.values().stream().map(map -> map.lastKey()).max(Long::compare).orElse(0L) - baseTime;

            try (FileWriter fileWriter = new FileWriter(reducePath, true)) {
                cacheHasSizeAlongTime.forEach((name, sizeAlongTime) -> {
                    try {
                        Map.Entry<Long, Integer> pollFirstEntry = sizeAlongTime.pollFirstEntry();
                        for (long i = 0; i < pollFirstEntry.getKey() - baseTime; i++) {
                            fileWriter.write(prefix + name + "," + i + "," + 0 + "\n");
                        }
                        for (long i = pollFirstEntry.getKey() - baseTime; i <= adjustedMaxTime; i++) {
                            if (sizeAlongTime.size() > 1 && i > pollFirstEntry.getKey() - baseTime) {
                                pollFirstEntry = sizeAlongTime.pollFirstEntry();
                            }
                            fileWriter.write(prefix + name + "," + i + "," + pollFirstEntry.getValue() + "\n");
                        }
                    } catch (IOException ex) {
                        LOGGER.log(Level.SEVERE, "file not found {0}", reducePath);
                    }
                });
            }
        } catch (IOException ex) {
            LOGGER.log(Level.SEVERE, "file not found {0}", eventsPath);
        }
    }

}