diff --git a/OpenCand.API/Repository/CandidatoRepository.cs b/OpenCand.API/Repository/CandidatoRepository.cs index 5983831..4b1a383 100644 --- a/OpenCand.API/Repository/CandidatoRepository.cs +++ b/OpenCand.API/Repository/CandidatoRepository.cs @@ -15,18 +15,22 @@ namespace OpenCand.Repository using (var connection = new NpgsqlConnection(ConnectionString)) { return (await connection.QueryAsync(@" - SELECT * + SELECT *, CASE - WHEN lower(nome) = lower(@query) THEN 0 -- Exact match (case-insensitive) - WHEN lower(nome) LIKE lower(@query) || '%' THEN 1 -- Starts with match (case-insensitive) - WHEN lower(nome) LIKE '%' || lower(@query) THEN 2 -- Contains anywhere match (case-insensitive) - WHEN cpf = @query THEN 0 -- Exact match for CPF - WHEN cpf LIKE @query || '%' THEN 1 -- Starts with match for CPF - WHEN cpf LIKE '%' || @query THEN 2 -- Contains anywhere match for CPF + WHEN lower(apelido) = lower(@query) THEN 0 -- apelido Exact match (case-insensitive) + WHEN lower(apelido) LIKE lower(@query) || '%' THEN 1 -- apelido Starts with match (case-insensitive) + WHEN lower(apelido) LIKE '%' || lower(@query) THEN 2 -- apelido Contains anywhere match (case-insensitive) + WHEN lower(nome) = lower(@query) THEN 0 -- nome Exact match (case-insensitive) + WHEN lower(nome) LIKE lower(@query) || '%' THEN 1 -- nome Starts with match (case-insensitive) + WHEN lower(nome) LIKE '%' || lower(@query) THEN 2 -- nome Contains anywhere match (case-insensitive) + WHEN cpf = @query THEN 0 -- cpf Exact match for CPF + WHEN cpf LIKE @query || '%' THEN 1 -- cpf Starts with match for CPF + WHEN cpf LIKE '%' || @query THEN 2 -- cpf Contains anywhere match for CPF ELSE 3 END AS name_rank FROM candidato - WHERE nome ILIKE '%' || @query || '%' OR + WHERE apelido ILIKE '%' || @query || '%' OR + nome ILIKE '%' || @query || '%' OR cpf ILIKE '%' || @query || '%' ORDER BY name_rank, length(nome) ASC diff --git a/OpenCand.ETL/Contracts/IParserService.cs b/OpenCand.ETL/Contracts/IParserService.cs new file mode 100644 index 0000000..1103ced --- /dev/null +++ b/OpenCand.ETL/Contracts/IParserService.cs @@ -0,0 +1,7 @@ +namespace OpenCand.ETL.Contracts +{ + public interface IParserService + { + Task ParseObject(CsvObj record); + } +} diff --git a/OpenCand.ETL/Parser/Services/CsvFixerService.cs b/OpenCand.ETL/Parser/CsvServices/CsvFixerService.cs similarity index 97% rename from OpenCand.ETL/Parser/Services/CsvFixerService.cs rename to OpenCand.ETL/Parser/CsvServices/CsvFixerService.cs index a932aef..13a178b 100644 --- a/OpenCand.ETL/Parser/Services/CsvFixerService.cs +++ b/OpenCand.ETL/Parser/CsvServices/CsvFixerService.cs @@ -1,15 +1,14 @@ using System.Text; using Microsoft.Extensions.Logging; -using OpenCand.Repository; namespace OpenCand.Parser.Services { public class CsvFixerService { - private readonly ILogger logger; + private readonly ILogger logger; public CsvFixerService( - ILogger logger) + ILogger logger) { this.logger = logger; } diff --git a/OpenCand.ETL/Parser/CsvServices/CsvParserService.cs b/OpenCand.ETL/Parser/CsvServices/CsvParserService.cs new file mode 100644 index 0000000..0118a62 --- /dev/null +++ b/OpenCand.ETL/Parser/CsvServices/CsvParserService.cs @@ -0,0 +1,150 @@ +using System.Globalization; +using CsvHelper; +using CsvHelper.Configuration; +using Microsoft.Extensions.Logging; +using OpenCand.ETL.Contracts; + +namespace OpenCand.Parser.Services +{ + public class CsvParserService : IDisposable + { + private readonly ILogger> logger; + private readonly CsvFixerService csvFixerService; + private readonly IParserService parserService; + + private readonly CsvConfiguration parserConfig; + + // Progress tracking fields + private long processedCount; + private long totalCount; + private string currentTask = string.Empty; + private Timer? progressTimer; + private readonly object progressLock = new object(); + + public CsvParserService( + ILogger> logger, + IParserService parserService, + CsvFixerService csvFixerService) + { + this.logger = logger; + this.csvFixerService = csvFixerService; + this.parserService = parserService; + + parserConfig = new CsvConfiguration(CultureInfo.InvariantCulture) + { + Delimiter = ";", + HasHeaderRecord = true, + PrepareHeaderForMatch = args => args.Header.ToLower(), + MissingFieldFound = null, + TrimOptions = TrimOptions.Trim, + Encoding = System.Text.Encoding.UTF8 + }; + } + + public async Task ParseFolderAsync(string filePath) + { + logger.LogInformation($"ParseFolderAsync - Starting to parse '{filePath}'"); + + filePath = csvFixerService.FixCsvFile(filePath); + + // Fix the CSV file if necessary + if (string.IsNullOrEmpty(filePath)) + { + logger.LogError($"ParseFolderAsync - Failed to fix CSV file at '{filePath}'"); + throw new InvalidOperationException($"Failed to fix CSV file at '{filePath}'"); + } + + try + { + using var reader = new StreamReader(filePath); + using var csv = new CsvReader(reader, parserConfig); + var po = new ParallelOptions + { + MaxDegreeOfParallelism = 40 + }; + + //csv.Context.RegisterClassMap>(); // optional for advanced mapping, not needed + var records = csv.GetRecords().ToList(); + + StartProgressTracking($"Parsing {nameof(CsvObj)} - {Path.GetFileName(filePath)}", records.Count); + + await Parallel.ForEachAsync(records, po, async (record, ct) => + { + try + { + await parserService.ParseObject(record); + + // Increment progress + IncrementProgress(); + } + catch (Exception ex) + { + logger.LogError(ex, $"ParseFolderAsync - Error processing:"); + IncrementProgress(); + } + }); + + StopProgressTracking(); + + logger.LogInformation($"ParseFolderAsync - Finished parsing from {filePath}"); + } + catch (Exception ex) + { + logger.LogError(ex, $"ParseFolderAsync - Error parsing file {filePath}"); + throw; + } + } + + // Progress tracking methods + private void StartProgressTracking(string taskName, long total) + { + lock (progressLock) + { + currentTask = taskName; + processedCount = 0; + totalCount = total; + + progressTimer?.Dispose(); + progressTimer = new Timer(LogProgress, null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5)); + + logger.LogInformation("Progress - Task: {Task}, Total: {Total}", currentTask, totalCount); + } + } + + private void IncrementProgress() + { + Interlocked.Increment(ref processedCount); + } + + private void StopProgressTracking() + { + lock (progressLock) + { + progressTimer?.Dispose(); + progressTimer = null; + + // Log final progress + var percentage = totalCount > 0 ? (double)processedCount / totalCount * 100 : 0; + logger.LogInformation("Progress - Task: {Task}, Processed: {Processed}, Total: {Total}, Progress: {Percentage:F2}%", + currentTask, processedCount, totalCount, percentage); + } + } + + private void LogProgress(object? state) + { + lock (progressLock) + { + if (string.IsNullOrEmpty(currentTask)) return; + + var percentage = totalCount > 0 ? (double)processedCount / totalCount * 100 : 0; + logger.LogInformation("Progress - Task: {Task}, Processed: {Processed}, Total: {Total}, Progress: {Percentage:F2}%", + currentTask, processedCount, totalCount, percentage); + } + } + + public void Dispose() + { + progressTimer?.Dispose(); + } + } +} diff --git a/OpenCand.ETL/Parser/ParserManager.cs b/OpenCand.ETL/Parser/ParserManager.cs index 839166a..1b20fc7 100644 --- a/OpenCand.ETL/Parser/ParserManager.cs +++ b/OpenCand.ETL/Parser/ParserManager.cs @@ -2,116 +2,81 @@ using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using OpenCand.Config; +using OpenCand.Parser.Models; using OpenCand.Parser.Services; namespace OpenCand.Parser { public class ParserManager { - private readonly CsvParserService csvParserService; private readonly ILogger logger; private readonly CsvSettings csvSettings; - private readonly IConfiguration configuration; + + private readonly CsvParserService candidatoParserService; + private readonly CsvParserService bemCandidatoParserService; + private readonly CsvParserService redeSocialParserService; + + private readonly string BasePath; public ParserManager( - CsvParserService csvParserService, IOptions csvSettings, ILogger logger, - IConfiguration configuration) + IConfiguration configuration, + CsvParserService candidatoParserService, + CsvParserService bemCandidatoParserService, + CsvParserService redeSocialParserService) { - this.csvParserService = csvParserService; this.logger = logger; this.csvSettings = csvSettings.Value; - this.configuration = configuration; + + this.candidatoParserService = candidatoParserService; + this.bemCandidatoParserService = bemCandidatoParserService; + this.redeSocialParserService = redeSocialParserService; + + // Get the base path from either SampleFolder in csvSettings or the BasePath in configuration + BasePath = configuration.GetValue("BasePath") ?? string.Empty; + if (string.IsNullOrEmpty(BasePath)) + { + throw new Exception("ParseFullDataAsync - BasePath is not configured in appsettings.json or CsvSettings.SampleFolder"); + } } public async Task ParseFullDataAsync() { logger.LogInformation("ParseFullDataAsync - Starting parsing"); + logger.LogInformation("ParseFullDataAsync - Processing will happen with BasePath: {BasePath}", BasePath); - // Get the base path from either SampleFolder in csvSettings or the BasePath in configuration - var basePath = configuration.GetValue("BasePath"); + var candidatosDirectory = Path.Combine(BasePath, csvSettings.CandidatosFolder); + var bensCandidatosDirectory = Path.Combine(BasePath, csvSettings.BensCandidatosFolder); + var redesSociaisDirectory = Path.Combine(BasePath, csvSettings.RedesSociaisFolder); - if (string.IsNullOrEmpty(basePath)) + //await ParseFolder(candidatosDirectory, candidatoParserService); + await ParseFolder(bensCandidatosDirectory, bemCandidatoParserService); + await ParseFolder(redesSociaisDirectory, redeSocialParserService); + + logger.LogInformation("ParseFullDataAsync - Full data parsing completed!"); + } + + private async Task ParseFolder(string csvDirectory, CsvParserService csvParserService) + { + if (Directory.Exists(csvDirectory)) { - logger.LogError("ParseFullDataAsync - BasePath is not configured in appsettings.json or CsvSettings.SampleFolder"); - return; + foreach (var filePath in Directory.GetFiles(csvDirectory, "*.csv")) + { + // Check if filePath contains "fix_" prefix + if (filePath.Contains("fix_")) + { + logger.LogInformation("ParseFolder - Skipping already fixed file: {FilePath}", filePath); + continue; + } + + logger.LogInformation("ParseFolder - Parsing data from {FilePath}", filePath); + await csvParserService.ParseFolderAsync(filePath); + } } - - logger.LogInformation("ParseFullDataAsync - Processing will happen with BasePath: {BasePath}", basePath); - - try + else { - var candidatosDirectory = Path.Combine(basePath, csvSettings.CandidatosFolder); - var bensCandidatosDirectory = Path.Combine(basePath, csvSettings.BensCandidatosFolder); - var redesSociaisDirectory = Path.Combine(basePath, csvSettings.RedesSociaisFolder); - - if (Directory.Exists(candidatosDirectory)) - { - foreach (var filePath in Directory.GetFiles(candidatosDirectory, "*.csv")) - { - // Check if filePath contains "fix_" prefix - if (filePath.Contains("fix_")) - { - logger.LogInformation("ParseFullDataAsync - Skipping already fixed file: {FilePath}", filePath); - continue; - } - - logger.LogInformation("ParseFullDataAsync - Parsing candidatos data from {FilePath}", filePath); - await csvParserService.ParseCandidatosAsync(filePath); - } - } - else - { - logger.LogWarning("ParseFullDataAsync - 'Candidatos' directory not found at {Directory}", candidatosDirectory); - } - - if (Directory.Exists(bensCandidatosDirectory)) - { - foreach (var filePath in Directory.GetFiles(bensCandidatosDirectory, "*.csv")) - { - // Check if filePath contains "fix_" prefix - if (filePath.Contains("fix_")) - { - logger.LogInformation("ParseFullDataAsync - Skipping already fixed file: {FilePath}", filePath); - continue; - } - - logger.LogInformation("ParseFullDataAsync - Parsing bens candidatos data from {FilePath}", filePath); - await csvParserService.ParseBensCandidatosAsync(filePath); - } - } - else - { - logger.LogWarning("ParseFullDataAsync - 'Bens candidatos' directory not found at {Directory}", bensCandidatosDirectory); - } - - if (Directory.Exists(redesSociaisDirectory)) - { - foreach (var filePath in Directory.GetFiles(redesSociaisDirectory, "*.csv")) - { - // Check if filePath contains "fix_" prefix - if (filePath.Contains("fix_")) - { - logger.LogInformation("ParseFullDataAsync - Skipping already fixed file: {FilePath}", filePath); - continue; - } - - logger.LogInformation("ParseFullDataAsync - Parsing redes sociais data from {FilePath}", filePath); - await csvParserService.ParseRedeSocialAsync(filePath); - } - } - else - { - logger.LogWarning("ParseFullDataAsync - 'Redes sociais' directory not found at {Directory}", redesSociaisDirectory); - } - - logger.LogInformation("ParseFullDataAsync - Full data parsing completed!"); - } - catch (Exception ex) - { - logger.LogError(ex, "ParseFullDataAsync - Error parsing full data set"); - throw; + logger.LogWarning("ParseFolder - Directory not found at {Directory}", csvDirectory); } } } diff --git a/OpenCand.ETL/Parser/ParserServices/BemCandidatoParserService.cs b/OpenCand.ETL/Parser/ParserServices/BemCandidatoParserService.cs new file mode 100644 index 0000000..02cf6a3 --- /dev/null +++ b/OpenCand.ETL/Parser/ParserServices/BemCandidatoParserService.cs @@ -0,0 +1,52 @@ +using Microsoft.Extensions.Logging; +using OpenCand.Core.Models; +using System.Globalization; +using OpenCand.ETL.Contracts; +using OpenCand.Parser.Models; +using OpenCand.Services; +using OpenCand.Parser.Services; + +namespace OpenCand.ETL.Parser.ParserServices +{ + public class BemCandidatoParserService : IParserService + { + private readonly ILogger logger; + private readonly BemCandidatoService bemCandidatoService; + + public BemCandidatoParserService( + ILogger logger, + BemCandidatoService bemCandidatoService) + { + this.logger = logger; + this.bemCandidatoService = bemCandidatoService; + } + + public async Task ParseObject(BemCandidatoCSV record) + { + // Parse decimal value + decimal? valor = null; + if (!string.IsNullOrEmpty(record.ValorBemCandidato)) + { + string normalizedValue = record.ValorBemCandidato.Replace(".", "").Replace(",", "."); + if (decimal.TryParse(normalizedValue, NumberStyles.Any, CultureInfo.InvariantCulture, out var parsedValue)) + { + valor = parsedValue; + } + } + + var bemCandidato = new BemCandidato + { + SqCandidato = record.SequencialCandidato, + Ano = record.AnoEleicao, + SiglaUF = record.SiglaUF, + NomeUE = record.NomeUE, + OrdemBem = record.NumeroOrdemBemCandidato, + TipoBem = record.DescricaoTipoBemCandidato, + Descricao = record.DescricaoBemCandidato, + Valor = valor + }; + + await bemCandidatoService.AddBemCandidatoAsync(bemCandidato); + } + } +} diff --git a/OpenCand.ETL/Parser/ParserServices/CandidatoParserService.cs b/OpenCand.ETL/Parser/ParserServices/CandidatoParserService.cs new file mode 100644 index 0000000..64e7519 --- /dev/null +++ b/OpenCand.ETL/Parser/ParserServices/CandidatoParserService.cs @@ -0,0 +1,100 @@ +using System.Globalization; +using Microsoft.Extensions.Logging; +using OpenCand.Core.Models; +using OpenCand.ETL.Contracts; +using OpenCand.Parser.Models; +using OpenCand.Services; + +namespace OpenCand.ETL.Parser.ParserServices +{ + public class CandidatoParserService : IParserService + { + private readonly ILogger logger; + private readonly CandidatoService candidatoService; + + public CandidatoParserService( + ILogger logger, + CandidatoService candidatoService) + { + this.logger = logger; + this.candidatoService = candidatoService; + } + + public async Task ParseObject(CandidatoCSV record) + { + if (string.IsNullOrWhiteSpace(record.CPFCandidato) || record.CPFCandidato.Length <= 3) + { + record.CPFCandidato = null; // Handle null/empty/whitespace CPF + } + + if (record.NomeCandidato == "NÃO DIVULGÁVEL" || + string.IsNullOrEmpty(record.NomeCandidato) || + record.NomeCandidato == "#NULO") + { + logger.LogCritical($"ParseCandidatosAsync - Candidate with id {record.SequencialCandidato} with invalid name, skipping..."); + return; // Skip candidates with invalid name + } + + if (string.IsNullOrWhiteSpace(record.Apelido) || + record.Apelido.Contains("#NUL") || + record.Apelido.Contains("NULO#") || + record.Apelido.Contains("#NE")) + { + record.Apelido = null; + } + + var candidato = new Candidato + { + Cpf = record.CPFCandidato, + SqCandidato = record.SequencialCandidato, + Nome = record.NomeCandidato, + Apelido = record.Apelido, + Email = record.Email.Contains("@") ? record.Email : null, + Sexo = record.Genero, + EstadoCivil = record.EstadoCivil, + Escolaridade = record.GrauInstrucao, + Ocupacao = record.Ocupacao, + Eleicoes = new List() + { + new CandidatoMapping + { + Cpf = record.CPFCandidato, + Nome = record.NomeCandidato, + Apelido = record.Apelido, + SqCandidato = record.SequencialCandidato, + Ano = record.AnoEleicao, + TipoEleicao = record.TipoAbrangencia, + NomeUE = record.NomeUE, + SiglaUF = record.SiglaUF, + Cargo = record.DescricaoCargo, + NrCandidato = record.NumeroCandidato, + Resultado = record.SituacaoTurno, + Partido = new Partido + { + Sigla = record.SiglaPartido, + Nome = record.NomePartido, + Numero = record.NumeroPartido, + } + } + } + }; + + if (!string.IsNullOrEmpty(record.DataNascimento) && + record.DataNascimento != "#NULO") + { + if (DateTime.TryParseExact(record.DataNascimento, "dd/MM/yyyy", + CultureInfo.InvariantCulture, DateTimeStyles.AssumeLocal, out var dataNascimento)) + { + // Convert to UTC DateTime to work with PostgreSQL timestamp with time zone + candidato.DataNascimento = DateTime.SpecifyKind(dataNascimento, DateTimeKind.Utc); + } + } + else + { + candidato.DataNascimento = null; // Handle null/empty/whitespace date + } + + await candidatoService.AddCandidatoAsync(candidato); + } + } +} diff --git a/OpenCand.ETL/Parser/ParserServices/RedeSocialParserService.cs b/OpenCand.ETL/Parser/ParserServices/RedeSocialParserService.cs new file mode 100644 index 0000000..a1d0a33 --- /dev/null +++ b/OpenCand.ETL/Parser/ParserServices/RedeSocialParserService.cs @@ -0,0 +1,37 @@ +using Microsoft.Extensions.Logging; +using OpenCand.Core.Models; +using OpenCand.ETL.Contracts; +using OpenCand.Parser.Models; +using OpenCand.Parser.Services; +using OpenCand.Services; + +namespace OpenCand.ETL.Parser.ParserServices +{ + public class RedeSocialParserService : IParserService + { + private readonly ILogger logger; + private readonly RedeSocialService redeSocialService; + + public RedeSocialParserService( + ILogger logger, + RedeSocialService redeSocialService) + { + this.logger = logger; + this.redeSocialService = redeSocialService; + } + + public async Task ParseObject(RedeSocialCSV record) + { + var redeSocial = new RedeSocial + { + SqCandidato = record.SequencialCandidato, + Ano = record.DataEleicao, + SiglaUF = record.SiglaUF, + Link = record.Url, + Rede = string.Empty + }; + + await redeSocialService.AddRedeSocialAsync(redeSocial); + } + } +} diff --git a/OpenCand.ETL/Parser/Services/CsvParserService.cs b/OpenCand.ETL/Parser/Services/CsvParserService.cs deleted file mode 100644 index da28db1..0000000 --- a/OpenCand.ETL/Parser/Services/CsvParserService.cs +++ /dev/null @@ -1,276 +0,0 @@ -using System.Globalization; -using CsvHelper; -using CsvHelper.Configuration; -using Microsoft.Extensions.Logging; -using OpenCand.Core.Models; -using OpenCand.ETL.Parser.CsvMappers; -using OpenCand.Parser.CsvMappers; -using OpenCand.Parser.Models; -using OpenCand.Services; - -namespace OpenCand.Parser.Services -{ - public class CsvParserService - { - private readonly ILogger logger; - private readonly CandidatoService candidatoService; - private readonly BemCandidatoService bemCandidatoService; - private readonly RedeSocialService redeSocialService; - private readonly CsvFixerService csvFixerService; - - private readonly CsvConfiguration parserConfig; - - public CsvParserService( - ILogger logger, - CandidatoService candidatoService, - BemCandidatoService bemCandidatoService, - RedeSocialService redeSocialService, - CsvFixerService csvFixerService) - { - this.logger = logger; - this.candidatoService = candidatoService; - this.bemCandidatoService = bemCandidatoService; - this.redeSocialService = redeSocialService; - this.csvFixerService = csvFixerService; - - parserConfig = new CsvConfiguration(CultureInfo.InvariantCulture) - { - Delimiter = ";", - HasHeaderRecord = true, - PrepareHeaderForMatch = args => args.Header.ToLower(), - MissingFieldFound = null, - TrimOptions = TrimOptions.Trim, - Encoding = System.Text.Encoding.UTF8 - }; - } - - public async Task ParseCandidatosAsync(string filePath) - { - logger.LogInformation($"ParseCandidatosAsync - Starting to parse 'candidatos' from '{filePath}'"); - - filePath = csvFixerService.FixCsvFile(filePath); - - // Fix the CSV file if necessary - if (string.IsNullOrEmpty(filePath)) - { - logger.LogError($"ParseCandidatosAsync - Failed to fix CSV file at '{filePath}'"); - throw new InvalidOperationException($"Failed to fix CSV file at '{filePath}'"); - } - - try - { - using var reader = new StreamReader(filePath); - using var csv = new CsvReader(reader, parserConfig); - var po = new ParallelOptions - { - MaxDegreeOfParallelism = 25 - }; - - csv.Context.RegisterClassMap(); - - var records = csv.GetRecords(); - - await Parallel.ForEachAsync(records, po, async (record, ct) => - { - try - { - if (string.IsNullOrWhiteSpace(record.CPFCandidato) || record.CPFCandidato.Length <= 3) - { - record.CPFCandidato = null; // Handle null/empty/whitespace CPF - } - - if (record.NomeCandidato == "NÃO DIVULGÁVEL" || - string.IsNullOrEmpty(record.NomeCandidato) || - record.NomeCandidato == "#NULO") - { - logger.LogCritical($"ParseCandidatosAsync - Candidate with id {record.SequencialCandidato} with invalid name, skipping..."); - return; // Skip candidates with invalid name - } - - var candidato = new Candidato - { - Cpf = record.CPFCandidato, - SqCandidato = record.SequencialCandidato, - Nome = record.NomeCandidato, - Apelido = record.Apelido, - Email = record.Email.Contains("@") ? record.Email : null, - Sexo = record.Genero, - EstadoCivil = record.EstadoCivil, - Escolaridade = record.GrauInstrucao, - Ocupacao = record.Ocupacao, - Eleicoes = new List() - { - new CandidatoMapping - { - Cpf = record.CPFCandidato, - Nome = record.NomeCandidato, - Apelido = record.Apelido, - SqCandidato = record.SequencialCandidato, - Ano = record.AnoEleicao, - TipoEleicao = record.TipoAbrangencia, - NomeUE = record.NomeUE, - SiglaUF = record.SiglaUF, - Cargo = record.DescricaoCargo, - NrCandidato = record.NumeroCandidato, - Resultado = record.SituacaoTurno, - Partido = new Partido - { - Sigla = record.SiglaPartido, - Nome = record.NomePartido, - Numero = record.NumeroPartido, - } - } - } - }; - - if (!string.IsNullOrEmpty(record.DataNascimento) && - record.DataNascimento != "#NULO") - { - if (DateTime.TryParseExact(record.DataNascimento, "dd/MM/yyyy", - CultureInfo.InvariantCulture, DateTimeStyles.AssumeLocal, out var dataNascimento)) - { - // Convert to UTC DateTime to work with PostgreSQL timestamp with time zone - candidato.DataNascimento = DateTime.SpecifyKind(dataNascimento, DateTimeKind.Utc); - } - } - else - { - candidato.DataNascimento = null; // Handle null/empty/whitespace date - } - - await candidatoService.AddCandidatoAsync(candidato); - } - catch (Exception ex) - { - logger.LogError(ex, "ParseCandidatosAsync - Error processing candidate with id {CandidatoId}", record.SequencialCandidato); - } - }); - - logger.LogInformation("ParseCandidatosAsync - Finished parsing candidatos from {FilePath}", filePath); - } - catch (Exception ex) - { - logger.LogError(ex, "ParseCandidatosAsync - Error parsing candidatos file {FilePath}", filePath); - throw; - } - } - - public async Task ParseBensCandidatosAsync(string filePath) - { - logger.LogInformation($"ParseBensCandidatosAsync - Starting to parse bens candidatos from '{filePath}'"); - - filePath = csvFixerService.FixCsvFile(filePath); - - // Fix the CSV file if necessary - if (string.IsNullOrEmpty(filePath)) - { - logger.LogError($"ParseBensCandidatosAsync - Failed to fix CSV file at '{filePath}'"); - throw new InvalidOperationException($"Failed to fix CSV file at '{filePath}'"); - } - - try - { - using var reader = new StreamReader(filePath); - using var csv = new CsvReader(reader, parserConfig); - csv.Context.RegisterClassMap(); - - var records = csv.GetRecords(); - - foreach (var record in records) - { - try - { - // Parse decimal value - decimal? valor = null; - if (!string.IsNullOrEmpty(record.ValorBemCandidato)) - { - string normalizedValue = record.ValorBemCandidato.Replace(".", "").Replace(",", "."); - if (decimal.TryParse(normalizedValue, NumberStyles.Any, CultureInfo.InvariantCulture, out var parsedValue)) - { - valor = parsedValue; - } - } - - var bemCandidato = new BemCandidato - { - SqCandidato = record.SequencialCandidato, - Ano = record.AnoEleicao, - SiglaUF = record.SiglaUF, - NomeUE = record.NomeUE, - OrdemBem = record.NumeroOrdemBemCandidato, - TipoBem = record.DescricaoTipoBemCandidato, - Descricao = record.DescricaoBemCandidato, - Valor = valor - }; - - await bemCandidatoService.AddBemCandidatoAsync(bemCandidato); - } - catch (Exception ex) - { - logger.LogError(ex, "ParseBensCandidatosAsync - Error processing bem candidato with id {CandidatoId} and ordem {OrdemBem}", - record.SequencialCandidato, record.NumeroOrdemBemCandidato); - } - } - - logger.LogInformation("ParseBensCandidatosAsync - Finished parsing bens candidatos from {FilePath}", filePath); - } - catch (Exception ex) - { - logger.LogError(ex, "ParseBensCandidatosAsync - Error parsing bens candidatos file {FilePath}", filePath); - throw; - } - } - - public async Task ParseRedeSocialAsync(string filePath) - { - logger.LogInformation($"ParseRedeSocialAsync - Starting to parse redes sociais from '{filePath}'"); - - filePath = csvFixerService.FixCsvFile(filePath); - - // Fix the CSV file if necessary - if (string.IsNullOrEmpty(filePath)) - { - logger.LogError($"ParseRedeSocialAsync - Failed to fix CSV file at '{filePath}'"); - throw new InvalidOperationException($"Failed to fix CSV file at '{filePath}'"); - } - - try - { - using var reader = new StreamReader(filePath); - using var csv = new CsvReader(reader, parserConfig); - csv.Context.RegisterClassMap(); - - var records = csv.GetRecords(); - - foreach (var record in records) - { - try - { - var redeSocial = new RedeSocial - { - SqCandidato = record.SequencialCandidato, - Ano = record.DataEleicao, - SiglaUF = record.SiglaUF, - Link = record.Url, - Rede = string.Empty - }; - - await redeSocialService.AddRedeSocialAsync(redeSocial); - } - catch (Exception ex) - { - logger.LogError(ex, "ParseRedeSocialAsync - Error processing redes sociais with id {SequencialCandidato} and link {Url}", - record.SequencialCandidato, record.Url); - } - } - - logger.LogInformation("ParseRedeSocialAsync - Finished parsing redes sociais from {FilePath}", filePath); - } - catch (Exception ex) - { - logger.LogError(ex, "ParseRedeSocialAsync - Error parsing redes sociais file {FilePath}", filePath); - throw; - } - } - } -} diff --git a/OpenCand.ETL/Program.cs b/OpenCand.ETL/Program.cs index c24f25e..8f54b2d 100644 --- a/OpenCand.ETL/Program.cs +++ b/OpenCand.ETL/Program.cs @@ -3,8 +3,11 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using OpenCand.Config; +using OpenCand.ETL.Contracts; +using OpenCand.ETL.Parser.ParserServices; using OpenCand.ETL.Repository; using OpenCand.Parser; +using OpenCand.Parser.Models; using OpenCand.Parser.Services; using OpenCand.Repository; using OpenCand.Services; @@ -52,9 +55,14 @@ namespace OpenCand { // Configuration services.Configure(hostContext.Configuration.GetSection("CsvSettings")); - + // Services - services.AddTransient(); + services.AddTransient, CandidatoParserService>(); + services.AddTransient, BemCandidatoParserService>(); + services.AddTransient, RedeSocialParserService>(); + services.AddTransient>(); + services.AddTransient>(); + services.AddTransient>(); services.AddTransient(); services.AddTransient(); services.AddTransient(); diff --git a/OpenCand.ETL/Repository/CandidatoRepository.cs b/OpenCand.ETL/Repository/CandidatoRepository.cs index bc12a02..7919fc4 100644 --- a/OpenCand.ETL/Repository/CandidatoRepository.cs +++ b/OpenCand.ETL/Repository/CandidatoRepository.cs @@ -50,7 +50,8 @@ namespace OpenCand.Repository { await connection.ExecuteAsync(@" INSERT INTO candidato_mapping (idcandidato, cpf, nome, apelido, sqcandidato, ano, tipoeleicao, siglauf, nomeue, cargo, nrcandidato, sgpartido, resultado) - VALUES (@idcandidato, @cpf, @nome, @apelido, @sqcandidato, @ano, @tipoeleicao, @siglauf, @nomeue, @cargo, @nrcandidato, @sgpartido, @resultado);", + VALUES (@idcandidato, @cpf, @nome, @apelido, @sqcandidato, @ano, @tipoeleicao, @siglauf, @nomeue, @cargo, @nrcandidato, @sgpartido, @resultado) + ON CONFLICT DO NOTHING;", new { idcandidato = candidatoMapping.IdCandidato, @@ -70,27 +71,45 @@ namespace OpenCand.Repository } } - public async Task?> GetCandidatoMappingByCpf(string cpf) + public async Task GetCandidatoByCpf(string cpf) { using (var connection = new NpgsqlConnection(ConnectionString)) { var query = @" - SELECT idcandidato, cpf, nome, sqcandidato, ano, tipoeleicao, siglauf, nomeue, cargo, nrcandidato, resultado - FROM candidato_mapping + SELECT * + FROM candidato WHERE cpf = @cpf"; - return (await connection.QueryAsync(query, new { cpf })).AsList(); + return await connection.QueryFirstOrDefaultAsync(query, new { cpf }); } } - public async Task?> GetCandidatoMappingByNome(string nome, string sqcandidato, int ano, string siglauf, string nomeue, string nrcandidato) + public async Task GetCandidatoByNome(string nome, DateTime datanascimento) { using (var connection = new NpgsqlConnection(ConnectionString)) { var query = @" - SELECT idcandidato, cpf, nome, sqcandidato, ano, tipoeleicao, siglauf, nomeue, cargo, nrcandidato, resultado + SELECT * + FROM candidato + WHERE nome = @nome AND datanascimento = @datanascimento"; + return await connection.QueryFirstOrDefaultAsync(query, new { nome, datanascimento }); + } + } + + public async Task GetCandidatoMappingByDetails(string nome, int ano, string cargo, string siglauf, string nomeue, string nrcandidato, string resultado) + { + using (var connection = new NpgsqlConnection(ConnectionString)) + { + var query = @" + SELECT * FROM candidato_mapping - WHERE nome = @nome AND sqcandidato = @sqcandidato AND ano = @ano AND siglauf = @siglauf AND nomeue = @nomeue AND nrcandidato = @nrcandidato"; - return (await connection.QueryAsync(query, new { nome, sqcandidato, ano, siglauf, nomeue, nrcandidato })).AsList(); + WHERE nome = @nome AND + ano = @ano AND + cargo = @cargo AND + siglauf = @siglauf AND + nomeue = @nomeue AND + nrcandidato = @nrcandidato AND + resultado = @resultado"; + return await connection.QueryFirstOrDefaultAsync(query, new { nome, ano, cargo, siglauf, nomeue, nrcandidato, resultado }); } } diff --git a/OpenCand.ETL/Services/CandidatoService.cs b/OpenCand.ETL/Services/CandidatoService.cs index 5f332ee..c0b1635 100644 --- a/OpenCand.ETL/Services/CandidatoService.cs +++ b/OpenCand.ETL/Services/CandidatoService.cs @@ -35,57 +35,59 @@ namespace OpenCand.Services await partidoRepository.AddPartidoAsync(candidatoMapping.Partido); } - List? mappings = null; - CandidatoMapping? existingMapping = null; + // Check if the candidate already exists in the database + Candidato? existingCandidato; if (candidato.Cpf == null || candidato.Cpf.Length != 11) { - // If CPF is not provided or invalid, we STRICTLY search by name and other properties - mappings = await candidatoRepository.GetCandidatoMappingByNome(candidato.Nome, - candidato.SqCandidato, - candidatoMapping.Ano, - candidatoMapping.SiglaUF, - candidatoMapping.NomeUE, - candidatoMapping.NrCandidato); + existingCandidato = await candidatoRepository.GetCandidatoByNome(candidato.Nome, candidato.DataNascimento.GetValueOrDefault()); } else { - mappings = await candidatoRepository.GetCandidatoMappingByCpf(candidato.Cpf); + existingCandidato = await candidatoRepository.GetCandidatoByCpf(candidato.Cpf); } + // If the candidate already exists, we can update the mappings + if (existingCandidato != null) + { + candidato.IdCandidato = existingCandidato.IdCandidato; + candidato.Cpf = GetNonEmptyString(existingCandidato.Cpf, candidato.Cpf); + candidato.Email = GetNonEmptyString(existingCandidato.Email, candidato.Email); + candidato.EstadoCivil = GetNonEmptyString(existingCandidato.EstadoCivil, candidato.EstadoCivil); + candidato.Apelido = GetNonEmptyString(existingCandidato.Apelido, candidato.Apelido); + candidato.Escolaridade = GetNonEmptyString(existingCandidato.Escolaridade, candidato.Escolaridade); + candidato.Ocupacao = GetNonEmptyString(existingCandidato.Ocupacao, candidato.Ocupacao); + candidato.Sexo = GetNonEmptyString(existingCandidato.Sexo, candidato.Sexo); + + candidatoMapping.IdCandidato = candidato.IdCandidato; + candidatoMapping.Cpf = GetNonEmptyString(candidato.Cpf, candidatoMapping.Cpf); + + // Update the entries for the existing candidate + await candidatoRepository.AddCandidatoAsync(candidato); + await candidatoRepository.AddCandidatoMappingAsync(candidatoMapping); + return; + } + + // If the candidate does not exist, we create a new one + CandidatoMapping? existingMapping = await candidatoRepository.GetCandidatoMappingByDetails(candidato.Nome, + candidatoMapping.Ano, + candidatoMapping.Cargo, + candidatoMapping.SiglaUF, + candidatoMapping.NomeUE, + candidatoMapping.NrCandidato, + candidatoMapping.Resultado); // Check if exists - if (mappings != null && mappings.Count > 0) + if (existingMapping != null) { - existingMapping = mappings.FirstOrDefault(m => m.Ano == candidatoMapping.Ano && - m.Cargo == candidatoMapping.Cargo && - m.SiglaUF == candidatoMapping.SiglaUF && - m.NomeUE == candidatoMapping.NomeUE && - m.NrCandidato == candidatoMapping.NrCandidato && - m.Resultado == candidatoMapping.Resultado); + candidato.IdCandidato = existingMapping.IdCandidato; + candidato.Cpf = GetNonEmptyString(existingMapping.Cpf, candidato.Cpf); + candidato.Apelido = GetNonEmptyString(existingMapping.Apelido, candidato.Apelido); + await candidatoRepository.AddCandidatoAsync(candidato); - // Already exists one for the current election - if (existingMapping != null) - { - candidato.IdCandidato = existingMapping.IdCandidato; - candidato.Cpf = existingMapping.Cpf; + return; + } - await candidatoRepository.AddCandidatoAsync(candidato); - return; - } - // If exists (but not for the current election), we take the existing idcandidato - // and create a new mapping for the current election - else - { - existingMapping = mappings.First(); - candidato.IdCandidato = existingMapping.IdCandidato; - candidato.Cpf = existingMapping.Cpf; - } - } - else - { - // No current mapping, we create a new one - // and create a new mapping for the current election - candidato.IdCandidato = Guid.NewGuid(); - } + // No current mapping, we create a new one and create a new mapping for the current election + candidato.IdCandidato = Guid.NewGuid(); // Set the mapping properties candidatoMapping.IdCandidato = candidato.IdCandidato; @@ -96,5 +98,18 @@ namespace OpenCand.Services await candidatoRepository.AddCandidatoMappingAsync(candidatoMapping); } + public string GetNonEmptyString(string? value1, string? value2) + { + if (!string.IsNullOrWhiteSpace(value1)) + { + return value1; + } + else if (!string.IsNullOrWhiteSpace(value2)) + { + return value2; + } + return string.Empty; + } + } } diff --git a/db/db.sql b/db/db.sql index 995f380..7ee678a 100644 --- a/db/db.sql +++ b/db/db.sql @@ -31,7 +31,7 @@ CREATE TABLE candidato_mapping ( siglauf VARCHAR(2), nomeue VARCHAR(100), cargo VARCHAR(50), - sgpartido VARCHAR(10), + sgpartido VARCHAR(50), nrcandidato VARCHAR(20), resultado VARCHAR(50), CONSTRAINT pk_candidato_mapping PRIMARY KEY (idcandidato, ano, siglauf, nomeue, cargo, nrcandidato, resultado), @@ -71,7 +71,7 @@ CREATE INDEX idx_rede_social_idcandidato ON rede_social (idcandidato); ---- Table for storing party information CREATE TABLE partido ( - sigla VARCHAR(10) NOT NULL PRIMARY KEY, + sigla VARCHAR(50) NOT NULL PRIMARY KEY, nome VARCHAR(255) NOT NULL, numero INT NOT NULL );