134 lines
5.0 KiB
Python
134 lines
5.0 KiB
Python
|
import sys
|
||
|
import requests
|
||
|
from dbfread import DBF
|
||
|
from datetime import date, datetime
|
||
|
import csv
|
||
|
|
||
|
import os
|
||
|
from dotenv import load_dotenv
|
||
|
load_dotenv(override=True)
|
||
|
host=os.getenv("HOST")
|
||
|
login_user = os.getenv("USER")
|
||
|
login_pass = os.getenv("PASS")
|
||
|
|
||
|
src_file_grp = os.getenv("FILE_ROOT")+os.getenv("GROUP_FILE")
|
||
|
src_file_pa = os.getenv("FILE_ROOT")+os.getenv("PEN_FILE")
|
||
|
print("Reading files: ", src_file_grp, src_file_pa)
|
||
|
def convert_dates_to_strings(data):
|
||
|
"""
|
||
|
Recursively convert date and datetime objects in the data to strings.
|
||
|
"""
|
||
|
if isinstance(data, list):
|
||
|
return [convert_dates_to_strings(item) for item in data]
|
||
|
elif isinstance(data, dict):
|
||
|
return {key: convert_dates_to_strings(value) for key, value in data.items()}
|
||
|
elif isinstance(data, (date, datetime)):
|
||
|
return data.isoformat()
|
||
|
else:
|
||
|
return data
|
||
|
|
||
|
def read_dbf_file(file_path, columns, encoding='latin1'):
|
||
|
data = []
|
||
|
try:
|
||
|
with DBF(file_path, encoding=encoding) as dbf:
|
||
|
unique_actp_values = set()
|
||
|
for record in dbf:
|
||
|
filtered_record = {column: record.get(column) for column in columns}
|
||
|
actp_value = filtered_record.get('ACTP')
|
||
|
if actp_value not in unique_actp_values:
|
||
|
unique_actp_values.add(actp_value)
|
||
|
data.append(filtered_record)
|
||
|
except Exception as e:
|
||
|
print("Error reading DBF file:", e)
|
||
|
return data
|
||
|
|
||
|
def read_pa_dbf_file(file_path, columns, filter_column, filter_value, encoding='latin1'):
|
||
|
data = []
|
||
|
try:
|
||
|
with DBF(file_path, encoding=encoding) as dbf:
|
||
|
for record in dbf:
|
||
|
if record.get(filter_column) == filter_value:
|
||
|
filtered_record = {column: record.get(column) for column in columns}
|
||
|
data.append(filtered_record)
|
||
|
except Exception as e:
|
||
|
print("Error reading PA DBF file:", e)
|
||
|
return data
|
||
|
|
||
|
def save_data_to_csv(data, file_path, columns):
|
||
|
try:
|
||
|
with open(file_path, 'w', newline='', encoding='utf-8') as csv_file:
|
||
|
writer = csv.DictWriter(csv_file, fieldnames=columns)
|
||
|
writer.writeheader()
|
||
|
writer.writerows(data)
|
||
|
print(f"CSV saved to {file_path}")
|
||
|
except Exception as e:
|
||
|
print("Error saving CSV to file:", e)
|
||
|
|
||
|
def get_auth_token():
|
||
|
url = host+"/auth/token/"
|
||
|
print("Using URL: "+url)
|
||
|
payload = {"phone_no":login_user, "password": login_pass}
|
||
|
try:
|
||
|
response = requests.post(url, data=payload)
|
||
|
response.raise_for_status()
|
||
|
return response.json().get('access')
|
||
|
except requests.exceptions.RequestException as e:
|
||
|
print("Error obtaining auth token:", e)
|
||
|
return None
|
||
|
|
||
|
def send_data_to_api(csv_file_path, token):
|
||
|
try:
|
||
|
url = host+"/api/v1/migrateGroups"
|
||
|
print("Using URL: "+url)
|
||
|
files = {'file': open(csv_file_path, 'rb')}
|
||
|
headers = {'Authorization': f'Bearer {token}'}
|
||
|
response = requests.post(url, files=files, headers=headers)
|
||
|
if response.status_code == 200:
|
||
|
response_data = response.json()
|
||
|
print(f"Message from server: {response_data.get('message')}")
|
||
|
print("CSV file sent successfully to the API")
|
||
|
else:
|
||
|
print("Failed to send CSV file to the API. Status code:", response.status_code)
|
||
|
print("Response content:", response.content.decode('utf-8'))
|
||
|
except Exception as e:
|
||
|
print("Error:", e)
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
# if len(sys.argv) < 3:
|
||
|
# print("Usage: python script.py <dbf_file_path> <pa_dbf_file_path>")
|
||
|
# else:
|
||
|
# dbf_file_path = sys.argv[1]
|
||
|
# pa_dbf_file_path = sys.argv[2]
|
||
|
dbf_file_path = src_file_grp
|
||
|
pa_dbf_file_path = src_file_pa
|
||
|
|
||
|
# Columns to extract from the main DBF file
|
||
|
columns_to_extract = ["ACTP", "MINBAL", "RDATE", "NOFI"]
|
||
|
dbf_data = read_dbf_file(dbf_file_path, columns_to_extract)
|
||
|
|
||
|
# Columns to extract from the PA DBF file
|
||
|
pa_columns_to_extract = ["PA_TP", "PA_CD", "PA_NA"]
|
||
|
pa_data = read_pa_dbf_file(pa_dbf_file_path, pa_columns_to_extract, "PA_CD", "InstPenalAmt")
|
||
|
|
||
|
# Create a dictionary to map PA_TP to PA_NA where PA_CD = "InstPenalAmt"
|
||
|
pa_dict = {record['PA_TP']: record['PA_NA'] for record in pa_data}
|
||
|
|
||
|
# Add the PA_NA values to the dbf_data
|
||
|
for record in dbf_data:
|
||
|
actp = record.get('ACTP')
|
||
|
record['PA_NA'] = pa_dict.get(actp, '')
|
||
|
|
||
|
# Save the merged data to groups.csv
|
||
|
columns_to_extract.append("PA_NA")
|
||
|
save_data_to_csv(dbf_data, 'groups.csv', columns_to_extract)
|
||
|
|
||
|
print("Sending groups data to the API...")
|
||
|
|
||
|
# Obtain the authentication token
|
||
|
token = get_auth_token()
|
||
|
if token:
|
||
|
# Call the function to send data to the API with authentication
|
||
|
send_data_to_api('groups.csv', token)
|
||
|
else:
|
||
|
print("Failed to obtain auth token, cannot send data to API.")
|