The justification behind this approach is that dataframe operations that involve iteration are slow according to this Stackoverflow question. this function expects a dataframe row as input. Lastly, we have the actual function that loads a record into the database. this is important for the get_latest_date() function to work properly, because if it retrieves this dummy record, initial loading is performed. if it fails to do so due to UndefinedTable it will create the table and insert a dummy record that dates back to before the data in our sources. this function will first attempt to query for the existence of the table and return a random record from it. Second, we have a function for creating our table. We return the connection object as its essential for any database operation, this function is a dependency for all other functions in the module. close ()Įnter fullscreen mode Exit fullscreen mode execute ( 'INSERT INTO covid19 (date, cases, deaths, recovered) VALUES (%s, %s, %s, %s)', ( row, row, row, row ) ) conn. close () return results def load_to_db ( row ): ''' Insert new rows to the db''' conn = connect () cursor = conn. execute ( 'SELECT * FROM covid19 ORDER BY date DESC LIMIT 1 ' ) results = cursor. close () def get_latest_date (): ''' Returns the current ltest record in the db''' conn = connect () cursor = conn. INSERT INTO covid19 (date, cases, deaths, recovered) VALUES ('', 0, 0, 0) ''' ) conn. execute ( 'SELECT * FROM covid19 LIMIT 1 ' ) return except UndefinedTable : print ( 'creating table' ) cursor. connect ( host = secrets_dict, database = 'postgres', user = secrets_dict, password = secrets_dict ) return conn def instantiate_db (): conn = connect () cursor = conn. loads ( secret ) def connect (): ''' Instantiates a connection to the db''' conn = psycopg2. get_secret_value ( SecretId = 'postgres-credentials' ) secrets_dict = json. dumps (), MessageStructure = 'json' ) return response The result to any number of interested message consumers''' response = sns_client. apply ( lambda x : load_to_db ( x ), axis = 1 ) return len ( data_to_send ) def post_to_sns ( message ): ''' Sends the output of the job to an SNS topic which fans out to_datetime ( get_latest_date ()) data_to_send = df > latest ] data_to_send. BytesIO ( csv_file ), low_memory = False ) return df def send_db ( df ): ''' Sends the data to the database that only has a date higher thanĬurrent latest in the db''' latest = pd. put_object ( Key = s3_path, Body = body ) def load_data ( key ): ''' Loads data from s3 into a pandas Dataframe''' obj = s3_client. environ s3_path = "data/" + file_name s3_resource. client ( 'sns' ) def download_file ( url, file_name ): ''' Downloads data from a file and saves it to s3 as an object''' body = requests. We pull data from This link and this John Hopkins datasetĪWS Lambda is a serverless platform for running small snippets of code in the cloud, in this section we will go over the code used to achieve the end goal Having a job that downloads data about covid-19 for case counts, deaths and recoveries for every given day, and stores them in a database for later access and reportingĬompute: Python application on AWS Lambda Modern data infrastructure has become so exciting and with limitless possibilities, Today we explore a use case where we pull, transform and visualize covid-19 data from several sources in a serverless and extremely quick to deploy fashion using python as our tool of choice Created in response to the #ACloudGuruChallenge posted by Forrest Brazeal.
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |