158 lines
6.3 KiB
Python
158 lines
6.3 KiB
Python
|
import sys
|
||
|
import requests
|
||
|
from dbfread import DBF
|
||
|
from datetime import date, datetime
|
||
|
import csv
|
||
|
|
||
|
import os
|
||
|
from dotenv import load_dotenv
|
||
|
load_dotenv(override=True)
|
||
|
host=os.getenv("HOST")
|
||
|
login_user = os.getenv("USER")
|
||
|
login_pass = os.getenv("PASS")
|
||
|
src_file_usr = os.getenv("FILE_ROOT")+os.getenv("USER_FILE")
|
||
|
print("Reading files: ", src_file_usr)
|
||
|
|
||
|
def convert_dates_to_strings(data):
|
||
|
"""
|
||
|
Recursively convert date and datetime objects in the data to strings.
|
||
|
"""
|
||
|
if isinstance(data, list):
|
||
|
return [convert_dates_to_strings(item) for item in data]
|
||
|
elif isinstance(data, dict):
|
||
|
return {key: convert_dates_to_strings(value) for key, value in data.items()}
|
||
|
elif isinstance(data, (date, datetime)):
|
||
|
return data.isoformat()
|
||
|
else:
|
||
|
return data
|
||
|
|
||
|
def read_dbf_file(file_path, columns):
|
||
|
data = []
|
||
|
phone_numbers_seen = set()
|
||
|
try:
|
||
|
# Open the .dbf file
|
||
|
with DBF(file_path) as dbf:
|
||
|
# Iterate over each record in the .dbf file
|
||
|
for record in dbf:
|
||
|
# Filter the record to include only the specified columns
|
||
|
filtered_record = {column: record.get(column) for column in columns}
|
||
|
|
||
|
# Clean up the ACNO field by removing spaces and keeping only numeric characters
|
||
|
if filtered_record.get('ACNO'):
|
||
|
filtered_record['ACNO'] = ''.join(filter(str.isdigit, filtered_record['ACNO']))
|
||
|
|
||
|
# Clean up the PHONE field by keeping only numeric characters
|
||
|
if filtered_record.get('PHONE'):
|
||
|
filtered_record['PHONE'] = ''.join(filter(str.isdigit, filtered_record['PHONE']))
|
||
|
|
||
|
# Check if NAME2 is empty
|
||
|
if not filtered_record.get('NAME2'):
|
||
|
# If NAME2 is empty, check if PHONE has a value
|
||
|
if filtered_record.get('PHONE'):
|
||
|
filtered_record['NAME2'] = filtered_record['PHONE']
|
||
|
else:
|
||
|
# If both NAME2 and PHONE are empty, skip this record
|
||
|
continue
|
||
|
|
||
|
# Check if PHONE length is not equal to 10
|
||
|
if len(filtered_record.get('PHONE', '')) != 10:
|
||
|
continue
|
||
|
|
||
|
# Skip records with duplicate phone numbers
|
||
|
if filtered_record.get('PHONE') in phone_numbers_seen:
|
||
|
continue
|
||
|
|
||
|
phone_numbers_seen.add(filtered_record.get('PHONE'))
|
||
|
|
||
|
# Merge [ADD1, ADD2, ADD3, PLACE, PIN] into one column "Address"
|
||
|
address_components = [record.get(col, '') for col in ['ADD1', 'ADD2', 'ADD3', 'PLACE', 'PIN']]
|
||
|
filtered_record['ADDR'] = '-'.join(filter(None, address_components))
|
||
|
|
||
|
# Remove [ADD1, ADD2, ADD3, PLACE, PIN]
|
||
|
for col in ['ADD1', 'ADD2', 'ADD3', 'PLACE', 'PIN']:
|
||
|
del filtered_record[col]
|
||
|
|
||
|
data.append(filtered_record)
|
||
|
except Exception as e:
|
||
|
print("Error:", e)
|
||
|
return data
|
||
|
|
||
|
def save_data_to_csv(data, file_path, columns):
|
||
|
try:
|
||
|
# Append "ADDR" to the list of columns if it's not already included
|
||
|
if "ADDR" not in columns:
|
||
|
columns.append("ADDR")
|
||
|
|
||
|
# Remove "ADD1", "ADD2", "ADD3", "PLACE", "PIN" from columns list
|
||
|
for col in ["ADD1", "ADD2", "ADD3", "PLACE", "PIN"]:
|
||
|
if col in columns:
|
||
|
columns.remove(col)
|
||
|
|
||
|
with open(file_path, 'w', newline='') as csv_file:
|
||
|
writer = csv.DictWriter(csv_file, fieldnames=columns)
|
||
|
writer.writeheader()
|
||
|
writer.writerows(data)
|
||
|
print(f"CSV saved to {file_path}")
|
||
|
except Exception as e:
|
||
|
print("Error saving CSV to file:", e)
|
||
|
|
||
|
def get_auth_token():
|
||
|
url = host+"/auth/token/"
|
||
|
print("Using URL: ", url)
|
||
|
payload = {"phone_no": login_user, "password": login_pass}
|
||
|
try:
|
||
|
response = requests.post(url, data=payload)
|
||
|
response.raise_for_status()
|
||
|
return response.json().get('access')
|
||
|
except requests.exceptions.RequestException as e:
|
||
|
print("Error obtaining auth token:", e)
|
||
|
return None
|
||
|
|
||
|
def send_data_to_api(csv_file_path, token):
|
||
|
try:
|
||
|
# API endpoint
|
||
|
url = host+"/api/v1/migrateUsers"
|
||
|
print("Using URL: ", url)
|
||
|
# Send CSV file with data as multipart form-data
|
||
|
files = {'file': open(csv_file_path, 'rb')}
|
||
|
headers = {'Authorization': f'Bearer {token}'}
|
||
|
response = requests.post(url, files=files, headers=headers)
|
||
|
|
||
|
# Check response status
|
||
|
if response.status_code == 200:
|
||
|
response_data = response.json()
|
||
|
# Print the message
|
||
|
print(f"Message from server: {response_data.get('message')}")
|
||
|
print("CSV file sent successfully to the API")
|
||
|
else:
|
||
|
# Print the error message and response content
|
||
|
print("Failed to send CSV file to the API. Status code:", response.status_code)
|
||
|
print("Response content:", response.content.decode('utf-8'))
|
||
|
except Exception as e:
|
||
|
# Print the general error message
|
||
|
print("Error:", e)
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
# Check if the file path is provided as a command-line argument
|
||
|
# if len(sys.argv) < 2:
|
||
|
# print("Usage: python script.py DE2.DBF")
|
||
|
# else:
|
||
|
# Get the file path from the command-line arguments
|
||
|
dbf_file_path = src_file_usr
|
||
|
# Specify the columns to be extracted
|
||
|
columns_to_extract = ["ACTP", "ACNO", "NAME2", "ADD1", "ADD2", "ADD3", "PLACE", "PIN", "PHONE"]
|
||
|
# Call the function to read the .dbf file with specific columns
|
||
|
dbf_data = read_dbf_file(dbf_file_path, columns_to_extract)
|
||
|
# Save the data to a CSV file
|
||
|
save_data_to_csv(dbf_data, 'users.csv', columns_to_extract)
|
||
|
# Print the data as JSON array
|
||
|
print("Sending data to the API...")
|
||
|
|
||
|
# Obtain the authentication token
|
||
|
token = get_auth_token()
|
||
|
if token:
|
||
|
# Call the function to send data to the API with authentication
|
||
|
send_data_to_api('users.csv', token)
|
||
|
else:
|
||
|
print("Failed to obtain auth token, cannot send data to API.")
|