Source code for kabbes_s3synchrony.Platforms.s3
import kabbes_s3synchrony
import py_starter as ps
import aws_connections
import dir_ops as do
import os
[docs]class Platform( kabbes_s3synchrony.BasePlatform ):
NAME = do.Path( os.path.abspath( __file__ ) ).root #s3
DIR_CLASS = aws_connections.s3.S3Dir
DIRS_CLASS = aws_connections.s3.S3Dirs
PATH_CLASS = aws_connections.s3.S3Path
PATHS_CLASS = aws_connections.s3.S3Paths
def __init__(self, *args, **kwargs ):
kabbes_s3synchrony.BasePlatform.__init__( self, *args, **kwargs )
self.data_rDir = aws_connections.s3.S3Dir( bucket = self.cfg['aws_bkt'], path = self.Connection.cfg['remote_data_dir'], conn = self.remote_connection )
self._util_rDir = self.data_rDir.join_Dir( path = self.UTIL_DIR ) #S3Dir
self._util_deleted_rDir = self._util_rDir.join_Dir( path = 'deleted' ) #S3Dir
self._remote_versions_rPath = self._util_rDir.join_Path( path = self._remote_versions_lPath.filename )
self._remote_delete_rPath = self._util_rDir.join_Path( path = self._remote_delete_lPath.filename )
def _get_remote_connection( self ):
self.remote_connection = aws_connections.Client(
dict = {
"boto3_client": self.NAME,
"connection.kwargs": self.cfg['credentials'].get_raw_dict()
}
)