163 lines
5.9 KiB
Python
163 lines
5.9 KiB
Python
|
import sys
|
||
|
import os
|
||
|
import glob
|
||
|
import requests
|
||
|
from dbfread import DBF
|
||
|
import csv
|
||
|
from datetime import datetime, timedelta
|
||
|
|
||
|
import os
|
||
|
from dotenv import load_dotenv
|
||
|
load_dotenv(override=True)
|
||
|
host=os.getenv("HOST")
|
||
|
login_user = os.getenv("USER")
|
||
|
login_pass = os.getenv("PASS")
|
||
|
|
||
|
src_file_trxn = os.getenv("FILE_ROOT")+os.getenv("TXN_DIR")
|
||
|
print("Reading files: ", src_file_trxn)
|
||
|
|
||
|
def get_auth_token():
|
||
|
url = host+"/auth/token/"
|
||
|
print("Using URL: ", url)
|
||
|
payload = {"phone_no": login_user, "password": login_pass}
|
||
|
try:
|
||
|
response = requests.post(url, data=payload)
|
||
|
response.raise_for_status()
|
||
|
return response.json().get('access')
|
||
|
except requests.exceptions.RequestException as e:
|
||
|
print("Error obtaining auth token:", e)
|
||
|
return None
|
||
|
|
||
|
def read_dbf_file(file_path, columns):
|
||
|
data = []
|
||
|
try:
|
||
|
with DBF(file_path, ignore_missing_memofile=True, encoding='latin1') as dbf: # Set encoding to handle non-ASCII characters
|
||
|
for record in dbf:
|
||
|
filtered_record = {column: str(record.get(column)).strip() for column in columns}
|
||
|
data.append(filtered_record)
|
||
|
except Exception as e:
|
||
|
print("Error:", e)
|
||
|
return data
|
||
|
|
||
|
def process_data(data, date_str):
|
||
|
processed_data = []
|
||
|
transaction_map = {}
|
||
|
|
||
|
# Parse the date from the file name
|
||
|
try:
|
||
|
file_date = datetime.strptime(date_str, '%y%m%d')
|
||
|
except ValueError:
|
||
|
print(f"Error parsing date from {date_str}. Skipping file.")
|
||
|
return []
|
||
|
|
||
|
for record in data:
|
||
|
if record.get('VOTP') == 'GL':
|
||
|
continue
|
||
|
|
||
|
if record.get('VOTP') == 'PEN':
|
||
|
vonar_parts = record.get('VONAR').split(' ')
|
||
|
identifier = vonar_parts[-1] if len(vonar_parts) > 0 else ''
|
||
|
parts = identifier.split('/')
|
||
|
if len(parts) == 2:
|
||
|
totp, voac = parts
|
||
|
key = (totp, voac)
|
||
|
if key in transaction_map:
|
||
|
transaction_map[key]['PEN'] = record.get('VOAMT')
|
||
|
else:
|
||
|
key = (record.get('VOTP'), record.get('VOAC'))
|
||
|
record['PEN'] = '0'
|
||
|
|
||
|
# Combine date and time for the transaction_at column
|
||
|
transaction_time = record.get('TM', '0000')
|
||
|
if transaction_time and transaction_time != '0000':
|
||
|
if ':' in transaction_time:
|
||
|
try:
|
||
|
transaction_hour, transaction_minute = map(int, transaction_time.split(':'))
|
||
|
except ValueError:
|
||
|
transaction_hour, transaction_minute = 0, 0
|
||
|
else:
|
||
|
try:
|
||
|
transaction_hour = int(transaction_time[:2])
|
||
|
transaction_minute = int(transaction_time[2:])
|
||
|
except ValueError:
|
||
|
transaction_hour, transaction_minute = 0, 0
|
||
|
else:
|
||
|
transaction_hour, transaction_minute = 0, 0
|
||
|
|
||
|
transaction_datetime = file_date + timedelta(hours=transaction_hour, minutes=transaction_minute)
|
||
|
record['transaction_at'] = transaction_datetime
|
||
|
|
||
|
transaction_map[key] = record
|
||
|
processed_data.append(record)
|
||
|
|
||
|
return processed_data
|
||
|
|
||
|
def save_data_to_csv(data, file_path, columns):
|
||
|
try:
|
||
|
if 'PEN' not in columns:
|
||
|
columns.append('PEN')
|
||
|
if 'transaction_at' not in columns:
|
||
|
columns.append('transaction_at')
|
||
|
|
||
|
with open(file_path, 'w', newline='', encoding='utf-8') as csv_file: # Set encoding to utf-8
|
||
|
writer = csv.DictWriter(csv_file, fieldnames=columns)
|
||
|
writer.writeheader()
|
||
|
writer.writerows(data)
|
||
|
print(f"CSV saved to {file_path}")
|
||
|
except Exception as e:
|
||
|
print("Error saving CSV to file:", e)
|
||
|
|
||
|
def send_data_to_api(csv_file_path, token):
|
||
|
url = host+"/api/v1/migrateTransactions"
|
||
|
print("Using URL: "+ url)
|
||
|
headers = {'Authorization': f'Bearer {token}'}
|
||
|
try:
|
||
|
files = {'file': open(csv_file_path, 'rb')}
|
||
|
response = requests.post(url, files=files, headers=headers)
|
||
|
if response.status_code == 201:
|
||
|
response_data = response.json()
|
||
|
print(f"Message from server: {response_data.get('message')}")
|
||
|
print("CSV file sent successfully to the API")
|
||
|
elif response.status_code == 401:
|
||
|
print("Token expired. Re-authenticating...")
|
||
|
new_token = get_auth_token()
|
||
|
if new_token:
|
||
|
send_data_to_api(csv_file_path, new_token)
|
||
|
else:
|
||
|
print("Failed to re-authenticate. Cannot send data to API.")
|
||
|
else:
|
||
|
print("Failed to send CSV file to the API. Status code:", response.status_code)
|
||
|
print("Response content:", response.content.decode('utf-8'))
|
||
|
except Exception as e:
|
||
|
print("Error:", e)
|
||
|
|
||
|
def process_files_in_folder(folder_path):
|
||
|
files = glob.glob(os.path.join(folder_path, 'DT*.DBF'))
|
||
|
if not files:
|
||
|
print("No matching files found.")
|
||
|
return
|
||
|
|
||
|
token = get_auth_token()
|
||
|
if not token:
|
||
|
print("Failed to obtain auth token, cannot send data to API.")
|
||
|
return
|
||
|
|
||
|
columns_to_extract = ["VOTP", "VOAC", "VOAMT", "VONAR", "TM"]
|
||
|
|
||
|
for dbf_file_path in files:
|
||
|
base_filename = os.path.basename(dbf_file_path)
|
||
|
date_str = base_filename[2:8] # Extract the date part from the base filename
|
||
|
dbf_data = read_dbf_file(dbf_file_path, columns_to_extract)
|
||
|
processed_data = process_data(dbf_data, date_str)
|
||
|
if processed_data:
|
||
|
csv_file_path = dbf_file_path.replace('.DBF', '.csv')
|
||
|
save_data_to_csv(processed_data, csv_file_path, columns_to_extract)
|
||
|
send_data_to_api(csv_file_path, token)
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
# if len(sys.argv) < 2:
|
||
|
# print("Usage: python script.py <folder_path>")
|
||
|
# else:
|
||
|
folder_path = src_file_trxn
|
||
|
process_files_in_folder(folder_path)
|