Add transform.py

هذا الالتزام موجود في:
2026-04-28 17:28:20 +00:00
التزام b42a70a3a4

755
transform.py Normal file
عرض الملف

@@ -0,0 +1,755 @@
# بسم الله الرحمن الرحيم
# بسم الله الرحمن الرحيم
# بسم الله الرحمن الرحيم
import base64
import json
import requests
import os
import pandas as pd
from datetime import datetime
import io
import numpy as np
import re
def sanitize_text(value):
"""
Sanitize text values to ensure they're UTF-8 compatible
"""
if pd.isna(value):
return ""
if isinstance(value, (int, float, np.integer, np.floating)):
return str(value)
if isinstance(value, str):
try:
return value.encode('utf-8', errors='ignore').decode('utf-8')
except:
cleaned = ''.join(char for char in value if ord(char) < 128 or char.isprintable())
return cleaned
try:
return str(value)
except:
return ""
def clean_site_name(name):
"""
Clean SiteName by standardizing similar values
"""
if pd.isna(name) or name == "":
return "Unknown"
name = str(name).strip().lower()
# Common variations mapping
site_mapping = {
'main': 'Main Site',
'main site': 'Main Site',
'mainstore': 'Main Site',
'main store': 'Main Site',
'north': 'North Site',
'north site': 'North Site',
'northstore': 'North Site',
'south': 'South Site',
'south site': 'South Site',
'southstore': 'South Site',
'east': 'East Site',
'east site': 'East Site',
'west': 'West Site',
'west site': 'West Site',
'central': 'Central Site',
'central site': 'Central Site'
}
for key, value in site_mapping.items():
if key in name:
return value
return name.title()
def clean_brand(brand):
"""
Clean Brand names by standardizing similar values
"""
if pd.isna(brand) or brand == "":
return "Unknown"
brand = str(brand).strip().lower()
# Brand variations mapping
brand_mapping = {
'nike': 'Nike',
'nik e': 'Nike',
'ni ke': 'Nike',
'adidas': 'Adidas',
'addidas': 'Adidas',
'adidas ': 'Adidas',
'puma': 'Puma',
'pum a': 'Puma',
'reebok': 'Reebok',
'reebok ': 'Reebok',
'reeb ok': 'Reebok',
'gucci': 'Gucci',
'gucc i': 'Gucci',
'chanel': 'Chanel',
'chan el': 'Chanel'
}
for key, value in brand_mapping.items():
if key in brand:
return value
return brand.title()
def calculate_age_from_dob(dob_value):
"""
Convert DOB to age, handle 1900-01-01 as Unknown
"""
if pd.isna(dob_value) or dob_value == "":
return "Unknown"
dob_str = str(dob_value).strip()
# Check for the placeholder date
if dob_str.startswith('1900-01-01') or dob_str.startswith('1900/01/01') or dob_str == '1900-01-01':
return "Unknown"
try:
# Try to parse the date
if '-' in dob_str:
dob = pd.to_datetime(dob_str.split()[0]) # Handle datetime strings
elif '/' in dob_str:
dob = pd.to_datetime(dob_str)
else:
return "Unknown"
today = datetime.now()
age = today.year - dob.year - ((today.month, today.day) < (dob.month, dob.day))
if age < 0 or age > 120: # Sanity check
return "Unknown"
return age
except:
return "Unknown"
def merge_contact_methods(row):
"""
Merge Email, SMS, Mail, Phone into one column with priority order
"""
contact_methods = []
if row.get('ContactByEmail') == 1 or str(row.get('ContactByEmail', '')).lower() == 'true' or str(row.get('ContactByEmail', '')).lower() == 'yes':
contact_methods.append('Email')
if row.get('ContactBySMS') == 1 or str(row.get('ContactBySMS', '')).lower() == 'true' or str(row.get('ContactBySMS', '')).lower() == 'yes':
contact_methods.append('SMS')
if row.get('ContactByMail') == 1 or str(row.get('ContactByMail', '')).lower() == 'true' or str(row.get('ContactByMail', '')).lower() == 'yes':
contact_methods.append('Mail')
if row.get('ContactByPhone') == 1 or str(row.get('ContactByPhone', '')).lower() == 'true' or str(row.get('ContactByPhone', '')).lower() == 'yes':
contact_methods.append('Phone')
if not contact_methods:
return 'NoContact'
return ','.join(contact_methods) # Return all methods as comma-separated
def extract_date_components(date_value, column_name):
"""
Extract Year, Month, TimeOfMonth, Day from date
"""
if pd.isna(date_value) or date_value == "":
return {
f'{column_name}_Year': "Unknown",
f'{column_name}_Month': "Unknown",
f'{column_name}_TimeOfMonth': "Unknown",
f'{column_name}_Day': "Unknown"
}
try:
# Parse the date
date_str = str(date_value).strip()
if '-' in date_str:
date_obj = pd.to_datetime(date_str.split()[0])
elif '/' in date_str:
date_obj = pd.to_datetime(date_str)
else:
return {
f'{column_name}_Year': "Unknown",
f'{column_name}_Month': "Unknown",
f'{column_name}_TimeOfMonth': "Unknown",
f'{column_name}_Day': "Unknown"
}
# Extract components
year = date_obj.year
month_names = ['January', 'February', 'March', 'April', 'May', 'June',
'July', 'August', 'September', 'October', 'November', 'December']
month = month_names[date_obj.month - 1]
day_num = date_obj.day
if 1 <= day_num <= 10:
time_of_month = "Beginning (1-10)"
elif 11 <= day_num <= 20:
time_of_month = "Middle (11-20)"
else:
time_of_month = "End (21-31)"
day_names = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
day = day_names[date_obj.weekday()]
return {
f'{column_name}_Year': year,
f'{column_name}_Month': month,
f'{column_name}_TimeOfMonth': time_of_month,
f'{column_name}_Day': day
}
except:
return {
f'{column_name}_Year': "Unknown",
f'{column_name}_Month': "Unknown",
f'{column_name}_TimeOfMonth': "Unknown",
f'{column_name}_Day': "Unknown"
}
def add_recurring_customer_flag(df, userid_column='Userid'):
"""
Add a flag indicating if customer is recurring (has multiple transactions)
"""
# Count transactions per user
user_transaction_counts = df[userid_column].value_counts()
# Create flag: 1 if more than 1 transaction, 0 otherwise
df['IsRecurringCustomer'] = df[userid_column].map(
lambda x: 1 if user_transaction_counts.get(x, 0) > 1 else 0
)
print(f" 🔄 Added 'IsRecurringCustomer' flag: {df['IsRecurringCustomer'].sum()} recurring customers out of {df[userid_column].nunique()} unique users")
return df
def transform_dataframe(df):
"""
Apply all transformations to the dataframe
"""
print("\n 🔄 Applying transformations...")
# A > Keep Userid and add recurring customer flag
if 'Userid' in df.columns:
print(" ✅ Keeping 'Userid' and adding recurring customer flag")
df = add_recurring_customer_flag(df, 'Userid')
else:
print(" ⚠️ 'Userid' column not found")
# B > Drop StoreId (same value)
if 'StoreId' in df.columns:
df = df.drop(columns=['StoreId'])
print(" 🗑️ Dropped 'StoreId'")
# C > Drop Store (same value)
if 'Store' in df.columns:
df = df.drop(columns=['Store'])
print(" 🗑️ Dropped 'Store'")
# D > Drop ParentSiteId (same value)
if 'ParentSiteId' in df.columns:
df = df.drop(columns=['ParentSiteId'])
print(" 🗑️ Dropped 'ParentSiteId'")
# E > Drop SiteType (same value)
if 'SiteType' in df.columns:
df = df.drop(columns=['SiteType'])
print(" 🗑️ Dropped 'SiteType'")
# F > Keep Gender
if 'Gender' in df.columns:
print(" ✅ Keeping 'Gender'")
# G > Convert DOB to Age
if 'DOB' in df.columns:
df['Age'] = df['DOB'].apply(calculate_age_from_dob)
df = df.drop(columns=['DOB'])
print(" ✅ Converted 'DOB' to 'Age' (1900-01-01 → Unknown)")
# H > Keep RegistrationDate
if 'RegistrationDate' in df.columns:
print(" ✅ Keeping 'RegistrationDate'")
# I > Drop FirstLoginDate
if 'FirstLoginDate' in df.columns:
df = df.drop(columns=['FirstLoginDate'])
print(" 🗑️ Dropped 'FirstLoginDate'")
# J > Drop LastLoginDate
if 'LastLoginDate' in df.columns:
df = df.drop(columns=['LastLoginDate'])
print(" 🗑️ Dropped 'LastLoginDate'")
# K,L,M,N > Merge ContactBy columns
contact_columns = ['ContactByEmail', 'ContactBySMS', 'ContactByMail', 'ContactByPhone']
existing_contact_cols = [col for col in contact_columns if col in df.columns]
if existing_contact_cols:
df['ContactMethod'] = df.apply(merge_contact_methods, axis=1)
df = df.drop(columns=existing_contact_cols)
print(f" ✅ Merged {len(existing_contact_cols)} contact columns into 'ContactMethod'")
# O > Drop ContactStatus
if 'ContactStatus' in df.columns:
df = df.drop(columns=['ContactStatus'])
print(" 🗑️ Dropped 'ContactStatus'")
# P > Drop TermsConsent
if 'TermsConsent' in df.columns:
df = df.drop(columns=['TermsConsent'])
print(" 🗑️ Dropped 'TermsConsent'")
# Q > Drop CommunityName
if 'CommunityName' in df.columns:
df = df.drop(columns=['CommunityName'])
print(" 🗑️ Dropped 'CommunityName'")
# R > Drop CountryId
if 'CountryId' in df.columns:
df = df.drop(columns=['CountryId'])
print(" 🗑️ Dropped 'CountryId'")
# S > Keep Country
if 'Country' in df.columns:
print(" ✅ Keeping 'Country'")
# T > Drop StateCode
if 'StateCode' in df.columns:
df = df.drop(columns=['StateCode'])
print(" 🗑️ Dropped 'StateCode'")
# U > Keep StateName
if 'StateName' in df.columns:
print(" ✅ Keeping 'StateName'")
# V > Drop City
if 'City' in df.columns:
df = df.drop(columns=['City'])
print(" 🗑️ Dropped 'City'")
# W > Drop PostalCode
if 'PostalCode' in df.columns:
df = df.drop(columns=['PostalCode'])
print(" 🗑️ Dropped 'PostalCode'")
# X > Drop Title
if 'Title' in df.columns:
df = df.drop(columns=['Title'])
print(" 🗑️ Dropped 'Title'")
# Y > Drop Salutation
if 'Salutation' in df.columns:
df = df.drop(columns=['Salutation'])
print(" 🗑️ Dropped 'Salutation'")
# Z > Keep R
if 'R' in df.columns:
print(" ✅ Keeping 'R'")
# AA > Keep F
if 'F' in df.columns:
print(" ✅ Keeping 'F'")
# AB > Keep M
if 'M' in df.columns:
print(" ✅ Keeping 'M'")
# AC > Keep RFM
if 'RFM' in df.columns:
print(" ✅ Keeping 'RFM'")
# AD > Keep Tier
if 'Tier' in df.columns:
print(" ✅ Keeping 'Tier'")
# AE, AF > Merge TransactionDate and CreateDate into date components
date_columns_to_process = []
if 'TransactionDate' in df.columns:
date_columns_to_process.append(('TransactionDate', 'Transaction'))
if 'CreateDate' in df.columns:
date_columns_to_process.append(('CreateDate', 'Create'))
for date_col, prefix in date_columns_to_process:
date_components = df[date_col].apply(lambda x: extract_date_components(x, prefix))
date_df = pd.DataFrame(date_components.tolist())
df = pd.concat([df, date_df], axis=1)
df = df.drop(columns=[date_col])
print(f" ✅ Converted '{date_col}' into 4 columns ({prefix}_Year, {prefix}_Month, {prefix}_TimeOfMonth, {prefix}_Day)")
# AG > Drop MemberId
if 'MemberId' in df.columns:
df = df.drop(columns=['MemberId'])
print(" 🗑️ Dropped 'MemberId'")
# AH > Drop SiteId
if 'SiteId' in df.columns:
df = df.drop(columns=['SiteId'])
print(" 🗑️ Dropped 'SiteId'")
# AI > Drop ParentSiteId
if 'ParentSiteId' in df.columns:
df = df.drop(columns=['ParentSiteId'])
print(" 🗑️ Dropped 'ParentSiteId'")
# AJ > Keep and clean SiteName
if 'SiteName' in df.columns:
df['SiteName'] = df['SiteName'].apply(clean_site_name)
print(" ✅ Kept and cleaned 'SiteName'")
# AK > Drop SiteType
if 'SiteType' in df.columns:
df = df.drop(columns=['SiteType'])
print(" 🗑️ Dropped 'SiteType'")
# AL > Keep Quantity
if 'Quantity' in df.columns:
print(" ✅ Keeping 'Quantity'")
# AM > Keep Amount
if 'Amount' in df.columns:
print(" ✅ Keeping 'Amount'")
# AN > Drop RewardType
if 'RewardType' in df.columns:
df = df.drop(columns=['RewardType'])
print(" 🗑️ Dropped 'RewardType'")
# AO > Keep Points
if 'Points' in df.columns:
print(" ✅ Keeping 'Points'")
# AP > Drop trxDetailId
if 'trxDetailId' in df.columns:
df = df.drop(columns=['trxDetailId'])
print(" 🗑️ Dropped 'trxDetailId'")
# AQ > Drop TrxId
if 'TrxId' in df.columns:
df = df.drop(columns=['TrxId'])
print(" 🗑️ Dropped 'TrxId'")
# AR > Drop TransactionStatusId
if 'TransactionStatusId' in df.columns:
df = df.drop(columns=['TransactionStatusId'])
print(" 🗑️ Dropped 'TransactionStatusId'")
# AS > Keep TransactionStatusName
if 'TransactionStatusName' in df.columns:
print(" ✅ Keeping 'TransactionStatusName'")
# AT > Drop TransactionTypeId
if 'TransactionTypeId' in df.columns:
df = df.drop(columns=['TransactionTypeId'])
print(" 🗑️ Dropped 'TransactionTypeId'")
# AU > Keep TransactionTypeName
if 'TransactionTypeName' in df.columns:
print(" ✅ Keeping 'TransactionTypeName'")
# AV > Drop Reportable
if 'Reportable' in df.columns:
df = df.drop(columns=['Reportable'])
print(" 🗑️ Dropped 'Reportable'")
# AW > Keep TransactionItemCode
if 'TransactionItemCode' in df.columns:
print(" ✅ Keeping 'TransactionItemCode'")
# AX > Keep AnalysisCode1
if 'AnalysisCode1' in df.columns:
print(" ✅ Keeping 'AnalysisCode1'")
# AY > Keep AnalysisCode2
if 'AnalysisCode2' in df.columns:
print(" ✅ Keeping 'AnalysisCode2'")
# AZ > Keep AnalysisCode3
if 'AnalysisCode3' in df.columns:
print(" ✅ Keeping 'AnalysisCode3'")
# BA > Keep AnalysisCode4
if 'AnalysisCode4' in df.columns:
print(" ✅ Keeping 'AnalysisCode4'")
# BB > Keep and clean Brand
if 'Brand' in df.columns:
df['Brand'] = df['Brand'].apply(clean_brand)
print(" ✅ Kept and cleaned 'Brand'")
# BC > Keep AnalysisCode6
if 'AnalysisCode6' in df.columns:
print(" ✅ Keeping 'AnalysisCode6'")
# BD > Keep AnalysisCode7
if 'AnalysisCode7' in df.columns:
print(" ✅ Keeping 'AnalysisCode7'")
# BE > Keep AnalysisCode8
if 'AnalysisCode8' in df.columns:
print(" ✅ Keeping 'AnalysisCode8'")
# BF > Keep Price
if 'Price' in df.columns:
print(" ✅ Keeping 'Price'")
# BG > Keep AnalysisCode10
if 'AnalysisCode10' in df.columns:
print(" ✅ Keeping 'AnalysisCode10'")
# BH > Keep InvalidReason
if 'InvalidReason' in df.columns:
print(" ✅ Keeping 'InvalidReason'")
# BI > Drop Description
if 'Description' in df.columns:
df = df.drop(columns=['Description'])
print(" 🗑️ Dropped 'Description'")
# BJ > Drop PromotionId
if 'PromotionId' in df.columns:
df = df.drop(columns=['PromotionId'])
print(" 🗑️ Dropped 'PromotionId'")
# BK > Keep PromotionName
if 'PromotionName' in df.columns:
print(" ✅ Keeping 'PromotionName'")
# BL > Convert PromotionStartDate into 4 columns
if 'PromotionStartDate' in df.columns:
date_components = df['PromotionStartDate'].apply(lambda x: extract_date_components(x, 'PromotionStart'))
date_df = pd.DataFrame(date_components.tolist())
df = pd.concat([df, date_df], axis=1)
df = df.drop(columns=['PromotionStartDate'])
print(" ✅ Converted 'PromotionStartDate' into 4 columns (PromotionStart_Year, PromotionStart_Month, PromotionStart_TimeOfMonth, PromotionStart_Day)")
# BM > Drop PromotionEndDate
if 'PromotionEndDate' in df.columns:
df = df.drop(columns=['PromotionEndDate'])
print(" 🗑️ Dropped 'PromotionEndDate'")
# BN > Drop PromotionOfferTypeId
if 'PromotionOfferTypeId' in df.columns:
df = df.drop(columns=['PromotionOfferTypeId'])
print(" 🗑️ Dropped 'PromotionOfferTypeId'")
# BO > Drop PromotionOfferTypeName
if 'PromotionOfferTypeName' in df.columns:
df = df.drop(columns=['PromotionOfferTypeName'])
print(" 🗑️ Dropped 'PromotionOfferTypeName'")
# BP > Drop PromotionSiteId
if 'PromotionSiteId' in df.columns:
df = df.drop(columns=['PromotionSiteId'])
print(" 🗑️ Dropped 'PromotionSiteId'")
# BQ > Drop PromotionSite
if 'PromotionSite' in df.columns:
df = df.drop(columns=['PromotionSite'])
print(" 🗑️ Dropped 'PromotionSite'")
# BR > Drop QualifyingProductQuantity
if 'QualifyingProductQuantity' in df.columns:
df = df.drop(columns=['QualifyingProductQuantity'])
print(" 🗑️ Dropped 'QualifyingProductQuantity'")
print("\n ✅ All transformations completed!")
return df
def read_and_process_file(file_path, max_rows=5000):
"""
Read the Excel file and apply all transformations
"""
try:
print(f" 📖 Reading file: {file_path}")
# Read the Excel file
df = pd.read_excel(file_path)
print(f" 📊 Original columns: {list(df.columns)}")
print(f" 📏 Original shape: {df.shape}")
# Limit to first max_rows
original_row_count = len(df)
if len(df) > max_rows:
df = df.head(max_rows)
print(f" ✂️ Limited dataset to first {max_rows} rows (from {original_row_count} total rows)")
else:
print(f" Dataset has {len(df)} rows (within the {max_rows} row limit)")
# Apply all transformations
df = transform_dataframe(df)
# Sanitize all text data (final pass)
print("\n 🧹 Final sanitization of text data...")
for col in df.columns:
if df[col].dtype == 'object': # Only process string columns
df[col] = df[col].apply(sanitize_text)
# Convert DataFrame to CSV
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False, encoding='utf-8')
csv_content = csv_buffer.getvalue().encode('utf-8')
# Get original file name and create modified name
original_file_name = os.path.basename(file_path)
name, ext = os.path.splitext(original_file_name)
modified_file_name = f"{name}_transformed_{len(df)}_rows.csv"
print(f"\n ✅ Successfully processed file: {modified_file_name}")
print(f" 📊 Final columns: {list(df.columns)}")
print(f" 📏 Final shape: {df.shape}")
print(f" 📄 CSV file size: {len(csv_content)} bytes")
return csv_content, modified_file_name, df
except FileNotFoundError:
print(f"❌ Error: File '{file_path}' not found!")
return None, None, None
except Exception as e:
print(f"❌ Error processing file: {e}")
import traceback
traceback.print_exc()
return None, None, None
def encode_file_to_base64(file_content):
"""
Encode file content to base64 string
"""
try:
base64_encoded = base64.b64encode(file_content).decode('ascii')
return base64_encoded
except Exception as e:
print(f"❌ Error encoding to base64: {e}")
cleaned_content = bytes([b for b in file_content if b < 128])
base64_encoded = base64.b64encode(cleaned_content).decode('ascii')
return base64_encoded
def send_to_api(file_name, base64_data):
"""
Send the encoded file data to the API
"""
api_url = "https://problab-api-0004c00ee319.hosted.ghaymah.systems/process_dataset"
payload = {
"event": {
"data": {
"new": {
"id": "snipp_transformed",
"file_data": base64_data,
"file_name": file_name,
"hasHeader": True,
"delimiter": ","
}
}
}
}
headers = {
'Content-Type': 'application/json',
'User-Agent': 'Data-Transformer/1.0',
'Accept': 'application/json'
}
try:
print(f"\n🔄 Sending transformed file '{file_name}' to API...")
print(f"📊 Base64 data size: {len(base64_data)} characters")
response = requests.post(api_url, json=payload, headers=headers, timeout=60)
if response.status_code == 200:
print("✅ File sent successfully!")
print(f"📋 Response status: {response.status_code}")
else:
print(f"❌ Failed to send file. Status code: {response.status_code}")
print(f"📋 Response: {response.text[:500]}")
return response
except Exception as e:
print(f"❌ Error occurred while sending to API: {e}")
return None
def save_clean_dataset(df, file_name):
"""
Save the transformed dataset locally
"""
csv_file = f"transformed_{file_name}"
df.to_csv(csv_file, index=False, encoding='utf-8')
print(f"\n💾 Transformed dataset saved: {csv_file}")
excel_file = csv_file.replace('.csv', '.xlsx')
df.to_excel(excel_file, index=False)
print(f"💾 Excel version saved: {excel_file}")
return csv_file
def main():
"""
Main function to execute all transformations and upload
"""
print("=" * 80)
print("🚢 Ship Performance Dataset - Complete Transformation & Upload")
print("=" * 80)
# Specify the path to your Excel file
excel_file_path = "C:/Users/Mikes/OneDrive/Pictures/MENA_BUSINESS_DATA/Transformation Schiff Sample File for Predictive analysis.xlsx"
# Process and transform the file
print("\n1⃣ Reading and transforming Excel file...")
file_content, modified_file_name, df = read_and_process_file(excel_file_path, max_rows=5000)
if file_content is None:
print("\n❌ Process failed. Please check if the file exists.")
return
# Encode to base64
print("\n2⃣ Encoding transformed file to base64...")
base64_data = encode_file_to_base64(file_content)
print(f" ✅ Encoding complete ({len(base64_data)} characters)")
# Send to API
print("\n3⃣ Sending transformed data to API...")
response = send_to_api(modified_file_name, base64_data)
# Save locally
save_clean_dataset(df, modified_file_name)
# Save transformation summary
summary_file = f'transformation_summary_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt'
with open(summary_file, 'w') as f:
f.write("TRANSFORMATION SUMMARY\n")
f.write("=" * 50 + "\n\n")
f.write(f"Original file: {excel_file_path}\n")
f.write(f"Rows processed: {len(df)}\n")
f.write(f"Final columns: {len(df.columns)}\n\n")
f.write("Final columns list:\n")
for col in df.columns:
f.write(f" - {col}\n")
print(f"\n📄 Transformation summary saved: {summary_file}")
print("\n" + "=" * 80)
if response and response.status_code == 200:
print("🎉 All transformations completed and file uploaded successfully! إن شاء الله")
print(f"{len(df)} rows processed")
print(f"{len(df.columns)} columns in final dataset")
print(" ✅ Recurring customer flag added")
print(" ✅ DOB converted to Age")
print(" ✅ Contact methods merged")
print(" ✅ Date columns split into components")
print(" ✅ SiteName and Brand cleaned")
else:
print("⚠️ Process completed but API upload may have failed.")
print(" 💡 Transformed file saved locally for inspection.")
print("=" * 80)
if __name__ == "__main__":
main()