Source code for kabbes_s3synchrony.BasePlatform
import hashlib
import datetime as dt
import kabbes_s3synchrony
import py_starter as ps
import dir_ops as do
import pandas as pd
from parent_class import ParentClass
import functools
import os
[docs]def data_function( method ):
@functools.wraps( method )
def wrapper( self, Paths_inst ):
successful_Paths = self.PATHS_CLASS()
for Path_inst in Paths_inst:
if method( self, Path_inst ):
successful_Paths._add( Path_inst )
return successful_Paths
return wrapper
[docs]class BasePlatform( ParentClass ):
NAME = do.Path( os.path.abspath( __file__ ) ).root #s3
DIR_CLASS = do.Dir
DIRS_CLASS = do.Dirs
PATH_CLASS = do.Path
PATHS_CLASS = do.Paths
_file_colname = "File"
_editor_colname = "Edited By"
_time_colname = "Time Edited"
_hash_colname = "Checksum"
columns = [_file_colname, _editor_colname, _time_colname, _hash_colname]
dttm_format = "%Y-%m-%d %H:%M:%S"
def __init__(self, Connection, **kwargs ):
"""Initialize necessary instance variables."""
ParentClass.__init__( self )
###
self.UTIL_DIR = '.' + self.NAME.upper() #.S3
### set cfg
self.Connection = Connection
self.cfg = self.Connection.cfg[ self.Connection.platform_node_name ]
###
self.data_lDir = do.Dir( self.Connection.cfg.parent['cwd.Dir'].join( self.Connection.cfg['local_data_rel_dir'] ) )
#lDir is a local Dir, rDir is a remote dir
self._util_lDir = do.Dir( self.data_lDir.join( self.UTIL_DIR ) )
self._remote_versions_lPath = do.Path( self._util_lDir.join( 'versions_remote.csv' ) )
self._local_versions_lPath = do.Path( self._util_lDir.join( 'versions_local.csv' ) )
self._remote_delete_lPath = do.Path( self._util_lDir.join( 'deleted_remote.csv' ) )
self._local_delete_lPath = do.Path( self._util_lDir.join( 'deleted_local.csv' ) )
self._tmp_lDir = do.Dir( self._util_lDir.join('tmp') )
self._logs_lDir = do.Dir( self._util_lDir.join('logs') )
self._ignore_lPath = do.Path( self._util_lDir.join( 'ignore_remote.txt' ) )
self._ignore = []
self._reset_approved = False
### These should be defined by the Child Platform
self.data_rDir = None
self._util_rDir = None
self._util_deleted_rDir = None
self._remote_versions_rPath = None
self._remote_delete_rPath = None
### Get remote connection
self._get_remote_connection()
def _get_remote_connection( self ):
self.remote_connection = None
[docs] def run( self ):
self.intro_message()
self.establish_connection()
if self.Connection.cfg['reset']:
if self.reset_confirm():
self.reset_local()
self.reset_remote()
else:
self.synchronize()
self.close_message()
[docs] def intro_message(self):
"""Print an introductory message to signal program start."""
print()
print("###########################")
print("# Data Sync #")
print("###########################")
print()
[docs] def close_message(self):
"""Print a closing message to signal program end."""
print()
print("###########################")
print("# Done Syncing #")
print("###########################")
[docs] def establish_connection(self):
"""Form a connection to the remote repository."""
# Create local data dir
if not self.data_lDir.exists():
self.data_lDir.create( override = True )
print ('Directory not present, creating empty directory at: ')
print (self.data_lDir)
# Create platform util local dir
if not self._util_lDir.exists():
self._util_lDir.create( override = True )
# Check if there is a .S3 subfolder in this S3 bucket/prefix
if not self._util_rDir.exists():
print("This S3 prefix has not been initialized for S3 Synchrony - Initializing prefix and uploading to S3...")
self._initialize_util_rDir()
print("Done")
has_local_lPaths = self._local_versions_lPath.exists() and self._local_delete_lPath.exists()
has_remote_lPaths = self._remote_versions_lPath.exists() and self._remote_delete_lPath.exists()
if(not has_local_lPaths or not has_remote_lPaths):
print( "Your data folder has not been initialized for S3 Synchrony - Downloading from S3..." )
self._util_rDir.download( Destination = self._util_lDir, override = True )
empty = pd.DataFrame( columns=self.columns )
empty.to_csv( self._local_delete_lPath.path, index=False)
empty.to_csv( self._local_versions_lPath.path , index=False)
if not self._tmp_lDir.exists():
self._tmp_lDir.create( override = True )
if not self._logs_lDir.exists():
self._logs_lDir.create( override = True )
if not self._ignore_lPath.exists():
self._ignore_lPath.create( override = True )
self._ignore = self._ignore_lPath.read().strip().split( '\n' ) #list of lines
def _initialize_util_rDir( self ):
"""Check for all necessary files on the S3 prefix for synchronization."""
randhex = self._get_randomized_dirname()
download_lDir = self._tmp_lDir.join_Dir( path = randhex )
self._tmp_lDir.create( override = True )
download_lDir.create( override = True )
self.data_rDir.download( Destination = download_lDir, override = True )
# Upload versions
df_versions = self._compute_directory( download_lDir, False )
temp_remote_versions_lPath = download_lDir.join_Path( path = self._remote_versions_lPath.filename )
df_versions.to_csv( temp_remote_versions_lPath.path, index = False )
self._remote_versions_rPath.upload( Destination = temp_remote_versions_lPath, override = True )
# Upload deleted
df_empty = pd.DataFrame( columns=self.columns )
temp_remote_delete_lPath = download_lDir.join_Path( path = self._remote_delete_lPath.filename )
df_empty.to_csv( temp_remote_delete_lPath.path , index=False)
self._remote_delete_rPath.upload( Destination = temp_remote_delete_lPath, override = True )
[docs] def synchronize(self):
"""Prompt the user to synchronize all local files with remote files"""
self._remote_versions_rPath.download( Destination = self._remote_versions_lPath, override = True, overwrite = True )
self._remote_delete_rPath.download( Destination = self._remote_delete_lPath, override = True, overwrite = True )
self._push_deleted_remote()
self._pull_deleted_local()
self._push_new_remote()
self._pull_new_local()
self._push_modified_remote()
self._pull_modified_local()
self._revert_modified_remote()
self._revert_modified_local()
self._remote_versions_rPath.upload( Destination = self._remote_versions_lPath, override = True )
self._remote_delete_rPath.upload( Destination = self._remote_delete_lPath, override = True )
# Save a snapshot of our current files into versionsLocal for next time
self._compute_directory( self.data_lDir ).to_csv( self._local_versions_lPath.path, index=False )
def _push_deleted_remote(self):
"""Remove remote files that were deleted locally."""
mine, other, mod_mine, mod_other = self._compute_dfs( self.data_lDir )
# Load in what files we had last time, and what files we have deleted in the past
oldmine = pd.read_csv(self._local_versions_lPath.path)
deletedlocal = pd.read_csv(self._local_delete_lPath.path)
# Combine a list of files we have deleted and files we have had in the past, remove any duplicates
deletedlocal = pd.concat([oldmine, deletedlocal])
deletedlocal = deletedlocal.drop_duplicates(
[self._file_colname], keep="last")
# From previous files + deleted files select only the ones that AREN'T in our local system but ARE on AWS
deletedlocal = deletedlocal[~deletedlocal[self._file_colname].isin(
mine[self._file_colname])]
deletedlocal = other[other[self._file_colname].isin(
deletedlocal[self._file_colname])]
deletedlocal.to_csv(self._local_delete_lPath.path, index=False)
if(len(deletedlocal) > 0):
print("UPLOAD: Would you like to delete these files on S3 that were deleted locally?:")
print("('file name' / 'Date last modified on S3')\n")
to_delete_Paths = self.PATHS_CLASS()
index = 1
for i, row in deletedlocal.iterrows():
rPath = self.data_rDir.join_Path( path = row[ self._file_colname ] )
to_delete_Paths._add( rPath )
print(index, row[self._file_colname], '\t', row[self._time_colname], '\t by', row[self._editor_colname])
index += 1
selected_deleted_Paths = self._apply_selected_indices( self._delete_from_remote, to_delete_Paths )
deleted_rel_paths = selected_deleted_Paths.get_rels( self.data_rDir ).export_strings()
deleteds3 = pd.read_csv( self._remote_delete_lPath.path )
newdeleted = other.loc[other[self._file_colname].isin(deleted_rel_paths)]
deleteds3 = pd.concat([deleteds3, newdeleted])
deleteds3.to_csv( self._remote_delete_lPath.path )
# Replace any removed file names with N/A and then drop if they have been deleted
other[self._file_colname] = other[self._file_colname].where(
~other[self._file_colname].isin( deleted_rel_paths ))
other = other.dropna()
other.to_csv(self._remote_versions_lPath.path, index=False)
print("Done.\n")
def _pull_deleted_local(self):
"""Remove files from local system that were deleted on S3."""
mine, other, mod_mine, mod_other = self._compute_dfs(self.data_lDir)
# Load in files deleted from S3, and select only those that ARE on our local system and AREN'T on AWS
deleted_remote = pd.read_csv(self._remote_delete_lPath.path)
deleted_remote = deleted_remote[deleted_remote[self._file_colname].isin(
mine[self._file_colname])]
deleted_remote = deleted_remote[~deleted_remote[self._file_colname].isin(
other[self._file_colname])]
if(len(deleted_remote) > 0):
print("DOWNLOAD: Would you like to delete these files from your computer that were deleted on S3?:")
print("('file name' / 'Date last modified locally')\n")
to_delete_Paths = do.Paths()
index = 1
for i, row in deleted_remote.iterrows():
lPath = self.data_lDir.join_Path( path = row[self._file_colname] )
to_delete_Paths._add( lPath )
mask = row[self._file_colname] == mine[self._file_colname]
print(index, row[self._file_colname], '\t', mine.loc[mask][self._file_colname].iloc[0])
index += 1
selected_deleted_Paths = self._apply_selected_indices(self._delete_from_local, to_delete_Paths)
print('Done.\n')
def _push_new_remote(self):
"""Upload files to S3 that were created locally."""
mine, other, mod_mine, mod_other = self._compute_dfs(self.data_lDir)
# Find files that are in our directory but not AWS, and load in files deleted from AWS
new_local = mine.loc[~mine[self._file_colname].isin(
other[self._file_colname])]
deletedfiles = pd.read_csv(self._remote_delete_lPath.path)[
self._file_colname].values.tolist()
if(len(new_local) > 0):
print("UPLOAD: Would you like to upload these new files to S3 that were created locally?:")
print("('file name' / 'Date last modified Locally')\n")
to_add_Paths = do.Paths()
index = 1
for i, row in new_local.iterrows():
lPath = self.data_lDir.join_Path( path = row[self._file_colname] )
to_add_Paths._add( lPath )
print(index, row[self._file_colname], '\t', row[self._time_colname], end='\t')
if(row[self._file_colname] in deletedfiles):
print("*DELETED ON S3", end='')
print()
index += 1
selected_added_Paths = self._apply_selected_indices(self._upload_to_remote, to_add_Paths)
added_rel_paths = selected_added_Paths.get_rels( self.data_lDir ).export_strings()
added_to_s3 = mine.loc[mine[self._file_colname].isin(added_rel_paths)]
newversions = pd.concat([other, added_to_s3])
newversions.to_csv(self._remote_versions_lPath.path, index=False)
print("Done.\n")
def _pull_new_local(self):
"""Download files from S3 that were created recently."""
mine, other, mod_mine, mod_other = self._compute_dfs(self.data_lDir)
# Find files that are on S3 but not our local system and read in files we have deleted locally
news3 = other.loc[~other[self._file_colname].isin(
mine[self._file_colname])]
deletedfiles = pd.read_csv(self._local_delete_lPath.path)[
self._file_colname].values.tolist()
if(len(news3) > 0):
print("DOWNLOAD: Would you like to download these new files that were created on S3?:")
print("('file name' / 'Date last modified on S3')\n")
to_download_Paths = self.PATHS_CLASS()
index = 1
for i, row in news3.iterrows():
rPath = self.data_rDir.join_Path( path = row[self._file_colname] )
to_download_Paths._add( rPath )
print(index, row[self._file_colname], '\t', row[self._time_colname],
"\t by", row[self._editor_colname], end='\t')
if(row[self._file_colname] in deletedfiles):
print("*DELETED LOCALLY", end='')
print()
index += 1
selected_downloaded_Paths = self._apply_selected_indices(self._download_from_remote, to_download_Paths)
print("Done.\n")
def _push_modified_remote(self):
"""Update files on S3 with modifications that were made locally more recently."""
mine, other, mod_mine, mod_other = self._compute_dfs(self.data_lDir)
if(len(mod_mine) > 0):
print("UPLOAD: Would you like to update these files on S3 with your local changes?:")
print("('file name' / 'Date last modified locally' / 'Date last modified on S3')\n")
self._push_sequence(mod_mine, mine, other)
def _pull_modified_local(self):
"""Update local files with modifications that were made on S3 more recently."""
mine, other, mod_mine, mod_other = self._compute_dfs(self.data_lDir)
if(len(mod_other) > 0):
print("DOWNLOAD: Would you like to update these local files with the changes from S3?:")
print("('file name' / 'Date last modified locally' / 'Date last modified on S3')\n")
self._pull_sequence(mod_other, other)
def _revert_modified_remote(self):
"""Revert remote files with modifications that were made locally less recently."""
mine, other, mod_mine, mod_other = self._compute_dfs(self.data_lDir)
if(len(mod_other) > 0):
print( "UPLOAD: Would you like to revert these files on S3 back to your local versions?:")
print( "('file name' / 'Date last modified locally' / 'Date last modified on S3')\n")
self._push_sequence(mod_other, mine, other)
def _revert_modified_local(self):
"""Revert local files with modifications that were made on S3 less recently."""
mine, other, mod_mine, mod_other = self._compute_dfs(self.data_lDir)
if(len(mod_mine) > 0):
print("DOWNLOAD: Would you like to revert these local files back to the versions on S3?:")
print("('file name' / 'Date last modified locally' / 'Date last modified on S3')\n")
self._pull_sequence(mod_mine, other)
def _compute_dfs(self, lDir ):
"""Return a list of dfs containing all the information for smart_sync."""
mine = self._compute_directory( lDir )
other = pd.read_csv( self._remote_versions_lPath.path )
mine = self._filter_ignore( mine )
other = self._filter_ignore( other )
inboth = mine[ mine[self._file_colname].isin( other[self._file_colname]) ]
mod_mine = [] # Files more recently modified Locally
mod_other = [] # Files more recently modified on S3
for file in inboth[self._file_colname]:
mycs = inboth.loc[inboth[self._file_colname] == file][self._hash_colname].iloc[0]
othercs = other.loc[other[self._file_colname] == file][self._hash_colname].iloc[0]
if(mycs != othercs): # users have pushed conflicting file check sums
datemine = dt.datetime.strptime(
mine.loc[mine[self._file_colname] == file][self._time_colname].iloc[0], self.dttm_format)
dateother = dt.datetime.strptime(
other.loc[other[self._file_colname] == file][self._time_colname].iloc[0], self.dttm_format)
if(datemine > dateother):
mod_mine.append([file, datemine, dateother])
else:
mod_other.append([file, datemine, dateother])
return (mine, other, mod_mine, mod_other)
def _compute_directory(self, lDir, ignore_util=True):
"""Create a dataframe describing all files in a local directory."""
df = pd.DataFrame(columns=self.columns)
folders_to_skip = [ self.UTIL_DIR ]
if not ignore_util:
folders_to_skip = []
Paths_inst = lDir.walk_contents_Paths( block_dirs=True, block_paths=False, folders_to_skip = folders_to_skip )
for Path_inst in Paths_inst:
df_new = pd.DataFrame( columns = self.columns )
df_new[ self._file_colname ] = [Path_inst.get_rel( lDir ).path ]
df_new[ self._time_colname ] = Path_inst.get_mtime().strftime( self.dttm_format )
df_new[ self._hash_colname ] = self._hash( Path_inst.path )
df_new[ self._editor_colname ] = self.Connection.cfg['_name']
df = pd.concat([df, df_new], ignore_index=True)
return df
def _filter_ignore(self, df ):
"""Remove all files that should be ignored as requested by the user."""
return df.loc[ ~df[self._file_colname].isin( self._ignore ) ]
@data_function
def _upload_to_remote( self, lPath ):
rel_lPath = lPath.get_rel( self.data_lDir )
rPath = self.data_rDir.join_Path( Path = rel_lPath )
return rPath.upload( Destination = lPath, override = True, print_off = True )
@data_function
def _download_from_remote(self, rPath):
rel_rPath = rPath.get_rel( self.data_rDir )
lPath = self.data_lDir.join_Path( Path = rel_rPath )
return rPath.download( Destination = lPath, override = True, print_off = True )
@data_function
def _delete_from_remote(self, rPath):
rel_rPath = rPath.get_rel( self.data_rDir )
deleted_rPath = self._util_deleted_rDir.join_Path( Path = rel_rPath )
# make a copy of the deleted file into the deleted folder in the util section
if rPath.copy( Destination = deleted_rPath, override = True ):
return rPath.remove( override = True, print_off = True )
return False
@data_function
def _delete_from_local(self, lPath):
return lPath.remove( override = True, print_off = True )
def _apply_selected_indices(self, data_function, Paths_inst):
"""Prompt the user to select certain files to perform a synchronization function on."""
indices = ps.get_user_selection_for_list_items( Paths_inst.export_strings(), print_off=False )
all_indices = list(range(len(Paths_inst)))
inds_not_selected = [ ind for ind in all_indices if ind not in indices ]
Paths_inst._remove_inds( inds_not_selected )
successful_Paths = data_function( Paths_inst )
return successful_Paths
def _push_sequence(self, listfiles, mine, other):
"""User-prompted uploading of files from a dataframe."""
to_push_Paths = do.Paths()
index = 1
for file in listfiles:
lPath = self.data_lDir.join_Path( path = file[0] )
to_push_Paths._add( lPath )
print(index, file[0], '\t', file[1], '\t', file[2], "\t by",
other.loc[other[self._file_colname] == file[0]][self._editor_colname].iloc[0])
index += 1
selected_push_Paths = self._apply_selected_indices( self._upload_to_remote, to_push_Paths )
push_rel_paths = selected_push_Paths.get_rels( self.data_lDir ).export_strings()
updatedins3 = mine.loc[mine[self._file_colname].isin(push_rel_paths)]
newversions = pd.concat([other, updatedins3])
newversions = newversions.drop_duplicates(
[self._file_colname], keep="last").sort_index()
newversions.to_csv(self._remote_versions_lPath.path, index=False)
print("Done.\n")
def _pull_sequence(self, listfiles, other):
"""User-prompted downloading of files from a dataframe."""
to_pull_Paths = self.PATHS_CLASS()
index = 1
for file in listfiles:
rPath = self.data_rDir.join_Path( path = file[0] )
to_pull_Paths._add( rPath )
print(index, file[0], '\t', file[1], '\t', file[2], "\t by",
other.loc[other[self._file_colname] == file[0]][self._editor_colname].iloc[0])
index += 1
selected_pull_Paths = self._apply_selected_indices(self._download_from_remote, to_pull_Paths)
print("Done.\n")
[docs] def reset_confirm(self) -> bool:
"""Prompt the user to confirm whether a reset can occur."""
print('Are you sure you would like to reset the remote data directory?')
print('This will not change any of your file contents, but will delete the entire')
confirm = input( str(self.UTIL_DIR) + ' folder on your local computer and on your AWS prefix: ' + self.aws_prfx + ' (y/n): ')
if confirm.lower() != 'y':
print("Reset aborted.")
self._reset_approved = False
self._reset_approved = True
return self._reset_approved
[docs] def reset_local(self):
"""Remove all modifications made locally by synchronization."""
if self._reset_approved:
self._util_lDir.remove( override = True )
else:
print("Cannot reset local -- user has not approved.")
[docs] def reset_remote(self):
"""Remove all modifications made to the remote repo by synchronization."""
if self._reset_approved:
self._util_rDir.remove( override = True )
else:
print("Cannot reset remote -- user has not approved.")
def _get_randomized_dirname(self):
"""Generate a random string to be used as a file name."""
return hashlib.md5(bytes(str(dt.datetime.now()), "utf-8")).hexdigest()
def _hash(self, filepath):
"""Return a unique checksum based on a file's contents."""
file = open(filepath, "rb")
data = file.read()
return hashlib.md5(data).hexdigest()