P_chandra_migrator/m_user.py

57 lines
1.9 KiB
Python
Raw Permalink Normal View History

2024-10-03 12:17:31 +00:00
import requests
import os
from dotenv import load_dotenv
from datetime import datetime
load_dotenv(override=True)
# Environment Variables
host = os.getenv("HOST")
login_user = os.getenv("USER")
login_pass = os.getenv("PASS")
def get_dynamic_path():
"""Get today's date in the format YYYYMMDD and return the dynamic path for 'users.csv'."""
today_date = datetime.now().strftime("%Y%m%d")
path = os.path.join("C:\\Humbingo\\to_humbingo", today_date, "users.csv")
return path
def get_auth_token():
"""Obtain an authentication token from the API."""
url = f"{host}/auth/token/"
payload = {"phone_no": login_user, "password": login_pass}
try:
response = requests.post(url, data=payload)
response.raise_for_status()
return response.json().get('access')
except requests.exceptions.RequestException as e:
print(f"Error obtaining auth token: {e}")
return None
def send_data_to_api(csv_file_path, token):
"""Send the CSV file to the API endpoint."""
url = f"{host}/api/v1/migrateUsers"
try:
files = {'file': open(csv_file_path, 'rb')}
headers = {'Authorization': f'Bearer {token}'}
response = requests.post(url, files=files, headers=headers)
if response.status_code == 200:
response_data = response.json()
print(f"Message from server: {response_data.get('message')}")
print("CSV file sent successfully to the API")
else:
print(f"Failed to send CSV file to the API. Status code: {response.status_code}")
print(f"Response content: {response.content.decode('utf-8')}")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
csv_path = get_dynamic_path()
print("Sending data to the API...")
token = get_auth_token()
if token:
send_data_to_api(csv_path, token)
else:
print("Failed to obtain auth token, cannot send data to API.")