Generally speaking enterprise programming languages are robust like Java or C++ that are defined by frameworks, scale, availability of various paradigms and so on. Remember Perl? It was used by companies in the 2000s but is practically unheard of these days. It has gone the way of Cobol. Of course there are projects that are still out there even with fortran so forget about any revenue generating projects ever getting rid of their mainstay languages. Towards that if you nab a project that has less resources, more revenue that is a good way to be full time in that project or do consulting for that project.
I had often dabbled in python and this language is used in schools as well as big firms a lot. There are lot of hooks to various clouds and other modules that form the modern tech stack.
One interesting usage of Python is with integration with cloud services such as Azure or AWS since the code with Python is typically way less compared to that of a Ruby or C variant.
Lot of this is usually provided with the cloud provider who smartly just encapsulate any existing open source project with a layer where you can essentially pass the parameters the same way you would if these apps were hosted on premises:
credentials = service_account.Credentials.from_service_account_file(
credentials_json
)
job_transport = (
job_controller_grpc_transport.JobControllerGrpcTransport(
address='{}-dataproc.googleapis.com:443'.format(self.region),
credentials=credentials))
This is one way to get a data proc job client
And to submit the job you have to essentially call the
job_details = {
'placement': {
'cluster_name': self.cluster_name
},
'spark_job': {
'main_jar_file_uri': spark_job_jar,
'jar_file_uris': other_jars,
'args': args,
'properties': {
'spark.executor.cores': '4',
'spark.executor.memory': '1g'
}
}
}
result = self.jobs.submit_job(self.project_id, self.region, job=job_details)
job_id = result.reference.job_id
return job_id
the spark submit job is handled by cloud provider libraries such as
from google.oauth2 import service_account
from google.cloud.dataproc_v1.gapic.transports import (
job_controller_grpc_transport)
from google.cloud.dataproc_v1 import JobControllerClient
A simple garbage collector client can then clean up folders on regular basis
def garbage_collect_stale_folders(credential_filename, days_until_stale):
"""Simple garbage collection process to remove stale folders.
Gets all folders for the current credentialed user,
then deletes those greater than a a given number of days old.
"""
sheets = SheetsClient(credential_filename)
mime_type = 'application/vnd.google-apps.folder'
user_email = sheets.get_current_user()
owned_folders = sheets.get_owned_files(user_email, mime_type)
print("{0} folders owned by user {1}".format(len(owned_folders), user_email))
stale_folders = []
now = datetime.utcnow()
date_format = '%Y-%m-%dT%H:%M:%S.%fZ'
date_limit = timedelta(days=days_until_stale)
for folder in owned_folders:
# Compare dates to garbage collection limit
folder_created_dt = datetime.strptime(folder.get('createdTime'), date_format)
if now - folder_created_dt > date_limit:
stale_folders.append(folder)
print("{0} stale folders (>{1} days old) found for garbage collecting.".format(
len(stale_folders), days_until_stale))
deleted_count = 0
for folder in stale_folders:
id_to_delete = folder.get('id')
print('Deleting ID {0}'.format(id_to_delete))
if sheets.delete_file(id_to_delete):
deleted_count += 1
print("{0} folders deleted.".format(deleted_count))
remaining_folders = sheets.get_owned_files(user_email, mime_type)
print("{0} remaining folders owned by user {1}".format(
len(remaining_folders), user_email))
The Storage can be easily read or written:
gcs = StorageClient(gcp_project, credential_filename)
local_dir_actual = gcs.download_dir_from_gcs(output_gcs_path, local_dir)
# Get local files
data_files = glob.glob('{0}/*.csv'.format(os.path.abspath(local_dir_actual)))
Emails are auto generated when a shared folders are created.
sheets = SheetsClient(credential_filename)
temp_merged_filename = '{0}/merged.csv'.format(local_dir)
folder_link = sheets.load_data_files_to_drive(
data_files, user_email,'ContactSearchResults', temp_merged_filename)
GCP provides few utilities that can be used as a wrapper on top of existing APIs that they expose in their apps:
"""Google Cloud Storage utilities."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
from google.cloud import storage
def _check_valid_gcs_path(gcs_path):
"""Simple GCS path validation."""
if not gcs_path.startswith('gs://'):
raise ValueError('GCS path must start with gs://')
def _split_gcs_path(gcs_path):
"""Splits a GCS path into bucket and key_prefix (dirname)."""
_check_valid_gcs_path(gcs_path)
path_parts = gcs_path[5:].split('/', 1)
bucket_name = path_parts[0]
if len(path_parts) == 1:
key_prefix = ''
elif path_parts[1].endswith('/'):
key_prefix = path_parts[1]
else:
key_prefix = path_parts[1] + '/'
return bucket_name, key_prefix
class StorageClient:
"""Simple Storage Client wrapper for automating common GCS tasks in Python."""
def __init__(self, project=None, credentials=None):
self.client = self._get_client(project, credentials)
def _get_client(self, project, credentials=None):
"""Gets credentials for Cloud Storage client.
If credentials are not provided, GOOGLE_APPLICATION_CREDENTIALS must be set.
Args:
project: GCP project id
credentials: path to service account json
Returns:
GCS Client
"""
if credentials:
return storage.Client.from_service_account_json(credentials)
else:
return storage.Client(project=project)
def download_file_from_gcs(self, gcs_path, local_dir):
"""Downloads a file from GCS."""
_check_valid_gcs_path(gcs_path)
bucket_name, object_name = gcs_path[5:].split('/', 1)
bucket = self.client.get_bucket(bucket_name)
blob = bucket.get_blob(object_name)
if blob:
local_path = os.path.join(local_dir, object_name.split('/')[-1])
blob.download_to_filename(local_path)
else:
raise ValueError('{0} not found: Bucket {1}, object {2}'.format(
gcs_path, bucket_name, object_name))
return local_path
def download_dir_from_gcs(self, gcs_path, local_dir):
"""Downloads a directory from GCS.
This is a utility that flattens the GCS directory as well.
"""
_check_valid_gcs_path(gcs_path)
bucket_name, dir_name = gcs_path[5:].split('/', 1)
bucket = self.client.get_bucket(bucket_name)
# Get blobs in requested directory
object_names = [blob for blob in bucket.list_blobs(prefix=dir_name)]
for blob in bucket.list_blobs(prefix=dir_name):
# print(blob.name)
if not blob.name.endswith('/'):
# Note that this flatten GCS directories on download
local_path = os.path.join(local_dir, blob.name.split('/')[-1])
blob.download_to_filename(local_path)
return local_dir
def upload_file_to_gcs(self, gcs_base_path, filename, saved_name=None):
"""Upload file to GCS bucket at provided path."""
bucket_name, key_prefix = _split_gcs_path(gcs_base_path)
bucket = self.client.get_bucket(bucket_name)
if saved_name:
object_name = key_prefix + saved_name
else:
object_name = key_prefix + os.path.basename(filename)
blob = bucket.blob(object_name)
blob.upload_from_filename(filename)
return blob.path
Python utility for sheets:
"""Google Sheets Python Utilities."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import csv
import httplib2
from uuid import uuid4
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
from googleapiclient.http import MediaFileUpload
SCOPES = ['https://www.googleapis.com/auth/drive',
'https://www.googleapis.com/auth/spreadsheets']
MAX_SHEETS_CELLS = 5000000
DEFAULT_SPREADSHEET_TITLE = "DefaultResults"
DEFAULT_ROW_COUNT = 1000
DEFAULT_COLUMN_COUNT = 30
DEFAULT_FOLDER_NAME = "ResultsFolder"
# Data file utilities
def estimate_total_data_size(data_files):
"""Estimates the amount of data stored in a list of data files.
Assumes each file is approximately equal length w/ 10% buffer.
Returns tuple of (total_row_count, column_count)
# TODO: Should return a row / column count to allow for cell estimates in google sheets!
"""
total_row_count = 0
for f in data_files:
total_row_count += get_file_len(f)
column_count = None
while not column_count:
current_file_index = 0
try:
column_count = get_file_column_count(data_files[current_file_index])
except:
# If there's any file read error, try the next file
current_file_index += 1
return total_row_count, column_count
def get_file_len(filename):
try:
with open(filename, 'r') as f:
for i, l in enumerate(f):
pass
return i+1
except:
return 0
def get_file_column_count(filename):
"""Return the number of columns in a .csv file."""
with open(filename, 'r') as f:
csv_reader = get_csv_reader(f)
header = next(csv_reader)
return len(header)
def get_header_and_data_rows_from_csv(filename):
results = []
try:
with open(filename, 'r') as f:
csv_reader = get_csv_reader(f)
header = next(csv_reader)
data_rows = [row for row in csv_reader]
return header, data_rows
except:
return None, None
def get_csv_reader(f):
"""Simple wrapper function to ensure consistent csv reader options."""
return csv.reader(
f, delimiter=',', quotechar='"', escapechar='\\', skipinitialspace=True)
class SheetsClient:
"""A simple client for manipulating data in Google Sheets & Drive."""
def __init__(self, credential_filename=None):
if credential_filename:
creds = self._get_creds_from_filename(credential_filename)
else:
creds = self._get_creds_from_application_default()
self.sheets = self._get_sheets_service_from_client_creds(creds)
self.drive = self._get_drive_service_from_client_creds(creds)
def _get_creds_from_application_default(self, scopes=None):
if not scopes:
scopes = SCOPES
unscoped_credentials = GoogleCredentials.get_application_default()
creds = unscoped_credentials.create_scoped(scopes)
http = creds.authorize(httplib2.Http())
creds.refresh(http)
print(creds.scopes)
return creds
def _get_creds_from_filename(self, credential_filename, scopes=None):
if not scopes:
scopes = SCOPES
unscoped_credentials = GoogleCredentials.from_stream(credential_filename)
creds = unscoped_credentials.create_scoped(scopes)
http = creds.authorize(httplib2.Http())
creds.refresh(http)
print(creds.scopes)
return creds
def _get_sheets_service_from_client_creds(self, creds):
service = build('sheets', 'v4', credentials=creds, cache_discovery=False)
return service
def _get_drive_service_from_client_creds(self, creds):
service = build('drive', 'v3', credentials=creds, cache_discovery=False)
return service
def load_data_files_to_drive(self, data_files, user_email, spreadsheet_title=None, merged_filename=None, folder_id=None):
print("Loading data files to Google Drive")
required_sheet_rows, column_count = estimate_total_data_size(data_files)
# Google Sheets has a default column count of 26
# We take this into account when determining if our data will fit in Sheets.
estimated_cell_count = max(required_sheet_rows * column_count, required_sheet_rows * 26)
print(estimated_cell_count)
if not folder_id:
folder_id = self.create_folder(DEFAULT_FOLDER_NAME)
print(folder_id)
self.grant_file_permissions_to_user(folder_id, user_email)
# Step 1: Create a Google Sheet if possible
if estimated_cell_count < MAX_SHEETS_CELLS:
spreadsheet_id = self.create_spreadsheet(spreadsheet_title, required_sheet_rows, column_count)
did_move = self.move_file_to_folder(spreadsheet_id, folder_id)
# Get SheetId for formatting
# TODO: Support multiple sheets, and be more robust around SheetId collection...
spreadsheet_details = self.sheets.spreadsheets().get(
spreadsheetId=spreadsheet_id).execute()
new_sheet_id = spreadsheet_details['sheets'][0]['properties']['sheetId']
print("Attempting data write to new spreadsheet_id")
total_rows_written = self.write_multiple_files_to_sheet(data_files, spreadsheet_id)
self.format_sheet(spreadsheet_id, new_sheet_id)
print('Wrote {0} rows of data to Sheet {1}'.format(total_rows_written, new_sheet_id))
else:
print("Estimated data cell count {0} exceeds maximum Google Sheets allowed cell count of {1}".format(
estimated_cell_count, MAX_SHEETS_CELLS))
# Step 2: Always upload a single CSV file
print("Merging and uploading single CSV file.")
if not merged_filename:
tmp_uuid = str(uuid4())
merged_filename = '/tmp/{0}.csv'.format(tmp_uuid)
file_id = self.merge_and_upload_csv(data_files, folder_id, merged_filename)
print("Uploaded CSV file as {0}".format(file_id))
folder_link = 'https://drive.google.com/drive/folders/{0}'.format(folder_id)
return folder_link
def create_folder(self, folder_name=None):
if not folder_name:
folder_name = DEFAULT_FOLDER_NAME
# A Google Drive folder is just a file w/ a special MIME type
file_metadata = {
'name': folder_name,
'mimeType': 'application/vnd.google-apps.folder'
}
new_file = self.drive.files().create(body=file_metadata,
fields='id').execute()
return new_file.get('id')
def delete_file(self, file_id):
"""Permanently delete a file from Google Drive.
Files deleted using this method skip the trash.
Args:
file_id: ID of the file to delete.
Returns:
boolean for whether the file was deleted.
"""
try:
self.drive.files().delete(fileId=file_id).execute()
return True
except Exception as e:
print(e)
return False
def get_current_user(self):
"""Returns the email address of the current authenticated user."""
response = self.drive.about().get(fields='user(emailAddress)').execute()
user_email = response['user']['emailAddress']
if user_email:
return user_email
def get_owned_files(self, user_email, mime_type):
"""Get all files owned by a user.
Args:
mime_type: mimeType of the files to be found.
"""
page_token = None
files = []
while True:
response = self.drive.files().list(
q="mimeType='{0}'".format(mime_type),
spaces='drive',
# fields='*',
fields='nextPageToken, files(id, name, createdTime)',
pageToken=page_token).execute()
for file in response.get('files', []):
files.append(file)
page_token = response.get('nextPageToken', None)
if page_token is None:
break
return files
def grant_file_permissions_to_user(self, file_id, user_email):
def callback(request_id, response, exception):
if exception:
print("Permission grants to {0} failed".format(user_email))
else:
print("Permission granted to {0}".format(user_email))
batch = self.drive.new_batch_http_request(callback=callback)
user_permission = {
'type': 'user',
'role': 'writer',
'emailAddress': user_email
}
batch.add(self.drive.permissions().create(
fileId=file_id,
body=user_permission,
fields='id'
))
batch.execute()
def merge_and_upload_csv(self, data_files, folder_id, merged_filename):
# output_filename = self.merge_data_files_to_single_csv(data_files, "/tmp/merged.csv")
output_filename = self.merge_data_files_to_single_csv(data_files, merged_filename)
print("Will write merged file to {0}".format(output_filename))
file_id = self.upload_csv_to_drive(output_filename, folder_id)
return file_id
def merge_data_files_to_single_csv(self, data_files, output_filename):
# Find a valid header to write (in case early files are empty)
header = None
current_file_index = 0
while not header:
try:
with open(data_files[current_file_index], 'r') as fin:
header = fin.readline()
except:
current_file_index += 1
with open(output_filename, 'w') as fout:
# Write header
fout.write(header)
# Attempt to write valid data from all files
for file in data_files:
print("Merging {0}".format(file))
try:
with open(file, 'r') as fin:
# Skip header
fin.readline()
for line in fin:
fout.write(line)
except:
print("File empty")
return output_filename
def upload_csv_to_drive(self, local_filename, folder_id=None):
# TODO: Remove mimeType hardcodes
file_metadata = {'name': 'results.csv'}
media = MediaFileUpload(local_filename, mimetype='text/csv')
file = self.drive.files().create(body=file_metadata,
media_body=media,
fields='id').execute()
file_id = file.get('id')
if folder_id:
self.move_file_to_folder(file_id, folder_id)
return file_id
def create_spreadsheet(self, title=None, row_count=None, column_count=None):
"""Creates a new Google Sheets Spreadsheet and returns its ID.
"""
if not title:
title = DEFAULT_SPREADSHEET_TITLE
if not row_count:
row_count = DEFAULT_ROW_COUNT
if not column_count:
column_count = DEFAULT_COLUMN_COUNT
spreadsheet_body = {
'properties': {
'title': title
},
'sheets': [
{
'properties': {
'title': 'Sheet1',
'index': 1,
'gridProperties': {
'rowCount': row_count,
'columnCount': column_count,
'frozenRowCount': 1
}
}
}
]
}
spreadsheet = self.sheets.spreadsheets().create(
body=spreadsheet_body,
fields='spreadsheetId').execute()
spreadsheet_id = spreadsheet.get('spreadsheetId')
print("New spreadsheet created: {0}".format(spreadsheet_id))
return spreadsheet_id
def move_file_to_folder(self, file_id, folder_id):
"""Moves a file to a new folder.
This removes it from an current folders.
"""
# Retrieve any existing parent folders
file = self.drive.files().get(fileId=file_id, fields='parents').execute()
maybe_parents = file.get('parents')
if maybe_parents and folder_id in maybe_parents:
print("File is already in desired folder.")
return True
if maybe_parents:
current_parents = ','.join(file.get('parents'))
else:
current_parents = None
# Move the file to the new folder
file = self.drive.files().update(fileId=file_id,
addParents=folder_id,
removeParents=current_parents,
fields='id, parents').execute()
# print(file)
if folder_id in file.get('parents'):
return True
else:
return False
def write_multiple_files_to_sheet(self, data_files, spreadsheet_id):
header_written = False
current_row = 1
for filename in data_files:
print(filename)
# Not all files may have valid data...skip those that don't
header, data_rows = get_header_and_data_rows_from_csv(filename)
if header and data_rows:
print(len(data_rows))
# Skip files that may not contain data.
if len(data_rows) > 0:
print("Writing {0} rows".format(len(data_rows)))
# Write header only once
if not header_written:
current_row += self.write_data_to_sheet([header], spreadsheet_id, 'A1')
header_written = True
current_row += self.write_data_to_sheet(data_rows, spreadsheet_id, 'A' + str(current_row))
else:
print("No data rows in file...skipping")
else:
print("File empty...skipping")
return current_row - 1
def write_data_to_sheet(self, data_rows, spreadsheet_id, sheet_range=None):
if not sheet_range:
sheet_range = 'Sheet1'
body ={
'values': data_rows
}
result = self.sheets.spreadsheets().values().update(
spreadsheetId=spreadsheet_id, range=sheet_range,
valueInputOption='RAW', body=body).execute()
# print(result)
return result.get('updatedRows')
def format_sheet(self, spreadsheet_id, sheet_id):
requests = []
# Format Header row
requests.append({
'repeatCell': {
'range': {
'sheetId': sheet_id,
'startRowIndex': 0,
'endRowIndex': 1
},
'cell': {
'userEnteredFormat': {
'textFormat': {
'fontSize': 11,
'bold': True
}
}
},
'fields': 'userEnteredFormat(textFormat)'
}
})
# Autosize columns based on data
requests.append({
"autoResizeDimensions": {
"dimensions": {
"sheetId": sheet_id,
"dimension": "COLUMNS",
# "startIndex": 1,
# "endIndex": 22
}
}
})
body = {
'requests': requests
}
response = self.sheets.spreadsheets().batchUpdate(
spreadsheetId=spreadsheet_id,
body=body).execute()
In the main application there is an easy way to expose python resources
api.add_resource(Export, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>', '/')
this is in a file called api.py which returns the app.
The "main" process would be
try:
folder_link = export_to_sheets(
query_input=json.dumps(query_input),
user_email=user_email,
spark_job_jar=current_app.config['SPARK_JOB_JAR'],
other_spark_jars=current_app.config['OTHER_SPARK_JARS'],
dataproc_cluster=current_app.config['DATAPROC_CLUSTER'],
scylla_hostname=current_app.config['SCYLLA_HOSTNAME'],
scylla_keyspace=current_app.config['SCYLLA_KEYSPACE'],
credential_filename=current_app.config['CREDENTIAL_FILENAME'],
partitions=current_app.config['DEFAULT_PARTITIONS'])
print("Completed")
status_response = {
'status': 'Complete',
'folderLink': folder_link
}
except:
print("failed")
status_response = {
'status': 'Error'
}
return make_response(jsonify(status_response), 200)
The same feature will definitely be a large spring-boot powered app with few hundred lines of code in Java or Groovy or other languages.