/utils
This is where all common functions are placed.
__init__.py
crypter.py
generateKey
def generateKey(self):
return Fernet(Fernet.generate_key())
encrypt
def encrypt(self, key, message):
return key.encrypt(message.encode())
decrypt
def decrypt(self, key, encryptedMessage):
return key.decrypt(encryptedMessage).decode()
email.py
class Email(ABC):
"""Class to send emails"""
@abstractmethod
def send(self):
pass
class EmailScheduler(Email):
"""Class to send all emails related to scheduler test cases"""
context = ssl.create_default_context()
testResult = {}
emailSubject = ""
emailBody = None
num_of_successes = None
num_of_fails = None
def __init__(self, sender_email, sender_password, receiver_email, receiver_name) -> None:
self.__senderEmail = sender_email
self.__senderPassword = sender_password
self.__receiverEmail = receiver_email
self.__receiverName = receiver_name
calculateFailsSuccesses
def calculateFailsSuccesses(self):
self.num_of_successes = len(list(filter(lambda x: self.testResult[x][-1] == "PASSED", self.testResult)))
self.num_of_fails = len(self.testResult) - self.num_of_successes
generateEmailSubject
def generateEmailSubject(self):
self.calculateFailsSuccesses()
if self.num_of_successes > 1 and self.num_of_fails > 1:
self.emailSubject = (
f"{self.emailSubject} - There were {self.num_of_successes} successes and {self.num_of_fails} failures"
)
elif self.num_of_successes > 1 and self.num_of_fails <= 1:
self.emailSubject = (
f"{self.emailSubject} - There were {self.num_of_successes} successes and {self.num_of_fails} failure"
)
elif self.num_of_successes <= 1 and self.num_of_fails > 1:
self.emailSubject = (
f"{self.emailSubject} - There were {self.num_of_successes} success and {self.num_of_fails} failures"
)
else:
self.emailSubject = (
f"{self.emailSubject} - There were {self.num_of_successes} success and {self.num_of_fails} failure"
)
generateEmailBody
def generateEmailBody(self, receiverName):
self.emailBody = f"""
Hi {receiverName}-san,
<br><br>
I hope this email finds you well. Attached table is the Selenium Test Automation Report for Daily Scheduler, providing an overview of the test results and performance for our software application.
<br><br>
Below is a summary table highlighting the key metrics from the Selenium test suite:
{
build_table(
pd.DataFrame.from_dict(self.testResult),
"blue_dark"
)
}
Thank you for your attention to this matter.
<br><br>
Best regards,<br>
QA-Team
"""
send
def send(self):
if len(self.__receiverName) == len(self.__receiverEmail):
for receiverName, receiverEmail in zip(self.__receiverName, self.__receiverEmail):
# create an email object
lib = EmailMessage()
# generate the email body and email subject
self.generateEmailSubject()
self.generateEmailBody(receiverName)
# set up email
lib["From"] = self.__senderEmail
lib["To"] = receiverEmail
lib["Subject"] = self.emailSubject
lib.add_alternative(self.emailBody, subtype="html")
# send email
with smtplib.SMTP_SSL("smtp.gmail.com", 465, context=self.context) as smtp:
smtp.login(self.__senderEmail, self.__senderPassword)
smtp.send_message(lib)
smtp.close()
else:
raise Exception("number of receiverName does not fit with the number of receiverEmail")
faker.py
class FakerGenerator:
"""Generating fake values"""
def __init__(self) -> None:
self.faker = Faker()
generate_name
def generate_name(self):
return self.faker.name()
generate_title
def generate_title(self):
return self.faker.company()
generate_sentence
def generate_sentence(self, num_of_sentences: int = 1):
return self.faker.paragraph(nb_sentences=num_of_sentences)
file_opener.py
class FileOpener:
"""To import, open, and read file"""
openCSV
@staticmethod
def openCSV(path, withHeader=False):
dataList = []
reader = csv.reader(open(path, "r"))
if withHeader:
next(reader)
for row in reader:
dataList.append(row)
return dataList
formatter.py
class Formatter:
"""Converting values into desired format result"""
convert_query_result
def convert_query_result(
self, query_result, rounding_columns=None, rounding_option=None, toList=False
):
result = []
# converting result to both decimal and datetime if it's possible
for i in range(len(query_result)):
for col in query_result[i]:
query_result[i][col] = self.convert_decimal(query_result[i][col])
query_result[i][col] = self.convert_datetime(query_result[i][col])
result.append(query_result[i])
# rounding number in result
if rounding_columns:
result = self.rounding(result, rounding_columns, rounding_option)
# converting result into a single dimension list
if toList:
listResult = []
for row in result:
listResult += list(row.values())
return listResult
return result
convert_decimal
def convert_decimal(self, value):
"""Converts inputted value into decimal format if it is possible"""
if isinstance(value, Decimal):
return float(value)
return value
convert_datetime
def convert_datetime(self, value):
"""Converts inputted value into desired date format if it is possible"""
if isinstance(value, datetime.datetime):
return value.strftime("%Y-%m-%d %H:%M:%S %Z")
return value
convert_number
def convert_number(self, strNumber):
"""Converts inputted string into number if it's possible"""
if len(strNumber):
checkedStrNumber = strNumber.replace(".", "", 1)
checkedStrNumber = (
checkedStrNumber.replace("-", "", 1)
if checkedStrNumber[0] == "-"
else checkedStrNumber
)
if checkedStrNumber.isdigit():
strNumber = (
round(float(strNumber), 1) if "." in strNumber else int(strNumber)
)
return strNumber
rounding
def rounding(self, query_result, column_names, option):
"""Iterats over query result and rounding all values in certain columns"""
for row in query_result:
for col in row:
if col in column_names:
row[col] = round(row[col], option)
return query_result
rgba_string_to_hex
def rgba_string_to_hex(self, rgba_string):
"""Converts RGBA string (mostly from Selenium) into hex code"""
rgb_values = re.findall(r"\d+", rgba_string)
r, g, b = map(int, rgb_values[:3])
return "#{:02x}{:02x}{:02x}".format(r, g, b)
convert_period
def convert_period(self, period: str) -> list[list]:
"""
Converts period into separated date
Args:
period (str): the period string
Returns:
list[list]: the separated date
Example:
>>> convert_period("2023/06/15 - 2023/06/06")
[[2023, 6, 15], [2023, 6, 6]]
"""
return [
[int(_) if _ != "" else _ for _ in date.split("/")]
for date in period.split(" - ")
]
re_sub
def re_sub(self, pattern, string):
return re.sub(pattern, "", string).strip()
gsheet.py
class GSheet:
"""Class to interact with Google Sheet"""
def __init__(self, spreadsheetName) -> None:
self.__sa = gspread.service_account()
self.__ss = self.__sa.open(spreadsheetName)
@property
def sa(self):
return self.__sa
@property
def ss(self):
return self.__ss
class GSheetStateSXT(GSheet):
"""Class to interact with Google Sheet corresponds to the SPREADSHEET_NAME"""
scenarioResult = {}
def __init__(
self,
spreadsheetName,
folderId,
testedFilesOnly=True,
executeJSON=False,
) -> None:
super().__init__(spreadsheetName)
self.curDate = dt.now().strftime("%Y/%m/%d %H:%M:%S")
self.automationName = "Selenium"
self.newSpreadsheetName = f"Automation - Release {self.curDate}"
self.__folderId = folderId
self.__newSs = None
self.testedFilesOnly = testedFilesOnly
self.executeJSON = executeJSON
self.json_path = "track.json"
create_a_copy_of_worksheet_into_new_gsheet_file_and_update_the_values
def create_a_copy_of_worksheet_into_new_gsheet_file_and_update_the_values(
self, worksheetName, namedRange, values
):
try: # assuming that the gsheet has already a worksheet with paramater name
wks = self.__newSs.worksheet(worksheetName)
except: # assuming that the gsheet does not have any worksheet the same with the parameter
oldWks = self.ss.worksheet(worksheetName)
wks = self.__newSs.worksheet(oldWks.copy_to(self.__newSs.id)["title"])
wks.update_title(worksheetName)
wks.update(namedRange, values, value_input_option="USER_ENTERED")
create_a_copy_of_gsheet_file
def create_a_copy_of_gsheet_file(self):
self.sa.copy(
file_id=self.ss.id, title=self.newSpreadsheetName, copy_permissions=True
)
self.__newSs = self.sa.open(self.newSpreadsheetName)
if self.testedFilesOnly:
deleteRequests = []
initialSheets = ["Cover", "Use Cases", "ToC", "Queries", "variables"]
for wks in self.__newSs.worksheets():
if wks.title not in initialSheets:
deleteRequests.append({"deleteSheet": {"sheetId": wks.id}})
self.__newSs.batch_update({"requests": deleteRequests})
get_values_by_named_range
def get_values_by_named_range(self, worksheetName, namedRange):
wks = self.ss.worksheet(worksheetName)
return wks.get(namedRange)
upload_the_gsheet_file_to_folder
def upload_the_gsheet_file_to_folder(self):
# Move the newly created spreadsheet to the desired folder
drive_service = build("drive", "v3", credentials=self.sa.auth)
drive_service.files().update(
fileId=self.__newSs.id, addParents=self.__folderId, fields="id,parents"
).execute()
save_data_to_json
def save_data_to_json(self):
# Write data to the JSON file
with open(self.json_path, "w") as json_file:
json.dump(
self.scenarioResult, json_file, indent=4
) # Use indent for pretty formatting
get_json
def get_json(self):
# Read data from the JSON file
with open(self.json_path, "r") as json_file:
return json.load(json_file)
update_all_values
def update_all_values(self, useJSON=False):
# create a new file (the duplicate of the target file)
self.create_a_copy_of_gsheet_file()
data = self.get_json()
if not useJSON:
self.save_data_to_json()
data = self.scenarioResult
for worksheetName in data:
for namedRange in data[worksheetName]:
values = [
[
self.curDate,
self.automationName,
internalCheckResult,
externalCheckResult,
testerNote,
]
for internalCheckResult, externalCheckResult, testerNote in data[
worksheetName
][namedRange]
]
self.create_a_copy_of_worksheet_into_new_gsheet_file_and_update_the_values(
worksheetName, namedRange.replace("Data", "Form"), values
)
# remove sheet1, which is the default sheet that is created when creating a new gsheet file
if self.__newSs.sheet1.title == "Sheet1":
self.__newSs.del_worksheet(self.__newSs.sheet1)
self.upload_the_gsheet_file_to_folder()
update_worksheet_colors
def update_worksheet_colors(self, useJSON=False):
data = self.scenarioResult
if useJSON:
data = self.get_json()
for wksName in data:
wksId = self.__newSs.worksheet(wksName).id
noFail = True
for nr in data[wksName]:
if len(list(filter(lambda x: x[0] == "FAILED", data[wksName][nr]))):
noFail = False
break
if noFail:
requestsBatch = [
{
"updateSheetProperties": {
"properties": {
"sheetId": wksId,
"tabColor": {
"red": 0.0, # Specify the color values in RGB format (from 0.0 to 1.0)
"green": 1.0,
"blue": 0.0,
},
},
"fields": "tabColor",
}
}
]
else:
requestsBatch = [
{
"updateSheetProperties": {
"properties": {
"sheetId": wksId,
"tabColor": {
"red": 1.0, # Specify the color values in RGB format (from 0.0 to 1.0)
"green": 0.0,
"blue": 0.0,
},
},
"fields": "tabColor",
}
}
]
# Send the batchUpdate request
self.__newSs.batch_update({"requests": requestsBatch})
logger.py
class RootFilter(logging.Filter):
def __init__(self, name: str) -> None:
super().__init__(name)
self.name = name
def filter(self, record: logging.LogRecord) -> bool:
return record.name.startswith(self.name)
class Logger:
"""Logging messages for a specific system or application component"""
def __init__(self) -> None:
# instantiate logging components
self.logger = logging.getLogger("root")
self.file_handler = logging.FileHandler("automation.log", mode="w")
# self.console_handler = logging.StreamHandler()
self.formatter = logging.Formatter(
"%(asctime)s | %(name)s | %(levelname)s | %(message)s"
)
# set up
self.setup()
setup
def setup(self):
# set level of the logger
self.logger.setLevel(logging.ERROR)
# set level of the handler
self.file_handler.setLevel(logging.ERROR)
# self.console_handler.setLevel(logging.ERROR)
# install formatter into the handlers
self.file_handler.setFormatter(self.formatter)
# self.console_handler.setFormatter(self.formatter)
# add filters
self.logger.addFilter(RootFilter(name=self.logger.name))
self.file_handler.addFilter(RootFilter(name=self.logger.name))
# self.console_handler.addFilter(RootFilter(name="root"))
# add the handlers to the logger
self.logger.addHandler(self.file_handler)
# self.logger.addHandler(self.console_handler)
shutdown
def shutdown(self):
logging.shutdown()
response_handler.py
class ResponseHandler:
"""To get response of calls (making use selenium-wire)"""
get_response
def get_response(self, driver: webdriver, prefix=""):
data = []
for request in driver.requests:
if request.response:
if request.url.startswith(prefix):
response = request.response
body = decode(
response.body,
response.headers.get("Content-Encoding", "identity"),
)
decoded_body = body.decode("utf-8")
json_data = json.loads(decoded_body)
data.append(json_data)
return data
wrapper.py
class Wrapper:
"""Making use functools\wraps"""
exception_handling_returns_None
@classmethod
def exception_handling_returns_None(cls, func):
"""
to let a test case returns a None value instead of raises an exception/error
"""
decoratorClassName = cls.__name__
decoratorMethodName = sys._getframe().f_code.co_name
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logging.getLogger(
f"root.{__name__}.{decoratorClassName}.{decoratorMethodName}"
).error(f"error:\n{str(e)}")
return None
return wrapper
exception_handling_raises_error
@classmethod
def exception_handling_raises_error(cls, func):
"""
to handle the error by tracking, but keeps raises the error
"""
decoratorClassName = cls.__name__
decoratorMethodName = sys._getframe().f_code.co_name
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logging.getLogger(
f"root.{__name__}.{decoratorClassName}.{decoratorMethodName}"
).error(f"error:\n{str(e)}")
raise Exception(str(e))
return wrapper
result_receiving
@classmethod
def result_receiving(cls, func):
"""
to track the result of test cases, so instead of directly raising error, it lets to write down the error first, e.g. email, report, and summary
"""
decoratorClassName = cls.__name__
decoratorMethodName = sys._getframe().f_code.co_name
@wraps(func)
def wrapper(self, *args, **kwargs):
funcName = str(func.__name__).replace("_", " ").title()
try:
isFail = False
errorMessage = None
try:
func(self, *args, **kwargs)
self.email.testResult[funcName].append("PASSED")
except Exception as e:
if str(e).replace("'", "") != funcName:
logging.getLogger(
f"root.{__name__}.{decoratorClassName}.{decoratorMethodName}"
).error(
f"class: {self.__class__.__name__}, method: {func.__name__}\n{str(e)}"
)
isFail = True
errorMessage = str(e)
self.email.testResult[funcName].append("FAILED")
except:
if not isFail:
self.email.testResult[funcName] = ["PASSED"]
else:
self.email.testResult[funcName] = ["FAILED"]
print(f"\n\nCurrent results:\n{self.email.testResult}")
if isFail:
raise Exception(f"There is an error in {funcName}: {errorMessage}")
return wrapper
unpagshe
@classmethod
def unpagshe(cls, worksheetName, named_range, needExternalCheck=False):
"""
to retrieve and unpack the data from gsheet
"""
decoratorClassName = cls.__name__
decoratorMethodName = sys._getframe().f_code.co_name
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
data = self.gsheet.get_values_by_named_range(worksheetName, named_range)
result = []
isFail = False
emptyFormats = [
"",
"-",
"<blank>",
"<empty>",
"blank",
"empty",
"inactive",
"uncheck",
]
anyFormats = ["anything", "dc", "Any", "any"]
for row in data:
# preprocess data
row = [None if (col in emptyFormats) else col for col in row]
row = [
FakerGenerator().generate_sentence()
if (col in anyFormats)
else col
for col in row
]
try:
func(self, *row, *args, **kwargs)
result.append(
["PASSED", "PASSED" if needExternalCheck else "", ""]
)
except Exception as e:
logging.getLogger(
f"root.{__name__}.{decoratorClassName}.{decoratorMethodName}"
).error(
f"class: {self.__class__.__name__}, method: {func.__name__}\n{str(e)}"
)
# raise Exception(str(e))
result.append(
[
"FAILED",
"FAILED" if needExternalCheck else "",
f"'{str(e)}'",
]
)
if not isFail:
isFail = True
try:
self.gsheet.scenarioResult[worksheetName][named_range] = result
except:
self.gsheet.scenarioResult[worksheetName] = {named_range: result}
if isFail:
raise Exception("an error occured")
return wrapper
return decorator
login_exeception_handling
@classmethod
def login_exeception_handling(cls, func):
"""to catch the error when login"""
decoratorClassName = cls.__name__
decoratorMethodName = sys._getframe().f_code.co_name
@wraps(func)
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except Exception as e:
logging.getLogger(
f"root.{__name__}.{decoratorClassName}.{decoratorMethodName}"
).error(f"login error:\n{str(e)}")
raise Exception(str(e))
return wrapper
role_checking
@classmethod
def role_checking(cls, func_role):
"""
to check the role inputted (from command) before executing any testcase (not used/deprecated)
"""
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if self.role == func_role:
func(self, *args, **kwargs)
else:
print(f"Role doesn't match, skipping '{func.__name__}' execution")
return
return wrapper
return decorator